Skip to content

feat(version): add api to get SDK versions #261

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Feb 27, 2020
2 changes: 2 additions & 0 deletions include/CCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ extern "C" {
#define MAX_MESSAGE_ID_LENGTH 256
#define MAX_TOPIC_LENGTH 512
#define MAX_BROKER_NAME_ID_LENGTH 256
#define MAX_SDK_VERSION_LENGTH 256
#define DEFAULT_SDK_VERSION "DefaultVersion"
typedef enum _CStatus_ {
// Success
OK = 0,
Expand Down
1 change: 1 addition & 0 deletions include/CProducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ ROCKETMQCLIENT_API CProducer* CreateTransactionProducer(const char* groupId,
ROCKETMQCLIENT_API int DestroyProducer(CProducer* producer);
ROCKETMQCLIENT_API int StartProducer(CProducer* producer);
ROCKETMQCLIENT_API int ShutdownProducer(CProducer* producer);
ROCKETMQCLIENT_API const char* ShowProducerVersion(CProducer* producer);

ROCKETMQCLIENT_API int SetProducerNameServerAddress(CProducer* producer, const char* namesrv);
ROCKETMQCLIENT_API int SetProducerNameServerDomain(CProducer* producer, const char* domain);
Expand Down
2 changes: 2 additions & 0 deletions include/CPullConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ ROCKETMQCLIENT_API CPullConsumer* CreatePullConsumer(const char* groupId);
ROCKETMQCLIENT_API int DestroyPullConsumer(CPullConsumer* consumer);
ROCKETMQCLIENT_API int StartPullConsumer(CPullConsumer* consumer);
ROCKETMQCLIENT_API int ShutdownPullConsumer(CPullConsumer* consumer);
ROCKETMQCLIENT_API const char* ShowPullConsumerVersion(CPullConsumer* consumer);

ROCKETMQCLIENT_API int SetPullConsumerGroupID(CPullConsumer* consumer, const char* groupId);
ROCKETMQCLIENT_API const char* GetPullConsumerGroupID(CPullConsumer* consumer);
ROCKETMQCLIENT_API int SetPullConsumerNameServerAddress(CPullConsumer* consumer, const char* namesrv);
Expand Down
1 change: 1 addition & 0 deletions include/CPushConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ ROCKETMQCLIENT_API CPushConsumer* CreatePushConsumer(const char* groupId);
ROCKETMQCLIENT_API int DestroyPushConsumer(CPushConsumer* consumer);
ROCKETMQCLIENT_API int StartPushConsumer(CPushConsumer* consumer);
ROCKETMQCLIENT_API int ShutdownPushConsumer(CPushConsumer* consumer);
ROCKETMQCLIENT_API const char* ShowPushConsumerVersion(CPushConsumer* consumer);
ROCKETMQCLIENT_API int SetPushConsumerGroupID(CPushConsumer* consumer, const char* groupId);
ROCKETMQCLIENT_API const char* GetPushConsumerGroupID(CPushConsumer* consumer);
ROCKETMQCLIENT_API int SetPushConsumerNameServerAddress(CPushConsumer* consumer, const char* namesrv);
Expand Down
1 change: 1 addition & 0 deletions include/DefaultMQProducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class ROCKETMQCLIENT_API DefaultMQProducer {

virtual void start();
virtual void shutdown();
virtual std::string version();

virtual SendResult send(MQMessage& msg, bool bSelectActiveBroker = false);
virtual SendResult send(MQMessage& msg, const MQMessageQueue& mq);
Expand Down
1 change: 1 addition & 0 deletions include/DefaultMQPullConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer {

virtual void start();
virtual void shutdown();
virtual std::string version();

const std::string& getNamesrvAddr() const;
void setNamesrvAddr(const std::string& namesrvAddr);
Expand Down
1 change: 1 addition & 0 deletions include/DefaultMQPushConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer {

virtual void start();
virtual void shutdown();
virtual std::string version();

const std::string& getNamesrvAddr() const;
void setNamesrvAddr(const std::string& namesrvAddr);
Expand Down
1 change: 1 addition & 0 deletions include/TransactionMQProducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class ROCKETMQCLIENT_API TransactionMQProducer {

void start();
void shutdown();
std::string version();

const std::string& getNamesrvAddr() const;
void setNamesrvAddr(const std::string& namesrvAddr);
Expand Down
2 changes: 2 additions & 0 deletions src/MQClientFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,8 @@ ConsumerRunningInfo* MQClientFactory::consumerRunningInfo(const string& consumer
runningInfo->setProperty(
ConsumerRunningInfo::PROP_CLIENT_VERSION,
MQVersion::GetVersionDesc(MQVersion::s_CurrentVersion)); // MQVersion::s_CurrentVersion ));
runningInfo->setProperty(ConsumerRunningInfo::PROP_CLIENT_SDK_VERSION,
pConsumer->getClientVersionString()); // in DefaultMQClient.cpp;

return runningInfo;
}
Expand Down
5 changes: 3 additions & 2 deletions src/MQClientFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ class MQClientFactory {
void doRebalance();
void timerCB_doRebalance(boost::system::error_code& ec, boost::shared_ptr<boost::asio::deadline_timer> t);
bool getSessionCredentialFromConsumerTable(SessionCredentials& sessionCredentials);
bool addConsumerToTable(const string& consumerName, MQConsumer* pMQConsumer);
void eraseConsumerFromTable(const string& consumerName);
int getConsumerTableSize();
void getTopicListFromConsumerSubscription(set<string>& topicList);
Expand All @@ -152,7 +151,6 @@ class MQClientFactory {

// producer related operation
bool getSessionCredentialFromProducerTable(SessionCredentials& sessionCredentials);
bool addProducerToTable(const string& producerName, MQProducer* pMQProducer);
void eraseProducerFromTable(const string& producerName);
int getProducerTableSize();
void insertProducerInfoToHeartBeatData(HeartbeatData* pHeartbeatData);
Expand All @@ -171,6 +169,9 @@ class MQClientFactory {
unique_ptr<MQClientAPIImpl> m_pClientAPIImpl;
unique_ptr<ClientRemotingProcessor> m_pClientRemotingProcessor;

bool addProducerToTable(const string& producerName, MQProducer* pMQProducer);
bool addConsumerToTable(const string& consumerName, MQConsumer* pMQConsumer);

private:
string m_nameSrvDomain; // per clientId
ServiceState m_serviceState;
Expand Down
28 changes: 23 additions & 5 deletions src/common/DefaultMQClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

#include "include/DefaultMQClient.h"
#include "DefaultMQClient.h"
#include "Logging.h"
#include "MQClientFactory.h"
#include "MQClientManager.h"
Expand All @@ -24,11 +24,11 @@
#include "UtilAll.h"

namespace rocketmq {

// hard code first.
#define ROCKETMQCPP_VERSION "2.0.0"
#define BUILD_DATE "02-14-2020"
#define BUILD_DATE "22:50:18 02-14-2020"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ToDo, hope to be fixed in the near long time.

// display version: strings bin/librocketmq.so |grep VERSION
const char* rocketmq_build_time = "VERSION: " ROCKETMQCPP_VERSION ", BUILD DATE: " BUILD_DATE " ";
const char* rocketmq_build_time = "CPP CORE VERSION: " ROCKETMQCPP_VERSION ", BUILD TIME: " BUILD_DATE;

//<!************************************************************************
DefaultMQClient::DefaultMQClient() {
Expand Down Expand Up @@ -56,6 +56,11 @@ string DefaultMQClient::getMQClientId() const {
// return processId + "-" + clientIP + "@" + m_instanceName;
return clientIP + "@" + processId + "#" + m_instanceName;
}
// version
string DefaultMQClient::getClientVersionString() const {
string version(rocketmq_build_time);
return version;
}

//<!groupName;
const string& DefaultMQClient::getGroupName() const {
Expand Down Expand Up @@ -222,6 +227,19 @@ void DefaultMQClient::setSessionCredentials(const string& input_accessKey,
const SessionCredentials& DefaultMQClient::getSessionCredentials() const {
return m_SessionCredentials;
}

void DefaultMQClient::showClientConfigs() {
// LOG_WARN("*****************************************************************************");
LOG_WARN("ClientID:%s", getMQClientId().c_str());
LOG_WARN("GroupName:%s", m_GroupName.c_str());
LOG_WARN("NameServer:%s", m_namesrvAddr.c_str());
LOG_WARN("NameServerDomain:%s", m_namesrvDomain.c_str());
LOG_WARN("NameSpace:%s", m_nameSpace.c_str());
LOG_WARN("InstanceName:%s", m_instanceName.c_str());
LOG_WARN("UnitName:%s", m_unitName.c_str());
LOG_WARN("PullThreadNum:%d", m_pullThreadNum);
LOG_WARN("TcpConnectTimeout:%lld ms", m_tcpConnectTimeout);
LOG_WARN("TcpTransportTryLockTimeout:%lld s", m_tcpTransportTryLockTimeout);
// LOG_WARN("*****************************************************************************");
}
//<!************************************************************************
} // namespace rocketmq
10 changes: 9 additions & 1 deletion src/consumer/DefaultMQPullConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "DefaultMQPullConsumer.h"
#include "DefaultMQPullConsumerImpl.h"
#include "MQVersion.h"

namespace rocketmq {

Expand All @@ -34,7 +35,14 @@ void DefaultMQPullConsumer::start() {
void DefaultMQPullConsumer::shutdown() {
impl->shutdown();
}

std::string DefaultMQPullConsumer::version() {
std::string versions = impl->getClientVersionString();
/*versions.append(", PROTOCOL VERSION: ")
.append(MQVersion::GetVersionDesc(MQVersion::s_CurrentVersion))
.append(", LANGUAGE: ")
.append(MQVersion::s_CurrentLanguage);*/
return versions;
}
// start mqclient set
const std::string& DefaultMQPullConsumer::getNamesrvAddr() const {
return impl->getNamesrvAddr();
Expand Down
2 changes: 2 additions & 0 deletions src/consumer/DefaultMQPullConsumerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ void DefaultMQPullConsumerImpl::start() {
sa.sa_flags = 0;
sigaction(SIGPIPE, &sa, 0);
#endif
LOG_INFO("###Current Pull Consumer@%s", getClientVersionString().c_str());
dealWithNameSpace();
showClientConfigs();
switch (m_serviceState) {
case CREATE_JUST: {
m_serviceState = START_FAILED;
Expand Down
10 changes: 9 additions & 1 deletion src/consumer/DefaultMQPushConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/

#include "DefaultMQPushConsumer.h"
#include <MQVersion.h>
#include "DefaultMQPushConsumerImpl.h"

namespace rocketmq {
Expand All @@ -34,7 +35,14 @@ void DefaultMQPushConsumer::start() {
void DefaultMQPushConsumer::shutdown() {
impl->shutdown();
}

std::string DefaultMQPushConsumer::version() {
std::string versions = impl->getClientVersionString();
/*versions.append(", PROTOCOL VERSION: ")
.append(MQVersion::GetVersionDesc(MQVersion::s_CurrentVersion))
.append(", LANGUAGE: ")
.append(MQVersion::s_CurrentLanguage);*/
return versions;
}
// ConsumeType DefaultMQPushConsumer::getConsumeType() {
// return impl->getConsumeType();
//}
Expand Down
42 changes: 42 additions & 0 deletions src/consumer/DefaultMQPushConsumerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ class AsyncPullCallback : public PullCallback {
//<!***************************************************************************
static boost::mutex m_asyncCallbackLock;

DefaultMQPushConsumerImpl::DefaultMQPushConsumerImpl() {}
DefaultMQPushConsumerImpl::DefaultMQPushConsumerImpl(const string& groupname)
: m_consumeFromWhere(CONSUME_FROM_LAST_OFFSET),
m_pOffsetStore(NULL),
Expand Down Expand Up @@ -307,8 +308,10 @@ void DefaultMQPushConsumerImpl::start() {
sa.sa_flags = 0;
sigaction(SIGPIPE, &sa, 0);
#endif
LOG_WARN("###Current Push Consumer@%s", getClientVersionString().c_str());
// deal with name space before start
dealWithNameSpace();
logConfigs();
switch (m_serviceState) {
case CREATE_JUST: {
m_serviceState = START_FAILED;
Expand Down Expand Up @@ -1038,5 +1041,44 @@ bool DefaultMQPushConsumerImpl::dealWithNameSpace() {

return true;
}
void DefaultMQPushConsumerImpl::logConfigs() {
showClientConfigs();

LOG_WARN("MessageModel:%d", m_messageModel);
LOG_WARN("MessageModel:%s", m_messageModel == BROADCASTING ? "BROADCASTING" : "CLUSTERING");

LOG_WARN("ConsumeFromWhere:%d", m_consumeFromWhere);
switch (m_consumeFromWhere) {
case CONSUME_FROM_FIRST_OFFSET:
LOG_WARN("ConsumeFromWhere:%s", "CONSUME_FROM_FIRST_OFFSET");
break;
case CONSUME_FROM_LAST_OFFSET:
LOG_WARN("ConsumeFromWhere:%s", "CONSUME_FROM_LAST_OFFSET");
break;

case CONSUME_FROM_TIMESTAMP:
LOG_WARN("ConsumeFromWhere:%s", "CONSUME_FROM_TIMESTAMP");
break;
case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
LOG_WARN("ConsumeFromWhere:%s", "CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST");
break;
case CONSUME_FROM_MAX_OFFSET:
LOG_WARN("ConsumeFromWhere:%s", "CONSUME_FROM_MAX_OFFSET");
break;
case CONSUME_FROM_MIN_OFFSET:
LOG_WARN("ConsumeFromWhere:%s", "CONSUME_FROM_MAX_OFFSET");
break;
default:
LOG_WARN("ConsumeFromWhere:%s", "UnKnown.");
break;
}
LOG_WARN("ConsumeThreadCount:%d", m_consumeThreadCount);
LOG_WARN("ConsumeMessageBatchMaxSize:%d", m_consumeMessageBatchMaxSize);
LOG_WARN("MaxMsgCacheSizePerQueue:%d", m_maxMsgCacheSize);
LOG_WARN("MaxReconsumeTimes:%d", m_maxReconsumeTimes);
LOG_WARN("PullMsgThreadPoolNum:%d", m_pullMsgThreadPoolNum);
LOG_WARN("AsyncPullMode:%s", m_asyncPull ? "true" : "false");
LOG_WARN("AsyncPullTimeout:%d ms", m_asyncPullTimeout);
}
//<!************************************************************************
} // namespace rocketmq
2 changes: 2 additions & 0 deletions src/consumer/DefaultMQPushConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class ConsumerRunningInfo;
//<!***************************************************************************
class DefaultMQPushConsumerImpl : public MQConsumer {
public:
DefaultMQPushConsumerImpl();
DefaultMQPushConsumerImpl(const std::string& groupname);
void boost_asio_work();
virtual ~DefaultMQPushConsumerImpl();
Expand Down Expand Up @@ -133,6 +134,7 @@ class DefaultMQPushConsumerImpl : public MQConsumer {
void copySubscription();
void updateTopicSubscribeInfoWhenSubscriptionChanged();
bool dealWithNameSpace();
void logConfigs();

private:
uint64_t m_startTime;
Expand Down
25 changes: 25 additions & 0 deletions src/extern/CProducer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ typedef struct __DefaultProducer__ {
TransactionMQProducer* innerTransactionProducer;
LocalTransactionListenerInner* listenerInner;
int producerType;
char* version;
} DefaultProducer;
CProducer* CreateProducer(const char* groupId) {
if (groupId == NULL) {
Expand All @@ -224,6 +225,9 @@ CProducer* CreateProducer(const char* groupId) {
DefaultProducer* defaultMQProducer = new DefaultProducer();
defaultMQProducer->producerType = CAPI_C_PRODUCER_TYPE_COMMON;
defaultMQProducer->innerProducer = new DefaultMQProducer(groupId);
defaultMQProducer->version = new char[MAX_SDK_VERSION_LENGTH];
strncpy(defaultMQProducer->version, defaultMQProducer->innerProducer->version().c_str(), MAX_SDK_VERSION_LENGTH - 1);
defaultMQProducer->version[MAX_SDK_VERSION_LENGTH - 1] = 0;
defaultMQProducer->innerTransactionProducer = NULL;
defaultMQProducer->listenerInner = NULL;
return (CProducer*)defaultMQProducer;
Expand All @@ -236,6 +240,10 @@ CProducer* CreateOrderlyProducer(const char* groupId) {
DefaultProducer* defaultMQProducer = new DefaultProducer();
defaultMQProducer->producerType = CAPI_C_PRODUCER_TYPE_ORDERLY;
defaultMQProducer->innerProducer = new DefaultMQProducer(groupId);

defaultMQProducer->version = new char[MAX_SDK_VERSION_LENGTH];
strncpy(defaultMQProducer->version, defaultMQProducer->innerProducer->version().c_str(), MAX_SDK_VERSION_LENGTH - 1);
defaultMQProducer->version[MAX_SDK_VERSION_LENGTH - 1] = 0;
defaultMQProducer->innerTransactionProducer = NULL;
defaultMQProducer->listenerInner = NULL;
return (CProducer*)defaultMQProducer;
Expand All @@ -252,13 +260,22 @@ CProducer* CreateTransactionProducer(const char* groupId, CLocalTransactionCheck
defaultMQProducer->listenerInner =
new LocalTransactionListenerInner((CProducer*)defaultMQProducer, callback, userData);
defaultMQProducer->innerTransactionProducer->setTransactionListener(defaultMQProducer->listenerInner);

defaultMQProducer->version = new char[MAX_SDK_VERSION_LENGTH];
strncpy(defaultMQProducer->version, defaultMQProducer->innerTransactionProducer->version().c_str(),
MAX_SDK_VERSION_LENGTH - 1);
defaultMQProducer->version[MAX_SDK_VERSION_LENGTH - 1] = 0;
return (CProducer*)defaultMQProducer;
}
int DestroyProducer(CProducer* pProducer) {
if (pProducer == NULL) {
return NULL_POINTER;
}
DefaultProducer* defaultMQProducer = (DefaultProducer*)pProducer;
if (defaultMQProducer->version != NULL) {
delete defaultMQProducer->version;
defaultMQProducer->version = NULL;
}
if (CAPI_C_PRODUCER_TYPE_TRANSACTION == defaultMQProducer->producerType) {
if (defaultMQProducer->innerTransactionProducer != NULL) {
delete defaultMQProducer->innerTransactionProducer;
Expand Down Expand Up @@ -307,6 +324,14 @@ int ShutdownProducer(CProducer* producer) {
}
return OK;
}
const char* ShowProducerVersion(CProducer* producer) {
if (producer == NULL) {
return DEFAULT_SDK_VERSION;
}
DefaultProducer* defaultMQProducer = (DefaultProducer*)producer;

return defaultMQProducer->version;
}
int SetProducerNameServerAddress(CProducer* producer, const char* namesrv) {
if (producer == NULL) {
return NULL_POINTER;
Expand Down
11 changes: 10 additions & 1 deletion src/extern/CPullConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ using namespace std;
#ifdef __cplusplus
extern "C" {
#endif

char VERSION_FOR_PULL_CONSUMER[MAX_SDK_VERSION_LENGTH];
CPullConsumer* CreatePullConsumer(const char* groupId) {
if (groupId == NULL) {
return NULL;
}
DefaultMQPullConsumer* defaultMQPullConsumer = new DefaultMQPullConsumer(groupId);
strncpy(VERSION_FOR_PULL_CONSUMER, defaultMQPullConsumer->version().c_str(), MAX_SDK_VERSION_LENGTH - 1);
VERSION_FOR_PULL_CONSUMER[MAX_SDK_VERSION_LENGTH - 1] = 0;
return (CPullConsumer*)defaultMQPullConsumer;
}
int DestroyPullConsumer(CPullConsumer* consumer) {
Expand Down Expand Up @@ -61,6 +63,13 @@ int ShutdownPullConsumer(CPullConsumer* consumer) {
((DefaultMQPullConsumer*)consumer)->shutdown();
return OK;
}
const char* ShowPullConsumerVersion(CPullConsumer* consumer) {
if (consumer == NULL) {
return NULL;
}
return VERSION_FOR_PULL_CONSUMER;
}

int SetPullConsumerGroupID(CPullConsumer* consumer, const char* groupId) {
if (consumer == NULL || groupId == NULL) {
return NULL_POINTER;
Expand Down
Loading