diff --git a/include/CCommon.h b/include/CCommon.h index 0fbcbdadf..fa6edb9c3 100644 --- a/include/CCommon.h +++ b/include/CCommon.h @@ -25,6 +25,8 @@ extern "C" { #define MAX_MESSAGE_ID_LENGTH 256 #define MAX_TOPIC_LENGTH 512 #define MAX_BROKER_NAME_ID_LENGTH 256 +#define MAX_SDK_VERSION_LENGTH 256 +#define DEFAULT_SDK_VERSION "DefaultVersion" typedef enum _CStatus_ { // Success OK = 0, diff --git a/include/CProducer.h b/include/CProducer.h index 09b23bae4..fa6a73052 100644 --- a/include/CProducer.h +++ b/include/CProducer.h @@ -47,6 +47,7 @@ ROCKETMQCLIENT_API CProducer* CreateTransactionProducer(const char* groupId, ROCKETMQCLIENT_API int DestroyProducer(CProducer* producer); ROCKETMQCLIENT_API int StartProducer(CProducer* producer); ROCKETMQCLIENT_API int ShutdownProducer(CProducer* producer); +ROCKETMQCLIENT_API const char* ShowProducerVersion(CProducer* producer); ROCKETMQCLIENT_API int SetProducerNameServerAddress(CProducer* producer, const char* namesrv); ROCKETMQCLIENT_API int SetProducerNameServerDomain(CProducer* producer, const char* domain); diff --git a/include/CPullConsumer.h b/include/CPullConsumer.h index ec960056d..60a33679d 100644 --- a/include/CPullConsumer.h +++ b/include/CPullConsumer.h @@ -33,6 +33,8 @@ ROCKETMQCLIENT_API CPullConsumer* CreatePullConsumer(const char* groupId); ROCKETMQCLIENT_API int DestroyPullConsumer(CPullConsumer* consumer); ROCKETMQCLIENT_API int StartPullConsumer(CPullConsumer* consumer); ROCKETMQCLIENT_API int ShutdownPullConsumer(CPullConsumer* consumer); +ROCKETMQCLIENT_API const char* ShowPullConsumerVersion(CPullConsumer* consumer); + ROCKETMQCLIENT_API int SetPullConsumerGroupID(CPullConsumer* consumer, const char* groupId); ROCKETMQCLIENT_API const char* GetPullConsumerGroupID(CPullConsumer* consumer); ROCKETMQCLIENT_API int SetPullConsumerNameServerAddress(CPullConsumer* consumer, const char* namesrv); diff --git a/include/CPushConsumer.h b/include/CPushConsumer.h index 4880d0f8a..e2bce18a8 100644 --- a/include/CPushConsumer.h +++ b/include/CPushConsumer.h @@ -36,6 +36,7 @@ ROCKETMQCLIENT_API CPushConsumer* CreatePushConsumer(const char* groupId); ROCKETMQCLIENT_API int DestroyPushConsumer(CPushConsumer* consumer); ROCKETMQCLIENT_API int StartPushConsumer(CPushConsumer* consumer); ROCKETMQCLIENT_API int ShutdownPushConsumer(CPushConsumer* consumer); +ROCKETMQCLIENT_API const char* ShowPushConsumerVersion(CPushConsumer* consumer); ROCKETMQCLIENT_API int SetPushConsumerGroupID(CPushConsumer* consumer, const char* groupId); ROCKETMQCLIENT_API const char* GetPushConsumerGroupID(CPushConsumer* consumer); ROCKETMQCLIENT_API int SetPushConsumerNameServerAddress(CPushConsumer* consumer, const char* namesrv); diff --git a/include/DefaultMQProducer.h b/include/DefaultMQProducer.h index 4614244a0..991d27de5 100644 --- a/include/DefaultMQProducer.h +++ b/include/DefaultMQProducer.h @@ -35,6 +35,7 @@ class ROCKETMQCLIENT_API DefaultMQProducer { virtual void start(); virtual void shutdown(); + virtual std::string version(); virtual SendResult send(MQMessage& msg, bool bSelectActiveBroker = false); virtual SendResult send(MQMessage& msg, const MQMessageQueue& mq); diff --git a/include/DefaultMQPullConsumer.h b/include/DefaultMQPullConsumer.h index 203578f48..6ed6c62e3 100644 --- a/include/DefaultMQPullConsumer.h +++ b/include/DefaultMQPullConsumer.h @@ -41,6 +41,7 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer { virtual void start(); virtual void shutdown(); + virtual std::string version(); const std::string& getNamesrvAddr() const; void setNamesrvAddr(const std::string& namesrvAddr); diff --git a/include/DefaultMQPushConsumer.h b/include/DefaultMQPushConsumer.h index 2b8f672ac..5b10b3df0 100644 --- a/include/DefaultMQPushConsumer.h +++ b/include/DefaultMQPushConsumer.h @@ -36,6 +36,7 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer { virtual void start(); virtual void shutdown(); + virtual std::string version(); const std::string& getNamesrvAddr() const; void setNamesrvAddr(const std::string& namesrvAddr); diff --git a/include/TransactionMQProducer.h b/include/TransactionMQProducer.h index 9de728a0d..f4c0281a8 100644 --- a/include/TransactionMQProducer.h +++ b/include/TransactionMQProducer.h @@ -36,6 +36,7 @@ class ROCKETMQCLIENT_API TransactionMQProducer { void start(); void shutdown(); + std::string version(); const std::string& getNamesrvAddr() const; void setNamesrvAddr(const std::string& namesrvAddr); diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp index fdbdb4aa3..f64818aad 100644 --- a/src/MQClientFactory.cpp +++ b/src/MQClientFactory.cpp @@ -1179,6 +1179,8 @@ ConsumerRunningInfo* MQClientFactory::consumerRunningInfo(const string& consumer runningInfo->setProperty( ConsumerRunningInfo::PROP_CLIENT_VERSION, MQVersion::GetVersionDesc(MQVersion::s_CurrentVersion)); // MQVersion::s_CurrentVersion )); + runningInfo->setProperty(ConsumerRunningInfo::PROP_CLIENT_SDK_VERSION, + pConsumer->getClientVersionString()); // in DefaultMQClient.cpp; return runningInfo; } diff --git a/src/MQClientFactory.h b/src/MQClientFactory.h index d36897da2..51f554ace 100644 --- a/src/MQClientFactory.h +++ b/src/MQClientFactory.h @@ -143,7 +143,6 @@ class MQClientFactory { void doRebalance(); void timerCB_doRebalance(boost::system::error_code& ec, boost::shared_ptr t); bool getSessionCredentialFromConsumerTable(SessionCredentials& sessionCredentials); - bool addConsumerToTable(const string& consumerName, MQConsumer* pMQConsumer); void eraseConsumerFromTable(const string& consumerName); int getConsumerTableSize(); void getTopicListFromConsumerSubscription(set& topicList); @@ -152,7 +151,6 @@ class MQClientFactory { // producer related operation bool getSessionCredentialFromProducerTable(SessionCredentials& sessionCredentials); - bool addProducerToTable(const string& producerName, MQProducer* pMQProducer); void eraseProducerFromTable(const string& producerName); int getProducerTableSize(); void insertProducerInfoToHeartBeatData(HeartbeatData* pHeartbeatData); @@ -171,6 +169,9 @@ class MQClientFactory { unique_ptr m_pClientAPIImpl; unique_ptr m_pClientRemotingProcessor; + bool addProducerToTable(const string& producerName, MQProducer* pMQProducer); + bool addConsumerToTable(const string& consumerName, MQConsumer* pMQConsumer); + private: string m_nameSrvDomain; // per clientId ServiceState m_serviceState; diff --git a/src/common/DefaultMQClient.cpp b/src/common/DefaultMQClient.cpp index f468b7fe5..39013fc80 100644 --- a/src/common/DefaultMQClient.cpp +++ b/src/common/DefaultMQClient.cpp @@ -15,7 +15,7 @@ * limitations under the License. */ -#include "include/DefaultMQClient.h" +#include "DefaultMQClient.h" #include "Logging.h" #include "MQClientFactory.h" #include "MQClientManager.h" @@ -24,11 +24,11 @@ #include "UtilAll.h" namespace rocketmq { - +// hard code first. #define ROCKETMQCPP_VERSION "2.0.0" -#define BUILD_DATE "02-14-2020" +#define BUILD_DATE "22:50:18 02-14-2020" // display version: strings bin/librocketmq.so |grep VERSION -const char* rocketmq_build_time = "VERSION: " ROCKETMQCPP_VERSION ", BUILD DATE: " BUILD_DATE " "; +const char* rocketmq_build_time = "CPP CORE VERSION: " ROCKETMQCPP_VERSION ", BUILD TIME: " BUILD_DATE; //shutdown(); } - +std::string DefaultMQPullConsumer::version() { + std::string versions = impl->getClientVersionString(); + /*versions.append(", PROTOCOL VERSION: ") + .append(MQVersion::GetVersionDesc(MQVersion::s_CurrentVersion)) + .append(", LANGUAGE: ") + .append(MQVersion::s_CurrentLanguage);*/ + return versions; +} // start mqclient set const std::string& DefaultMQPullConsumer::getNamesrvAddr() const { return impl->getNamesrvAddr(); diff --git a/src/consumer/DefaultMQPullConsumerImpl.cpp b/src/consumer/DefaultMQPullConsumerImpl.cpp index 640884232..cd49548be 100644 --- a/src/consumer/DefaultMQPullConsumerImpl.cpp +++ b/src/consumer/DefaultMQPullConsumerImpl.cpp @@ -63,7 +63,9 @@ void DefaultMQPullConsumerImpl::start() { sa.sa_flags = 0; sigaction(SIGPIPE, &sa, 0); #endif + LOG_INFO("###Current Pull Consumer@%s", getClientVersionString().c_str()); dealWithNameSpace(); + showClientConfigs(); switch (m_serviceState) { case CREATE_JUST: { m_serviceState = START_FAILED; diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp index dc02d9d34..5034051b5 100644 --- a/src/consumer/DefaultMQPushConsumer.cpp +++ b/src/consumer/DefaultMQPushConsumer.cpp @@ -16,6 +16,7 @@ */ #include "DefaultMQPushConsumer.h" +#include #include "DefaultMQPushConsumerImpl.h" namespace rocketmq { @@ -34,7 +35,14 @@ void DefaultMQPushConsumer::start() { void DefaultMQPushConsumer::shutdown() { impl->shutdown(); } - +std::string DefaultMQPushConsumer::version() { + std::string versions = impl->getClientVersionString(); + /*versions.append(", PROTOCOL VERSION: ") + .append(MQVersion::GetVersionDesc(MQVersion::s_CurrentVersion)) + .append(", LANGUAGE: ") + .append(MQVersion::s_CurrentLanguage);*/ + return versions; +} // ConsumeType DefaultMQPushConsumer::getConsumeType() { // return impl->getConsumeType(); //} diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp index 54500bcec..5fe6f9c3b 100644 --- a/src/consumer/DefaultMQPushConsumerImpl.cpp +++ b/src/consumer/DefaultMQPushConsumerImpl.cpp @@ -193,6 +193,7 @@ class AsyncPullCallback : public PullCallback { //producerType = CAPI_C_PRODUCER_TYPE_COMMON; defaultMQProducer->innerProducer = new DefaultMQProducer(groupId); + defaultMQProducer->version = new char[MAX_SDK_VERSION_LENGTH]; + strncpy(defaultMQProducer->version, defaultMQProducer->innerProducer->version().c_str(), MAX_SDK_VERSION_LENGTH - 1); + defaultMQProducer->version[MAX_SDK_VERSION_LENGTH - 1] = 0; defaultMQProducer->innerTransactionProducer = NULL; defaultMQProducer->listenerInner = NULL; return (CProducer*)defaultMQProducer; @@ -236,6 +240,10 @@ CProducer* CreateOrderlyProducer(const char* groupId) { DefaultProducer* defaultMQProducer = new DefaultProducer(); defaultMQProducer->producerType = CAPI_C_PRODUCER_TYPE_ORDERLY; defaultMQProducer->innerProducer = new DefaultMQProducer(groupId); + + defaultMQProducer->version = new char[MAX_SDK_VERSION_LENGTH]; + strncpy(defaultMQProducer->version, defaultMQProducer->innerProducer->version().c_str(), MAX_SDK_VERSION_LENGTH - 1); + defaultMQProducer->version[MAX_SDK_VERSION_LENGTH - 1] = 0; defaultMQProducer->innerTransactionProducer = NULL; defaultMQProducer->listenerInner = NULL; return (CProducer*)defaultMQProducer; @@ -252,6 +260,11 @@ CProducer* CreateTransactionProducer(const char* groupId, CLocalTransactionCheck defaultMQProducer->listenerInner = new LocalTransactionListenerInner((CProducer*)defaultMQProducer, callback, userData); defaultMQProducer->innerTransactionProducer->setTransactionListener(defaultMQProducer->listenerInner); + + defaultMQProducer->version = new char[MAX_SDK_VERSION_LENGTH]; + strncpy(defaultMQProducer->version, defaultMQProducer->innerTransactionProducer->version().c_str(), + MAX_SDK_VERSION_LENGTH - 1); + defaultMQProducer->version[MAX_SDK_VERSION_LENGTH - 1] = 0; return (CProducer*)defaultMQProducer; } int DestroyProducer(CProducer* pProducer) { @@ -259,6 +272,10 @@ int DestroyProducer(CProducer* pProducer) { return NULL_POINTER; } DefaultProducer* defaultMQProducer = (DefaultProducer*)pProducer; + if (defaultMQProducer->version != NULL) { + delete defaultMQProducer->version; + defaultMQProducer->version = NULL; + } if (CAPI_C_PRODUCER_TYPE_TRANSACTION == defaultMQProducer->producerType) { if (defaultMQProducer->innerTransactionProducer != NULL) { delete defaultMQProducer->innerTransactionProducer; @@ -307,6 +324,14 @@ int ShutdownProducer(CProducer* producer) { } return OK; } +const char* ShowProducerVersion(CProducer* producer) { + if (producer == NULL) { + return DEFAULT_SDK_VERSION; + } + DefaultProducer* defaultMQProducer = (DefaultProducer*)producer; + + return defaultMQProducer->version; +} int SetProducerNameServerAddress(CProducer* producer, const char* namesrv) { if (producer == NULL) { return NULL_POINTER; diff --git a/src/extern/CPullConsumer.cpp b/src/extern/CPullConsumer.cpp index f84a4bcbf..a0db24272 100644 --- a/src/extern/CPullConsumer.cpp +++ b/src/extern/CPullConsumer.cpp @@ -27,12 +27,14 @@ using namespace std; #ifdef __cplusplus extern "C" { #endif - +char VERSION_FOR_PULL_CONSUMER[MAX_SDK_VERSION_LENGTH]; CPullConsumer* CreatePullConsumer(const char* groupId) { if (groupId == NULL) { return NULL; } DefaultMQPullConsumer* defaultMQPullConsumer = new DefaultMQPullConsumer(groupId); + strncpy(VERSION_FOR_PULL_CONSUMER, defaultMQPullConsumer->version().c_str(), MAX_SDK_VERSION_LENGTH - 1); + VERSION_FOR_PULL_CONSUMER[MAX_SDK_VERSION_LENGTH - 1] = 0; return (CPullConsumer*)defaultMQPullConsumer; } int DestroyPullConsumer(CPullConsumer* consumer) { @@ -61,6 +63,13 @@ int ShutdownPullConsumer(CPullConsumer* consumer) { ((DefaultMQPullConsumer*)consumer)->shutdown(); return OK; } +const char* ShowPullConsumerVersion(CPullConsumer* consumer) { + if (consumer == NULL) { + return NULL; + } + return VERSION_FOR_PULL_CONSUMER; +} + int SetPullConsumerGroupID(CPullConsumer* consumer, const char* groupId) { if (consumer == NULL || groupId == NULL) { return NULL_POINTER; diff --git a/src/extern/CPushConsumer.cpp b/src/extern/CPushConsumer.cpp index 66ef93757..5ee89ca7e 100644 --- a/src/extern/CPushConsumer.cpp +++ b/src/extern/CPushConsumer.cpp @@ -85,13 +85,16 @@ map g_OrderListenerMap; #ifdef __cplusplus extern "C" { #endif - +char VERSION_FOR_PUSH_CONSUMER[MAX_SDK_VERSION_LENGTH]; CPushConsumer* CreatePushConsumer(const char* groupId) { if (groupId == NULL) { return NULL; } DefaultMQPushConsumer* defaultMQPushConsumer = new DefaultMQPushConsumer(groupId); defaultMQPushConsumer->setConsumeFromWhere(CONSUME_FROM_LAST_OFFSET); + + strncpy(VERSION_FOR_PUSH_CONSUMER, defaultMQPushConsumer->version().c_str(), MAX_SDK_VERSION_LENGTH - 1); + VERSION_FOR_PUSH_CONSUMER[MAX_SDK_VERSION_LENGTH - 1] = 0; return (CPushConsumer*)defaultMQPushConsumer; } int DestroyPushConsumer(CPushConsumer* consumer) { @@ -120,6 +123,13 @@ int ShutdownPushConsumer(CPushConsumer* consumer) { ((DefaultMQPushConsumer*)consumer)->shutdown(); return OK; } + +const char* ShowPushConsumerVersion(CPushConsumer* consumer) { + if (consumer == NULL) { + return NULL; + } + return VERSION_FOR_PUSH_CONSUMER; +} int SetPushConsumerGroupID(CPushConsumer* consumer, const char* groupId) { if (consumer == NULL || groupId == NULL) { return NULL_POINTER; diff --git a/src/include/DefaultMQClient.h b/src/include/DefaultMQClient.h index b1b51bdb1..a2e5ce56f 100644 --- a/src/include/DefaultMQClient.h +++ b/src/include/DefaultMQClient.h @@ -41,6 +41,7 @@ class DefaultMQClient { public: // clientid=processId-ipAddr@instanceName; + std::string getClientVersionString() const; std::string getMQClientId() const; const std::string& getNamesrvAddr() const; void setNamesrvAddr(const std::string& namesrvAddr); @@ -174,6 +175,7 @@ class DefaultMQClient { virtual void shutdown(); MQClientFactory* getFactory() const; virtual bool isServiceStateOk(); + void showClientConfigs(); protected: std::string m_namesrvAddr; diff --git a/src/producer/DefaultMQProducer.cpp b/src/producer/DefaultMQProducer.cpp index f656b1e58..197729956 100644 --- a/src/producer/DefaultMQProducer.cpp +++ b/src/producer/DefaultMQProducer.cpp @@ -16,6 +16,7 @@ */ #include "DefaultMQProducer.h" +#include #include "DefaultMQProducerImpl.h" @@ -36,6 +37,17 @@ void DefaultMQProducer::shutdown() { impl->shutdown(); } +std::string DefaultMQProducer::version() { + std::string versions = impl->getClientVersionString(); + /* + versions.append(", PROTOCOL VERSION: ") + .append(MQVersion::GetVersionDesc(MQVersion::s_CurrentVersion)) + .append(", LANGUAGE: ") + .append(MQVersion::s_CurrentLanguage); + */ + return versions; +} + // start mqclient set const std::string& DefaultMQProducer::getNamesrvAddr() const { return impl->getNamesrvAddr(); diff --git a/src/producer/DefaultMQProducerImpl.cpp b/src/producer/DefaultMQProducerImpl.cpp index aff238b4e..037f81aa8 100644 --- a/src/producer/DefaultMQProducerImpl.cpp +++ b/src/producer/DefaultMQProducerImpl.cpp @@ -63,8 +63,10 @@ void DefaultMQProducerImpl::start() { sa.sa_flags = 0; sigaction(SIGPIPE, &sa, 0); #endif + LOG_WARN("###Current Producer@%s", getClientVersionString().c_str()); // we should deal with namespaced before start. dealWithNameSpace(); + logConfigs(); switch (m_serviceState) { case CREATE_JUST: { m_serviceState = START_FAILED; @@ -634,5 +636,15 @@ bool DefaultMQProducerImpl::dealWithNameSpace() { } return true; } +void DefaultMQProducerImpl::logConfigs() { + showClientConfigs(); + + LOG_WARN("SendMsgTimeout:%d ms", m_sendMsgTimeout); + LOG_WARN("CompressMsgBodyOverHowmuch:%d", m_compressMsgBodyOverHowmuch); + LOG_WARN("MaxMessageSize:%d", m_maxMessageSize); + LOG_WARN("CompressLevel:%d", m_compressLevel); + LOG_WARN("RetryTimes:%d", m_retryTimes); + LOG_WARN("RetryTimes4Async:%d", m_retryTimes4Async); +} //& msgs); bool dealWithNameSpace(); + void logConfigs(); private: int m_sendMsgTimeout; diff --git a/src/producer/TransactionMQProducer.cpp b/src/producer/TransactionMQProducer.cpp index fe2b85521..c246a8089 100644 --- a/src/producer/TransactionMQProducer.cpp +++ b/src/producer/TransactionMQProducer.cpp @@ -16,6 +16,7 @@ */ #include "TransactionMQProducer.h" +#include #include "TransactionMQProducerImpl.h" @@ -35,7 +36,16 @@ void TransactionMQProducer::start() { void TransactionMQProducer::shutdown() { impl->shutdown(); } - +std::string TransactionMQProducer::version() { + std::string versions = impl->getClientVersionString(); + /* + versions.append(", PROTOCOL VERSION: ") + .append(MQVersion::GetVersionDesc(MQVersion::s_CurrentVersion)) + .append(", LANGUAGE: ") + .append(MQVersion::s_CurrentLanguage); + */ + return versions; +} // start mqclient set const std::string& TransactionMQProducer::getNamesrvAddr() const { return impl->getNamesrvAddr(); diff --git a/src/protocol/ConsumerRunningInfo.cpp b/src/protocol/ConsumerRunningInfo.cpp index 999686bed..951cc4b7a 100644 --- a/src/protocol/ConsumerRunningInfo.cpp +++ b/src/protocol/ConsumerRunningInfo.cpp @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ #include "ConsumerRunningInfo.h" #include "UtilAll.h" @@ -23,6 +23,7 @@ const string ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE = "PROP_THREADPOOL_C const string ConsumerRunningInfo::PROP_CONSUME_ORDERLY = "PROP_CONSUMEORDERLY"; const string ConsumerRunningInfo::PROP_CONSUME_TYPE = "PROP_CONSUME_TYPE"; const string ConsumerRunningInfo::PROP_CLIENT_VERSION = "PROP_CLIENT_VERSION"; +const string ConsumerRunningInfo::PROP_CLIENT_SDK_VERSION = "PROP_CLIENT_CORE_VERSION"; const string ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP = "PROP_CONSUMER_START_TIMESTAMP"; const map ConsumerRunningInfo::getProperties() const { @@ -82,6 +83,7 @@ string ConsumerRunningInfo::encode() { outData[PROP_CONSUMER_START_TIMESTAMP] = properties[PROP_CONSUMER_START_TIMESTAMP]; outData[PROP_CONSUME_ORDERLY] = properties[PROP_CONSUME_ORDERLY]; outData[PROP_THREADPOOL_CORE_SIZE] = properties[PROP_THREADPOOL_CORE_SIZE]; + outData[PROP_CLIENT_SDK_VERSION] = properties[PROP_CLIENT_SDK_VERSION]; Json::Value root; root["jstack"] = jstack; @@ -116,4 +118,4 @@ string ConsumerRunningInfo::encode() { return finals; } -} +} // namespace rocketmq diff --git a/src/protocol/ConsumerRunningInfo.h b/src/protocol/ConsumerRunningInfo.h index 3c83834b6..de0d4b2ab 100644 --- a/src/protocol/ConsumerRunningInfo.h +++ b/src/protocol/ConsumerRunningInfo.h @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ #ifndef __CONSUMERRUNNINGINFO_H__ #define __CONSUMERRUNNINGINFO_H__ @@ -38,6 +38,7 @@ class ConsumerRunningInfo { static const string PROP_CONSUME_ORDERLY; static const string PROP_CONSUME_TYPE; static const string PROP_CLIENT_VERSION; + static const string PROP_CLIENT_SDK_VERSION; static const string PROP_CONSUMER_START_TIMESTAMP; public: @@ -61,5 +62,5 @@ class ConsumerRunningInfo { // map statusTable; string jstack; }; -} +} // namespace rocketmq #endif diff --git a/src/transport/TcpRemotingClient.cpp b/src/transport/TcpRemotingClient.cpp index 4c708f7df..294c319d9 100644 --- a/src/transport/TcpRemotingClient.cpp +++ b/src/transport/TcpRemotingClient.cpp @@ -28,6 +28,8 @@ namespace rocketmq { //, int64, int, int)); @@ -49,60 +48,17 @@ class MockMQClientAPIImpl : public MQClientAPIImpl { uint64_t tcpConnectTimeout, uint64_t tcpTransportTryLockTimeout, string unitName) - : MQClientAPIImpl(mqClientId, - clientRemotingProcessor, - pullThreadNum, - tcpConnectTimeout, - tcpTransportTryLockTimeout, - unitName) { - m_processor = clientRemotingProcessor; - } - ClientRemotingProcessor* m_processor; - void reInitRemoteClient(TcpRemotingClient* client) { - m_pRemotingClient.reset(client); - m_pRemotingClient->registerProcessor(CHECK_TRANSACTION_STATE, m_processor); - m_pRemotingClient->registerProcessor(RESET_CONSUMER_CLIENT_OFFSET, m_processor); - m_pRemotingClient->registerProcessor(GET_CONSUMER_STATUS_FROM_CLIENT, m_processor); - m_pRemotingClient->registerProcessor(GET_CONSUMER_RUNNING_INFO, m_processor); - m_pRemotingClient->registerProcessor(NOTIFY_CONSUMER_IDS_CHANGED, m_processor); - m_pRemotingClient->registerProcessor(CONSUME_MESSAGE_DIRECTLY, m_processor); - } -}; -class MockMQClientAPIImplUtil { - public: - static MockMQClientAPIImplUtil* GetInstance() { - static MockMQClientAPIImplUtil instance; - return &instance; - } - MockMQClientAPIImpl* GetGtestMockClientAPIImpl() { - if (m_impl != nullptr) { - return m_impl; - } - string cid = "testClientId"; - int ptN = 1; - uint64_t tct = 3000; - uint64_t ttt = 3000; - string un = "central"; - SessionCredentials sc; - ClientRemotingProcessor* pp = new ClientRemotingProcessor(nullptr); - MockMQClientAPIImpl* impl = new MockMQClientAPIImpl(cid, pp, ptN, tct, ttt, un); - MockTcpRemotingClient* pClient = new MockTcpRemotingClient(ptN, tct, ttt); - impl->reInitRemoteClient(pClient); - m_impl = impl; - m_pClient = pClient; - return impl; - } - MockTcpRemotingClient* GetGtestMockRemotingClient() { return m_pClient; } - MockMQClientAPIImpl* m_impl = nullptr; - MockTcpRemotingClient* m_pClient = nullptr; + : MQClientAPIImpl(mqClientId) {} + void reInitRemoteClient(TcpRemotingClient* client) { m_pRemotingClient.reset(client); } }; TEST(MQClientAPIImplTest, getMaxOffset) { SessionCredentials sc; - MockMQClientAPIImpl* impl = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockClientAPIImpl(); + MockMQClientAPIImpl* impl = new MockMQClientAPIImpl("testMockAPIImpl", nullptr, 1, 2, 3, "testUnit"); Mock::AllowLeak(impl); - MockTcpRemotingClient* pClient = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockRemotingClient(); + MockTcpRemotingClient* pClient = new MockTcpRemotingClient(); Mock::AllowLeak(pClient); + impl->reInitRemoteClient(pClient); GetMaxOffsetResponseHeader* pHead = new GetMaxOffsetResponseHeader(); pHead->offset = 4096; RemotingCommand* pCommandFailed = new RemotingCommand(SYSTEM_ERROR, nullptr); @@ -120,10 +76,11 @@ TEST(MQClientAPIImplTest, getMaxOffset) { TEST(MQClientAPIImplTest, getMinOffset) { SessionCredentials sc; - MockMQClientAPIImpl* impl = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockClientAPIImpl(); + MockMQClientAPIImpl* impl = new MockMQClientAPIImpl("testMockAPIImpl", nullptr, 1, 2, 3, "testUnit"); Mock::AllowLeak(impl); - MockTcpRemotingClient* pClient = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockRemotingClient(); + MockTcpRemotingClient* pClient = new MockTcpRemotingClient(); Mock::AllowLeak(pClient); + impl->reInitRemoteClient(pClient); GetMinOffsetResponseHeader* pHead = new GetMinOffsetResponseHeader(); pHead->offset = 2048; RemotingCommand* pCommandFailed = new RemotingCommand(SYSTEM_ERROR, nullptr); @@ -154,11 +111,11 @@ class MyMockAutoDeleteSendCallback : public AutoDeleteSendCallBack { TEST(MQClientAPIImplTest, sendMessage) { string cid = "testClientId"; SessionCredentials sc; - MockMQClientAPIImpl* impl = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockClientAPIImpl(); + MockMQClientAPIImpl* impl = new MockMQClientAPIImpl("testMockAPIImpl", nullptr, 1, 2, 3, "testUnit"); Mock::AllowLeak(impl); - MockTcpRemotingClient* pClient = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockRemotingClient(); + MockTcpRemotingClient* pClient = new MockTcpRemotingClient(); Mock::AllowLeak(pClient); - + impl->reInitRemoteClient(pClient); SendMessageResponseHeader* pHead = new SendMessageResponseHeader(); pHead->msgId = "MessageID"; pHead->queueId = 1; @@ -238,10 +195,11 @@ TEST(MQClientAPIImplTest, sendMessage) { TEST(MQClientAPIImplTest, consumerSendMessageBack) { SessionCredentials sc; MQMessageExt msg; - MockMQClientAPIImpl* impl = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockClientAPIImpl(); + MockMQClientAPIImpl* impl = new MockMQClientAPIImpl("testMockAPIImpl", nullptr, 1, 2, 3, "testUnit"); Mock::AllowLeak(impl); - MockTcpRemotingClient* pClient = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockRemotingClient(); + MockTcpRemotingClient* pClient = new MockTcpRemotingClient(); Mock::AllowLeak(pClient); + impl->reInitRemoteClient(pClient); RemotingCommand* pCommandFailed = new RemotingCommand(SYSTEM_ERROR, nullptr); RemotingCommand* pCommandSuccuss = new RemotingCommand(SUCCESS_VALUE, nullptr); EXPECT_CALL(*pClient, invokeSync(_, _, _)) diff --git a/test/src/MQClientFactoryTest.cpp b/test/src/MQClientFactoryTest.cpp index 11490e58f..eb78cf757 100644 --- a/test/src/MQClientFactoryTest.cpp +++ b/test/src/MQClientFactoryTest.cpp @@ -20,17 +20,28 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" +#include "ConsumerRunningInfo.h" +#include "DefaultMQPushConsumerImpl.h" #include "MQClientFactory.h" using namespace std; using namespace rocketmq; +using rocketmq::ConsumerRunningInfo; +using rocketmq::DefaultMQPushConsumerImpl; using rocketmq::MQClientFactory; using rocketmq::TopicRouteData; using testing::_; using ::testing::InitGoogleMock; using ::testing::InitGoogleTest; +using testing::Mock; using testing::Return; +class MockPushConsumerImpl : public DefaultMQPushConsumerImpl { + public: + MockPushConsumerImpl(const std::string& groupname) : DefaultMQPushConsumerImpl() {} + MOCK_METHOD0(getConsumerRunningInfo, ConsumerRunningInfo*()); +}; + class MockMQClientAPIImpl : public MQClientAPIImpl { public: MockMQClientAPIImpl(const string& mqClientId, @@ -39,12 +50,7 @@ class MockMQClientAPIImpl : public MQClientAPIImpl { uint64_t tcpConnectTimeout, uint64_t tcpTransportTryLockTimeout, string unitName) - : MQClientAPIImpl(mqClientId, - clientRemotingProcessor, - pullThreadNum, - tcpConnectTimeout, - tcpTransportTryLockTimeout, - unitName) {} + : MQClientAPIImpl(mqClientId) {} MOCK_METHOD5(getMinOffset, int64(const string&, const string&, int, int, const SessionCredentials&)); MOCK_METHOD3(getTopicRouteInfoFromNameServer, TopicRouteData*(const string&, int, const SessionCredentials&)); @@ -56,10 +62,11 @@ class MockMQClientFactory : public MQClientFactory { uint64_t tcpConnectTimeout, uint64_t tcpTransportTryLockTimeout, string unitName) - : MQClientFactory(mqClientId, pullThreadNum, tcpConnectTimeout, tcpTransportTryLockTimeout, unitName) {} + : MQClientFactory(mqClientId) {} void reInitClientImpl(MQClientAPIImpl* pImpl) { m_pClientAPIImpl.reset(pImpl); } - void reInitRemotingProcessor(ClientRemotingProcessor* pImpl) { m_pClientRemotingProcessor.reset(pImpl); } - ClientRemotingProcessor* getRemotingProcessor() { return m_pClientRemotingProcessor.release(); } + void addTestConsumer(const string& consumerName, MQConsumer* pMQConsumer) { + addConsumerToTable(consumerName, pMQConsumer); + } }; TEST(MQClientFactoryTest, minOffset) { @@ -70,8 +77,8 @@ TEST(MQClientFactoryTest, minOffset) { string unitName = "central"; MockMQClientFactory* factory = new MockMQClientFactory(clientId, pullThreadNum, tcpConnectTimeout, tcpTransportTryLockTimeout, unitName); - MockMQClientAPIImpl* pImpl = new MockMQClientAPIImpl(clientId, factory->getRemotingProcessor(), pullThreadNum, - tcpConnectTimeout, tcpTransportTryLockTimeout, unitName); + MockMQClientAPIImpl* pImpl = new MockMQClientAPIImpl(clientId, nullptr, pullThreadNum, tcpConnectTimeout, + tcpTransportTryLockTimeout, unitName); factory->reInitClientImpl(pImpl); MQMessageQueue mq; mq.setTopic("testTopic"); @@ -100,6 +107,25 @@ TEST(MQClientFactoryTest, minOffset) { delete factory; } +TEST(MQClientFactoryTest, consumerRunningInfo) { + string clientId = "testClientId"; + int pullThreadNum = 1; + uint64_t tcpConnectTimeout = 3000; + uint64_t tcpTransportTryLockTimeout = 3000; + string unitName = "central"; + MockMQClientFactory* factory = + new MockMQClientFactory(clientId, pullThreadNum, tcpConnectTimeout, tcpTransportTryLockTimeout, unitName); + MockPushConsumerImpl* mockPushConsumer = new MockPushConsumerImpl(clientId); + Mock::AllowLeak(mockPushConsumer); + factory->addTestConsumer(clientId, mockPushConsumer); + ConsumerRunningInfo* info = new ConsumerRunningInfo(); + info->setJstack("Hello,JStack"); + EXPECT_CALL(*mockPushConsumer, getConsumerRunningInfo()).Times(1).WillOnce(Return(info)); + ConsumerRunningInfo* info2 = factory->consumerRunningInfo(clientId); + EXPECT_EQ(info2->getJstack(), "Hello,JStack"); + delete factory; +} + int main(int argc, char* argv[]) { InitGoogleMock(&argc, argv); return RUN_ALL_TESTS(); diff --git a/test/src/extern/CProducerTest.cpp b/test/src/extern/CProducerTest.cpp index 14f999930..7798fe6f4 100644 --- a/test/src/extern/CProducerTest.cpp +++ b/test/src/extern/CProducerTest.cpp @@ -247,7 +247,21 @@ TEST(cProducer, null) { EXPECT_EQ(SetProducerLogLevel(NULL, E_LOG_LEVEL_FATAL), NULL_POINTER); EXPECT_EQ(DestroyProducer(NULL), NULL_POINTER); } - +TEST(cProducer, version) { + CProducer* cProducer = CreateProducer("groupTestVersion"); + EXPECT_TRUE(cProducer != NULL); + string version(ShowProducerVersion(cProducer)); + EXPECT_GT(version.length(), 0); + CProducer* cProducer2 = CreateOrderlyProducer("orderGroupTestVersion"); + EXPECT_TRUE(cProducer2 != NULL); + string version2(ShowProducerVersion(cProducer2)); + EXPECT_GT(version2.length(), 0); + + CProducer* cProducer3 = CreateTransactionProducer("tranGroupTestVersion", NULL, NULL); + EXPECT_TRUE(cProducer3 != NULL); + string version3(ShowProducerVersion(cProducer3)); + EXPECT_GT(version3.length(), 0); +} int main(int argc, char* argv[]) { InitGoogleMock(&argc, argv); testing::GTEST_FLAG(throw_on_failure) = true; diff --git a/test/src/extern/CPullConsumerTest.cpp b/test/src/extern/CPullConsumerTest.cpp index bbdbbae90..0ebfde64b 100644 --- a/test/src/extern/CPullConsumerTest.cpp +++ b/test/src/extern/CPullConsumerTest.cpp @@ -196,7 +196,12 @@ TEST(cpullConsumer, null) { EXPECT_EQ(StartPullConsumer(NULL), NULL_POINTER); EXPECT_EQ(ShutdownPullConsumer(NULL), NULL_POINTER); } - +TEST(cpullConsumer, version) { + CPullConsumer* pullConsumer = CreatePullConsumer("groupTestVersion"); + EXPECT_TRUE(pullConsumer != NULL); + string version(ShowPullConsumerVersion(pullConsumer)); + EXPECT_GT(version.length(), 0); +} int main(int argc, char* argv[]) { InitGoogleMock(&argc, argv); testing::GTEST_FLAG(throw_on_failure) = true; diff --git a/test/src/extern/CPushConsumerTest.cpp b/test/src/extern/CPushConsumerTest.cpp index a2d2fc9fd..2eff2e7b0 100644 --- a/test/src/extern/CPushConsumerTest.cpp +++ b/test/src/extern/CPushConsumerTest.cpp @@ -147,7 +147,12 @@ TEST(cPushComsumer, null) { EXPECT_EQ(SetPushConsumerLogLevel(NULL, E_LOG_LEVEL_LEVEL_NUM), NULL_POINTER); EXPECT_EQ(SetPushConsumerMessageModel(NULL, BROADCASTING), NULL_POINTER); } - +TEST(cPushComsumer, version) { + CPushConsumer* pushConsumer = CreatePushConsumer("groupTestVersion"); + EXPECT_TRUE(pushConsumer != NULL); + string version(ShowPushConsumerVersion(pushConsumer)); + EXPECT_GT(version.length(), 0); +} int main(int argc, char* argv[]) { InitGoogleMock(&argc, argv); testing::GTEST_FLAG(filter) = "cPushComsumer.*"; diff --git a/test/src/protocol/ConsumerRunningInfoTest.cpp b/test/src/protocol/ConsumerRunningInfoTest.cpp index 70b305f11..8642e49e2 100644 --- a/test/src/protocol/ConsumerRunningInfoTest.cpp +++ b/test/src/protocol/ConsumerRunningInfoTest.cpp @@ -99,6 +99,7 @@ TEST(ConsumerRunningInfo, init) { info.setProperty(ConsumerRunningInfo::PROP_CONSUME_TYPE, "consume_type"); info.setProperty(ConsumerRunningInfo::PROP_CLIENT_VERSION, "client_version"); info.setProperty(ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP, "127"); + info.setProperty(ConsumerRunningInfo::PROP_CLIENT_SDK_VERSION, "sdk_version"); // encode string outStr = info.encode();