From 24cfe7283fd038252cb796730aeb3e1feb62397d Mon Sep 17 00:00:00 2001 From: donggang123 <jonnxu@163.com> Date: Mon, 3 Jun 2019 00:06:17 +0800 Subject: [PATCH 01/14] transaction-development --- example/TransactionProducer.cpp | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 example/TransactionProducer.cpp diff --git a/example/TransactionProducer.cpp b/example/TransactionProducer.cpp new file mode 100644 index 000000000..e69de29bb From 7365439e201747d5eb03fdf8613dd4ad9b478bc3 Mon Sep 17 00:00:00 2001 From: jonnxu <jonnxu@163.com> Date: Mon, 3 Jun 2019 00:20:54 +0800 Subject: [PATCH 02/14] Send transaction message feature --- example/TransactionProducer.cpp | 149 +++++++++++++++++ include/MQMessage.h | 4 + include/SendResult.h | 10 +- include/TransactionListener.h | 48 ++++++ include/TransactionMQProducer.h | 57 +++++++ include/TransactionSendResult.h | 52 ++++++ src/MQClientAPIImpl.cpp | 18 ++- src/MQClientAPIImpl.h | 6 + src/MQClientFactory.cpp | 42 +++++ src/MQClientFactory.h | 5 +- src/message/MQMessageId.h | 10 ++ src/producer/SendResult.cpp | 13 ++ src/producer/TransactionMQProducer.cpp | 185 ++++++++++++++++++++++ src/protocol/CommandHeader.cpp | 101 ++++++++++++ src/protocol/CommandHeader.h | 59 +++++++ src/protocol/RemotingCommand.cpp | 3 + src/transport/ClientRemotingProcessor.cpp | 54 ++++++- src/transport/ClientRemotingProcessor.h | 13 ++ 18 files changed, 822 insertions(+), 7 deletions(-) create mode 100644 include/TransactionListener.h create mode 100644 include/TransactionMQProducer.h create mode 100644 include/TransactionSendResult.h create mode 100644 src/producer/TransactionMQProducer.cpp diff --git a/example/TransactionProducer.cpp b/example/TransactionProducer.cpp index e69de29bb..98d5410da 100644 --- a/example/TransactionProducer.cpp +++ b/example/TransactionProducer.cpp @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <atomic> +#include <condition_variable> +#include <iomanip> +#include <iostream> +#include <mutex> +#include <thread> +#include "TransactionListener.h" +#include "TransactionMQProducer.h" +#include "TransactionSendResult.h" +#include "common.h" + +using namespace rocketmq; + +std::atomic<bool> g_quit; +std::mutex g_mtx; +std::condition_variable g_finished; +TpsReportService g_tps; + +class MyTransactionListener : public TransactionListener { + virtual LocalTransactionState executeLocalTransaction(const MQMessage& msg, void* arg) { + + if (!arg) { + std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", state: COMMIT_MESAGE " << endl; + return LocalTransactionState::COMMIT_MESSAGE; + } + + LocalTransactionState state = (LocalTransactionState)(*(int*)arg % 3); + m_state_map[msg.getTransactionId()] = state; + std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", state: " << state << endl; + return state; + } + + virtual LocalTransactionState checkLocalTransaction(const MQMessageExt& msg) { + + string transactionId = msg.getTransactionId(); + LocalTransactionState state; + if (m_state_map.find(transactionId) != m_state_map.end() && m_state_map[transactionId] == LocalTransactionState::UNKNOW) { + state = LocalTransactionState::COMMIT_MESSAGE; + m_state_map[transactionId] = state; + } + else { + state = LocalTransactionState::UNKNOW; + } + + state = LocalTransactionState::COMMIT_MESSAGE; + + std::cout << "checkLocalTransaction transactionId:" << transactionId << ", state: " << state << endl; + return state; + } + private: + map<string, LocalTransactionState> m_state_map; +}; + +void SyncProducerWorker(RocketmqSendAndConsumerArgs* info, TransactionMQProducer* producer) { + while (!g_quit.load()) { + if (g_msgCount.load() <= 0) { + std::this_thread::sleep_for(std::chrono::seconds(2)); + std::unique_lock<std::mutex> lck(g_mtx); + g_finished.notify_one(); + } + + MQMessage msg(info->topic, // topic + "*", // tag + info->body); // body + try { + auto start = std::chrono::system_clock::now(); + std::cout << "before sendMessageInTransaction" << endl; + LocalTransactionState state = LocalTransactionState::UNKNOW; + TransactionSendResult sendResult = producer->sendMessageInTransaction(msg, &state); + std::cout << "sendMessageInTransaction msgId: " << sendResult.getMsgId() << endl; + g_tps.Increment(); + --g_msgCount; + auto end = std::chrono::system_clock::now(); + auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start); + if (duration.count() >= 500) { + std::cout << "send RT more than: " << duration.count() << " ms with msgid: " << sendResult.getMsgId() << endl; + } + } catch (const MQException& e) { + std::cout << "send failed: " << std::endl; + } + } +} + +int main(int argc, char* argv[]) { + RocketmqSendAndConsumerArgs info; + if (!ParseArgs(argc, argv, &info)) { + exit(-1); + } + PrintRocketmqSendAndConsumerArgs(info); + TransactionMQProducer producer("please_rename_unique_group_name"); + producer.setNamesrvAddr(info.namesrv); + producer.setNamesrvDomain(info.namesrv_domain); + producer.setGroupName(info.groupname); + producer.setInstanceName(info.groupname); + producer.setSessionCredentials("mq acesskey", "mq secretkey", "ALIYUN"); + producer.setSendMsgTimeout(500); + producer.setTcpTransportTryLockTimeout(1000); + producer.setTcpTransportConnectTimeout(400); + producer.setLogLevel(eLOG_LEVEL_DEBUG); + producer.setTransactionListener(new MyTransactionListener()); + producer.start(); + std::vector<std::shared_ptr<std::thread>> work_pool; + auto start = std::chrono::system_clock::now(); + int msgcount = g_msgCount.load(); + g_tps.start(); + + int threadCount = info.thread_count; + for (int j = 0; j < threadCount; j++) { + std::shared_ptr<std::thread> th = std::make_shared<std::thread>(SyncProducerWorker, &info, &producer); + work_pool.push_back(th); + } + + { + std::unique_lock<std::mutex> lck(g_mtx); + g_finished.wait(lck); + g_quit.store(true); + } + + auto end = std::chrono::system_clock::now(); + auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start); + + std::cout << "per msg time: " << duration.count() / (double)msgcount << "ms \n" + << "========================finished==============================\n"; + + for (size_t th = 0; th != work_pool.size(); ++th) { + work_pool[th]->join(); + } + + producer.shutdown(); + + return 0; +} diff --git a/include/MQMessage.h b/include/MQMessage.h index e4a6b5e53..89ff84ed2 100644 --- a/include/MQMessage.h +++ b/include/MQMessage.h @@ -73,6 +73,9 @@ class ROCKETMQCLIENT_API MQMessage { void setBody(const char* body, int len); void setBody(const std::string& body); + void setTransactionId(const std::string& id) { m_transactionId = id; } + std::string getTransactionId() const { return m_transactionId; } + std::map<std::string, std::string> getProperties() const; void setProperties(std::map<std::string, std::string>& properties); @@ -132,6 +135,7 @@ class ROCKETMQCLIENT_API MQMessage { std::string m_topic; int m_flag; std::string m_body; + std::string m_transactionId; std::map<std::string, std::string> m_properties; }; //<!*************************************************************************** diff --git a/include/SendResult.h b/include/SendResult.h index 2f8883a83..f8be3d209 100644 --- a/include/SendResult.h +++ b/include/SendResult.h @@ -39,18 +39,26 @@ class ROCKETMQCLIENT_API SendResult { SendResult(const SendResult& other); SendResult& operator=(const SendResult& other); + void setTransactionId(const std::string& id) { + m_transactionId = id; + } + + std::string getTransactionId() { return m_transactionId; } + const std::string& getMsgId() const; const std::string& getOffsetMsgId() const; SendStatus getSendStatus() const; MQMessageQueue getMessageQueue() const; int64 getQueueOffset() const; - + std::string toString() const; + private: SendStatus m_sendStatus; std::string m_msgId; std::string m_offsetMsgId; MQMessageQueue m_messageQueue; int64 m_queueOffset; + std::string m_transactionId; }; //<!*************************************************************************** diff --git a/include/TransactionListener.h b/include/TransactionListener.h new file mode 100644 index 000000000..6756e96a7 --- /dev/null +++ b/include/TransactionListener.h @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __TRANSACTIONLISTENER_H__ +#define __TRANSACTIONLISTENER_H__ + +#include "MQMessage.h" +#include "MQMessageExt.h" +#include "TransactionSendResult.h" + +namespace rocketmq { +class ROCKETMQCLIENT_API TransactionListener { + public: + virtual ~TransactionListener() {} + /** + * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction. + * + * @param msg Half(prepare) message + * @param arg Custom business parameter + * @return Transaction state + */ + virtual LocalTransactionState executeLocalTransaction(const MQMessage& msg, void* arg) = 0; + + /** + * When no response to prepare(half) message. broker will send check message to check the transaction status, and this + * method will be invoked to get local transaction status. + * + * @param msg Check message + * @return Transaction state + */ + virtual LocalTransactionState checkLocalTransaction(const MQMessageExt& msg) = 0; +}; +} // namespace rocketmq +#endif diff --git a/include/TransactionMQProducer.h b/include/TransactionMQProducer.h new file mode 100644 index 000000000..924d04a1f --- /dev/null +++ b/include/TransactionMQProducer.h @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __TRANSACTIONMQPRODUCER_H__ +#define __TRANSACTIONMQPRODUCER_H__ + +#include <memory> +#include <string> +#include "DefaultMQProducer.h" +#include "MQMessageExt.h" +#include "TransactionListener.h" +#include "TransactionSendResult.h" + +namespace rocketmq { + +class ROCKETMQCLIENT_API TransactionMQProducer : public DefaultMQProducer { + public: + TransactionMQProducer(const std::string& producerGroup) : DefaultMQProducer(producerGroup) {} + virtual ~TransactionMQProducer() {} + void start(); + void shutdown(); + std::shared_ptr<TransactionListener> getCheckListener() { return m_transactionListener; } + void setTransactionListener(TransactionListener* listener) { m_transactionListener.reset(listener); } + TransactionSendResult sendMessageInTransaction(MQMessage& msg, void* arg); + void checkTransactionState(const std::string& addr, const MQMessageExt& message, + long m_tranStateTableOffset, + long m_commitLogOffset, + std::string m_msgId, + std::string m_transactionId, + std::string m_offsetMsgId); + + private: + void initTransactionEnv(); + void destroyTransactionEnv(); + void endTransaction(SendResult& sendResult, + LocalTransactionState& localTransactionState); + + private: + std::shared_ptr<TransactionListener> m_transactionListener; +}; +} // namespace rocketmq + +#endif diff --git a/include/TransactionSendResult.h b/include/TransactionSendResult.h new file mode 100644 index 000000000..423379eb7 --- /dev/null +++ b/include/TransactionSendResult.h @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __TRANSACTIONSENDRESULT_H__ +#define __TRANSACTIONSENDRESULT_H__ + +#include "SendResult.h" + +namespace rocketmq { + +enum LocalTransactionState { + COMMIT_MESSAGE, + ROLLBACK_MESSAGE, + UNKNOW, +}; + +class ROCKETMQCLIENT_API TransactionSendResult : public SendResult { + public: + TransactionSendResult() {} + + TransactionSendResult(const SendStatus& sendStatus, + const std::string& msgId, + const std::string& offsetMsgId, + const MQMessageQueue& messageQueue, + int64 queueOffset) + : SendResult(sendStatus, msgId, offsetMsgId, messageQueue, queueOffset) {} + + LocalTransactionState getLocalTransactionState() { return m_localTransactionState; } + + void setLocalTransactionState(LocalTransactionState localTransactionState) { + m_localTransactionState = localTransactionState; + } + + private: + LocalTransactionState m_localTransactionState; +}; +} // namespace rocketmq +#endif \ No newline at end of file diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp index 8ec60449f..7381cedbb 100644 --- a/src/MQClientAPIImpl.cpp +++ b/src/MQClientAPIImpl.cpp @@ -209,6 +209,20 @@ void MQClientAPIImpl::createTopic(const string& addr, THROW_MQEXCEPTION(MQBrokerException, "response is null", -1); } +void MQClientAPIImpl::endTransactionOneway( + std::string addr, + EndTransactionRequestHeader* requestHeader, + std::string remark, + const SessionCredentials& sessionCredentials) { + + RemotingCommand request(END_TRANSACTION, requestHeader); + request.setRemark(remark); + callSignatureBeforeRequest(addr, request, sessionCredentials); + request.Encode(); + m_pRemotingClient->invokeOneway(addr, request); + return; +} + SendResult MQClientAPIImpl::sendMessage(const string& addr, const string& brokerName, const MQMessage& msg, @@ -373,9 +387,9 @@ SendResult MQClientAPIImpl::sendMessageSync(const string& addr, unique_ptr<RemotingCommand> pResponse(m_pRemotingClient->invokeSync(addr, request, timeoutMillis)); if (pResponse != NULL) { try { - LOG_DEBUG("sendMessageSync success:%s to addr:%s,brokername:%s", msg.toString().c_str(), addr.c_str(), - brokerName.c_str()); SendResult result = processSendResponse(brokerName, msg, pResponse.get()); + LOG_DEBUG("sendMessageSync success:%s to addr:%s,brokername:%s, send status:%d", msg.toString().c_str(), addr.c_str(), + brokerName.c_str(), (int)result.getSendStatus()); return result; } catch (...) { LOG_ERROR("send error"); diff --git a/src/MQClientAPIImpl.h b/src/MQClientAPIImpl.h index 1a5e202c8..27e95c86a 100644 --- a/src/MQClientAPIImpl.h +++ b/src/MQClientAPIImpl.h @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + #ifndef __MQCLIENTAPIIMPL_H__ #define __MQCLIENTAPIIMPL_H__ #include "AsyncCallback.h" @@ -60,6 +61,11 @@ class MQClientAPIImpl { const string& defaultTopic, TopicConfig topicConfig, const SessionCredentials& sessionCredentials); + void endTransactionOneway( + std::string addr, + EndTransactionRequestHeader* requestHeader, + std::string remark, + const SessionCredentials& sessionCredentials); SendResult sendMessage(const string& addr, const string& brokerName, diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp index cfa62dd99..a033a7b45 100644 --- a/src/MQClientFactory.cpp +++ b/src/MQClientFactory.cpp @@ -15,6 +15,7 @@ * limitations under the License. */ #include "MQClientFactory.h" +#include "TransactionMQProducer.h" #include "ConsumerRunningInfo.h" #include "Logging.h" #include "MQClientManager.h" @@ -667,6 +668,29 @@ FindBrokerResult* MQClientFactory::findBrokerAddressInAdmin(const string& broker return NULL; } +void MQClientFactory::checkTransactionState(const std::string& addr, const MQMessageExt& messageExt, + const CheckTransactionStateRequestHeader& checkRequestHeader) { + string group = messageExt.getProperty(MQMessage::PROPERTY_PRODUCER_GROUP); + if (group != "") { + MQProducer* producer = selectProducer(group); + if (producer != nullptr) { + TransactionMQProducer* transProducer = dynamic_cast<TransactionMQProducer*>(producer); + if (transProducer != nullptr) { + transProducer->checkTransactionState(addr, messageExt, + checkRequestHeader.m_tranStateTableOffset, checkRequestHeader.m_commitLogOffset, checkRequestHeader.m_msgId, checkRequestHeader.m_transactionId, checkRequestHeader.m_offsetMsgId); + } else { + LOG_ERROR("checkTransactionState, producer not TransactionMQProducer failed, msg:%s", + messageExt.toString().data()); + } + } else { + LOG_ERROR("checkTransactionState, pick producer by group[%s] failed, msg:%s", group.data(), + messageExt.toString().data()); + } + } else { + LOG_ERROR("checkTransactionState, pick producer group failed, msg:%s", messageExt.toString().data()); + } +} + MQClientAPIImpl* MQClientFactory::getMQClientAPIImpl() const { return m_pClientAPIImpl.get(); } @@ -836,6 +860,24 @@ void MQClientFactory::doRebalanceByConsumerGroup(const string& consumerGroup) { } } +void MQClientFactory::endTransactionOneway(const MQMessageQueue& mq, + EndTransactionRequestHeader* requestHeader, + const SessionCredentials& sessionCredentials) { + + string brokerAddr = findBrokerAddressInPublish(mq.getBrokerName()); + string remark = ""; + if (!brokerAddr.empty()) { + try { + getMQClientAPIImpl()->endTransactionOneway(brokerAddr, requestHeader, remark, sessionCredentials); + } catch (MQException& e) { + LOG_ERROR("endTransactionOneway exception:%s", e.what()); + throw e; + } + } else { + THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1); + } +} + void MQClientFactory::unregisterClient(const string& producerGroup, const string& consumerGroup, const SessionCredentials& sessionCredentials) { diff --git a/src/MQClientFactory.h b/src/MQClientFactory.h index e5d6200fc..e6b895975 100644 --- a/src/MQClientFactory.h +++ b/src/MQClientFactory.h @@ -69,7 +69,10 @@ class MQClientFactory { int64 begin, int64 end, const SessionCredentials& session_credentials); - + void endTransactionOneway(const MQMessageQueue& mq, + EndTransactionRequestHeader* requestHeader, + const SessionCredentials& sessionCredentials); + void checkTransactionState(const std::string& addr, const MQMessageExt& message, const CheckTransactionStateRequestHeader& checkRequestHeader); MQClientAPIImpl* getMQClientAPIImpl() const; MQProducer* selectProducer(const string& group); MQConsumer* selectConsumer(const string& group); diff --git a/src/message/MQMessageId.h b/src/message/MQMessageId.h index 38d11eeae..61e1fd203 100644 --- a/src/message/MQMessageId.h +++ b/src/message/MQMessageId.h @@ -24,7 +24,17 @@ namespace rocketmq { //<!*************************************************************************** class MQMessageId { public: + + MQMessageId(){} MQMessageId(sockaddr address, int64 offset) : m_address(address), m_offset(offset) {} + MQMessageId& operator=(const MQMessageId& id) { + if (&id == this) { + return *this; + } + this->m_address = id.m_address; + this->m_offset = id.m_offset; + return *this; + } sockaddr getAddress() const { return m_address; } diff --git a/src/producer/SendResult.cpp b/src/producer/SendResult.cpp index 81ddf7661..fb3f5b8d3 100644 --- a/src/producer/SendResult.cpp +++ b/src/producer/SendResult.cpp @@ -17,6 +17,7 @@ #include "SendResult.h" #include "UtilAll.h" #include "VirtualEnvUtil.h" +#include <sstream> namespace rocketmq { //<!*************************************************************************** @@ -74,5 +75,17 @@ int64 SendResult::getQueueOffset() const { return m_queueOffset; } +std::string SendResult::toString() const { + stringstream ss; + ss << "SendResult: "; + ss << "sendStatus:" << m_sendStatus; + ss << ",msgId:" << m_msgId; + ss << ",offsetMsgId:" << m_offsetMsgId; + ss << ",queueOffset:" << m_queueOffset; + ss << ",transactionId:" << m_transactionId; + ss << ",messageQueue:" << m_messageQueue.toString(); + return ss.str(); +} + //<!************************************************************************ } //<!end namespace; diff --git a/src/producer/TransactionMQProducer.cpp b/src/producer/TransactionMQProducer.cpp new file mode 100644 index 000000000..1023a21af --- /dev/null +++ b/src/producer/TransactionMQProducer.cpp @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "TransactionMQProducer.h" +#include <string> +#include "CommandHeader.h" +#include "Logging.h" +#include "MQClientFactory.h" +#include "MQDecoder.h" +#include "MessageSysFlag.h" +#include "TransactionListener.h" +#include "TransactionSendResult.h" + +using namespace std; +namespace rocketmq { + +void TransactionMQProducer::initTransactionEnv() {} + +void TransactionMQProducer::destroyTransactionEnv() {} + +TransactionSendResult TransactionMQProducer::sendMessageInTransaction(MQMessage& msg, void* arg) { + if (nullptr == m_transactionListener) { + THROW_MQEXCEPTION(MQClientException, "transactionListener is null", -1); + } + + SendResult sendResult; + msg.setProperty(MQMessage::PROPERTY_TRANSACTION_PREPARED, "true"); + msg.setProperty(MQMessage::PROPERTY_PRODUCER_GROUP, getGroupName()); + try { + sendResult = send(msg); + } catch (MQException& e) { + THROW_MQEXCEPTION(MQClientException, e.what(), -1); + } + + LOG_DEBUG("sendMessageInTransaction result:%s", sendResult.toString().data()); + LocalTransactionState localTransactionState = LocalTransactionState::UNKNOW; + switch (sendResult.getSendStatus()) { + case SendStatus::SEND_OK: { + try { + if (sendResult.getTransactionId() != "") { + msg.setProperty("__transactionId__", sendResult.getTransactionId()); + } + string transactionId = msg.getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); + if (transactionId != "") { + msg.setTransactionId(transactionId); + } + LOG_DEBUG("sendMessageInTransaction, msgId:%s, transactionId:%s", + sendResult.getMsgId().data(), transactionId.data()); + localTransactionState = m_transactionListener->executeLocalTransaction(msg, arg); + if (localTransactionState != LocalTransactionState::COMMIT_MESSAGE) { + LOG_WARN("executeLocalTransaction ret not LocalTransactionState::commit, msg:%s", msg.toString().data()); + } + } catch (MQException& e) { + THROW_MQEXCEPTION(MQClientException, e.what(), -1); + } + } break; + case SendStatus::SEND_FLUSH_DISK_TIMEOUT: + case SendStatus::SEND_FLUSH_SLAVE_TIMEOUT: + case SendStatus::SEND_SLAVE_NOT_AVAILABLE: + localTransactionState = LocalTransactionState::ROLLBACK_MESSAGE; + break; + default: + break; + } + + try { + endTransaction(sendResult, localTransactionState); + } catch (MQException& e) { + LOG_WARN("endTransaction exception:%s", e.what()); + } + + TransactionSendResult transactionSendResult(sendResult.getSendStatus(), sendResult.getMsgId(), + sendResult.getOffsetMsgId(), sendResult.getMessageQueue(), + sendResult.getQueueOffset()); + transactionSendResult.setTransactionId(msg.getTransactionId()); + transactionSendResult.setLocalTransactionState(localTransactionState); + return transactionSendResult; +} + +void TransactionMQProducer::endTransaction(SendResult& sendResult, LocalTransactionState& localTransactionState) { + MQMessageId id; + if (sendResult.getOffsetMsgId() != "") { + id = MQDecoder::decodeMessageId(sendResult.getOffsetMsgId()); + } else { + id = MQDecoder::decodeMessageId(sendResult.getMsgId()); + } + string transId = sendResult.getTransactionId(); + + int commitOrRollback = 0; + switch (localTransactionState) { + case COMMIT_MESSAGE: + commitOrRollback = MessageSysFlag::TransactionCommitType; + break; + case ROLLBACK_MESSAGE: + commitOrRollback = MessageSysFlag::TransactionRollbackType; + break; + case UNKNOW: + commitOrRollback = MessageSysFlag::TransactionNotType; + break; + default: + commitOrRollback = MessageSysFlag::TransactionNotType; + break; + } + + bool fromTransCheck = false; + EndTransactionRequestHeader* requestHeader = + new EndTransactionRequestHeader(getGroupName(), sendResult.getQueueOffset(), id.getOffset(), commitOrRollback, + fromTransCheck, sendResult.getMsgId(), transId); + LOG_DEBUG("endTransaction: msg:%s", requestHeader->toString().data()); + getFactory()->endTransactionOneway(sendResult.getMessageQueue(), requestHeader, getSessionCredentials()); +} + +void TransactionMQProducer::checkTransactionState(const std::string& addr, const MQMessageExt& message, + long m_tranStateTableOffset, + long m_commitLogOffset, + std::string m_msgId, + std::string m_transactionId, + std::string m_offsetMsgId) { + LocalTransactionState localTransactionState = m_transactionListener->checkLocalTransaction(message); + + EndTransactionRequestHeader* endHeader = new EndTransactionRequestHeader(); + endHeader->m_commitLogOffset = m_commitLogOffset; + endHeader->m_producerGroup = getGroupName(); + endHeader->m_tranStateTableOffset = m_tranStateTableOffset; + endHeader->m_fromTransactionCheck = true; + + string uniqueKey = m_transactionId; + if (m_transactionId == "") + uniqueKey = message.getMsgId(); + + endHeader->m_msgId = uniqueKey; + endHeader->m_transactionId = m_transactionId; + switch (localTransactionState) { + case COMMIT_MESSAGE: + endHeader->m_commitOrRollback = MessageSysFlag::TransactionCommitType; + break; + case ROLLBACK_MESSAGE: + endHeader->m_commitOrRollback = MessageSysFlag::TransactionRollbackType; + LOG_WARN("when broker check, client rollback this transaction, %s", endHeader->toString().data()); + break; + case UNKNOW: + endHeader->m_commitOrRollback = MessageSysFlag::TransactionNotType; + LOG_WARN("when broker check, client does not know this transaction state, %s", endHeader->toString().data()); + break; + default: + break; + } + + LOG_INFO("checkTransactionState, endTransactionOneway: uniqueKey:%s, client state:%d, end header: %s", uniqueKey.data(), localTransactionState, + endHeader->toString().data()); + + string remark; + try { + getFactory()->getMQClientAPIImpl()->endTransactionOneway(addr, endHeader, remark, getSessionCredentials()); + } catch (MQException& e) { + LOG_ERROR("endTransactionOneway exception:%s", e.what()); + throw e; + } +} + +void TransactionMQProducer::start() { + initTransactionEnv(); + DefaultMQProducer::start(); +} + +void TransactionMQProducer::shutdown() { + DefaultMQProducer::shutdown(); + destroyTransactionEnv(); +} + +} // namespace rocketmq diff --git a/src/protocol/CommandHeader.cpp b/src/protocol/CommandHeader.cpp index 4e4b0bb5e..807d409cb 100644 --- a/src/protocol/CommandHeader.cpp +++ b/src/protocol/CommandHeader.cpp @@ -60,6 +60,107 @@ void CreateTopicRequestHeader::SetDeclaredFieldOfCommandHeader(map<string, strin requestMap.insert(pair<string, string>("topicFilterType", topicFilterType)); } +void CheckTransactionStateRequestHeader::Encode(Json::Value& outData) {} + +CommandHeader* CheckTransactionStateRequestHeader::Decode(Json::Value& ext) { + + CheckTransactionStateRequestHeader* h = new CheckTransactionStateRequestHeader(); + Json::Value& tempValue = ext["msgId"]; + if (tempValue.isString()) { + h->m_msgId = tempValue.asString(); + } + + tempValue = ext["transactionId"]; + if (tempValue.isString()) { + h->m_transactionId = tempValue.asString(); + } + + tempValue = ext["offsetMsgId"]; + if (tempValue.isString()) { + h->m_offsetMsgId = tempValue.asString(); + } + + tempValue = ext["tranStateTableOffset"]; + if (tempValue.isInt64()) { + h->m_tranStateTableOffset = tempValue.asInt64(); + } + + tempValue = ext["commitLogOffset"]; + if (tempValue.isInt64()) { + h->m_commitLogOffset = tempValue.asInt64(); + } + + return h; +} + +void CheckTransactionStateRequestHeader::SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap) { + requestMap.insert(pair<string, string>("msgId", m_msgId)); + requestMap.insert(pair<string, string>("transactionId", m_transactionId)); + requestMap.insert(pair<string, string>("offsetMsgId", m_offsetMsgId)); + requestMap.insert(pair<string, string>("commitLogOffset", UtilAll::to_string(m_commitLogOffset))); + requestMap.insert(pair<string, string>("tranStateTableOffset", UtilAll::to_string(m_tranStateTableOffset))); +} + +std::string CheckTransactionStateRequestHeader::toString() { + stringstream ss; + ss << "CheckTransactionStateRequestHeader:"; + ss << " msgId:" << m_msgId; + ss << " transactionId:" << m_transactionId; + ss << " offsetMsgId:" << m_offsetMsgId; + ss << " commitLogOffset:" << m_commitLogOffset; + ss << " tranStateTableOffset:" << m_tranStateTableOffset; + return ss.str(); +} + +void EndTransactionRequestHeader::Encode(Json::Value& outData) { + outData["msgId"] = m_msgId; + outData["transactionId"] = m_transactionId; + outData["producerGroup"] = m_producerGroup; + outData["tranStateTableOffset"] = UtilAll::to_string(m_tranStateTableOffset); + outData["commitLogOffset"] = UtilAll::to_string(m_commitLogOffset); + outData["commitOrRollback"] = UtilAll::to_string(m_commitOrRollback); + outData["fromTransactionCheck"] = UtilAll::to_string(m_fromTransactionCheck); + LOG_DEBUG( + "EndTransactionRequestHeader Encode msgId:%s, transactionId: %s, producerGroup is:%s, UtilAll::to_string( tranStateTableOffset) " + "is:%s,UtilAll::to_string( commitLogOffset):%s, UtilAll::to_string( commitOrRollback) " + "is:%s, UtilAll::to_string( fromTransactionCheck) is:%s", + m_msgId.c_str(), m_transactionId.c_str(), m_producerGroup.c_str(), + UtilAll::to_string(m_tranStateTableOffset).c_str(), UtilAll::to_string(m_commitLogOffset).c_str(), + UtilAll::to_string(m_commitOrRollback).c_str(), UtilAll::to_string(m_fromTransactionCheck).c_str()); +} + +void EndTransactionRequestHeader::SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap) { + LOG_DEBUG( + "EndTransactionRequestHeader SetDeclaredFieldOfCommandHeader msgId:%s, transactionId:%s, producerGroup is:%s, UtilAll::to_string( tranStateTableOffset) " + "is:%s,UtilAll::to_string( commitLogOffset):%s, UtilAll::to_string( commitOrRollback) " + "is:%s, UtilAll::to_string( fromTransactionCheck) is:%s", + m_msgId.c_str(), m_transactionId.c_str(), m_producerGroup.c_str(), + UtilAll::to_string(m_tranStateTableOffset).c_str(), UtilAll::to_string(m_commitLogOffset).c_str(), + UtilAll::to_string(m_commitOrRollback).c_str(), UtilAll::to_string(m_fromTransactionCheck).c_str()); + + + requestMap.insert(pair<string, string>("msgId", m_msgId)); + requestMap.insert(pair<string, string>("transactionId", m_transactionId)); + requestMap.insert(pair<string, string>("producerGroup", m_producerGroup)); + requestMap.insert(pair<string, string>("tranStateTableOffset", UtilAll::to_string(m_tranStateTableOffset))); + requestMap.insert(pair<string, string>("commitLogOffset", UtilAll::to_string(m_commitLogOffset))); + requestMap.insert(pair<string, string>("commitOrRollback", UtilAll::to_string(m_commitOrRollback))); + requestMap.insert(pair<string, string>("fromTransactionCheck", UtilAll::to_string(m_fromTransactionCheck))); +} + +std::string EndTransactionRequestHeader::toString() { + stringstream ss; + ss << "EndTransactionRequestHeader:"; + ss << " m_msgId:" << m_msgId; + ss << " m_transactionId:" << m_transactionId; + ss << " m_producerGroup:" << m_producerGroup; + ss << " m_tranStateTableOffset:" << m_tranStateTableOffset; + ss << " m_commitLogOffset:" << m_commitLogOffset; + ss << " m_commitOrRollback:" << m_commitOrRollback; + ss << " m_fromTransactionCheck:" << m_fromTransactionCheck; + return ss.str(); +} + //<!************************************************************************ void SendMessageRequestHeader::Encode(Json::Value& outData) { outData["producerGroup"] = producerGroup; diff --git a/src/protocol/CommandHeader.h b/src/protocol/CommandHeader.h index 2ad3e47c8..22f61005e 100644 --- a/src/protocol/CommandHeader.h +++ b/src/protocol/CommandHeader.h @@ -18,6 +18,7 @@ #ifndef __COMMANDCUSTOMHEADER_H__ #define __COMMANDCUSTOMHEADER_H__ +#include <map> #include <string> #include "MQClientException.h" #include "MessageSysFlag.h" @@ -35,6 +36,64 @@ class CommandHeader { virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap) {} }; +class CheckTransactionStateRequestHeader : public CommandHeader { + public: + CheckTransactionStateRequestHeader() {} + CheckTransactionStateRequestHeader(long tableOffset, + long commLogOffset, + const std::string& msgid, + const std::string& transactionId, + const std::string& offsetMsgId) + : m_tranStateTableOffset(tableOffset), + m_commitLogOffset(commLogOffset), + m_msgId(msgid), + m_transactionId(transactionId), + m_offsetMsgId(offsetMsgId) {} + virtual ~CheckTransactionStateRequestHeader() {} + virtual void Encode(Json::Value& outData); + static CommandHeader* Decode(Json::Value& ext); + virtual void SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap); + std::string toString(); + public: + long m_tranStateTableOffset; + long m_commitLogOffset; + std::string m_msgId; + std::string m_transactionId; + std::string m_offsetMsgId; +}; + +class EndTransactionRequestHeader : public CommandHeader { + public: + EndTransactionRequestHeader() {} + EndTransactionRequestHeader(const std::string& groupName, + long tableOffset, + long commLogOffset, + int commitOrRoll, + bool fromTransCheck, + const std::string& msgid, + const std::string& transId) + : m_producerGroup(groupName), + m_tranStateTableOffset(tableOffset), + m_commitLogOffset(commLogOffset), + m_commitOrRollback(commitOrRoll), + m_fromTransactionCheck(fromTransCheck), + m_msgId(msgid), + m_transactionId(transId) {} + virtual ~EndTransactionRequestHeader() {} + virtual void Encode(Json::Value& outData); + virtual void SetDeclaredFieldOfCommandHeader(std::map<string, string>& requestMap); + std::string toString(); + + public: + std::string m_producerGroup; + long m_tranStateTableOffset; + long m_commitLogOffset; + int m_commitOrRollback; + bool m_fromTransactionCheck; + std::string m_msgId; + std::string m_transactionId; +}; + //<!************************************************************************ class GetRouteInfoRequestHeader : public CommandHeader { public: diff --git a/src/protocol/RemotingCommand.cpp b/src/protocol/RemotingCommand.cpp index f556a24cc..16e040f1f 100644 --- a/src/protocol/RemotingCommand.cpp +++ b/src/protocol/RemotingCommand.cpp @@ -242,6 +242,9 @@ void RemotingCommand::SetExtHeader(int code) { break; case NOTIFY_CONSUMER_IDS_CHANGED: m_pExtHeader.reset(NotifyConsumerIdsChangedRequestHeader::Decode(ext)); + break; + case CHECK_TRANSACTION_STATE: + m_pExtHeader.reset(CheckTransactionStateRequestHeader::Decode(ext)); default: break; } diff --git a/src/transport/ClientRemotingProcessor.cpp b/src/transport/ClientRemotingProcessor.cpp index b0be046f7..50dc164c8 100644 --- a/src/transport/ClientRemotingProcessor.cpp +++ b/src/transport/ClientRemotingProcessor.cpp @@ -28,10 +28,10 @@ ClientRemotingProcessor::ClientRemotingProcessor(MQClientFactory* mqClientFactor ClientRemotingProcessor::~ClientRemotingProcessor() {} RemotingCommand* ClientRemotingProcessor::processRequest(const string& addr, RemotingCommand* request) { - LOG_DEBUG("request Command received:processRequest"); + LOG_INFO("request Command received:processRequest, addr:%s, code:%d", addr.data(), request->getCode()); switch (request->getCode()) { case CHECK_TRANSACTION_STATE: - // return checkTransactionState( request); + return checkTransactionState(addr, request); break; case NOTIFY_CONSUMER_IDS_CHANGED: return notifyConsumerIdsChanged(request); @@ -142,8 +142,56 @@ RemotingCommand* ClientRemotingProcessor::notifyConsumerIdsChanged(RemotingComma request->SetExtHeader(request->getCode()); NotifyConsumerIdsChangedRequestHeader* requestHeader = (NotifyConsumerIdsChangedRequestHeader*)request->getCommandHeader(); - LOG_INFO("notifyConsumerIdsChanged:%s", requestHeader->getGroup().c_str()); + if (requestHeader == nullptr) { + LOG_ERROR("notifyConsumerIdsChanged requestHeader null"); + return NULL; + } + string group = requestHeader->getGroup(); + LOG_INFO("notifyConsumerIdsChanged:%s", group.c_str()); m_mqClientFactory->doRebalanceByConsumerGroup(requestHeader->getGroup()); return NULL; } + +RemotingCommand* ClientRemotingProcessor::checkTransactionState(const std::string& addr, RemotingCommand* request) { + if (!request) { + LOG_ERROR("checkTransactionState request null"); + return nullptr; + } + + LOG_INFO("checkTransactionState addr:%s, request: %s", addr.data(), request->ToString().data()); + + request->SetExtHeader(request->getCode()); + CheckTransactionStateRequestHeader* requestHeader = (CheckTransactionStateRequestHeader*)request->getCommandHeader(); + if (!requestHeader) { + LOG_ERROR("checkTransactionState CheckTransactionStateRequestHeader requestHeader null"); + return nullptr; + } + LOG_INFO("checkTransactionState request: %s", requestHeader->toString().data()); + + const MemoryBlock* block = request->GetBody(); + if (block && block->getSize() > 0) { + std::vector<MQMessageExt> mqvec; + MQDecoder::decodes(block, mqvec); + if (mqvec.size() == 0) { + LOG_ERROR("checkTransactionState decodes MQMessageExt fail, request:%s", requestHeader->toString().data()); + return nullptr; + } + + MQMessageExt& messageExt = mqvec[0]; + for (auto& pair : messageExt.getProperties()) { + LOG_INFO("checkTransactionState key:%s, value: %s", pair.first.data(), pair.second.data() ); + } + + string transactionId = messageExt.getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); + if (transactionId != "") { + messageExt.setTransactionId(transactionId); + } + + m_mqClientFactory->checkTransactionState(addr, messageExt, *requestHeader); + } else { + LOG_ERROR("checkTransactionState getbody null or size 0, request Header:%s", requestHeader->toString().data()); + } + return nullptr; } + +} // namespace rocketmq diff --git a/src/transport/ClientRemotingProcessor.h b/src/transport/ClientRemotingProcessor.h index 58a041707..c88b8bb4b 100644 --- a/src/transport/ClientRemotingProcessor.h +++ b/src/transport/ClientRemotingProcessor.h @@ -33,6 +33,7 @@ class ClientRemotingProcessor { RemotingCommand* resetOffset(RemotingCommand* request); RemotingCommand* getConsumerRunningInfo(const string& addr, RemotingCommand* request); RemotingCommand* notifyConsumerIdsChanged(RemotingCommand* request); + RemotingCommand* checkTransactionState(const string& addr, RemotingCommand* request); private: MQClientFactory* m_mqClientFactory; @@ -49,6 +50,18 @@ class ResetOffsetBody { private: std::map<MQMessageQueue, int64> m_offsetTable; }; + +class CheckTransactionStateBody { + public: + CheckTransactionStateBody() {} + virtual ~CheckTransactionStateBody() { m_offsetTable.clear(); } + void setOffsetTable(MQMessageQueue mq, int64 offset); + std::map<MQMessageQueue, int64> getOffsetTable(); + static ResetOffsetBody* Decode(const MemoryBlock* mem); + + private: + std::map<MQMessageQueue, int64> m_offsetTable; +}; } #endif From b50833d8168ef4f118831caf7dd8fea36532d6a5 Mon Sep 17 00:00:00 2001 From: jonnxu <jonnxu@163.com> Date: Mon, 3 Jun 2019 01:04:34 +0800 Subject: [PATCH 03/14] update transaction producer example --- example/TransactionProducer.cpp | 27 +++++++-------------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/example/TransactionProducer.cpp b/example/TransactionProducer.cpp index 98d5410da..3f4953358 100644 --- a/example/TransactionProducer.cpp +++ b/example/TransactionProducer.cpp @@ -36,42 +36,29 @@ TpsReportService g_tps; class MyTransactionListener : public TransactionListener { virtual LocalTransactionState executeLocalTransaction(const MQMessage& msg, void* arg) { + std::cout << "executeLocalTransaction enter msg:" << msg.toString() << endl; if (!arg) { - std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", state: COMMIT_MESAGE " << endl; + std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: COMMIT_MESAGE " << endl; return LocalTransactionState::COMMIT_MESSAGE; } LocalTransactionState state = (LocalTransactionState)(*(int*)arg % 3); - m_state_map[msg.getTransactionId()] = state; - std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", state: " << state << endl; + std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: " << state << endl; return state; } virtual LocalTransactionState checkLocalTransaction(const MQMessageExt& msg) { - string transactionId = msg.getTransactionId(); - LocalTransactionState state; - if (m_state_map.find(transactionId) != m_state_map.end() && m_state_map[transactionId] == LocalTransactionState::UNKNOW) { - state = LocalTransactionState::COMMIT_MESSAGE; - m_state_map[transactionId] = state; - } - else { - state = LocalTransactionState::UNKNOW; - } - - state = LocalTransactionState::COMMIT_MESSAGE; - - std::cout << "checkLocalTransaction transactionId:" << transactionId << ", state: " << state << endl; - return state; + std::cout << "checkLocalTransaction enter msg:" << msg.toString() << endl; + std::cout << "checkLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: COMMIT_MESSAGE" << endl; + return LocalTransactionState::COMMIT_MESSAGE; } - private: - map<string, LocalTransactionState> m_state_map; }; void SyncProducerWorker(RocketmqSendAndConsumerArgs* info, TransactionMQProducer* producer) { while (!g_quit.load()) { if (g_msgCount.load() <= 0) { - std::this_thread::sleep_for(std::chrono::seconds(2)); + std::this_thread::sleep_for(std::chrono::seconds(70)); std::unique_lock<std::mutex> lck(g_mtx); g_finished.notify_one(); } From 8d4cffcdb669f620d6ec91736702dda66535bc38 Mon Sep 17 00:00:00 2001 From: xujiang1 <xujiang1@cmschina.com.cn> Date: Mon, 3 Jun 2019 17:31:39 +0800 Subject: [PATCH 04/14] update when send complete, need break while loop for Transaction Producer Demo --- example/TransactionProducer.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/example/TransactionProducer.cpp b/example/TransactionProducer.cpp index 3f4953358..ee03a8421 100644 --- a/example/TransactionProducer.cpp +++ b/example/TransactionProducer.cpp @@ -61,6 +61,7 @@ void SyncProducerWorker(RocketmqSendAndConsumerArgs* info, TransactionMQProducer std::this_thread::sleep_for(std::chrono::seconds(70)); std::unique_lock<std::mutex> lck(g_mtx); g_finished.notify_one(); + break; } MQMessage msg(info->topic, // topic From 02ffc3559803dc6bc725f70ba66937219666db94 Mon Sep 17 00:00:00 2001 From: xujiang1 <xujiang1@cmschina.com.cn> Date: Mon, 3 Jun 2019 17:53:19 +0800 Subject: [PATCH 05/14] update the decode error --- src/protocol/CommandHeader.cpp | 25 +++++-------------------- 1 file changed, 5 insertions(+), 20 deletions(-) diff --git a/src/protocol/CommandHeader.cpp b/src/protocol/CommandHeader.cpp index 807d409cb..6f5898b43 100644 --- a/src/protocol/CommandHeader.cpp +++ b/src/protocol/CommandHeader.cpp @@ -81,13 +81,13 @@ CommandHeader* CheckTransactionStateRequestHeader::Decode(Json::Value& ext) { } tempValue = ext["tranStateTableOffset"]; - if (tempValue.isInt64()) { - h->m_tranStateTableOffset = tempValue.asInt64(); + if (tempValue.isString()) { + h->m_tranStateTableOffset = UtilAll::str2ll(tempValue.asCString()); } tempValue = ext["commitLogOffset"]; - if (tempValue.isInt64()) { - h->m_commitLogOffset = tempValue.asInt64(); + if (tempValue.isString()) { + h->m_commitLogOffset = UtilAll::str2ll(tempValue.asCString()); } return h; @@ -120,25 +120,10 @@ void EndTransactionRequestHeader::Encode(Json::Value& outData) { outData["commitLogOffset"] = UtilAll::to_string(m_commitLogOffset); outData["commitOrRollback"] = UtilAll::to_string(m_commitOrRollback); outData["fromTransactionCheck"] = UtilAll::to_string(m_fromTransactionCheck); - LOG_DEBUG( - "EndTransactionRequestHeader Encode msgId:%s, transactionId: %s, producerGroup is:%s, UtilAll::to_string( tranStateTableOffset) " - "is:%s,UtilAll::to_string( commitLogOffset):%s, UtilAll::to_string( commitOrRollback) " - "is:%s, UtilAll::to_string( fromTransactionCheck) is:%s", - m_msgId.c_str(), m_transactionId.c_str(), m_producerGroup.c_str(), - UtilAll::to_string(m_tranStateTableOffset).c_str(), UtilAll::to_string(m_commitLogOffset).c_str(), - UtilAll::to_string(m_commitOrRollback).c_str(), UtilAll::to_string(m_fromTransactionCheck).c_str()); } void EndTransactionRequestHeader::SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap) { - LOG_DEBUG( - "EndTransactionRequestHeader SetDeclaredFieldOfCommandHeader msgId:%s, transactionId:%s, producerGroup is:%s, UtilAll::to_string( tranStateTableOffset) " - "is:%s,UtilAll::to_string( commitLogOffset):%s, UtilAll::to_string( commitOrRollback) " - "is:%s, UtilAll::to_string( fromTransactionCheck) is:%s", - m_msgId.c_str(), m_transactionId.c_str(), m_producerGroup.c_str(), - UtilAll::to_string(m_tranStateTableOffset).c_str(), UtilAll::to_string(m_commitLogOffset).c_str(), - UtilAll::to_string(m_commitOrRollback).c_str(), UtilAll::to_string(m_fromTransactionCheck).c_str()); - - + requestMap.insert(pair<string, string>("msgId", m_msgId)); requestMap.insert(pair<string, string>("transactionId", m_transactionId)); requestMap.insert(pair<string, string>("producerGroup", m_producerGroup)); From 269e967273e7ab945765fc64994696fcaeee46ba Mon Sep 17 00:00:00 2001 From: jonnxu <jonnxu@163.com> Date: Sun, 14 Jul 2019 00:46:31 +0800 Subject: [PATCH 06/14] update by review --- example/TransactionProducer.cpp | 22 ++++++++-------- include/TransactionMQProducer.h | 12 ++++----- src/MQClientFactory.cpp | 2 +- src/producer/TransactionMQProducer.cpp | 35 +++++++++++++++----------- src/protocol/RemotingCommand.cpp | 1 + 5 files changed, 40 insertions(+), 32 deletions(-) diff --git a/example/TransactionProducer.cpp b/example/TransactionProducer.cpp index ee03a8421..93ac93737 100644 --- a/example/TransactionProducer.cpp +++ b/example/TransactionProducer.cpp @@ -37,20 +37,20 @@ class MyTransactionListener : public TransactionListener { virtual LocalTransactionState executeLocalTransaction(const MQMessage& msg, void* arg) { std::cout << "executeLocalTransaction enter msg:" << msg.toString() << endl; - if (!arg) { - std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: COMMIT_MESAGE " << endl; - return LocalTransactionState::COMMIT_MESSAGE; - } - - LocalTransactionState state = (LocalTransactionState)(*(int*)arg % 3); - std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: " << state << endl; - return state; + if (!arg) { + std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: COMMIT_MESAGE " << endl; + return LocalTransactionState::COMMIT_MESSAGE; + } + + LocalTransactionState state = (LocalTransactionState)(*(int*)arg % 3); + std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: " << state << endl; + return state; } virtual LocalTransactionState checkLocalTransaction(const MQMessageExt& msg) { std::cout << "checkLocalTransaction enter msg:" << msg.toString() << endl; - std::cout << "checkLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: COMMIT_MESSAGE" << endl; + std::cout << "checkLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: COMMIT_MESSAGE" << endl; return LocalTransactionState::COMMIT_MESSAGE; } }; @@ -61,7 +61,7 @@ void SyncProducerWorker(RocketmqSendAndConsumerArgs* info, TransactionMQProducer std::this_thread::sleep_for(std::chrono::seconds(70)); std::unique_lock<std::mutex> lck(g_mtx); g_finished.notify_one(); - break; + break; } MQMessage msg(info->topic, // topic @@ -70,7 +70,7 @@ void SyncProducerWorker(RocketmqSendAndConsumerArgs* info, TransactionMQProducer try { auto start = std::chrono::system_clock::now(); std::cout << "before sendMessageInTransaction" << endl; - LocalTransactionState state = LocalTransactionState::UNKNOW; + LocalTransactionState state = LocalTransactionState::UNKNOW; TransactionSendResult sendResult = producer->sendMessageInTransaction(msg, &state); std::cout << "sendMessageInTransaction msgId: " << sendResult.getMsgId() << endl; g_tps.Increment(); diff --git a/include/TransactionMQProducer.h b/include/TransactionMQProducer.h index 924d04a1f..4e3b18e01 100644 --- a/include/TransactionMQProducer.h +++ b/include/TransactionMQProducer.h @@ -33,15 +33,15 @@ class ROCKETMQCLIENT_API TransactionMQProducer : public DefaultMQProducer { virtual ~TransactionMQProducer() {} void start(); void shutdown(); - std::shared_ptr<TransactionListener> getCheckListener() { return m_transactionListener; } + std::shared_ptr<TransactionListener> getTransactionListener() { return m_transactionListener; } void setTransactionListener(TransactionListener* listener) { m_transactionListener.reset(listener); } TransactionSendResult sendMessageInTransaction(MQMessage& msg, void* arg); void checkTransactionState(const std::string& addr, const MQMessageExt& message, - long m_tranStateTableOffset, - long m_commitLogOffset, - std::string m_msgId, - std::string m_transactionId, - std::string m_offsetMsgId); + long tranStateTableOffset, + long commitLogOffset, + const std::string& msgId, + const std::string& transactionId, + const std::string& offsetMsgId); private: void initTransactionEnv(); diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp index a033a7b45..05357b971 100644 --- a/src/MQClientFactory.cpp +++ b/src/MQClientFactory.cpp @@ -671,7 +671,7 @@ FindBrokerResult* MQClientFactory::findBrokerAddressInAdmin(const string& broker void MQClientFactory::checkTransactionState(const std::string& addr, const MQMessageExt& messageExt, const CheckTransactionStateRequestHeader& checkRequestHeader) { string group = messageExt.getProperty(MQMessage::PROPERTY_PRODUCER_GROUP); - if (group != "") { + if (!group.empty()) { MQProducer* producer = selectProducer(group); if (producer != nullptr) { TransactionMQProducer* transProducer = dynamic_cast<TransactionMQProducer*>(producer); diff --git a/src/producer/TransactionMQProducer.cpp b/src/producer/TransactionMQProducer.cpp index 1023a21af..92b0e9fc5 100644 --- a/src/producer/TransactionMQProducer.cpp +++ b/src/producer/TransactionMQProducer.cpp @@ -49,7 +49,7 @@ TransactionSendResult TransactionMQProducer::sendMessageInTransaction(MQMessage& LOG_DEBUG("sendMessageInTransaction result:%s", sendResult.toString().data()); LocalTransactionState localTransactionState = LocalTransactionState::UNKNOW; switch (sendResult.getSendStatus()) { - case SendStatus::SEND_OK: { + case SendStatus::SEND_OK: try { if (sendResult.getTransactionId() != "") { msg.setProperty("__transactionId__", sendResult.getTransactionId()); @@ -67,7 +67,7 @@ TransactionSendResult TransactionMQProducer::sendMessageInTransaction(MQMessage& } catch (MQException& e) { THROW_MQEXCEPTION(MQClientException, e.what(), -1); } - } break; + break; case SendStatus::SEND_FLUSH_DISK_TIMEOUT: case SendStatus::SEND_FLUSH_SLAVE_TIMEOUT: case SendStatus::SEND_SLAVE_NOT_AVAILABLE: @@ -125,25 +125,32 @@ void TransactionMQProducer::endTransaction(SendResult& sendResult, LocalTransact } void TransactionMQProducer::checkTransactionState(const std::string& addr, const MQMessageExt& message, - long m_tranStateTableOffset, - long m_commitLogOffset, - std::string m_msgId, - std::string m_transactionId, - std::string m_offsetMsgId) { - LocalTransactionState localTransactionState = m_transactionListener->checkLocalTransaction(message); + long tranStateTableOffset, + long commitLogOffset, + const std::string& msgId, + const std::string& transactionId, + const std::string& offsetMsgId) { + LocalTransactionState localTransactionState = UNKNOW; + try { + m_transactionListener->checkLocalTransaction(message); + } catch (MQException& e) { + LOG_INFO("checkTransactionState, checkLocalTransaction exception: %s", e.what()); + } + EndTransactionRequestHeader* endHeader = new EndTransactionRequestHeader(); - endHeader->m_commitLogOffset = m_commitLogOffset; + endHeader->m_commitLogOffset = commitLogOffset; endHeader->m_producerGroup = getGroupName(); - endHeader->m_tranStateTableOffset = m_tranStateTableOffset; + endHeader->m_tranStateTableOffset = tranStateTableOffset; endHeader->m_fromTransactionCheck = true; - string uniqueKey = m_transactionId; - if (m_transactionId == "") - uniqueKey = message.getMsgId(); + string uniqueKey = transactionId; + if (transactionId.empty()) { + uniqueKey = message.getMsgId(); + } endHeader->m_msgId = uniqueKey; - endHeader->m_transactionId = m_transactionId; + endHeader->m_transactionId = transactionId; switch (localTransactionState) { case COMMIT_MESSAGE: endHeader->m_commitOrRollback = MessageSysFlag::TransactionCommitType; diff --git a/src/protocol/RemotingCommand.cpp b/src/protocol/RemotingCommand.cpp index 16e040f1f..08765de60 100644 --- a/src/protocol/RemotingCommand.cpp +++ b/src/protocol/RemotingCommand.cpp @@ -245,6 +245,7 @@ void RemotingCommand::SetExtHeader(int code) { break; case CHECK_TRANSACTION_STATE: m_pExtHeader.reset(CheckTransactionStateRequestHeader::Decode(ext)); + break; default: break; } From 179a7d5c71a80c706871ba3ad539378c720d789f Mon Sep 17 00:00:00 2001 From: jonnxu <jonnxu@163.com> Date: Mon, 15 Jul 2019 22:26:31 +0800 Subject: [PATCH 07/14] Add async transaction check --- example/TransactionProducer.cpp | 8 ++-- include/TransactionMQProducer.h | 25 +++++++++++-- src/producer/TransactionMQProducer.cpp | 52 +++++++++++++++++++------- 3 files changed, 63 insertions(+), 22 deletions(-) diff --git a/example/TransactionProducer.cpp b/example/TransactionProducer.cpp index 93ac93737..b8e3b3995 100644 --- a/example/TransactionProducer.cpp +++ b/example/TransactionProducer.cpp @@ -36,7 +36,6 @@ TpsReportService g_tps; class MyTransactionListener : public TransactionListener { virtual LocalTransactionState executeLocalTransaction(const MQMessage& msg, void* arg) { - std::cout << "executeLocalTransaction enter msg:" << msg.toString() << endl; if (!arg) { std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: COMMIT_MESAGE " << endl; return LocalTransactionState::COMMIT_MESSAGE; @@ -50,7 +49,6 @@ class MyTransactionListener : public TransactionListener { virtual LocalTransactionState checkLocalTransaction(const MQMessageExt& msg) { std::cout << "checkLocalTransaction enter msg:" << msg.toString() << endl; - std::cout << "checkLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: COMMIT_MESSAGE" << endl; return LocalTransactionState::COMMIT_MESSAGE; } }; @@ -58,7 +56,7 @@ class MyTransactionListener : public TransactionListener { void SyncProducerWorker(RocketmqSendAndConsumerArgs* info, TransactionMQProducer* producer) { while (!g_quit.load()) { if (g_msgCount.load() <= 0) { - std::this_thread::sleep_for(std::chrono::seconds(70)); + std::this_thread::sleep_for(std::chrono::seconds(60)); std::unique_lock<std::mutex> lck(g_mtx); g_finished.notify_one(); break; @@ -72,7 +70,7 @@ void SyncProducerWorker(RocketmqSendAndConsumerArgs* info, TransactionMQProducer std::cout << "before sendMessageInTransaction" << endl; LocalTransactionState state = LocalTransactionState::UNKNOW; TransactionSendResult sendResult = producer->sendMessageInTransaction(msg, &state); - std::cout << "sendMessageInTransaction msgId: " << sendResult.getMsgId() << endl; + std::cout << "after sendMessageInTransaction msgId: " << sendResult.getMsgId() << endl; g_tps.Increment(); --g_msgCount; auto end = std::chrono::system_clock::now(); @@ -81,7 +79,7 @@ void SyncProducerWorker(RocketmqSendAndConsumerArgs* info, TransactionMQProducer std::cout << "send RT more than: " << duration.count() << " ms with msgid: " << sendResult.getMsgId() << endl; } } catch (const MQException& e) { - std::cout << "send failed: " << std::endl; + std::cout << "send failed: " << e.what() << std::endl; } } } diff --git a/include/TransactionMQProducer.h b/include/TransactionMQProducer.h index 4e3b18e01..fcd9a7c95 100644 --- a/include/TransactionMQProducer.h +++ b/include/TransactionMQProducer.h @@ -18,6 +18,11 @@ #ifndef __TRANSACTIONMQPRODUCER_H__ #define __TRANSACTIONMQPRODUCER_H__ +#include <boost/asio.hpp> +#include <boost/asio/io_service.hpp> +#include <boost/bind.hpp> +#include <boost/date_time/posix_time/posix_time.hpp> +#include <boost/weak_ptr.hpp> #include <memory> #include <string> #include "DefaultMQProducer.h" @@ -29,14 +34,16 @@ namespace rocketmq { class ROCKETMQCLIENT_API TransactionMQProducer : public DefaultMQProducer { public: - TransactionMQProducer(const std::string& producerGroup) : DefaultMQProducer(producerGroup) {} + TransactionMQProducer(const std::string& producerGroup) + : DefaultMQProducer(producerGroup), m_thread_num(1), m_ioServiceWork(m_ioService) {} virtual ~TransactionMQProducer() {} void start(); void shutdown(); std::shared_ptr<TransactionListener> getTransactionListener() { return m_transactionListener; } void setTransactionListener(TransactionListener* listener) { m_transactionListener.reset(listener); } TransactionSendResult sendMessageInTransaction(MQMessage& msg, void* arg); - void checkTransactionState(const std::string& addr, const MQMessageExt& message, + void checkTransactionState(const std::string& addr, + const MQMessageExt& message, long tranStateTableOffset, long commitLogOffset, const std::string& msgId, @@ -46,11 +53,21 @@ class ROCKETMQCLIENT_API TransactionMQProducer : public DefaultMQProducer { private: void initTransactionEnv(); void destroyTransactionEnv(); - void endTransaction(SendResult& sendResult, - LocalTransactionState& localTransactionState); + void endTransaction(SendResult& sendResult, LocalTransactionState& localTransactionState); + void checkTransactionStateImpl(const std::string& addr, + const MQMessageExt& message, + long tranStateTableOffset, + long commitLogOffset, + const std::string& msgId, + const std::string& transactionId, + const std::string& offsetMsgId); private: std::shared_ptr<TransactionListener> m_transactionListener; + int m_thread_num; + boost::thread_group m_threadpool; + boost::asio::io_service m_ioService; + boost::asio::io_service::work m_ioServiceWork; }; } // namespace rocketmq diff --git a/src/producer/TransactionMQProducer.cpp b/src/producer/TransactionMQProducer.cpp index 92b0e9fc5..86c7cd7eb 100644 --- a/src/producer/TransactionMQProducer.cpp +++ b/src/producer/TransactionMQProducer.cpp @@ -28,12 +28,19 @@ using namespace std; namespace rocketmq { -void TransactionMQProducer::initTransactionEnv() {} +void TransactionMQProducer::initTransactionEnv() { + for (int i = 0; i < m_thread_num; ++i) { + m_threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &m_ioService)); + } +} -void TransactionMQProducer::destroyTransactionEnv() {} +void TransactionMQProducer::destroyTransactionEnv() { + m_ioService.stop(); + m_threadpool.join_all(); +} TransactionSendResult TransactionMQProducer::sendMessageInTransaction(MQMessage& msg, void* arg) { - if (nullptr == m_transactionListener) { + if (!m_transactionListener) { THROW_MQEXCEPTION(MQClientException, "transactionListener is null", -1); } @@ -46,7 +53,6 @@ TransactionSendResult TransactionMQProducer::sendMessageInTransaction(MQMessage& THROW_MQEXCEPTION(MQClientException, e.what(), -1); } - LOG_DEBUG("sendMessageInTransaction result:%s", sendResult.toString().data()); LocalTransactionState localTransactionState = LocalTransactionState::UNKNOW; switch (sendResult.getSendStatus()) { case SendStatus::SEND_OK: @@ -58,8 +64,8 @@ TransactionSendResult TransactionMQProducer::sendMessageInTransaction(MQMessage& if (transactionId != "") { msg.setTransactionId(transactionId); } - LOG_DEBUG("sendMessageInTransaction, msgId:%s, transactionId:%s", - sendResult.getMsgId().data(), transactionId.data()); + LOG_DEBUG("sendMessageInTransaction, msgId:%s, transactionId:%s", sendResult.getMsgId().data(), + transactionId.data()); localTransactionState = m_transactionListener->executeLocalTransaction(msg, arg); if (localTransactionState != LocalTransactionState::COMMIT_MESSAGE) { LOG_WARN("executeLocalTransaction ret not LocalTransactionState::commit, msg:%s", msg.toString().data()); @@ -72,6 +78,7 @@ TransactionSendResult TransactionMQProducer::sendMessageInTransaction(MQMessage& case SendStatus::SEND_FLUSH_SLAVE_TIMEOUT: case SendStatus::SEND_SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState::ROLLBACK_MESSAGE; + LOG_WARN("sendMessageInTransaction, send not ok, rollback, result:%s", sendResult.toString().data()); break; default: break; @@ -124,20 +131,39 @@ void TransactionMQProducer::endTransaction(SendResult& sendResult, LocalTransact getFactory()->endTransactionOneway(sendResult.getMessageQueue(), requestHeader, getSessionCredentials()); } -void TransactionMQProducer::checkTransactionState(const std::string& addr, const MQMessageExt& message, +void TransactionMQProducer::checkTransactionState(const std::string& addr, + const MQMessageExt& message, long tranStateTableOffset, long commitLogOffset, const std::string& msgId, const std::string& transactionId, const std::string& offsetMsgId) { - LocalTransactionState localTransactionState = UNKNOW; + + LOG_DEBUG("checkTransactionState: msgId:%s, transactionId:%s", msgId.data(), transactionId.data()); + if (!m_transactionListener) { + LOG_WARN("checkTransactionState, transactionListener null"); + THROW_MQEXCEPTION(MQClientException, "checkTransactionState, transactionListener null", -1); + } + m_ioService.post(boost::bind(&TransactionMQProducer::checkTransactionStateImpl, this, addr, message, + tranStateTableOffset, commitLogOffset, msgId, transactionId, offsetMsgId)); +} + +void TransactionMQProducer::checkTransactionStateImpl(const std::string& addr, + const MQMessageExt& message, + long tranStateTableOffset, + long commitLogOffset, + const std::string& msgId, + const std::string& transactionId, + const std::string& offsetMsgId) { + LOG_DEBUG("checkTransactionStateImpl: msgId:%s, transactionId:%s", msgId.data(), transactionId.data()); + LocalTransactionState localTransactionState = UNKNOW; try { - m_transactionListener->checkLocalTransaction(message); + localTransactionState = m_transactionListener->checkLocalTransaction(message); } catch (MQException& e) { LOG_INFO("checkTransactionState, checkLocalTransaction exception: %s", e.what()); } - + EndTransactionRequestHeader* endHeader = new EndTransactionRequestHeader(); endHeader->m_commitLogOffset = commitLogOffset; endHeader->m_producerGroup = getGroupName(); @@ -148,7 +174,7 @@ void TransactionMQProducer::checkTransactionState(const std::string& addr, const if (transactionId.empty()) { uniqueKey = message.getMsgId(); } - + endHeader->m_msgId = uniqueKey; endHeader->m_transactionId = transactionId; switch (localTransactionState) { @@ -167,8 +193,8 @@ void TransactionMQProducer::checkTransactionState(const std::string& addr, const break; } - LOG_INFO("checkTransactionState, endTransactionOneway: uniqueKey:%s, client state:%d, end header: %s", uniqueKey.data(), localTransactionState, - endHeader->toString().data()); + LOG_INFO("checkTransactionState, endTransactionOneway: uniqueKey:%s, client state:%d, end header: %s", + uniqueKey.data(), localTransactionState, endHeader->toString().data()); string remark; try { From 2f15a482467b504b258a758b83e3e36690d08c5e Mon Sep 17 00:00:00 2001 From: jonnxu <jonnxu@163.com> Date: Sat, 20 Jul 2019 21:05:18 +0800 Subject: [PATCH 08/14] update by review, change UNKNOW to UNKNOWN --- example/TransactionProducer.cpp | 2 +- include/TransactionSendResult.h | 2 +- src/producer/TransactionMQProducer.cpp | 9 ++++----- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/example/TransactionProducer.cpp b/example/TransactionProducer.cpp index b8e3b3995..72ae11b9b 100644 --- a/example/TransactionProducer.cpp +++ b/example/TransactionProducer.cpp @@ -68,7 +68,7 @@ void SyncProducerWorker(RocketmqSendAndConsumerArgs* info, TransactionMQProducer try { auto start = std::chrono::system_clock::now(); std::cout << "before sendMessageInTransaction" << endl; - LocalTransactionState state = LocalTransactionState::UNKNOW; + LocalTransactionState state = LocalTransactionState::UNKNOWN; TransactionSendResult sendResult = producer->sendMessageInTransaction(msg, &state); std::cout << "after sendMessageInTransaction msgId: " << sendResult.getMsgId() << endl; g_tps.Increment(); diff --git a/include/TransactionSendResult.h b/include/TransactionSendResult.h index 423379eb7..cf28465fb 100644 --- a/include/TransactionSendResult.h +++ b/include/TransactionSendResult.h @@ -25,7 +25,7 @@ namespace rocketmq { enum LocalTransactionState { COMMIT_MESSAGE, ROLLBACK_MESSAGE, - UNKNOW, + UNKNOWN }; class ROCKETMQCLIENT_API TransactionSendResult : public SendResult { diff --git a/src/producer/TransactionMQProducer.cpp b/src/producer/TransactionMQProducer.cpp index 86c7cd7eb..909e4f535 100644 --- a/src/producer/TransactionMQProducer.cpp +++ b/src/producer/TransactionMQProducer.cpp @@ -107,7 +107,7 @@ void TransactionMQProducer::endTransaction(SendResult& sendResult, LocalTransact } string transId = sendResult.getTransactionId(); - int commitOrRollback = 0; + int commitOrRollback = MessageSysFlag::TransactionNotType; switch (localTransactionState) { case COMMIT_MESSAGE: commitOrRollback = MessageSysFlag::TransactionCommitType; @@ -115,11 +115,10 @@ void TransactionMQProducer::endTransaction(SendResult& sendResult, LocalTransact case ROLLBACK_MESSAGE: commitOrRollback = MessageSysFlag::TransactionRollbackType; break; - case UNKNOW: + case UNKNOWN: commitOrRollback = MessageSysFlag::TransactionNotType; break; default: - commitOrRollback = MessageSysFlag::TransactionNotType; break; } @@ -157,7 +156,7 @@ void TransactionMQProducer::checkTransactionStateImpl(const std::string& addr, const std::string& transactionId, const std::string& offsetMsgId) { LOG_DEBUG("checkTransactionStateImpl: msgId:%s, transactionId:%s", msgId.data(), transactionId.data()); - LocalTransactionState localTransactionState = UNKNOW; + LocalTransactionState localTransactionState = UNKNOWN; try { localTransactionState = m_transactionListener->checkLocalTransaction(message); } catch (MQException& e) { @@ -185,7 +184,7 @@ void TransactionMQProducer::checkTransactionStateImpl(const std::string& addr, endHeader->m_commitOrRollback = MessageSysFlag::TransactionRollbackType; LOG_WARN("when broker check, client rollback this transaction, %s", endHeader->toString().data()); break; - case UNKNOW: + case UNKNOWN: endHeader->m_commitOrRollback = MessageSysFlag::TransactionNotType; LOG_WARN("when broker check, client does not know this transaction state, %s", endHeader->toString().data()); break; From f3cf8b00a85b5d6a79fa84a35b9651e728bc885d Mon Sep 17 00:00:00 2001 From: jonnxu <jonnxu@163.com> Date: Sat, 20 Jul 2019 21:14:56 +0800 Subject: [PATCH 09/14] Update by review --- src/producer/TransactionMQProducer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/producer/TransactionMQProducer.cpp b/src/producer/TransactionMQProducer.cpp index 909e4f535..4b2071fda 100644 --- a/src/producer/TransactionMQProducer.cpp +++ b/src/producer/TransactionMQProducer.cpp @@ -53,7 +53,7 @@ TransactionSendResult TransactionMQProducer::sendMessageInTransaction(MQMessage& THROW_MQEXCEPTION(MQClientException, e.what(), -1); } - LocalTransactionState localTransactionState = LocalTransactionState::UNKNOW; + LocalTransactionState localTransactionState = LocalTransactionState::UNKNOWN; switch (sendResult.getSendStatus()) { case SendStatus::SEND_OK: try { From 252fea99950680955cc1615a3c1abd7e770206ad Mon Sep 17 00:00:00 2001 From: jonnxu <jonnxu@163.com> Date: Sat, 20 Jul 2019 21:18:54 +0800 Subject: [PATCH 10/14] Update by review, change UNKNOW to UNKNOWN --- src/producer/TransactionMQProducer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/producer/TransactionMQProducer.cpp b/src/producer/TransactionMQProducer.cpp index 909e4f535..4b2071fda 100644 --- a/src/producer/TransactionMQProducer.cpp +++ b/src/producer/TransactionMQProducer.cpp @@ -53,7 +53,7 @@ TransactionSendResult TransactionMQProducer::sendMessageInTransaction(MQMessage& THROW_MQEXCEPTION(MQClientException, e.what(), -1); } - LocalTransactionState localTransactionState = LocalTransactionState::UNKNOW; + LocalTransactionState localTransactionState = LocalTransactionState::UNKNOWN; switch (sendResult.getSendStatus()) { case SendStatus::SEND_OK: try { From 02b1d51f37f62b49024fe7bdc49b65427bdb8dbd Mon Sep 17 00:00:00 2001 From: jonnxu <jonnxu@163.com> Date: Mon, 22 Jul 2019 23:25:38 +0800 Subject: [PATCH 11/14] Delete redundant code --- src/transport/ClientRemotingProcessor.cpp | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/transport/ClientRemotingProcessor.cpp b/src/transport/ClientRemotingProcessor.cpp index 50dc164c8..46396cec8 100644 --- a/src/transport/ClientRemotingProcessor.cpp +++ b/src/transport/ClientRemotingProcessor.cpp @@ -177,11 +177,6 @@ RemotingCommand* ClientRemotingProcessor::checkTransactionState(const std::strin return nullptr; } - MQMessageExt& messageExt = mqvec[0]; - for (auto& pair : messageExt.getProperties()) { - LOG_INFO("checkTransactionState key:%s, value: %s", pair.first.data(), pair.second.data() ); - } - string transactionId = messageExt.getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (transactionId != "") { messageExt.setTransactionId(transactionId); From d409bfa6f67cf734265c7653c166597b59496197 Mon Sep 17 00:00:00 2001 From: jonnxu <jonnxu@163.com> Date: Tue, 23 Jul 2019 23:08:35 +0800 Subject: [PATCH 12/14] Format the codes --- example/TransactionProducer.cpp | 8 ++++---- include/MQMessage.h | 2 +- include/SendResult.h | 8 +++----- include/TransactionSendResult.h | 6 +----- src/MQClientAPIImpl.cpp | 18 ++++++++---------- src/MQClientAPIImpl.h | 13 ++++++------- src/MQClientFactory.cpp | 13 +++++++------ src/MQClientFactory.h | 6 ++++-- src/message/MQMessageId.h | 7 +++---- src/producer/SendResult.cpp | 22 +++++++++++----------- src/producer/TransactionMQProducer.cpp | 1 - src/protocol/CommandHeader.cpp | 4 +--- 12 files changed, 49 insertions(+), 59 deletions(-) diff --git a/example/TransactionProducer.cpp b/example/TransactionProducer.cpp index 72ae11b9b..1aabb0887 100644 --- a/example/TransactionProducer.cpp +++ b/example/TransactionProducer.cpp @@ -35,19 +35,19 @@ TpsReportService g_tps; class MyTransactionListener : public TransactionListener { virtual LocalTransactionState executeLocalTransaction(const MQMessage& msg, void* arg) { - if (!arg) { - std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: COMMIT_MESAGE " << endl; + std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() + << ", return state: COMMIT_MESAGE " << endl; return LocalTransactionState::COMMIT_MESSAGE; } LocalTransactionState state = (LocalTransactionState)(*(int*)arg % 3); - std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: " << state << endl; + std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: " << state + << endl; return state; } virtual LocalTransactionState checkLocalTransaction(const MQMessageExt& msg) { - std::cout << "checkLocalTransaction enter msg:" << msg.toString() << endl; return LocalTransactionState::COMMIT_MESSAGE; } diff --git a/include/MQMessage.h b/include/MQMessage.h index 89ff84ed2..70fab3613 100644 --- a/include/MQMessage.h +++ b/include/MQMessage.h @@ -74,7 +74,7 @@ class ROCKETMQCLIENT_API MQMessage { void setBody(const std::string& body); void setTransactionId(const std::string& id) { m_transactionId = id; } - std::string getTransactionId() const { return m_transactionId; } + std::string getTransactionId() const { return m_transactionId; } std::map<std::string, std::string> getProperties() const; void setProperties(std::map<std::string, std::string>& properties); diff --git a/include/SendResult.h b/include/SendResult.h index f8be3d209..870d03bc4 100644 --- a/include/SendResult.h +++ b/include/SendResult.h @@ -39,9 +39,7 @@ class ROCKETMQCLIENT_API SendResult { SendResult(const SendResult& other); SendResult& operator=(const SendResult& other); - void setTransactionId(const std::string& id) { - m_transactionId = id; - } + void setTransactionId(const std::string& id) { m_transactionId = id; } std::string getTransactionId() { return m_transactionId; } @@ -51,7 +49,7 @@ class ROCKETMQCLIENT_API SendResult { MQMessageQueue getMessageQueue() const; int64 getQueueOffset() const; std::string toString() const; - + private: SendStatus m_sendStatus; std::string m_msgId; @@ -62,5 +60,5 @@ class ROCKETMQCLIENT_API SendResult { }; //<!*************************************************************************** -} //<!end namespace; +} // namespace rocketmq #endif diff --git a/include/TransactionSendResult.h b/include/TransactionSendResult.h index cf28465fb..0bb1e480e 100644 --- a/include/TransactionSendResult.h +++ b/include/TransactionSendResult.h @@ -22,11 +22,7 @@ namespace rocketmq { -enum LocalTransactionState { - COMMIT_MESSAGE, - ROLLBACK_MESSAGE, - UNKNOWN -}; +enum LocalTransactionState { COMMIT_MESSAGE, ROLLBACK_MESSAGE, UNKNOWN }; class ROCKETMQCLIENT_API TransactionSendResult : public SendResult { public: diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp index 7381cedbb..337fbe043 100644 --- a/src/MQClientAPIImpl.cpp +++ b/src/MQClientAPIImpl.cpp @@ -209,14 +209,12 @@ void MQClientAPIImpl::createTopic(const string& addr, THROW_MQEXCEPTION(MQBrokerException, "response is null", -1); } -void MQClientAPIImpl::endTransactionOneway( - std::string addr, - EndTransactionRequestHeader* requestHeader, - std::string remark, - const SessionCredentials& sessionCredentials) { - +void MQClientAPIImpl::endTransactionOneway(std::string addr, + EndTransactionRequestHeader* requestHeader, + std::string remark, + const SessionCredentials& sessionCredentials) { RemotingCommand request(END_TRANSACTION, requestHeader); - request.setRemark(remark); + request.setRemark(remark); callSignatureBeforeRequest(addr, request, sessionCredentials); request.Encode(); m_pRemotingClient->invokeOneway(addr, request); @@ -388,8 +386,8 @@ SendResult MQClientAPIImpl::sendMessageSync(const string& addr, if (pResponse != NULL) { try { SendResult result = processSendResponse(brokerName, msg, pResponse.get()); - LOG_DEBUG("sendMessageSync success:%s to addr:%s,brokername:%s, send status:%d", msg.toString().c_str(), addr.c_str(), - brokerName.c_str(), (int)result.getSendStatus()); + LOG_DEBUG("sendMessageSync success:%s to addr:%s,brokername:%s, send status:%d", msg.toString().c_str(), + addr.c_str(), brokerName.c_str(), (int)result.getSendStatus()); return result; } catch (...) { LOG_ERROR("send error"); @@ -933,4 +931,4 @@ void MQClientAPIImpl::unlockBatchMQ(const string& addr, } //<!************************************************************************ -} //<!end namespace; +} // namespace rocketmq diff --git a/src/MQClientAPIImpl.h b/src/MQClientAPIImpl.h index 27e95c86a..08a8db057 100644 --- a/src/MQClientAPIImpl.h +++ b/src/MQClientAPIImpl.h @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - + #ifndef __MQCLIENTAPIIMPL_H__ #define __MQCLIENTAPIIMPL_H__ #include "AsyncCallback.h" @@ -61,11 +61,10 @@ class MQClientAPIImpl { const string& defaultTopic, TopicConfig topicConfig, const SessionCredentials& sessionCredentials); - void endTransactionOneway( - std::string addr, - EndTransactionRequestHeader* requestHeader, - std::string remark, - const SessionCredentials& sessionCredentials); + void endTransactionOneway(std::string addr, + EndTransactionRequestHeader* requestHeader, + std::string remark, + const SessionCredentials& sessionCredentials); SendResult sendMessage(const string& addr, const string& brokerName, @@ -221,6 +220,6 @@ class MQClientAPIImpl { bool m_firstFetchNameSrv; string m_mqClientId; }; -} //<!end namespace; +} // namespace rocketmq //<!*************************************************************************** #endif diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp index 05357b971..126517d28 100644 --- a/src/MQClientFactory.cpp +++ b/src/MQClientFactory.cpp @@ -15,13 +15,13 @@ * limitations under the License. */ #include "MQClientFactory.h" -#include "TransactionMQProducer.h" #include "ConsumerRunningInfo.h" #include "Logging.h" #include "MQClientManager.h" #include "PullRequest.h" #include "Rebalance.h" #include "TopicPublishInfo.h" +#include "TransactionMQProducer.h" #define MAX_BUFF_SIZE 8192 #define SAFE_BUFF_SIZE 7936 // 8192 - 256 = 7936 @@ -668,7 +668,8 @@ FindBrokerResult* MQClientFactory::findBrokerAddressInAdmin(const string& broker return NULL; } -void MQClientFactory::checkTransactionState(const std::string& addr, const MQMessageExt& messageExt, +void MQClientFactory::checkTransactionState(const std::string& addr, + const MQMessageExt& messageExt, const CheckTransactionStateRequestHeader& checkRequestHeader) { string group = messageExt.getProperty(MQMessage::PROPERTY_PRODUCER_GROUP); if (!group.empty()) { @@ -676,8 +677,9 @@ void MQClientFactory::checkTransactionState(const std::string& addr, const MQMes if (producer != nullptr) { TransactionMQProducer* transProducer = dynamic_cast<TransactionMQProducer*>(producer); if (transProducer != nullptr) { - transProducer->checkTransactionState(addr, messageExt, - checkRequestHeader.m_tranStateTableOffset, checkRequestHeader.m_commitLogOffset, checkRequestHeader.m_msgId, checkRequestHeader.m_transactionId, checkRequestHeader.m_offsetMsgId); + transProducer->checkTransactionState(addr, messageExt, checkRequestHeader.m_tranStateTableOffset, + checkRequestHeader.m_commitLogOffset, checkRequestHeader.m_msgId, + checkRequestHeader.m_transactionId, checkRequestHeader.m_offsetMsgId); } else { LOG_ERROR("checkTransactionState, producer not TransactionMQProducer failed, msg:%s", messageExt.toString().data()); @@ -863,7 +865,6 @@ void MQClientFactory::doRebalanceByConsumerGroup(const string& consumerGroup) { void MQClientFactory::endTransactionOneway(const MQMessageQueue& mq, EndTransactionRequestHeader* requestHeader, const SessionCredentials& sessionCredentials) { - string brokerAddr = findBrokerAddressInPublish(mq.getBrokerName()); string remark = ""; if (!brokerAddr.empty()) { @@ -1144,4 +1145,4 @@ void MQClientFactory::getSessionCredentialsFromOneOfProducerOrConsumer(SessionCr } //<!************************************************************************ -} //<!end namespace; +} // namespace rocketmq diff --git a/src/MQClientFactory.h b/src/MQClientFactory.h index e6b895975..067198f3d 100644 --- a/src/MQClientFactory.h +++ b/src/MQClientFactory.h @@ -72,7 +72,9 @@ class MQClientFactory { void endTransactionOneway(const MQMessageQueue& mq, EndTransactionRequestHeader* requestHeader, const SessionCredentials& sessionCredentials); - void checkTransactionState(const std::string& addr, const MQMessageExt& message, const CheckTransactionStateRequestHeader& checkRequestHeader); + void checkTransactionState(const std::string& addr, + const MQMessageExt& message, + const CheckTransactionStateRequestHeader& checkRequestHeader); MQClientAPIImpl* getMQClientAPIImpl() const; MQProducer* selectProducer(const string& group); MQConsumer* selectConsumer(const string& group); @@ -202,6 +204,6 @@ class MQClientFactory { unique_ptr<boost::thread> m_consumer_async_service_thread; }; -} //<!end namespace; +} // namespace rocketmq #endif diff --git a/src/message/MQMessageId.h b/src/message/MQMessageId.h index 61e1fd203..fbe937bad 100644 --- a/src/message/MQMessageId.h +++ b/src/message/MQMessageId.h @@ -24,12 +24,11 @@ namespace rocketmq { //<!*************************************************************************** class MQMessageId { public: - - MQMessageId(){} + MQMessageId() {} MQMessageId(sockaddr address, int64 offset) : m_address(address), m_offset(offset) {} MQMessageId& operator=(const MQMessageId& id) { if (&id == this) { - return *this; + return *this; } this->m_address = id.m_address; this->m_offset = id.m_offset; @@ -49,6 +48,6 @@ class MQMessageId { int64 m_offset; }; -} //<!end namespace; +} // namespace rocketmq #endif diff --git a/src/producer/SendResult.cpp b/src/producer/SendResult.cpp index fb3f5b8d3..6c5576902 100644 --- a/src/producer/SendResult.cpp +++ b/src/producer/SendResult.cpp @@ -15,9 +15,9 @@ * limitations under the License. */ #include "SendResult.h" +#include <sstream> #include "UtilAll.h" #include "VirtualEnvUtil.h" -#include <sstream> namespace rocketmq { //<!*************************************************************************** @@ -76,16 +76,16 @@ int64 SendResult::getQueueOffset() const { } std::string SendResult::toString() const { - stringstream ss; - ss << "SendResult: "; - ss << "sendStatus:" << m_sendStatus; - ss << ",msgId:" << m_msgId; - ss << ",offsetMsgId:" << m_offsetMsgId; - ss << ",queueOffset:" << m_queueOffset; - ss << ",transactionId:" << m_transactionId; - ss << ",messageQueue:" << m_messageQueue.toString(); - return ss.str(); + stringstream ss; + ss << "SendResult: "; + ss << "sendStatus:" << m_sendStatus; + ss << ",msgId:" << m_msgId; + ss << ",offsetMsgId:" << m_offsetMsgId; + ss << ",queueOffset:" << m_queueOffset; + ss << ",transactionId:" << m_transactionId; + ss << ",messageQueue:" << m_messageQueue.toString(); + return ss.str(); } //<!************************************************************************ -} //<!end namespace; +} // namespace rocketmq diff --git a/src/producer/TransactionMQProducer.cpp b/src/producer/TransactionMQProducer.cpp index 4b2071fda..fbd78c5ac 100644 --- a/src/producer/TransactionMQProducer.cpp +++ b/src/producer/TransactionMQProducer.cpp @@ -137,7 +137,6 @@ void TransactionMQProducer::checkTransactionState(const std::string& addr, const std::string& msgId, const std::string& transactionId, const std::string& offsetMsgId) { - LOG_DEBUG("checkTransactionState: msgId:%s, transactionId:%s", msgId.data(), transactionId.data()); if (!m_transactionListener) { LOG_WARN("checkTransactionState, transactionListener null"); diff --git a/src/protocol/CommandHeader.cpp b/src/protocol/CommandHeader.cpp index 6f5898b43..f41744edd 100644 --- a/src/protocol/CommandHeader.cpp +++ b/src/protocol/CommandHeader.cpp @@ -63,7 +63,6 @@ void CreateTopicRequestHeader::SetDeclaredFieldOfCommandHeader(map<string, strin void CheckTransactionStateRequestHeader::Encode(Json::Value& outData) {} CommandHeader* CheckTransactionStateRequestHeader::Decode(Json::Value& ext) { - CheckTransactionStateRequestHeader* h = new CheckTransactionStateRequestHeader(); Json::Value& tempValue = ext["msgId"]; if (tempValue.isString()) { @@ -123,7 +122,6 @@ void EndTransactionRequestHeader::Encode(Json::Value& outData) { } void EndTransactionRequestHeader::SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap) { - requestMap.insert(pair<string, string>("msgId", m_msgId)); requestMap.insert(pair<string, string>("transactionId", m_transactionId)); requestMap.insert(pair<string, string>("producerGroup", m_producerGroup)); @@ -618,4 +616,4 @@ const string NotifyConsumerIdsChangedRequestHeader::getGroup() const { } //<!************************************************************************ -} //<!end namespace; +} // namespace rocketmq From 64c6190b1379a79d1a5d011ae8b007e22fb1501a Mon Sep 17 00:00:00 2001 From: jonnxu <jonnxu@163.com> Date: Wed, 24 Jul 2019 09:21:04 +0800 Subject: [PATCH 13/14] Format the code --- src/protocol/CommandHeader.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/protocol/CommandHeader.h b/src/protocol/CommandHeader.h index 22f61005e..4a80ecf9c 100644 --- a/src/protocol/CommandHeader.h +++ b/src/protocol/CommandHeader.h @@ -54,6 +54,7 @@ class CheckTransactionStateRequestHeader : public CommandHeader { static CommandHeader* Decode(Json::Value& ext); virtual void SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap); std::string toString(); + public: long m_tranStateTableOffset; long m_commitLogOffset; @@ -482,6 +483,6 @@ class NotifyConsumerIdsChangedRequestHeader : public CommandHeader { }; //<!*************************************************************************** -} //<!end namespace; +} // namespace rocketmq #endif From a559c1ef66cf316859653994a31477a71a5c34f8 Mon Sep 17 00:00:00 2001 From: jonnxu <jonnxu@163.com> Date: Wed, 24 Jul 2019 10:23:47 +0800 Subject: [PATCH 14/14] Update code --- src/transport/ClientRemotingProcessor.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/transport/ClientRemotingProcessor.cpp b/src/transport/ClientRemotingProcessor.cpp index 46396cec8..63736c5ba 100644 --- a/src/transport/ClientRemotingProcessor.cpp +++ b/src/transport/ClientRemotingProcessor.cpp @@ -177,6 +177,7 @@ RemotingCommand* ClientRemotingProcessor::checkTransactionState(const std::strin return nullptr; } + MQMessageExt& messageExt = mqvec[0]; string transactionId = messageExt.getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (transactionId != "") { messageExt.setTransactionId(transactionId);