From 24cfe7283fd038252cb796730aeb3e1feb62397d Mon Sep 17 00:00:00 2001
From: donggang123 <jonnxu@163.com>
Date: Mon, 3 Jun 2019 00:06:17 +0800
Subject: [PATCH 01/14] transaction-development

---
 example/TransactionProducer.cpp | 0
 1 file changed, 0 insertions(+), 0 deletions(-)
 create mode 100644 example/TransactionProducer.cpp

diff --git a/example/TransactionProducer.cpp b/example/TransactionProducer.cpp
new file mode 100644
index 000000000..e69de29bb

From 7365439e201747d5eb03fdf8613dd4ad9b478bc3 Mon Sep 17 00:00:00 2001
From: jonnxu <jonnxu@163.com>
Date: Mon, 3 Jun 2019 00:20:54 +0800
Subject: [PATCH 02/14] Send transaction message feature

---
 example/TransactionProducer.cpp           | 149 +++++++++++++++++
 include/MQMessage.h                       |   4 +
 include/SendResult.h                      |  10 +-
 include/TransactionListener.h             |  48 ++++++
 include/TransactionMQProducer.h           |  57 +++++++
 include/TransactionSendResult.h           |  52 ++++++
 src/MQClientAPIImpl.cpp                   |  18 ++-
 src/MQClientAPIImpl.h                     |   6 +
 src/MQClientFactory.cpp                   |  42 +++++
 src/MQClientFactory.h                     |   5 +-
 src/message/MQMessageId.h                 |  10 ++
 src/producer/SendResult.cpp               |  13 ++
 src/producer/TransactionMQProducer.cpp    | 185 ++++++++++++++++++++++
 src/protocol/CommandHeader.cpp            | 101 ++++++++++++
 src/protocol/CommandHeader.h              |  59 +++++++
 src/protocol/RemotingCommand.cpp          |   3 +
 src/transport/ClientRemotingProcessor.cpp |  54 ++++++-
 src/transport/ClientRemotingProcessor.h   |  13 ++
 18 files changed, 822 insertions(+), 7 deletions(-)
 create mode 100644 include/TransactionListener.h
 create mode 100644 include/TransactionMQProducer.h
 create mode 100644 include/TransactionSendResult.h
 create mode 100644 src/producer/TransactionMQProducer.cpp

