From 5fd8e3d6751ef28eb8dcc33b6146d428712d656f Mon Sep 17 00:00:00 2001 From: ShannonDing Date: Wed, 18 Mar 2020 17:26:20 +0800 Subject: [PATCH] feat(trace): add message trace shift for C style apis. --- include/CCommon.h | 1 + include/CProducer.h | 1 + include/CPushConsumer.h | 1 + include/TransactionMQProducer.h | 3 ++- src/extern/CProducer.cpp | 18 ++++++++++++++++++ src/extern/CPushConsumer.cpp | 9 ++++++++- src/producer/TransactionMQProducer.cpp | 7 ++++++- test/src/extern/CProducerTest.cpp | 4 ++++ test/src/extern/CPushConsumerTest.cpp | 2 ++ 9 files changed, 43 insertions(+), 3 deletions(-) diff --git a/include/CCommon.h b/include/CCommon.h index fa6edb9c3..ae8a9e1b3 100644 --- a/include/CCommon.h +++ b/include/CCommon.h @@ -83,6 +83,7 @@ typedef enum _CLogLevel_ { #endif typedef enum _CMessageModel_ { BROADCASTING, CLUSTERING } CMessageModel; +typedef enum _CTraceModel_ { OPEN, CLOSE } CTraceModel; #ifdef __cplusplus } diff --git a/include/CProducer.h b/include/CProducer.h index fa6a73052..296b13f71 100644 --- a/include/CProducer.h +++ b/include/CProducer.h @@ -63,6 +63,7 @@ ROCKETMQCLIENT_API int SetProducerLogLevel(CProducer* producer, CLogLevel level) ROCKETMQCLIENT_API int SetProducerSendMsgTimeout(CProducer* producer, int timeout); ROCKETMQCLIENT_API int SetProducerCompressLevel(CProducer* producer, int level); ROCKETMQCLIENT_API int SetProducerMaxMessageSize(CProducer* producer, int size); +ROCKETMQCLIENT_API int SetProducerMessageTrace(CProducer* consumer, CTraceModel openTrace); ROCKETMQCLIENT_API int SendMessageSync(CProducer* producer, CMessage* msg, CSendResult* result); ROCKETMQCLIENT_API int SendBatchMessage(CProducer* producer, CBatchMessage* msg, CSendResult* result); diff --git a/include/CPushConsumer.h b/include/CPushConsumer.h index e2bce18a8..5ed83f3a7 100644 --- a/include/CPushConsumer.h +++ b/include/CPushConsumer.h @@ -59,6 +59,7 @@ ROCKETMQCLIENT_API int SetPushConsumerLogLevel(CPushConsumer* consumer, CLogLeve ROCKETMQCLIENT_API int SetPushConsumerMessageModel(CPushConsumer* consumer, CMessageModel messageModel); ROCKETMQCLIENT_API int SetPushConsumerMaxCacheMessageSize(CPushConsumer* consumer, int maxCacheSize); ROCKETMQCLIENT_API int SetPushConsumerMaxCacheMessageSizeInMb(CPushConsumer* consumer, int maxCacheSizeInMb); +ROCKETMQCLIENT_API int SetPushConsumerMessageTrace(CPushConsumer* consumer, CTraceModel openTrace); #ifdef __cplusplus } diff --git a/include/TransactionMQProducer.h b/include/TransactionMQProducer.h index f4c0281a8..784de6ce3 100644 --- a/include/TransactionMQProducer.h +++ b/include/TransactionMQProducer.h @@ -79,7 +79,8 @@ class ROCKETMQCLIENT_API TransactionMQProducer { void setLogLevel(elogLevel inputLevel); elogLevel getLogLevel(); void setLogFileSizeAndNum(int fileNum, long perFileSize); // perFileSize is MB unit - + void setMessageTrace(bool messageTrace); + bool getMessageTrace() const; std::shared_ptr getTransactionListener(); void setTransactionListener(TransactionListener* listener); TransactionSendResult sendMessageInTransaction(MQMessage& msg, void* arg); diff --git a/src/extern/CProducer.cpp b/src/extern/CProducer.cpp index 00a9b0508..0b139e22f 100644 --- a/src/extern/CProducer.cpp +++ b/src/extern/CProducer.cpp @@ -797,6 +797,24 @@ int SetProducerMaxMessageSize(CProducer* producer, int size) { } return OK; } +int SetProducerMessageTrace(CProducer* producer, CTraceModel openTrace) { + if (producer == NULL) { + return NULL_POINTER; + } + DefaultProducer* defaultMQProducer = (DefaultProducer*)producer; + bool messageTrace = openTrace == OPEN ? true : false; + try { + if (CAPI_C_PRODUCER_TYPE_TRANSACTION == defaultMQProducer->producerType) { + defaultMQProducer->innerTransactionProducer->setMessageTrace(messageTrace); + } else { + defaultMQProducer->innerProducer->setMessageTrace(messageTrace); + } + } catch (exception& e) { + MQClientErrorContainer::setErr(string(e.what())); + return PRODUCER_START_FAILED; + } + return OK; +} #ifdef __cplusplus }; #endif diff --git a/src/extern/CPushConsumer.cpp b/src/extern/CPushConsumer.cpp index 5ee89ca7e..77da3afba 100644 --- a/src/extern/CPushConsumer.cpp +++ b/src/extern/CPushConsumer.cpp @@ -296,7 +296,14 @@ int SetPushConsumerLogLevel(CPushConsumer* consumer, CLogLevel level) { ((DefaultMQPushConsumer*)consumer)->setLogLevel((elogLevel)level); return OK; } - +int SetPushConsumerMessageTrace(CPushConsumer* consumer, CTraceModel openTrace) { + if (consumer == NULL) { + return NULL_POINTER; + } + bool messageTrace = openTrace == OPEN ? true : false; + ((DefaultMQPushConsumer*)consumer)->setMessageTrace(messageTrace); + return OK; +} #ifdef __cplusplus }; #endif diff --git a/src/producer/TransactionMQProducer.cpp b/src/producer/TransactionMQProducer.cpp index c246a8089..97ed1b3e8 100644 --- a/src/producer/TransactionMQProducer.cpp +++ b/src/producer/TransactionMQProducer.cpp @@ -160,7 +160,12 @@ void TransactionMQProducer::setUnitName(std::string unitName) { const std::string& TransactionMQProducer::getUnitName() const { return impl->getUnitName(); } - +void TransactionMQProducer::setMessageTrace(bool messageTrace) { + impl->setMessageTrace(messageTrace); +} +bool TransactionMQProducer::getMessageTrace() const { + return impl->getMessageTrace(); +} std::shared_ptr TransactionMQProducer::getTransactionListener() { return impl->getTransactionListener(); } diff --git a/test/src/extern/CProducerTest.cpp b/test/src/extern/CProducerTest.cpp index 7798fe6f4..9a5156fea 100644 --- a/test/src/extern/CProducerTest.cpp +++ b/test/src/extern/CProducerTest.cpp @@ -228,6 +228,10 @@ TEST(cProducer, info) { EXPECT_EQ(SetProducerSessionCredentials(cProducer, "accessKey", "secretKey", "channel"), OK); SessionCredentials sessionCredentials = defaultMQProducer->getSessionCredentials(); EXPECT_EQ(sessionCredentials.getAccessKey(), "accessKey"); + + EXPECT_EQ(SetProducerMessageTrace(cProducer, OPEN), OK); + EXPECT_EQ(defaultMQProducer->getMessageTrace(), true); + Mock::AllowLeak(defaultMQProducer); } diff --git a/test/src/extern/CPushConsumerTest.cpp b/test/src/extern/CPushConsumerTest.cpp index 2eff2e7b0..6b516ab84 100644 --- a/test/src/extern/CPushConsumerTest.cpp +++ b/test/src/extern/CPushConsumerTest.cpp @@ -118,6 +118,8 @@ TEST(cPushComsumer, info) { EXPECT_EQ(SetPushConsumerMessageModel(cpushConsumer, BROADCASTING), OK); EXPECT_EQ(mqPushConsumer->getMessageModel(), MessageModel::BROADCASTING); + EXPECT_EQ(SetPushConsumerMessageTrace(cpushConsumer, CLOSE), OK); + EXPECT_EQ(mqPushConsumer->getMessageTrace(), false); Mock::AllowLeak(mqPushConsumer); }