diff --git a/example/TransactionProducer.cpp b/example/TransactionProducer.cpp
index e69de29bb..98d5410da 100644
--- a/example/TransactionProducer.cpp
+++ b/example/TransactionProducer.cpp
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <atomic>
+#include <condition_variable>
+#include <iomanip>
+#include <iostream>
+#include <mutex>
+#include <thread>
+#include "TransactionListener.h"
+#include "TransactionMQProducer.h"
+#include "TransactionSendResult.h"
+#include "common.h"
+
+using namespace rocketmq;
+
+std::atomic<bool> g_quit;
+std::mutex g_mtx;
+std::condition_variable g_finished;
+TpsReportService g_tps;
+
+class MyTransactionListener : public TransactionListener {
+  virtual LocalTransactionState executeLocalTransaction(const MQMessage& msg, void* arg) {
+
+	if (!arg) {
+		std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", state: COMMIT_MESAGE " << endl;
+		return LocalTransactionState::COMMIT_MESSAGE;
+	}
+
+	LocalTransactionState state = (LocalTransactionState)(*(int*)arg % 3);
+	m_state_map[msg.getTransactionId()] = state;
+	std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", state: " << state << endl;
+	return state;
+  }
+
+  virtual LocalTransactionState checkLocalTransaction(const MQMessageExt& msg) {
+
+	string transactionId = msg.getTransactionId();
+	LocalTransactionState state; 
+	if (m_state_map.find(transactionId) != m_state_map.end() && m_state_map[transactionId] == LocalTransactionState::UNKNOW) {
+		state = LocalTransactionState::COMMIT_MESSAGE;
+		m_state_map[transactionId] = state;
+	}
+	else {
+		state = LocalTransactionState::UNKNOW;
+	}
+
+	state = LocalTransactionState::COMMIT_MESSAGE;
+
+	std::cout << "checkLocalTransaction  transactionId:" << transactionId << ", state: " << state  << endl;
+    return state;
+  }
+  private:
+  	map<string, LocalTransactionState>  m_state_map;
+};
+
+void SyncProducerWorker(RocketmqSendAndConsumerArgs* info, TransactionMQProducer* producer) {
+  while (!g_quit.load()) {
+    if (g_msgCount.load() <= 0) {
+      std::this_thread::sleep_for(std::chrono::seconds(2));
+      std::unique_lock<std::mutex> lck(g_mtx);
+      g_finished.notify_one();
+    }
+
+    MQMessage msg(info->topic,  // topic
+                  "*",          // tag
+                  info->body);  // body
+    try {
+      auto start = std::chrono::system_clock::now();
+      std::cout << "before sendMessageInTransaction" << endl;
+	  LocalTransactionState state = LocalTransactionState::UNKNOW;
+      TransactionSendResult sendResult = producer->sendMessageInTransaction(msg, &state);
+      std::cout << "sendMessageInTransaction  msgId: " << sendResult.getMsgId() << endl;
+      g_tps.Increment();
+      --g_msgCount;
+      auto end = std::chrono::system_clock::now();
+      auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
+      if (duration.count() >= 500) {
+        std::cout << "send RT more than: " << duration.count() << " ms with msgid: " << sendResult.getMsgId() << endl;
+      }
+    } catch (const MQException& e) {
+      std::cout << "send failed: " << std::endl;
+    }
+  }
+}
+
+int main(int argc, char* argv[]) {
+  RocketmqSendAndConsumerArgs info;
+  if (!ParseArgs(argc, argv, &info)) {
+    exit(-1);
+  }
+  PrintRocketmqSendAndConsumerArgs(info);
+  TransactionMQProducer producer("please_rename_unique_group_name");
+  producer.setNamesrvAddr(info.namesrv);
+  producer.setNamesrvDomain(info.namesrv_domain);
+  producer.setGroupName(info.groupname);
+  producer.setInstanceName(info.groupname);
+  producer.setSessionCredentials("mq acesskey", "mq secretkey", "ALIYUN");
+  producer.setSendMsgTimeout(500);
+  producer.setTcpTransportTryLockTimeout(1000);
+  producer.setTcpTransportConnectTimeout(400);
+  producer.setLogLevel(eLOG_LEVEL_DEBUG);
+  producer.setTransactionListener(new MyTransactionListener());
+  producer.start();
+  std::vector<std::shared_ptr<std::thread>> work_pool;
+  auto start = std::chrono::system_clock::now();
+  int msgcount = g_msgCount.load();
+  g_tps.start();
+
+  int threadCount = info.thread_count;
+  for (int j = 0; j < threadCount; j++) {
+    std::shared_ptr<std::thread> th = std::make_shared<std::thread>(SyncProducerWorker, &info, &producer);
+    work_pool.push_back(th);
+  }
+
+  {
+    std::unique_lock<std::mutex> lck(g_mtx);
+    g_finished.wait(lck);
+    g_quit.store(true);
+  }
+
+  auto end = std::chrono::system_clock::now();
+  auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
+
+  std::cout << "per msg time: " << duration.count() / (double)msgcount << "ms \n"
+            << "========================finished==============================\n";
+
+  for (size_t th = 0; th != work_pool.size(); ++th) {
+    work_pool[th]->join();
+  }
+
+  producer.shutdown();
+
+  return 0;
+}
diff --git a/include/MQMessage.h b/include/MQMessage.h
index e4a6b5e53..89ff84ed2 100644
--- a/include/MQMessage.h
+++ b/include/MQMessage.h
@@ -73,6 +73,9 @@ class ROCKETMQCLIENT_API MQMessage {
   void setBody(const char* body, int len);
   void setBody(const std::string& body);
 
+  void setTransactionId(const std::string& id) { m_transactionId = id; }
+  std::string getTransactionId() const     { return m_transactionId; }
+
   std::map<std::string, std::string> getProperties() const;
   void setProperties(std::map<std::string, std::string>& properties);
 
@@ -132,6 +135,7 @@ class ROCKETMQCLIENT_API MQMessage {
   std::string m_topic;
   int m_flag;
   std::string m_body;
+  std::string m_transactionId;
   std::map<std::string, std::string> m_properties;
 };
 //<!***************************************************************************
diff --git a/include/SendResult.h b/include/SendResult.h
index 2f8883a83..f8be3d209 100644
--- a/include/SendResult.h
+++ b/include/SendResult.h
@@ -39,18 +39,26 @@ class ROCKETMQCLIENT_API SendResult {
   SendResult(const SendResult& other);
   SendResult& operator=(const SendResult& other);
 
+  void setTransactionId(const std::string& id) { 
+      m_transactionId = id;
+  }
+
+  std::string getTransactionId() { return m_transactionId; }
+
   const std::string& getMsgId() const;
   const std::string& getOffsetMsgId() const;
   SendStatus getSendStatus() const;
   MQMessageQueue getMessageQueue() const;
   int64 getQueueOffset() const;
-
+  std::string toString() const;
+  
  private:
   SendStatus m_sendStatus;
   std::string m_msgId;
   std::string m_offsetMsgId;
   MQMessageQueue m_messageQueue;
   int64 m_queueOffset;
+  std::string m_transactionId;
 };
 
 //<!***************************************************************************
diff --git a/include/TransactionListener.h b/include/TransactionListener.h
new file mode 100644
index 000000000..6756e96a7
--- /dev/null
+++ b/include/TransactionListener.h
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __TRANSACTIONLISTENER_H__
+#define __TRANSACTIONLISTENER_H__
+
+#include "MQMessage.h"
+#include "MQMessageExt.h"
+#include "TransactionSendResult.h"
+
+namespace rocketmq {
+class ROCKETMQCLIENT_API TransactionListener {
+ public:
+  virtual ~TransactionListener() {}
+  /**
+   * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
+   *
+   * @param msg Half(prepare) message
+   * @param arg Custom business parameter
+   * @return Transaction state
+   */
+  virtual LocalTransactionState executeLocalTransaction(const MQMessage& msg, void* arg) = 0;
+
+  /**
+   * When no response to prepare(half) message. broker will send check message to check the transaction status, and this
+   * method will be invoked to get local transaction status.
+   *
+   * @param msg Check message
+   * @return Transaction state
+   */
+  virtual LocalTransactionState checkLocalTransaction(const MQMessageExt& msg) = 0;
+};
+}  // namespace rocketmq
+#endif
diff --git a/include/TransactionMQProducer.h b/include/TransactionMQProducer.h
new file mode 100644
index 000000000..924d04a1f
--- /dev/null
+++ b/include/TransactionMQProducer.h
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __TRANSACTIONMQPRODUCER_H__
+#define __TRANSACTIONMQPRODUCER_H__
+
+#include <memory>
+#include <string>
+#include "DefaultMQProducer.h"
+#include "MQMessageExt.h"
+#include "TransactionListener.h"
+#include "TransactionSendResult.h"
+
+namespace rocketmq {
+
+class ROCKETMQCLIENT_API TransactionMQProducer : public DefaultMQProducer {
+ public:
+  TransactionMQProducer(const std::string& producerGroup) : DefaultMQProducer(producerGroup) {}
+  virtual ~TransactionMQProducer() {}
+  void start();
+  void shutdown();
+  std::shared_ptr<TransactionListener> getCheckListener() { return m_transactionListener; }
+  void setTransactionListener(TransactionListener* listener) { m_transactionListener.reset(listener); }
+  TransactionSendResult sendMessageInTransaction(MQMessage& msg, void* arg);
+  void checkTransactionState(const std::string& addr, const MQMessageExt& message,
+                             long m_tranStateTableOffset,
+                             long m_commitLogOffset,
+                             std::string m_msgId,
+                             std::string m_transactionId,
+                             std::string m_offsetMsgId);
+
+ private:
+  void initTransactionEnv();
+  void destroyTransactionEnv();
+  void endTransaction(SendResult& sendResult,
+                      LocalTransactionState& localTransactionState);
+
+ private:
+  std::shared_ptr<TransactionListener> m_transactionListener;
+};
+}  // namespace rocketmq
+
+#endif
diff --git a/include/TransactionSendResult.h b/include/TransactionSendResult.h
new file mode 100644
index 000000000..423379eb7
--- /dev/null
+++ b/include/TransactionSendResult.h
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __TRANSACTIONSENDRESULT_H__
+#define __TRANSACTIONSENDRESULT_H__
+
+#include "SendResult.h"
+
+namespace rocketmq {
+
+enum LocalTransactionState {
+  COMMIT_MESSAGE,
+  ROLLBACK_MESSAGE,
+  UNKNOW,
+};
+
+class ROCKETMQCLIENT_API TransactionSendResult : public SendResult {
+ public:
+  TransactionSendResult() {}
+
+  TransactionSendResult(const SendStatus& sendStatus,
+                        const std::string& msgId,
+                        const std::string& offsetMsgId,
+                        const MQMessageQueue& messageQueue,
+                        int64 queueOffset)
+      : SendResult(sendStatus, msgId, offsetMsgId, messageQueue, queueOffset) {}
+
+  LocalTransactionState getLocalTransactionState() { return m_localTransactionState; }
+
+  void setLocalTransactionState(LocalTransactionState localTransactionState) {
+    m_localTransactionState = localTransactionState;
+  }
+
+ private:
+  LocalTransactionState m_localTransactionState;
+};
+}  // namespace rocketmq
+#endif
\ No newline at end of file
diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index 8ec60449f..7381cedbb 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -209,6 +209,20 @@ void MQClientAPIImpl::createTopic(const string& addr,
   THROW_MQEXCEPTION(MQBrokerException, "response is null", -1);
 }
 
+void MQClientAPIImpl::endTransactionOneway(
+  std::string addr,
+  EndTransactionRequestHeader* requestHeader,
+  std::string remark,
+  const SessionCredentials& sessionCredentials) {
+
+  RemotingCommand request(END_TRANSACTION, requestHeader);
+  request.setRemark(remark);  
+  callSignatureBeforeRequest(addr, request, sessionCredentials);
+  request.Encode();
+  m_pRemotingClient->invokeOneway(addr, request);
+  return;
+}
+
 SendResult MQClientAPIImpl::sendMessage(const string& addr,
                                         const string& brokerName,
                                         const MQMessage& msg,
@@ -373,9 +387,9 @@ SendResult MQClientAPIImpl::sendMessageSync(const string& addr,
   unique_ptr<RemotingCommand> pResponse(m_pRemotingClient->invokeSync(addr, request, timeoutMillis));
   if (pResponse != NULL) {
     try {
-      LOG_DEBUG("sendMessageSync success:%s to addr:%s,brokername:%s", msg.toString().c_str(), addr.c_str(),
-                brokerName.c_str());
       SendResult result = processSendResponse(brokerName, msg, pResponse.get());
+      LOG_DEBUG("sendMessageSync success:%s to addr:%s,brokername:%s, send status:%d", msg.toString().c_str(), addr.c_str(),
+                brokerName.c_str(), (int)result.getSendStatus());
       return result;
     } catch (...) {
       LOG_ERROR("send error");
diff --git a/src/MQClientAPIImpl.h b/src/MQClientAPIImpl.h
index 1a5e202c8..27e95c86a 100644
--- a/src/MQClientAPIImpl.h
+++ b/src/MQClientAPIImpl.h
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+ 
 #ifndef __MQCLIENTAPIIMPL_H__
 #define __MQCLIENTAPIIMPL_H__
 #include "AsyncCallback.h"
@@ -60,6 +61,11 @@ class MQClientAPIImpl {
                    const string& defaultTopic,
                    TopicConfig topicConfig,
                    const SessionCredentials& sessionCredentials);
+  void endTransactionOneway(
+    std::string addr,
+    EndTransactionRequestHeader* requestHeader,
+    std::string remark,
+    const SessionCredentials& sessionCredentials);
 
   SendResult sendMessage(const string& addr,
                          const string& brokerName,
diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp
index cfa62dd99..a033a7b45 100644
--- a/src/MQClientFactory.cpp
+++ b/src/MQClientFactory.cpp
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 #include "MQClientFactory.h"
+#include "TransactionMQProducer.h"
 #include "ConsumerRunningInfo.h"
 #include "Logging.h"
 #include "MQClientManager.h"
@@ -667,6 +668,29 @@ FindBrokerResult* MQClientFactory::findBrokerAddressInAdmin(const string& broker
   return NULL;
 }
 
+void MQClientFactory::checkTransactionState(const std::string& addr, const MQMessageExt& messageExt,
+                                            const CheckTransactionStateRequestHeader& checkRequestHeader) {
+  string group = messageExt.getProperty(MQMessage::PROPERTY_PRODUCER_GROUP);
+  if (group != "") {
+    MQProducer* producer = selectProducer(group);
+    if (producer != nullptr) {
+      TransactionMQProducer* transProducer = dynamic_cast<TransactionMQProducer*>(producer);
+      if (transProducer != nullptr) {
+        transProducer->checkTransactionState(addr, messageExt, 
+            checkRequestHeader.m_tranStateTableOffset, checkRequestHeader.m_commitLogOffset, checkRequestHeader.m_msgId, checkRequestHeader.m_transactionId, checkRequestHeader.m_offsetMsgId);
+      } else {
+        LOG_ERROR("checkTransactionState, producer not TransactionMQProducer failed, msg:%s",
+                  messageExt.toString().data());
+      }
+    } else {
+      LOG_ERROR("checkTransactionState, pick producer by group[%s] failed, msg:%s", group.data(),
+                messageExt.toString().data());
+    }
+  } else {
+    LOG_ERROR("checkTransactionState, pick producer group failed, msg:%s", messageExt.toString().data());
+  }
+}
+
 MQClientAPIImpl* MQClientFactory::getMQClientAPIImpl() const {
   return m_pClientAPIImpl.get();
 }
@@ -836,6 +860,24 @@ void MQClientFactory::doRebalanceByConsumerGroup(const string& consumerGroup) {
   }
 }
 
+void MQClientFactory::endTransactionOneway(const MQMessageQueue& mq,
+                                           EndTransactionRequestHeader* requestHeader,
+                                           const SessionCredentials& sessionCredentials) {
+  
+  string brokerAddr = findBrokerAddressInPublish(mq.getBrokerName());
+  string remark = "";
+  if (!brokerAddr.empty()) {
+    try {
+      getMQClientAPIImpl()->endTransactionOneway(brokerAddr, requestHeader, remark, sessionCredentials);
+    } catch (MQException& e) {
+      LOG_ERROR("endTransactionOneway exception:%s", e.what());
+      throw e;
+    }
+  } else {
+    THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1);
+  }
+}
+
 void MQClientFactory::unregisterClient(const string& producerGroup,
                                        const string& consumerGroup,
                                        const SessionCredentials& sessionCredentials) {
diff --git a/src/MQClientFactory.h b/src/MQClientFactory.h
index e5d6200fc..e6b895975 100644
--- a/src/MQClientFactory.h
+++ b/src/MQClientFactory.h
@@ -69,7 +69,10 @@ class MQClientFactory {
                            int64 begin,
                            int64 end,
                            const SessionCredentials& session_credentials);
-
+  void endTransactionOneway(const MQMessageQueue& mq,
+                            EndTransactionRequestHeader* requestHeader,
+                            const SessionCredentials& sessionCredentials);
+  void checkTransactionState(const std::string& addr, const MQMessageExt& message, const CheckTransactionStateRequestHeader& checkRequestHeader);
   MQClientAPIImpl* getMQClientAPIImpl() const;
   MQProducer* selectProducer(const string& group);
   MQConsumer* selectConsumer(const string& group);
diff --git a/src/message/MQMessageId.h b/src/message/MQMessageId.h
index 38d11eeae..61e1fd203 100644
--- a/src/message/MQMessageId.h
+++ b/src/message/MQMessageId.h
@@ -24,7 +24,17 @@ namespace rocketmq {
 //<!***************************************************************************
 class MQMessageId {
  public:
+    
+  MQMessageId(){}
   MQMessageId(sockaddr address, int64 offset) : m_address(address), m_offset(offset) {}
+  MQMessageId& operator=(const MQMessageId& id) {
+    if (&id == this) {
+        return *this;
+    }
+    this->m_address = id.m_address;
+    this->m_offset = id.m_offset;
+    return *this;
+  }
 
   sockaddr getAddress() const { return m_address; }
 
diff --git a/src/producer/SendResult.cpp b/src/producer/SendResult.cpp
index 81ddf7661..fb3f5b8d3 100644
--- a/src/producer/SendResult.cpp
+++ b/src/producer/SendResult.cpp
@@ -17,6 +17,7 @@
 #include "SendResult.h"
 #include "UtilAll.h"
 #include "VirtualEnvUtil.h"
+#include <sstream>
 
 namespace rocketmq {
 //<!***************************************************************************
@@ -74,5 +75,17 @@ int64 SendResult::getQueueOffset() const {
   return m_queueOffset;
 }
 
+std::string SendResult::toString() const {
+    stringstream ss;
+    ss << "SendResult: ";
+    ss << "sendStatus:" << m_sendStatus;
+    ss << ",msgId:" << m_msgId;
+    ss << ",offsetMsgId:" << m_offsetMsgId;
+    ss << ",queueOffset:" << m_queueOffset;
+    ss << ",transactionId:" << m_transactionId;   
+    ss << ",messageQueue:" << m_messageQueue.toString();
+    return ss.str();
+}
+
 //<!************************************************************************
 }  //<!end namespace;
diff --git a/src/producer/TransactionMQProducer.cpp b/src/producer/TransactionMQProducer.cpp
new file mode 100644
index 000000000..1023a21af
--- /dev/null
+++ b/src/producer/TransactionMQProducer.cpp
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TransactionMQProducer.h"
+#include <string>
+#include "CommandHeader.h"
+#include "Logging.h"
+#include "MQClientFactory.h"
+#include "MQDecoder.h"
+#include "MessageSysFlag.h"
+#include "TransactionListener.h"
+#include "TransactionSendResult.h"
+
+using namespace std;
+namespace rocketmq {
+
+void TransactionMQProducer::initTransactionEnv() {}
+
+void TransactionMQProducer::destroyTransactionEnv() {}
+
+TransactionSendResult TransactionMQProducer::sendMessageInTransaction(MQMessage& msg, void* arg) {
+  if (nullptr == m_transactionListener) {
+    THROW_MQEXCEPTION(MQClientException, "transactionListener is null", -1);
+  }
+
+  SendResult sendResult;
+  msg.setProperty(MQMessage::PROPERTY_TRANSACTION_PREPARED, "true");
+  msg.setProperty(MQMessage::PROPERTY_PRODUCER_GROUP, getGroupName());
+  try {
+    sendResult = send(msg);
+  } catch (MQException& e) {
+    THROW_MQEXCEPTION(MQClientException, e.what(), -1);
+  }
+
+  LOG_DEBUG("sendMessageInTransaction result:%s", sendResult.toString().data());
+  LocalTransactionState localTransactionState = LocalTransactionState::UNKNOW;
+  switch (sendResult.getSendStatus()) {
+    case SendStatus::SEND_OK: {
+      try {
+        if (sendResult.getTransactionId() != "") {
+          msg.setProperty("__transactionId__", sendResult.getTransactionId());
+        }
+        string transactionId = msg.getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
+        if (transactionId != "") {
+          msg.setTransactionId(transactionId);
+        }
+        LOG_DEBUG("sendMessageInTransaction, msgId:%s, transactionId:%s",
+                  sendResult.getMsgId().data(), transactionId.data());
+        localTransactionState = m_transactionListener->executeLocalTransaction(msg, arg);
+        if (localTransactionState != LocalTransactionState::COMMIT_MESSAGE) {
+          LOG_WARN("executeLocalTransaction ret not LocalTransactionState::commit, msg:%s", msg.toString().data());
+        }
+      } catch (MQException& e) {
+        THROW_MQEXCEPTION(MQClientException, e.what(), -1);
+      }
+    } break;
+    case SendStatus::SEND_FLUSH_DISK_TIMEOUT:
+    case SendStatus::SEND_FLUSH_SLAVE_TIMEOUT:
+    case SendStatus::SEND_SLAVE_NOT_AVAILABLE:
+      localTransactionState = LocalTransactionState::ROLLBACK_MESSAGE;
+      break;
+    default:
+      break;
+  }
+
+  try {
+    endTransaction(sendResult, localTransactionState);
+  } catch (MQException& e) {
+    LOG_WARN("endTransaction exception:%s", e.what());
+  }
+
+  TransactionSendResult transactionSendResult(sendResult.getSendStatus(), sendResult.getMsgId(),
+                                              sendResult.getOffsetMsgId(), sendResult.getMessageQueue(),
+                                              sendResult.getQueueOffset());
+  transactionSendResult.setTransactionId(msg.getTransactionId());
+  transactionSendResult.setLocalTransactionState(localTransactionState);
+  return transactionSendResult;
+}
+
+void TransactionMQProducer::endTransaction(SendResult& sendResult, LocalTransactionState& localTransactionState) {
+  MQMessageId id;
+  if (sendResult.getOffsetMsgId() != "") {
+    id = MQDecoder::decodeMessageId(sendResult.getOffsetMsgId());
+  } else {
+    id = MQDecoder::decodeMessageId(sendResult.getMsgId());
+  }
+  string transId = sendResult.getTransactionId();
+
+  int commitOrRollback = 0;
+  switch (localTransactionState) {
+    case COMMIT_MESSAGE:
+      commitOrRollback = MessageSysFlag::TransactionCommitType;
+      break;
+    case ROLLBACK_MESSAGE:
+      commitOrRollback = MessageSysFlag::TransactionRollbackType;
+      break;
+    case UNKNOW:
+      commitOrRollback = MessageSysFlag::TransactionNotType;
+      break;
+    default:
+      commitOrRollback = MessageSysFlag::TransactionNotType;
+      break;
+  }
+
+  bool fromTransCheck = false;
+  EndTransactionRequestHeader* requestHeader =
+      new EndTransactionRequestHeader(getGroupName(), sendResult.getQueueOffset(), id.getOffset(), commitOrRollback,
+                                      fromTransCheck, sendResult.getMsgId(), transId);
+  LOG_DEBUG("endTransaction: msg:%s", requestHeader->toString().data());
+  getFactory()->endTransactionOneway(sendResult.getMessageQueue(), requestHeader, getSessionCredentials());
+}
+
+void TransactionMQProducer::checkTransactionState(const std::string& addr, const MQMessageExt& message,
+                                                  long m_tranStateTableOffset,
+                                                  long m_commitLogOffset,
+                                                  std::string m_msgId,
+                                                  std::string m_transactionId,
+                                                  std::string m_offsetMsgId) {
+  LocalTransactionState localTransactionState = m_transactionListener->checkLocalTransaction(message);
+
+  EndTransactionRequestHeader* endHeader = new EndTransactionRequestHeader();
+  endHeader->m_commitLogOffset = m_commitLogOffset;
+  endHeader->m_producerGroup = getGroupName();
+  endHeader->m_tranStateTableOffset = m_tranStateTableOffset;
+  endHeader->m_fromTransactionCheck = true;
+
+  string uniqueKey = m_transactionId;
+  if (m_transactionId == "")
+  	uniqueKey = message.getMsgId();
+  
+  endHeader->m_msgId = uniqueKey;
+  endHeader->m_transactionId = m_transactionId;
+  switch (localTransactionState) {
+    case COMMIT_MESSAGE:
+      endHeader->m_commitOrRollback = MessageSysFlag::TransactionCommitType;
+      break;
+    case ROLLBACK_MESSAGE:
+      endHeader->m_commitOrRollback = MessageSysFlag::TransactionRollbackType;
+      LOG_WARN("when broker check, client rollback this transaction, %s", endHeader->toString().data());
+      break;
+    case UNKNOW:
+      endHeader->m_commitOrRollback = MessageSysFlag::TransactionNotType;
+      LOG_WARN("when broker check, client does not know this transaction state, %s", endHeader->toString().data());
+      break;
+    default:
+      break;
+  }
+
+  LOG_INFO("checkTransactionState, endTransactionOneway: uniqueKey:%s, client state:%d, end header: %s", uniqueKey.data(), localTransactionState,
+    endHeader->toString().data());
+
+  string remark;
+  try {
+    getFactory()->getMQClientAPIImpl()->endTransactionOneway(addr, endHeader, remark, getSessionCredentials());
+  } catch (MQException& e) {
+    LOG_ERROR("endTransactionOneway exception:%s", e.what());
+    throw e;
+  }
+}
+
+void TransactionMQProducer::start() {
+  initTransactionEnv();
+  DefaultMQProducer::start();
+}
+
+void TransactionMQProducer::shutdown() {
+  DefaultMQProducer::shutdown();
+  destroyTransactionEnv();
+}
+
+}  // namespace rocketmq
diff --git a/src/protocol/CommandHeader.cpp b/src/protocol/CommandHeader.cpp
index 4e4b0bb5e..807d409cb 100644
--- a/src/protocol/CommandHeader.cpp
+++ b/src/protocol/CommandHeader.cpp
@@ -60,6 +60,107 @@ void CreateTopicRequestHeader::SetDeclaredFieldOfCommandHeader(map<string, strin
   requestMap.insert(pair<string, string>("topicFilterType", topicFilterType));
 }
 
+void CheckTransactionStateRequestHeader::Encode(Json::Value& outData) {}
+
+CommandHeader* CheckTransactionStateRequestHeader::Decode(Json::Value& ext) {
+
+  CheckTransactionStateRequestHeader* h = new CheckTransactionStateRequestHeader();
+  Json::Value& tempValue = ext["msgId"];
+  if (tempValue.isString()) {
+    h->m_msgId = tempValue.asString();
+  }
+
+  tempValue = ext["transactionId"];
+  if (tempValue.isString()) {
+    h->m_transactionId = tempValue.asString();
+  }
+
+  tempValue = ext["offsetMsgId"];
+  if (tempValue.isString()) {
+    h->m_offsetMsgId = tempValue.asString();
+  }
+
+  tempValue = ext["tranStateTableOffset"];
+  if (tempValue.isInt64()) {
+    h->m_tranStateTableOffset = tempValue.asInt64();
+  }
+
+  tempValue = ext["commitLogOffset"];
+  if (tempValue.isInt64()) {
+    h->m_commitLogOffset = tempValue.asInt64();
+  }
+
+  return h;
+}
+
+void CheckTransactionStateRequestHeader::SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap) {
+  requestMap.insert(pair<string, string>("msgId", m_msgId));
+  requestMap.insert(pair<string, string>("transactionId", m_transactionId));
+  requestMap.insert(pair<string, string>("offsetMsgId", m_offsetMsgId));
+  requestMap.insert(pair<string, string>("commitLogOffset", UtilAll::to_string(m_commitLogOffset)));
+  requestMap.insert(pair<string, string>("tranStateTableOffset", UtilAll::to_string(m_tranStateTableOffset)));
+}
+
+std::string CheckTransactionStateRequestHeader::toString() {
+  stringstream ss;
+  ss << "CheckTransactionStateRequestHeader:";
+  ss << " msgId:" << m_msgId;
+  ss << " transactionId:" << m_transactionId;
+  ss << " offsetMsgId:" << m_offsetMsgId;
+  ss << " commitLogOffset:" << m_commitLogOffset;
+  ss << " tranStateTableOffset:" << m_tranStateTableOffset;
+  return ss.str();
+}
+
+void EndTransactionRequestHeader::Encode(Json::Value& outData) {
+  outData["msgId"] = m_msgId;
+  outData["transactionId"] = m_transactionId;
+  outData["producerGroup"] = m_producerGroup;
+  outData["tranStateTableOffset"] = UtilAll::to_string(m_tranStateTableOffset);
+  outData["commitLogOffset"] = UtilAll::to_string(m_commitLogOffset);
+  outData["commitOrRollback"] = UtilAll::to_string(m_commitOrRollback);
+  outData["fromTransactionCheck"] = UtilAll::to_string(m_fromTransactionCheck);
+  LOG_DEBUG(
+		"EndTransactionRequestHeader Encode msgId:%s, transactionId: %s, producerGroup is:%s, UtilAll::to_string( tranStateTableOffset) "
+		"is:%s,UtilAll::to_string( commitLogOffset):%s, UtilAll::to_string( commitOrRollback) "
+		"is:%s, UtilAll::to_string( fromTransactionCheck) is:%s",
+		m_msgId.c_str(), m_transactionId.c_str(), m_producerGroup.c_str(),
+		UtilAll::to_string(m_tranStateTableOffset).c_str(), UtilAll::to_string(m_commitLogOffset).c_str(),
+		UtilAll::to_string(m_commitOrRollback).c_str(), UtilAll::to_string(m_fromTransactionCheck).c_str());  
+}
+
+void EndTransactionRequestHeader::SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap) {
+  LOG_DEBUG(
+		"EndTransactionRequestHeader SetDeclaredFieldOfCommandHeader msgId:%s, transactionId:%s, producerGroup is:%s, UtilAll::to_string( tranStateTableOffset) "
+		"is:%s,UtilAll::to_string( commitLogOffset):%s, UtilAll::to_string( commitOrRollback) "
+		"is:%s, UtilAll::to_string( fromTransactionCheck) is:%s",
+		m_msgId.c_str(), m_transactionId.c_str(), m_producerGroup.c_str(),
+		UtilAll::to_string(m_tranStateTableOffset).c_str(), UtilAll::to_string(m_commitLogOffset).c_str(),
+		UtilAll::to_string(m_commitOrRollback).c_str(), UtilAll::to_string(m_fromTransactionCheck).c_str());
+
+	
+  requestMap.insert(pair<string, string>("msgId", m_msgId));
+  requestMap.insert(pair<string, string>("transactionId", m_transactionId));
+  requestMap.insert(pair<string, string>("producerGroup", m_producerGroup));
+  requestMap.insert(pair<string, string>("tranStateTableOffset", UtilAll::to_string(m_tranStateTableOffset)));
+  requestMap.insert(pair<string, string>("commitLogOffset", UtilAll::to_string(m_commitLogOffset)));
+  requestMap.insert(pair<string, string>("commitOrRollback", UtilAll::to_string(m_commitOrRollback)));
+  requestMap.insert(pair<string, string>("fromTransactionCheck", UtilAll::to_string(m_fromTransactionCheck)));
+}
+
+std::string EndTransactionRequestHeader::toString() {
+  stringstream ss;
+  ss << "EndTransactionRequestHeader:";
+  ss << " m_msgId:" << m_msgId;
+  ss << " m_transactionId:" << m_transactionId;
+  ss << " m_producerGroup:" << m_producerGroup;
+  ss << " m_tranStateTableOffset:" << m_tranStateTableOffset;
+  ss << " m_commitLogOffset:" << m_commitLogOffset;
+  ss << " m_commitOrRollback:" << m_commitOrRollback;
+  ss << " m_fromTransactionCheck:" << m_fromTransactionCheck;
+  return ss.str();
+}
+
 //<!************************************************************************
 void SendMessageRequestHeader::Encode(Json::Value& outData) {
   outData["producerGroup"] = producerGroup;
diff --git a/src/protocol/CommandHeader.h b/src/protocol/CommandHeader.h
index 2ad3e47c8..22f61005e 100644
--- a/src/protocol/CommandHeader.h
+++ b/src/protocol/CommandHeader.h
@@ -18,6 +18,7 @@
 #ifndef __COMMANDCUSTOMHEADER_H__
 #define __COMMANDCUSTOMHEADER_H__
 
+#include <map>
 #include <string>
 #include "MQClientException.h"
 #include "MessageSysFlag.h"
@@ -35,6 +36,64 @@ class CommandHeader {
   virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap) {}
 };
 
+class CheckTransactionStateRequestHeader : public CommandHeader {
+ public:
+  CheckTransactionStateRequestHeader() {}
+  CheckTransactionStateRequestHeader(long tableOffset,
+                                     long commLogOffset,
+                                     const std::string& msgid,
+                                     const std::string& transactionId,
+                                     const std::string& offsetMsgId)
+      : m_tranStateTableOffset(tableOffset),
+        m_commitLogOffset(commLogOffset),
+        m_msgId(msgid),
+        m_transactionId(transactionId),
+        m_offsetMsgId(offsetMsgId) {}
+  virtual ~CheckTransactionStateRequestHeader() {}
+  virtual void Encode(Json::Value& outData);
+  static CommandHeader* Decode(Json::Value& ext);
+  virtual void SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap);
+  std::string toString();
+ public:
+  long m_tranStateTableOffset;
+  long m_commitLogOffset;
+  std::string m_msgId;
+  std::string m_transactionId;
+  std::string m_offsetMsgId;
+};
+
+class EndTransactionRequestHeader : public CommandHeader {
+ public:
+  EndTransactionRequestHeader() {}
+  EndTransactionRequestHeader(const std::string& groupName,
+                              long tableOffset,
+                              long commLogOffset,
+                              int commitOrRoll,
+                              bool fromTransCheck,
+                              const std::string& msgid,
+                              const std::string& transId)
+      : m_producerGroup(groupName),
+        m_tranStateTableOffset(tableOffset),
+        m_commitLogOffset(commLogOffset),
+        m_commitOrRollback(commitOrRoll),
+        m_fromTransactionCheck(fromTransCheck),
+        m_msgId(msgid),
+        m_transactionId(transId) {}
+  virtual ~EndTransactionRequestHeader() {}
+  virtual void Encode(Json::Value& outData);
+  virtual void SetDeclaredFieldOfCommandHeader(std::map<string, string>& requestMap);
+  std::string toString();
+
+ public:
+  std::string m_producerGroup;
+  long m_tranStateTableOffset;
+  long m_commitLogOffset;
+  int m_commitOrRollback;
+  bool m_fromTransactionCheck;
+  std::string m_msgId;
+  std::string m_transactionId;
+};
+
 //<!************************************************************************
 class GetRouteInfoRequestHeader : public CommandHeader {
  public:
diff --git a/src/protocol/RemotingCommand.cpp b/src/protocol/RemotingCommand.cpp
index f556a24cc..16e040f1f 100644
--- a/src/protocol/RemotingCommand.cpp
+++ b/src/protocol/RemotingCommand.cpp
@@ -242,6 +242,9 @@ void RemotingCommand::SetExtHeader(int code) {
           break;
         case NOTIFY_CONSUMER_IDS_CHANGED:
           m_pExtHeader.reset(NotifyConsumerIdsChangedRequestHeader::Decode(ext));
+          break;
+        case CHECK_TRANSACTION_STATE:
+          m_pExtHeader.reset(CheckTransactionStateRequestHeader::Decode(ext));
         default:
           break;
       }
diff --git a/src/transport/ClientRemotingProcessor.cpp b/src/transport/ClientRemotingProcessor.cpp
index b0be046f7..50dc164c8 100644
--- a/src/transport/ClientRemotingProcessor.cpp
+++ b/src/transport/ClientRemotingProcessor.cpp
@@ -28,10 +28,10 @@ ClientRemotingProcessor::ClientRemotingProcessor(MQClientFactory* mqClientFactor
 ClientRemotingProcessor::~ClientRemotingProcessor() {}
 
 RemotingCommand* ClientRemotingProcessor::processRequest(const string& addr, RemotingCommand* request) {
-  LOG_DEBUG("request Command received:processRequest");
+  LOG_INFO("request Command received:processRequest, addr:%s, code:%d", addr.data(), request->getCode());
   switch (request->getCode()) {
     case CHECK_TRANSACTION_STATE:
-      //  return checkTransactionState( request);
+      return checkTransactionState(addr, request);
       break;
     case NOTIFY_CONSUMER_IDS_CHANGED:
       return notifyConsumerIdsChanged(request);
@@ -142,8 +142,56 @@ RemotingCommand* ClientRemotingProcessor::notifyConsumerIdsChanged(RemotingComma
   request->SetExtHeader(request->getCode());
   NotifyConsumerIdsChangedRequestHeader* requestHeader =
       (NotifyConsumerIdsChangedRequestHeader*)request->getCommandHeader();
-  LOG_INFO("notifyConsumerIdsChanged:%s", requestHeader->getGroup().c_str());
+  if (requestHeader == nullptr) {
+    LOG_ERROR("notifyConsumerIdsChanged requestHeader null");
+    return NULL;
+  }
+  string group = requestHeader->getGroup();
+  LOG_INFO("notifyConsumerIdsChanged:%s", group.c_str());
   m_mqClientFactory->doRebalanceByConsumerGroup(requestHeader->getGroup());
   return NULL;
 }
+
+RemotingCommand* ClientRemotingProcessor::checkTransactionState(const std::string& addr, RemotingCommand* request) {
+  if (!request) {
+    LOG_ERROR("checkTransactionState request null");
+    return nullptr;
+  }
+
+  LOG_INFO("checkTransactionState addr:%s, request: %s", addr.data(), request->ToString().data());
+
+  request->SetExtHeader(request->getCode());
+  CheckTransactionStateRequestHeader* requestHeader = (CheckTransactionStateRequestHeader*)request->getCommandHeader();
+  if (!requestHeader) {
+    LOG_ERROR("checkTransactionState CheckTransactionStateRequestHeader requestHeader null");
+    return nullptr;
+  }
+  LOG_INFO("checkTransactionState request: %s", requestHeader->toString().data());
+
+  const MemoryBlock* block = request->GetBody();
+  if (block && block->getSize() > 0) {
+    std::vector<MQMessageExt> mqvec;
+    MQDecoder::decodes(block, mqvec);
+    if (mqvec.size() == 0) {
+      LOG_ERROR("checkTransactionState decodes MQMessageExt fail, request:%s", requestHeader->toString().data());
+      return nullptr;
+    }
+
+    MQMessageExt& messageExt = mqvec[0];
+	for (auto& pair : messageExt.getProperties()) {
+	    LOG_INFO("checkTransactionState key:%s, value: %s", pair.first.data(), pair.second.data() );		
+	}
+
+    string transactionId = messageExt.getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
+    if (transactionId != "") {
+      messageExt.setTransactionId(transactionId);
+    }
+
+    m_mqClientFactory->checkTransactionState(addr, messageExt, *requestHeader);
+  } else {
+    LOG_ERROR("checkTransactionState getbody null or size 0, request Header:%s", requestHeader->toString().data());
+  }
+  return nullptr;
 }
+
+}  // namespace rocketmq
diff --git a/src/transport/ClientRemotingProcessor.h b/src/transport/ClientRemotingProcessor.h
index 58a041707..c88b8bb4b 100644
--- a/src/transport/ClientRemotingProcessor.h
+++ b/src/transport/ClientRemotingProcessor.h
@@ -33,6 +33,7 @@ class ClientRemotingProcessor {
   RemotingCommand* resetOffset(RemotingCommand* request);
   RemotingCommand* getConsumerRunningInfo(const string& addr, RemotingCommand* request);
   RemotingCommand* notifyConsumerIdsChanged(RemotingCommand* request);
+  RemotingCommand* checkTransactionState(const string& addr, RemotingCommand* request);
 
  private:
   MQClientFactory* m_mqClientFactory;
@@ -49,6 +50,18 @@ class ResetOffsetBody {
  private:
   std::map<MQMessageQueue, int64> m_offsetTable;
 };
+
+class CheckTransactionStateBody {
+ public:
+  CheckTransactionStateBody() {}
+  virtual ~CheckTransactionStateBody() { m_offsetTable.clear(); }
+  void setOffsetTable(MQMessageQueue mq, int64 offset);
+  std::map<MQMessageQueue, int64> getOffsetTable();
+  static ResetOffsetBody* Decode(const MemoryBlock* mem);
+
+ private:
+  std::map<MQMessageQueue, int64> m_offsetTable;
+};
 }
 
 #endif

From b50833d8168ef4f118831caf7dd8fea36532d6a5 Mon Sep 17 00:00:00 2001
From: jonnxu <jonnxu@163.com>
Date: Mon, 3 Jun 2019 01:04:34 +0800
Subject: [PATCH 03/14] update transaction producer example

---
 example/TransactionProducer.cpp | 27 +++++++--------------------
 1 file changed, 7 insertions(+), 20 deletions(-)

diff --git a/example/TransactionProducer.cpp b/example/TransactionProducer.cpp
index 98d5410da..3f4953358 100644
--- a/example/TransactionProducer.cpp
+++ b/example/TransactionProducer.cpp
@@ -36,42 +36,29 @@ TpsReportService g_tps;
 class MyTransactionListener : public TransactionListener {
   virtual LocalTransactionState executeLocalTransaction(const MQMessage& msg, void* arg) {
 
+    std::cout << "executeLocalTransaction enter msg:" << msg.toString() << endl;
 	if (!arg) {
-		std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", state: COMMIT_MESAGE " << endl;
+		std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: COMMIT_MESAGE " << endl;
 		return LocalTransactionState::COMMIT_MESSAGE;
 	}
 
 	LocalTransactionState state = (LocalTransactionState)(*(int*)arg % 3);
-	m_state_map[msg.getTransactionId()] = state;
-	std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", state: " << state << endl;
+	std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: " << state << endl;
 	return state;
   }
 
   virtual LocalTransactionState checkLocalTransaction(const MQMessageExt& msg) {
 
-	string transactionId = msg.getTransactionId();
-	LocalTransactionState state; 
-	if (m_state_map.find(transactionId) != m_state_map.end() && m_state_map[transactionId] == LocalTransactionState::UNKNOW) {
-		state = LocalTransactionState::COMMIT_MESSAGE;
-		m_state_map[transactionId] = state;
-	}
-	else {
-		state = LocalTransactionState::UNKNOW;
-	}
-
-	state = LocalTransactionState::COMMIT_MESSAGE;
-
-	std::cout << "checkLocalTransaction  transactionId:" << transactionId << ", state: " << state  << endl;
-    return state;
+    std::cout << "checkLocalTransaction enter msg:" << msg.toString() << endl;
+	std::cout << "checkLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: COMMIT_MESSAGE" << endl;
+    return LocalTransactionState::COMMIT_MESSAGE;
   }
-  private:
-  	map<string, LocalTransactionState>  m_state_map;
 };
 
 void SyncProducerWorker(RocketmqSendAndConsumerArgs* info, TransactionMQProducer* producer) {
   while (!g_quit.load()) {
     if (g_msgCount.load() <= 0) {
-      std::this_thread::sleep_for(std::chrono::seconds(2));
+      std::this_thread::sleep_for(std::chrono::seconds(70));
       std::unique_lock<std::mutex> lck(g_mtx);
       g_finished.notify_one();
     }

From 8d4cffcdb669f620d6ec91736702dda66535bc38 Mon Sep 17 00:00:00 2001
From: xujiang1 <xujiang1@cmschina.com.cn>
Date: Mon, 3 Jun 2019 17:31:39 +0800
Subject: [PATCH 04/14] update when send complete, need break while loop for
 Transaction Producer Demo

---
 example/TransactionProducer.cpp | 1 +
 1 file changed, 1 insertion(+)

diff --git a/example/TransactionProducer.cpp b/example/TransactionProducer.cpp
index 3f4953358..ee03a8421 100644
--- a/example/TransactionProducer.cpp
+++ b/example/TransactionProducer.cpp
@@ -61,6 +61,7 @@ void SyncProducerWorker(RocketmqSendAndConsumerArgs* info, TransactionMQProducer
       std::this_thread::sleep_for(std::chrono::seconds(70));
       std::unique_lock<std::mutex> lck(g_mtx);
       g_finished.notify_one();
+	  break;
     }
 
     MQMessage msg(info->topic,  // topic

From 02ffc3559803dc6bc725f70ba66937219666db94 Mon Sep 17 00:00:00 2001
From: xujiang1 <xujiang1@cmschina.com.cn>
Date: Mon, 3 Jun 2019 17:53:19 +0800
Subject: [PATCH 05/14] update the decode error

---
 src/protocol/CommandHeader.cpp | 25 +++++--------------------
 1 file changed, 5 insertions(+), 20 deletions(-)

diff --git a/src/protocol/CommandHeader.cpp b/src/protocol/CommandHeader.cpp
index 807d409cb..6f5898b43 100644
--- a/src/protocol/CommandHeader.cpp
+++ b/src/protocol/CommandHeader.cpp
@@ -81,13 +81,13 @@ CommandHeader* CheckTransactionStateRequestHeader::Decode(Json::Value& ext) {
   }
 
   tempValue = ext["tranStateTableOffset"];
-  if (tempValue.isInt64()) {
-    h->m_tranStateTableOffset = tempValue.asInt64();
+  if (tempValue.isString()) {
+    h->m_tranStateTableOffset = UtilAll::str2ll(tempValue.asCString());
   }
 
   tempValue = ext["commitLogOffset"];
-  if (tempValue.isInt64()) {
-    h->m_commitLogOffset = tempValue.asInt64();
+  if (tempValue.isString()) {
+    h->m_commitLogOffset = UtilAll::str2ll(tempValue.asCString());
   }
 
   return h;
@@ -120,25 +120,10 @@ void EndTransactionRequestHeader::Encode(Json::Value& outData) {
   outData["commitLogOffset"] = UtilAll::to_string(m_commitLogOffset);
   outData["commitOrRollback"] = UtilAll::to_string(m_commitOrRollback);
   outData["fromTransactionCheck"] = UtilAll::to_string(m_fromTransactionCheck);
-  LOG_DEBUG(
-		"EndTransactionRequestHeader Encode msgId:%s, transactionId: %s, producerGroup is:%s, UtilAll::to_string( tranStateTableOffset) "
-		"is:%s,UtilAll::to_string( commitLogOffset):%s, UtilAll::to_string( commitOrRollback) "
-		"is:%s, UtilAll::to_string( fromTransactionCheck) is:%s",
-		m_msgId.c_str(), m_transactionId.c_str(), m_producerGroup.c_str(),
-		UtilAll::to_string(m_tranStateTableOffset).c_str(), UtilAll::to_string(m_commitLogOffset).c_str(),
-		UtilAll::to_string(m_commitOrRollback).c_str(), UtilAll::to_string(m_fromTransactionCheck).c_str());  
 }
 
 void EndTransactionRequestHeader::SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap) {
-  LOG_DEBUG(
-		"EndTransactionRequestHeader SetDeclaredFieldOfCommandHeader msgId:%s, transactionId:%s, producerGroup is:%s, UtilAll::to_string( tranStateTableOffset) "
-		"is:%s,UtilAll::to_string( commitLogOffset):%s, UtilAll::to_string( commitOrRollback) "
-		"is:%s, UtilAll::to_string( fromTransactionCheck) is:%s",
-		m_msgId.c_str(), m_transactionId.c_str(), m_producerGroup.c_str(),
-		UtilAll::to_string(m_tranStateTableOffset).c_str(), UtilAll::to_string(m_commitLogOffset).c_str(),
-		UtilAll::to_string(m_commitOrRollback).c_str(), UtilAll::to_string(m_fromTransactionCheck).c_str());
-
-	
+
   requestMap.insert(pair<string, string>("msgId", m_msgId));
   requestMap.insert(pair<string, string>("transactionId", m_transactionId));
   requestMap.insert(pair<string, string>("producerGroup", m_producerGroup));

From 269e967273e7ab945765fc64994696fcaeee46ba Mon Sep 17 00:00:00 2001
From: jonnxu <jonnxu@163.com>
Date: Sun, 14 Jul 2019 00:46:31 +0800
Subject: [PATCH 06/14] update by review

---
 example/TransactionProducer.cpp        | 22 ++++++++--------
 include/TransactionMQProducer.h        | 12 ++++-----
 src/MQClientFactory.cpp                |  2 +-
 src/producer/TransactionMQProducer.cpp | 35 +++++++++++++++-----------
 src/protocol/RemotingCommand.cpp       |  1 +
 5 files changed, 40 insertions(+), 32 deletions(-)

diff --git a/example/TransactionProducer.cpp b/example/TransactionProducer.cpp
index ee03a8421..93ac93737 100644
--- a/example/TransactionProducer.cpp
+++ b/example/TransactionProducer.cpp
@@ -37,20 +37,20 @@ class MyTransactionListener : public TransactionListener {
   virtual LocalTransactionState executeLocalTransaction(const MQMessage& msg, void* arg) {
 
     std::cout << "executeLocalTransaction enter msg:" << msg.toString() << endl;
-	if (!arg) {
-		std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: COMMIT_MESAGE " << endl;
-		return LocalTransactionState::COMMIT_MESSAGE;
-	}
-
-	LocalTransactionState state = (LocalTransactionState)(*(int*)arg % 3);
-	std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: " << state << endl;
-	return state;
+    if (!arg) {
+      std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: COMMIT_MESAGE " << endl;
+      return LocalTransactionState::COMMIT_MESSAGE;
+    }
+
+    LocalTransactionState state = (LocalTransactionState)(*(int*)arg % 3);
+    std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: " << state << endl;
+    return state;
   }
 
   virtual LocalTransactionState checkLocalTransaction(const MQMessageExt& msg) {
 
     std::cout << "checkLocalTransaction enter msg:" << msg.toString() << endl;
-	std::cout << "checkLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: COMMIT_MESSAGE" << endl;
+    std::cout << "checkLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: COMMIT_MESSAGE" << endl;
     return LocalTransactionState::COMMIT_MESSAGE;
   }
 };
@@ -61,7 +61,7 @@ void SyncProducerWorker(RocketmqSendAndConsumerArgs* info, TransactionMQProducer
       std::this_thread::sleep_for(std::chrono::seconds(70));
       std::unique_lock<std::mutex> lck(g_mtx);
       g_finished.notify_one();
-	  break;
+      break;
     }
 
     MQMessage msg(info->topic,  // topic
@@ -70,7 +70,7 @@ void SyncProducerWorker(RocketmqSendAndConsumerArgs* info, TransactionMQProducer
     try {
       auto start = std::chrono::system_clock::now();
       std::cout << "before sendMessageInTransaction" << endl;
-	  LocalTransactionState state = LocalTransactionState::UNKNOW;
+      LocalTransactionState state = LocalTransactionState::UNKNOW;
       TransactionSendResult sendResult = producer->sendMessageInTransaction(msg, &state);
       std::cout << "sendMessageInTransaction  msgId: " << sendResult.getMsgId() << endl;
       g_tps.Increment();
diff --git a/include/TransactionMQProducer.h b/include/TransactionMQProducer.h
index 924d04a1f..4e3b18e01 100644
--- a/include/TransactionMQProducer.h
+++ b/include/TransactionMQProducer.h
@@ -33,15 +33,15 @@ class ROCKETMQCLIENT_API TransactionMQProducer : public DefaultMQProducer {
   virtual ~TransactionMQProducer() {}
   void start();
   void shutdown();
-  std::shared_ptr<TransactionListener> getCheckListener() { return m_transactionListener; }
+  std::shared_ptr<TransactionListener> getTransactionListener() { return m_transactionListener; }
   void setTransactionListener(TransactionListener* listener) { m_transactionListener.reset(listener); }
   TransactionSendResult sendMessageInTransaction(MQMessage& msg, void* arg);
   void checkTransactionState(const std::string& addr, const MQMessageExt& message,
-                             long m_tranStateTableOffset,
-                             long m_commitLogOffset,
-                             std::string m_msgId,
-                             std::string m_transactionId,
-                             std::string m_offsetMsgId);
+                             long tranStateTableOffset,
+                             long commitLogOffset,
+                             const std::string& msgId,
+                             const std::string& transactionId,
+                             const std::string& offsetMsgId);
 
  private:
   void initTransactionEnv();
diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp
index a033a7b45..05357b971 100644
--- a/src/MQClientFactory.cpp
+++ b/src/MQClientFactory.cpp
@@ -671,7 +671,7 @@ FindBrokerResult* MQClientFactory::findBrokerAddressInAdmin(const string& broker
 void MQClientFactory::checkTransactionState(const std::string& addr, const MQMessageExt& messageExt,
                                             const CheckTransactionStateRequestHeader& checkRequestHeader) {
   string group = messageExt.getProperty(MQMessage::PROPERTY_PRODUCER_GROUP);
-  if (group != "") {
+  if (!group.empty()) {
     MQProducer* producer = selectProducer(group);
     if (producer != nullptr) {
       TransactionMQProducer* transProducer = dynamic_cast<TransactionMQProducer*>(producer);
diff --git a/src/producer/TransactionMQProducer.cpp b/src/producer/TransactionMQProducer.cpp
index 1023a21af..92b0e9fc5 100644
--- a/src/producer/TransactionMQProducer.cpp
+++ b/src/producer/TransactionMQProducer.cpp
@@ -49,7 +49,7 @@ TransactionSendResult TransactionMQProducer::sendMessageInTransaction(MQMessage&
   LOG_DEBUG("sendMessageInTransaction result:%s", sendResult.toString().data());
   LocalTransactionState localTransactionState = LocalTransactionState::UNKNOW;
   switch (sendResult.getSendStatus()) {
-    case SendStatus::SEND_OK: {
+    case SendStatus::SEND_OK:
       try {
         if (sendResult.getTransactionId() != "") {
           msg.setProperty("__transactionId__", sendResult.getTransactionId());
@@ -67,7 +67,7 @@ TransactionSendResult TransactionMQProducer::sendMessageInTransaction(MQMessage&
       } catch (MQException& e) {
         THROW_MQEXCEPTION(MQClientException, e.what(), -1);
       }
-    } break;
+      break;
     case SendStatus::SEND_FLUSH_DISK_TIMEOUT:
     case SendStatus::SEND_FLUSH_SLAVE_TIMEOUT:
     case SendStatus::SEND_SLAVE_NOT_AVAILABLE:
@@ -125,25 +125,32 @@ void TransactionMQProducer::endTransaction(SendResult& sendResult, LocalTransact
 }
 
 void TransactionMQProducer::checkTransactionState(const std::string& addr, const MQMessageExt& message,
-                                                  long m_tranStateTableOffset,
-                                                  long m_commitLogOffset,
-                                                  std::string m_msgId,
-                                                  std::string m_transactionId,
-                                                  std::string m_offsetMsgId) {
-  LocalTransactionState localTransactionState = m_transactionListener->checkLocalTransaction(message);
+                                                  long tranStateTableOffset,
+                                                  long commitLogOffset,
+                                                  const std::string& msgId,
+                                                  const std::string& transactionId,
+                                                  const std::string& offsetMsgId) {
+  LocalTransactionState localTransactionState = UNKNOW;
 
+  try {
+    m_transactionListener->checkLocalTransaction(message);
+  } catch (MQException& e) {
+    LOG_INFO("checkTransactionState, checkLocalTransaction exception: %s", e.what());
+  }
+  
   EndTransactionRequestHeader* endHeader = new EndTransactionRequestHeader();
-  endHeader->m_commitLogOffset = m_commitLogOffset;
+  endHeader->m_commitLogOffset = commitLogOffset;
   endHeader->m_producerGroup = getGroupName();
-  endHeader->m_tranStateTableOffset = m_tranStateTableOffset;
+  endHeader->m_tranStateTableOffset = tranStateTableOffset;
   endHeader->m_fromTransactionCheck = true;
 
-  string uniqueKey = m_transactionId;
-  if (m_transactionId == "")
-  	uniqueKey = message.getMsgId();
+  string uniqueKey = transactionId;
+  if (transactionId.empty()) {
+    uniqueKey = message.getMsgId();
+  }
   
   endHeader->m_msgId = uniqueKey;
-  endHeader->m_transactionId = m_transactionId;
+  endHeader->m_transactionId = transactionId;
   switch (localTransactionState) {
     case COMMIT_MESSAGE:
       endHeader->m_commitOrRollback = MessageSysFlag::TransactionCommitType;
diff --git a/src/protocol/RemotingCommand.cpp b/src/protocol/RemotingCommand.cpp
index 16e040f1f..08765de60 100644
--- a/src/protocol/RemotingCommand.cpp
+++ b/src/protocol/RemotingCommand.cpp
@@ -245,6 +245,7 @@ void RemotingCommand::SetExtHeader(int code) {
           break;
         case CHECK_TRANSACTION_STATE:
           m_pExtHeader.reset(CheckTransactionStateRequestHeader::Decode(ext));
+          break;
         default:
           break;
       }

From 179a7d5c71a80c706871ba3ad539378c720d789f Mon Sep 17 00:00:00 2001
From: jonnxu <jonnxu@163.com>
Date: Mon, 15 Jul 2019 22:26:31 +0800
Subject: [PATCH 07/14] Add async transaction check

---
 example/TransactionProducer.cpp        |  8 ++--
 include/TransactionMQProducer.h        | 25 +++++++++++--
 src/producer/TransactionMQProducer.cpp | 52 +++++++++++++++++++-------
 3 files changed, 63 insertions(+), 22 deletions(-)

diff --git a/example/TransactionProducer.cpp b/example/TransactionProducer.cpp
index 93ac93737..b8e3b3995 100644
--- a/example/TransactionProducer.cpp
+++ b/example/TransactionProducer.cpp
@@ -36,7 +36,6 @@ TpsReportService g_tps;
 class MyTransactionListener : public TransactionListener {
   virtual LocalTransactionState executeLocalTransaction(const MQMessage& msg, void* arg) {
 
-    std::cout << "executeLocalTransaction enter msg:" << msg.toString() << endl;
     if (!arg) {
       std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: COMMIT_MESAGE " << endl;
       return LocalTransactionState::COMMIT_MESSAGE;
@@ -50,7 +49,6 @@ class MyTransactionListener : public TransactionListener {
   virtual LocalTransactionState checkLocalTransaction(const MQMessageExt& msg) {
 
     std::cout << "checkLocalTransaction enter msg:" << msg.toString() << endl;
-    std::cout << "checkLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: COMMIT_MESSAGE" << endl;
     return LocalTransactionState::COMMIT_MESSAGE;
   }
 };
@@ -58,7 +56,7 @@ class MyTransactionListener : public TransactionListener {
 void SyncProducerWorker(RocketmqSendAndConsumerArgs* info, TransactionMQProducer* producer) {
   while (!g_quit.load()) {
     if (g_msgCount.load() <= 0) {
-      std::this_thread::sleep_for(std::chrono::seconds(70));
+      std::this_thread::sleep_for(std::chrono::seconds(60));
       std::unique_lock<std::mutex> lck(g_mtx);
       g_finished.notify_one();
       break;
@@ -72,7 +70,7 @@ void SyncProducerWorker(RocketmqSendAndConsumerArgs* info, TransactionMQProducer
       std::cout << "before sendMessageInTransaction" << endl;
       LocalTransactionState state = LocalTransactionState::UNKNOW;
       TransactionSendResult sendResult = producer->sendMessageInTransaction(msg, &state);
-      std::cout << "sendMessageInTransaction  msgId: " << sendResult.getMsgId() << endl;
+      std::cout << "after sendMessageInTransaction msgId: " << sendResult.getMsgId() << endl;
       g_tps.Increment();
       --g_msgCount;
       auto end = std::chrono::system_clock::now();
@@ -81,7 +79,7 @@ void SyncProducerWorker(RocketmqSendAndConsumerArgs* info, TransactionMQProducer
         std::cout << "send RT more than: " << duration.count() << " ms with msgid: " << sendResult.getMsgId() << endl;
       }
     } catch (const MQException& e) {
-      std::cout << "send failed: " << std::endl;
+      std::cout << "send failed: " << e.what() << std::endl;
     }
   }
 }
diff --git a/include/TransactionMQProducer.h b/include/TransactionMQProducer.h
index 4e3b18e01..fcd9a7c95 100644
--- a/include/TransactionMQProducer.h
+++ b/include/TransactionMQProducer.h
@@ -18,6 +18,11 @@
 #ifndef __TRANSACTIONMQPRODUCER_H__
 #define __TRANSACTIONMQPRODUCER_H__
 
+#include <boost/asio.hpp>
+#include <boost/asio/io_service.hpp>
+#include <boost/bind.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/weak_ptr.hpp>
 #include <memory>
 #include <string>
 #include "DefaultMQProducer.h"
@@ -29,14 +34,16 @@ namespace rocketmq {
 
 class ROCKETMQCLIENT_API TransactionMQProducer : public DefaultMQProducer {
  public:
-  TransactionMQProducer(const std::string& producerGroup) : DefaultMQProducer(producerGroup) {}
+  TransactionMQProducer(const std::string& producerGroup)
+      : DefaultMQProducer(producerGroup), m_thread_num(1), m_ioServiceWork(m_ioService) {}
   virtual ~TransactionMQProducer() {}
   void start();
   void shutdown();
   std::shared_ptr<TransactionListener> getTransactionListener() { return m_transactionListener; }
   void setTransactionListener(TransactionListener* listener) { m_transactionListener.reset(listener); }
   TransactionSendResult sendMessageInTransaction(MQMessage& msg, void* arg);
-  void checkTransactionState(const std::string& addr, const MQMessageExt& message,
+  void checkTransactionState(const std::string& addr,
+                             const MQMessageExt& message,
                              long tranStateTableOffset,
                              long commitLogOffset,
                              const std::string& msgId,
@@ -46,11 +53,21 @@ class ROCKETMQCLIENT_API TransactionMQProducer : public DefaultMQProducer {
  private:
   void initTransactionEnv();
   void destroyTransactionEnv();
-  void endTransaction(SendResult& sendResult,
-                      LocalTransactionState& localTransactionState);
+  void endTransaction(SendResult& sendResult, LocalTransactionState& localTransactionState);
+  void checkTransactionStateImpl(const std::string& addr,
+                                 const MQMessageExt& message,
+                                 long tranStateTableOffset,
+                                 long commitLogOffset,
+                                 const std::string& msgId,
+                                 const std::string& transactionId,
+                                 const std::string& offsetMsgId);
 
  private:
   std::shared_ptr<TransactionListener> m_transactionListener;
+  int m_thread_num;
+  boost::thread_group m_threadpool;
+  boost::asio::io_service m_ioService;
+  boost::asio::io_service::work m_ioServiceWork;
 };
 }  // namespace rocketmq
 
diff --git a/src/producer/TransactionMQProducer.cpp b/src/producer/TransactionMQProducer.cpp
index 92b0e9fc5..86c7cd7eb 100644
--- a/src/producer/TransactionMQProducer.cpp
+++ b/src/producer/TransactionMQProducer.cpp
@@ -28,12 +28,19 @@
 using namespace std;
 namespace rocketmq {
 
-void TransactionMQProducer::initTransactionEnv() {}
+void TransactionMQProducer::initTransactionEnv() {
+  for (int i = 0; i < m_thread_num; ++i) {
+    m_threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &m_ioService));
+  }
+}
 
-void TransactionMQProducer::destroyTransactionEnv() {}
+void TransactionMQProducer::destroyTransactionEnv() {
+  m_ioService.stop();
+  m_threadpool.join_all();
+}
 
 TransactionSendResult TransactionMQProducer::sendMessageInTransaction(MQMessage& msg, void* arg) {
-  if (nullptr == m_transactionListener) {
+  if (!m_transactionListener) {
     THROW_MQEXCEPTION(MQClientException, "transactionListener is null", -1);
   }
 
@@ -46,7 +53,6 @@ TransactionSendResult TransactionMQProducer::sendMessageInTransaction(MQMessage&
     THROW_MQEXCEPTION(MQClientException, e.what(), -1);
   }
 
-  LOG_DEBUG("sendMessageInTransaction result:%s", sendResult.toString().data());
   LocalTransactionState localTransactionState = LocalTransactionState::UNKNOW;
   switch (sendResult.getSendStatus()) {
     case SendStatus::SEND_OK:
@@ -58,8 +64,8 @@ TransactionSendResult TransactionMQProducer::sendMessageInTransaction(MQMessage&
         if (transactionId != "") {
           msg.setTransactionId(transactionId);
         }
-        LOG_DEBUG("sendMessageInTransaction, msgId:%s, transactionId:%s",
-                  sendResult.getMsgId().data(), transactionId.data());
+        LOG_DEBUG("sendMessageInTransaction, msgId:%s, transactionId:%s", sendResult.getMsgId().data(),
+                  transactionId.data());
         localTransactionState = m_transactionListener->executeLocalTransaction(msg, arg);
         if (localTransactionState != LocalTransactionState::COMMIT_MESSAGE) {
           LOG_WARN("executeLocalTransaction ret not LocalTransactionState::commit, msg:%s", msg.toString().data());
@@ -72,6 +78,7 @@ TransactionSendResult TransactionMQProducer::sendMessageInTransaction(MQMessage&
     case SendStatus::SEND_FLUSH_SLAVE_TIMEOUT:
     case SendStatus::SEND_SLAVE_NOT_AVAILABLE:
       localTransactionState = LocalTransactionState::ROLLBACK_MESSAGE;
+      LOG_WARN("sendMessageInTransaction, send not ok, rollback, result:%s", sendResult.toString().data());
       break;
     default:
       break;
@@ -124,20 +131,39 @@ void TransactionMQProducer::endTransaction(SendResult& sendResult, LocalTransact
   getFactory()->endTransactionOneway(sendResult.getMessageQueue(), requestHeader, getSessionCredentials());
 }
 
-void TransactionMQProducer::checkTransactionState(const std::string& addr, const MQMessageExt& message,
+void TransactionMQProducer::checkTransactionState(const std::string& addr,
+                                                  const MQMessageExt& message,
                                                   long tranStateTableOffset,
                                                   long commitLogOffset,
                                                   const std::string& msgId,
                                                   const std::string& transactionId,
                                                   const std::string& offsetMsgId) {
-  LocalTransactionState localTransactionState = UNKNOW;
+                                                  
+  LOG_DEBUG("checkTransactionState: msgId:%s, transactionId:%s", msgId.data(), transactionId.data());
+  if (!m_transactionListener) {
+    LOG_WARN("checkTransactionState, transactionListener null");
+    THROW_MQEXCEPTION(MQClientException, "checkTransactionState, transactionListener null", -1);
+  }
 
+  m_ioService.post(boost::bind(&TransactionMQProducer::checkTransactionStateImpl, this, addr, message,
+                               tranStateTableOffset, commitLogOffset, msgId, transactionId, offsetMsgId));
+}
+
+void TransactionMQProducer::checkTransactionStateImpl(const std::string& addr,
+                                                      const MQMessageExt& message,
+                                                      long tranStateTableOffset,
+                                                      long commitLogOffset,
+                                                      const std::string& msgId,
+                                                      const std::string& transactionId,
+                                                      const std::string& offsetMsgId) {
+  LOG_DEBUG("checkTransactionStateImpl: msgId:%s, transactionId:%s", msgId.data(), transactionId.data());
+  LocalTransactionState localTransactionState = UNKNOW;
   try {
-    m_transactionListener->checkLocalTransaction(message);
+    localTransactionState = m_transactionListener->checkLocalTransaction(message);
   } catch (MQException& e) {
     LOG_INFO("checkTransactionState, checkLocalTransaction exception: %s", e.what());
   }
-  
+
   EndTransactionRequestHeader* endHeader = new EndTransactionRequestHeader();
   endHeader->m_commitLogOffset = commitLogOffset;
   endHeader->m_producerGroup = getGroupName();
@@ -148,7 +174,7 @@ void TransactionMQProducer::checkTransactionState(const std::string& addr, const
   if (transactionId.empty()) {
     uniqueKey = message.getMsgId();
   }
-  
+
   endHeader->m_msgId = uniqueKey;
   endHeader->m_transactionId = transactionId;
   switch (localTransactionState) {
@@ -167,8 +193,8 @@ void TransactionMQProducer::checkTransactionState(const std::string& addr, const
       break;
   }
 
-  LOG_INFO("checkTransactionState, endTransactionOneway: uniqueKey:%s, client state:%d, end header: %s", uniqueKey.data(), localTransactionState,
-    endHeader->toString().data());
+  LOG_INFO("checkTransactionState, endTransactionOneway: uniqueKey:%s, client state:%d, end header: %s",
+           uniqueKey.data(), localTransactionState, endHeader->toString().data());
 
   string remark;
   try {

From 2f15a482467b504b258a758b83e3e36690d08c5e Mon Sep 17 00:00:00 2001
From: jonnxu <jonnxu@163.com>
Date: Sat, 20 Jul 2019 21:05:18 +0800
Subject: [PATCH 08/14] update by review, change UNKNOW to UNKNOWN

---
 example/TransactionProducer.cpp        | 2 +-
 include/TransactionSendResult.h        | 2 +-
 src/producer/TransactionMQProducer.cpp | 9 ++++-----
 3 files changed, 6 insertions(+), 7 deletions(-)

diff --git a/example/TransactionProducer.cpp b/example/TransactionProducer.cpp
index b8e3b3995..72ae11b9b 100644
--- a/example/TransactionProducer.cpp
+++ b/example/TransactionProducer.cpp
@@ -68,7 +68,7 @@ void SyncProducerWorker(RocketmqSendAndConsumerArgs* info, TransactionMQProducer
     try {
       auto start = std::chrono::system_clock::now();
       std::cout << "before sendMessageInTransaction" << endl;
-      LocalTransactionState state = LocalTransactionState::UNKNOW;
+      LocalTransactionState state = LocalTransactionState::UNKNOWN;
       TransactionSendResult sendResult = producer->sendMessageInTransaction(msg, &state);
       std::cout << "after sendMessageInTransaction msgId: " << sendResult.getMsgId() << endl;
       g_tps.Increment();
diff --git a/include/TransactionSendResult.h b/include/TransactionSendResult.h
index 423379eb7..cf28465fb 100644
--- a/include/TransactionSendResult.h
+++ b/include/TransactionSendResult.h
@@ -25,7 +25,7 @@ namespace rocketmq {
 enum LocalTransactionState {
   COMMIT_MESSAGE,
   ROLLBACK_MESSAGE,
-  UNKNOW,
+  UNKNOWN
 };
 
 class ROCKETMQCLIENT_API TransactionSendResult : public SendResult {
diff --git a/src/producer/TransactionMQProducer.cpp b/src/producer/TransactionMQProducer.cpp
index 86c7cd7eb..909e4f535 100644
--- a/src/producer/TransactionMQProducer.cpp
+++ b/src/producer/TransactionMQProducer.cpp
@@ -107,7 +107,7 @@ void TransactionMQProducer::endTransaction(SendResult& sendResult, LocalTransact
   }
   string transId = sendResult.getTransactionId();
 
-  int commitOrRollback = 0;
+  int commitOrRollback = MessageSysFlag::TransactionNotType;
   switch (localTransactionState) {
     case COMMIT_MESSAGE:
       commitOrRollback = MessageSysFlag::TransactionCommitType;
@@ -115,11 +115,10 @@ void TransactionMQProducer::endTransaction(SendResult& sendResult, LocalTransact
     case ROLLBACK_MESSAGE:
       commitOrRollback = MessageSysFlag::TransactionRollbackType;
       break;
-    case UNKNOW:
+    case UNKNOWN:
       commitOrRollback = MessageSysFlag::TransactionNotType;
       break;
     default:
-      commitOrRollback = MessageSysFlag::TransactionNotType;
       break;
   }
 
@@ -157,7 +156,7 @@ void TransactionMQProducer::checkTransactionStateImpl(const std::string& addr,
                                                       const std::string& transactionId,
                                                       const std::string& offsetMsgId) {
   LOG_DEBUG("checkTransactionStateImpl: msgId:%s, transactionId:%s", msgId.data(), transactionId.data());
-  LocalTransactionState localTransactionState = UNKNOW;
+  LocalTransactionState localTransactionState = UNKNOWN;
   try {
     localTransactionState = m_transactionListener->checkLocalTransaction(message);
   } catch (MQException& e) {
@@ -185,7 +184,7 @@ void TransactionMQProducer::checkTransactionStateImpl(const std::string& addr,
       endHeader->m_commitOrRollback = MessageSysFlag::TransactionRollbackType;
       LOG_WARN("when broker check, client rollback this transaction, %s", endHeader->toString().data());
       break;
-    case UNKNOW:
+    case UNKNOWN:
       endHeader->m_commitOrRollback = MessageSysFlag::TransactionNotType;
       LOG_WARN("when broker check, client does not know this transaction state, %s", endHeader->toString().data());
       break;

From f3cf8b00a85b5d6a79fa84a35b9651e728bc885d Mon Sep 17 00:00:00 2001
From: jonnxu <jonnxu@163.com>
Date: Sat, 20 Jul 2019 21:14:56 +0800
Subject: [PATCH 09/14] Update by review

---
 src/producer/TransactionMQProducer.cpp | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/producer/TransactionMQProducer.cpp b/src/producer/TransactionMQProducer.cpp
index 909e4f535..4b2071fda 100644
--- a/src/producer/TransactionMQProducer.cpp
+++ b/src/producer/TransactionMQProducer.cpp
@@ -53,7 +53,7 @@ TransactionSendResult TransactionMQProducer::sendMessageInTransaction(MQMessage&
     THROW_MQEXCEPTION(MQClientException, e.what(), -1);
   }
 
-  LocalTransactionState localTransactionState = LocalTransactionState::UNKNOW;
+  LocalTransactionState localTransactionState = LocalTransactionState::UNKNOWN;
   switch (sendResult.getSendStatus()) {
     case SendStatus::SEND_OK:
       try {

From 252fea99950680955cc1615a3c1abd7e770206ad Mon Sep 17 00:00:00 2001
From: jonnxu <jonnxu@163.com>
Date: Sat, 20 Jul 2019 21:18:54 +0800
Subject: [PATCH 10/14] Update by review, change UNKNOW to UNKNOWN

---
 src/producer/TransactionMQProducer.cpp | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/producer/TransactionMQProducer.cpp b/src/producer/TransactionMQProducer.cpp
index 909e4f535..4b2071fda 100644
--- a/src/producer/TransactionMQProducer.cpp
+++ b/src/producer/TransactionMQProducer.cpp
@@ -53,7 +53,7 @@ TransactionSendResult TransactionMQProducer::sendMessageInTransaction(MQMessage&
     THROW_MQEXCEPTION(MQClientException, e.what(), -1);
   }
 
-  LocalTransactionState localTransactionState = LocalTransactionState::UNKNOW;
+  LocalTransactionState localTransactionState = LocalTransactionState::UNKNOWN;
   switch (sendResult.getSendStatus()) {
     case SendStatus::SEND_OK:
       try {

From 02b1d51f37f62b49024fe7bdc49b65427bdb8dbd Mon Sep 17 00:00:00 2001
From: jonnxu <jonnxu@163.com>
Date: Mon, 22 Jul 2019 23:25:38 +0800
Subject: [PATCH 11/14] Delete redundant code

---
 src/transport/ClientRemotingProcessor.cpp | 5 -----
 1 file changed, 5 deletions(-)

diff --git a/src/transport/ClientRemotingProcessor.cpp b/src/transport/ClientRemotingProcessor.cpp
index 50dc164c8..46396cec8 100644
--- a/src/transport/ClientRemotingProcessor.cpp
+++ b/src/transport/ClientRemotingProcessor.cpp
@@ -177,11 +177,6 @@ RemotingCommand* ClientRemotingProcessor::checkTransactionState(const std::strin
       return nullptr;
     }
 
-    MQMessageExt& messageExt = mqvec[0];
-	for (auto& pair : messageExt.getProperties()) {
-	    LOG_INFO("checkTransactionState key:%s, value: %s", pair.first.data(), pair.second.data() );		
-	}
-
     string transactionId = messageExt.getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
     if (transactionId != "") {
       messageExt.setTransactionId(transactionId);

From d409bfa6f67cf734265c7653c166597b59496197 Mon Sep 17 00:00:00 2001
From: jonnxu <jonnxu@163.com>
Date: Tue, 23 Jul 2019 23:08:35 +0800
Subject: [PATCH 12/14] Format the codes

---
 example/TransactionProducer.cpp        |  8 ++++----
 include/MQMessage.h                    |  2 +-
 include/SendResult.h                   |  8 +++-----
 include/TransactionSendResult.h        |  6 +-----
 src/MQClientAPIImpl.cpp                | 18 ++++++++----------
 src/MQClientAPIImpl.h                  | 13 ++++++-------
 src/MQClientFactory.cpp                | 13 +++++++------
 src/MQClientFactory.h                  |  6 ++++--
 src/message/MQMessageId.h              |  7 +++----
 src/producer/SendResult.cpp            | 22 +++++++++++-----------
 src/producer/TransactionMQProducer.cpp |  1 -
 src/protocol/CommandHeader.cpp         |  4 +---
 12 files changed, 49 insertions(+), 59 deletions(-)

diff --git a/example/TransactionProducer.cpp b/example/TransactionProducer.cpp
index 72ae11b9b..1aabb0887 100644
--- a/example/TransactionProducer.cpp
+++ b/example/TransactionProducer.cpp
@@ -35,19 +35,19 @@ TpsReportService g_tps;
 
 class MyTransactionListener : public TransactionListener {
   virtual LocalTransactionState executeLocalTransaction(const MQMessage& msg, void* arg) {
-
     if (!arg) {
-      std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: COMMIT_MESAGE " << endl;
+      std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId()
+                << ", return state: COMMIT_MESAGE " << endl;
       return LocalTransactionState::COMMIT_MESSAGE;
     }
 
     LocalTransactionState state = (LocalTransactionState)(*(int*)arg % 3);
-    std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: " << state << endl;
+    std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: " << state
+              << endl;
     return state;
   }
 
   virtual LocalTransactionState checkLocalTransaction(const MQMessageExt& msg) {
-
     std::cout << "checkLocalTransaction enter msg:" << msg.toString() << endl;
     return LocalTransactionState::COMMIT_MESSAGE;
   }
diff --git a/include/MQMessage.h b/include/MQMessage.h
index 89ff84ed2..70fab3613 100644
--- a/include/MQMessage.h
+++ b/include/MQMessage.h
@@ -74,7 +74,7 @@ class ROCKETMQCLIENT_API MQMessage {
   void setBody(const std::string& body);
 
   void setTransactionId(const std::string& id) { m_transactionId = id; }
-  std::string getTransactionId() const     { return m_transactionId; }
+  std::string getTransactionId() const { return m_transactionId; }
 
   std::map<std::string, std::string> getProperties() const;
   void setProperties(std::map<std::string, std::string>& properties);
diff --git a/include/SendResult.h b/include/SendResult.h
index f8be3d209..870d03bc4 100644
--- a/include/SendResult.h
+++ b/include/SendResult.h
@@ -39,9 +39,7 @@ class ROCKETMQCLIENT_API SendResult {
   SendResult(const SendResult& other);
   SendResult& operator=(const SendResult& other);
 
-  void setTransactionId(const std::string& id) { 
-      m_transactionId = id;
-  }
+  void setTransactionId(const std::string& id) { m_transactionId = id; }
 
   std::string getTransactionId() { return m_transactionId; }
 
@@ -51,7 +49,7 @@ class ROCKETMQCLIENT_API SendResult {
   MQMessageQueue getMessageQueue() const;
   int64 getQueueOffset() const;
   std::string toString() const;
-  
+
  private:
   SendStatus m_sendStatus;
   std::string m_msgId;
@@ -62,5 +60,5 @@ class ROCKETMQCLIENT_API SendResult {
 };
 
 //<!***************************************************************************
-}  //<!end namespace;
+}  // namespace rocketmq
 #endif
diff --git a/include/TransactionSendResult.h b/include/TransactionSendResult.h
index cf28465fb..0bb1e480e 100644
--- a/include/TransactionSendResult.h
+++ b/include/TransactionSendResult.h
@@ -22,11 +22,7 @@
 
 namespace rocketmq {
 
-enum LocalTransactionState {
-  COMMIT_MESSAGE,
-  ROLLBACK_MESSAGE,
-  UNKNOWN
-};
+enum LocalTransactionState { COMMIT_MESSAGE, ROLLBACK_MESSAGE, UNKNOWN };
 
 class ROCKETMQCLIENT_API TransactionSendResult : public SendResult {
  public:
diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index 7381cedbb..337fbe043 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -209,14 +209,12 @@ void MQClientAPIImpl::createTopic(const string& addr,
   THROW_MQEXCEPTION(MQBrokerException, "response is null", -1);
 }
 
-void MQClientAPIImpl::endTransactionOneway(
-  std::string addr,
-  EndTransactionRequestHeader* requestHeader,
-  std::string remark,
-  const SessionCredentials& sessionCredentials) {
-
+void MQClientAPIImpl::endTransactionOneway(std::string addr,
+                                           EndTransactionRequestHeader* requestHeader,
+                                           std::string remark,
+                                           const SessionCredentials& sessionCredentials) {
   RemotingCommand request(END_TRANSACTION, requestHeader);
-  request.setRemark(remark);  
+  request.setRemark(remark);
   callSignatureBeforeRequest(addr, request, sessionCredentials);
   request.Encode();
   m_pRemotingClient->invokeOneway(addr, request);
@@ -388,8 +386,8 @@ SendResult MQClientAPIImpl::sendMessageSync(const string& addr,
   if (pResponse != NULL) {
     try {
       SendResult result = processSendResponse(brokerName, msg, pResponse.get());
-      LOG_DEBUG("sendMessageSync success:%s to addr:%s,brokername:%s, send status:%d", msg.toString().c_str(), addr.c_str(),
-                brokerName.c_str(), (int)result.getSendStatus());
+      LOG_DEBUG("sendMessageSync success:%s to addr:%s,brokername:%s, send status:%d", msg.toString().c_str(),
+                addr.c_str(), brokerName.c_str(), (int)result.getSendStatus());
       return result;
     } catch (...) {
       LOG_ERROR("send error");
@@ -933,4 +931,4 @@ void MQClientAPIImpl::unlockBatchMQ(const string& addr,
 }
 
 //<!************************************************************************
-}  //<!end namespace;
+}  // namespace rocketmq
diff --git a/src/MQClientAPIImpl.h b/src/MQClientAPIImpl.h
index 27e95c86a..08a8db057 100644
--- a/src/MQClientAPIImpl.h
+++ b/src/MQClientAPIImpl.h
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- 
+
 #ifndef __MQCLIENTAPIIMPL_H__
 #define __MQCLIENTAPIIMPL_H__
 #include "AsyncCallback.h"
@@ -61,11 +61,10 @@ class MQClientAPIImpl {
                    const string& defaultTopic,
                    TopicConfig topicConfig,
                    const SessionCredentials& sessionCredentials);
-  void endTransactionOneway(
-    std::string addr,
-    EndTransactionRequestHeader* requestHeader,
-    std::string remark,
-    const SessionCredentials& sessionCredentials);
+  void endTransactionOneway(std::string addr,
+                            EndTransactionRequestHeader* requestHeader,
+                            std::string remark,
+                            const SessionCredentials& sessionCredentials);
 
   SendResult sendMessage(const string& addr,
                          const string& brokerName,
@@ -221,6 +220,6 @@ class MQClientAPIImpl {
   bool m_firstFetchNameSrv;
   string m_mqClientId;
 };
-}  //<!end namespace;
+}  // namespace rocketmq
 //<!***************************************************************************
 #endif
diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp
index 05357b971..126517d28 100644
--- a/src/MQClientFactory.cpp
+++ b/src/MQClientFactory.cpp
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 #include "MQClientFactory.h"
-#include "TransactionMQProducer.h"
 #include "ConsumerRunningInfo.h"
 #include "Logging.h"
 #include "MQClientManager.h"
 #include "PullRequest.h"
 #include "Rebalance.h"
 #include "TopicPublishInfo.h"
+#include "TransactionMQProducer.h"
 
 #define MAX_BUFF_SIZE 8192
 #define SAFE_BUFF_SIZE 7936  // 8192 - 256 = 7936
@@ -668,7 +668,8 @@ FindBrokerResult* MQClientFactory::findBrokerAddressInAdmin(const string& broker
   return NULL;
 }
 
-void MQClientFactory::checkTransactionState(const std::string& addr, const MQMessageExt& messageExt,
+void MQClientFactory::checkTransactionState(const std::string& addr,
+                                            const MQMessageExt& messageExt,
                                             const CheckTransactionStateRequestHeader& checkRequestHeader) {
   string group = messageExt.getProperty(MQMessage::PROPERTY_PRODUCER_GROUP);
   if (!group.empty()) {
@@ -676,8 +677,9 @@ void MQClientFactory::checkTransactionState(const std::string& addr, const MQMes
     if (producer != nullptr) {
       TransactionMQProducer* transProducer = dynamic_cast<TransactionMQProducer*>(producer);
       if (transProducer != nullptr) {
-        transProducer->checkTransactionState(addr, messageExt, 
-            checkRequestHeader.m_tranStateTableOffset, checkRequestHeader.m_commitLogOffset, checkRequestHeader.m_msgId, checkRequestHeader.m_transactionId, checkRequestHeader.m_offsetMsgId);
+        transProducer->checkTransactionState(addr, messageExt, checkRequestHeader.m_tranStateTableOffset,
+                                             checkRequestHeader.m_commitLogOffset, checkRequestHeader.m_msgId,
+                                             checkRequestHeader.m_transactionId, checkRequestHeader.m_offsetMsgId);
       } else {
         LOG_ERROR("checkTransactionState, producer not TransactionMQProducer failed, msg:%s",
                   messageExt.toString().data());
@@ -863,7 +865,6 @@ void MQClientFactory::doRebalanceByConsumerGroup(const string& consumerGroup) {
 void MQClientFactory::endTransactionOneway(const MQMessageQueue& mq,
                                            EndTransactionRequestHeader* requestHeader,
                                            const SessionCredentials& sessionCredentials) {
-  
   string brokerAddr = findBrokerAddressInPublish(mq.getBrokerName());
   string remark = "";
   if (!brokerAddr.empty()) {
@@ -1144,4 +1145,4 @@ void MQClientFactory::getSessionCredentialsFromOneOfProducerOrConsumer(SessionCr
 }
 
 //<!************************************************************************
-}  //<!end namespace;
+}  // namespace rocketmq
diff --git a/src/MQClientFactory.h b/src/MQClientFactory.h
index e6b895975..067198f3d 100644
--- a/src/MQClientFactory.h
+++ b/src/MQClientFactory.h
@@ -72,7 +72,9 @@ class MQClientFactory {
   void endTransactionOneway(const MQMessageQueue& mq,
                             EndTransactionRequestHeader* requestHeader,
                             const SessionCredentials& sessionCredentials);
-  void checkTransactionState(const std::string& addr, const MQMessageExt& message, const CheckTransactionStateRequestHeader& checkRequestHeader);
+  void checkTransactionState(const std::string& addr,
+                             const MQMessageExt& message,
+                             const CheckTransactionStateRequestHeader& checkRequestHeader);
   MQClientAPIImpl* getMQClientAPIImpl() const;
   MQProducer* selectProducer(const string& group);
   MQConsumer* selectConsumer(const string& group);
@@ -202,6 +204,6 @@ class MQClientFactory {
   unique_ptr<boost::thread> m_consumer_async_service_thread;
 };
 
-}  //<!end namespace;
+}  // namespace rocketmq
 
 #endif
diff --git a/src/message/MQMessageId.h b/src/message/MQMessageId.h
index 61e1fd203..fbe937bad 100644
--- a/src/message/MQMessageId.h
+++ b/src/message/MQMessageId.h
@@ -24,12 +24,11 @@ namespace rocketmq {
 //<!***************************************************************************
 class MQMessageId {
  public:
-    
-  MQMessageId(){}
+  MQMessageId() {}
   MQMessageId(sockaddr address, int64 offset) : m_address(address), m_offset(offset) {}
   MQMessageId& operator=(const MQMessageId& id) {
     if (&id == this) {
-        return *this;
+      return *this;
     }
     this->m_address = id.m_address;
     this->m_offset = id.m_offset;
@@ -49,6 +48,6 @@ class MQMessageId {
   int64 m_offset;
 };
 
-}  //<!end namespace;
+}  // namespace rocketmq
 
 #endif
diff --git a/src/producer/SendResult.cpp b/src/producer/SendResult.cpp
index fb3f5b8d3..6c5576902 100644
--- a/src/producer/SendResult.cpp
+++ b/src/producer/SendResult.cpp
@@ -15,9 +15,9 @@
  * limitations under the License.
  */
 #include "SendResult.h"
+#include <sstream>
 #include "UtilAll.h"
 #include "VirtualEnvUtil.h"
-#include <sstream>
 
 namespace rocketmq {
 //<!***************************************************************************
@@ -76,16 +76,16 @@ int64 SendResult::getQueueOffset() const {
 }
 
 std::string SendResult::toString() const {
-    stringstream ss;
-    ss << "SendResult: ";
-    ss << "sendStatus:" << m_sendStatus;
-    ss << ",msgId:" << m_msgId;
-    ss << ",offsetMsgId:" << m_offsetMsgId;
-    ss << ",queueOffset:" << m_queueOffset;
-    ss << ",transactionId:" << m_transactionId;   
-    ss << ",messageQueue:" << m_messageQueue.toString();
-    return ss.str();
+  stringstream ss;
+  ss << "SendResult: ";
+  ss << "sendStatus:" << m_sendStatus;
+  ss << ",msgId:" << m_msgId;
+  ss << ",offsetMsgId:" << m_offsetMsgId;
+  ss << ",queueOffset:" << m_queueOffset;
+  ss << ",transactionId:" << m_transactionId;
+  ss << ",messageQueue:" << m_messageQueue.toString();
+  return ss.str();
 }
 
 //<!************************************************************************
-}  //<!end namespace;
+}  // namespace rocketmq
diff --git a/src/producer/TransactionMQProducer.cpp b/src/producer/TransactionMQProducer.cpp
index 4b2071fda..fbd78c5ac 100644
--- a/src/producer/TransactionMQProducer.cpp
+++ b/src/producer/TransactionMQProducer.cpp
@@ -137,7 +137,6 @@ void TransactionMQProducer::checkTransactionState(const std::string& addr,
                                                   const std::string& msgId,
                                                   const std::string& transactionId,
                                                   const std::string& offsetMsgId) {
-                                                  
   LOG_DEBUG("checkTransactionState: msgId:%s, transactionId:%s", msgId.data(), transactionId.data());
   if (!m_transactionListener) {
     LOG_WARN("checkTransactionState, transactionListener null");
diff --git a/src/protocol/CommandHeader.cpp b/src/protocol/CommandHeader.cpp
index 6f5898b43..f41744edd 100644
--- a/src/protocol/CommandHeader.cpp
+++ b/src/protocol/CommandHeader.cpp
@@ -63,7 +63,6 @@ void CreateTopicRequestHeader::SetDeclaredFieldOfCommandHeader(map<string, strin
 void CheckTransactionStateRequestHeader::Encode(Json::Value& outData) {}
 
 CommandHeader* CheckTransactionStateRequestHeader::Decode(Json::Value& ext) {
-
   CheckTransactionStateRequestHeader* h = new CheckTransactionStateRequestHeader();
   Json::Value& tempValue = ext["msgId"];
   if (tempValue.isString()) {
@@ -123,7 +122,6 @@ void EndTransactionRequestHeader::Encode(Json::Value& outData) {
 }
 
 void EndTransactionRequestHeader::SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap) {
-
   requestMap.insert(pair<string, string>("msgId", m_msgId));
   requestMap.insert(pair<string, string>("transactionId", m_transactionId));
   requestMap.insert(pair<string, string>("producerGroup", m_producerGroup));
@@ -618,4 +616,4 @@ const string NotifyConsumerIdsChangedRequestHeader::getGroup() const {
 }
 
 //<!************************************************************************
-}  //<!end namespace;
+}  // namespace rocketmq

From 64c6190b1379a79d1a5d011ae8b007e22fb1501a Mon Sep 17 00:00:00 2001
From: jonnxu <jonnxu@163.com>
Date: Wed, 24 Jul 2019 09:21:04 +0800
Subject: [PATCH 13/14] Format the code

---
 src/protocol/CommandHeader.h | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/src/protocol/CommandHeader.h b/src/protocol/CommandHeader.h
index 22f61005e..4a80ecf9c 100644
--- a/src/protocol/CommandHeader.h
+++ b/src/protocol/CommandHeader.h
@@ -54,6 +54,7 @@ class CheckTransactionStateRequestHeader : public CommandHeader {
   static CommandHeader* Decode(Json::Value& ext);
   virtual void SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap);
   std::string toString();
+
  public:
   long m_tranStateTableOffset;
   long m_commitLogOffset;
@@ -482,6 +483,6 @@ class NotifyConsumerIdsChangedRequestHeader : public CommandHeader {
 };
 
 //<!***************************************************************************
-}  //<!end namespace;
+}  // namespace rocketmq
 
 #endif

From a559c1ef66cf316859653994a31477a71a5c34f8 Mon Sep 17 00:00:00 2001
From: jonnxu <jonnxu@163.com>
Date: Wed, 24 Jul 2019 10:23:47 +0800
Subject: [PATCH 14/14] Update code

---
 src/transport/ClientRemotingProcessor.cpp | 1 +
 1 file changed, 1 insertion(+)

diff --git a/src/transport/ClientRemotingProcessor.cpp b/src/transport/ClientRemotingProcessor.cpp
index 46396cec8..63736c5ba 100644
--- a/src/transport/ClientRemotingProcessor.cpp
+++ b/src/transport/ClientRemotingProcessor.cpp
@@ -177,6 +177,7 @@ RemotingCommand* ClientRemotingProcessor::checkTransactionState(const std::strin
       return nullptr;
     }
 
+    MQMessageExt& messageExt = mqvec[0];
     string transactionId = messageExt.getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
     if (transactionId != "") {
       messageExt.setTransactionId(transactionId);