Skip to content

Commit 643a638

Browse files
authored
Merge pull request #29 from ShannonDing/PullConsumer
Add name server domain support and refactor pull consumer.
2 parents 27228bd + 34cc8bf commit 643a638

File tree

8 files changed

+111
-39
lines changed

8 files changed

+111
-39
lines changed

example/PullConsumeMessage.c

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,19 +40,57 @@ void thread_sleep(unsigned milliseconds) {
4040
#endif
4141
}
4242

43-
int main(int argc,char * argv [])
44-
{
45-
int i = 0;
43+
int main(int argc, char *argv[]) {
44+
int i = 0, j = 0;
45+
int ret = 0;
46+
int size = 0;
47+
CMessageQueue *mqs = NULL;
4648
printf("PullConsumer Initializing....\n");
47-
CPullConsumer* consumer = CreatePullConsumer("Group_Consumer_Test");
48-
SetPullConsumerNameServerAddress(consumer,"172.17.0.2:9876");
49+
CPullConsumer *consumer = CreatePullConsumer("Group_Consumer_Test");
50+
SetPullConsumerNameServerAddress(consumer, "172.17.0.2:9876");
4951
StartPullConsumer(consumer);
5052
printf("Pull Consumer Start...\n");
51-
for( i=0; i<10; i++)
52-
{
53-
printf("Now Running : %d S\n",i*10);
54-
thread_sleep(10000);
53+
for (i = 1; i <= 5; i++) {
54+
printf("FetchSubscriptionMessageQueues : %d times\n", i);
55+
ret = FetchSubscriptionMessageQueues(consumer, "T_TestTopic", &mqs, &size);
56+
if(ret != OK) {
57+
printf("Get MQ Queue Failed,ErrorCode:%d\n", ret);
58+
}
59+
printf("Get MQ Size:%d\n", size);
60+
for (j = 0; j < size; j++) {
61+
int noNewMsg = 0;
62+
long long tmpoffset = 0;
63+
printf("Pull Message For Topic:%s,Queue:%s,QueueId:%d\n", mqs[j].topic, mqs[j].brokerName, mqs[j].queueId);
64+
do {
65+
int k = 0;
66+
CPullResult pullResult = Pull(consumer, &mqs[j], "*", tmpoffset, 32);
67+
if (pullResult.pullStatus != E_BROKER_TIMEOUT) {
68+
tmpoffset = pullResult.nextBeginOffset;
69+
}
70+
printf("PullStatus:%d,MaxOffset:%lld,MinOffset:%lld,NextBegainOffset:%lld", pullResult.pullStatus,
71+
pullResult.maxOffset, pullResult.minOffset, pullResult.nextBeginOffset);
72+
switch (pullResult.pullStatus) {
73+
case E_FOUND:
74+
printf("Get Message Size:%d\n", pullResult.size);
75+
for (k = 0; k < pullResult.size; ++k) {
76+
printf("Got Message ID:%s,Body:%s\n", GetMessageId(pullResult.msgFoundList[k]),GetMessageBody(pullResult.msgFoundList[k]));
77+
}
78+
break;
79+
case E_NO_MATCHED_MSG:
80+
noNewMsg = 1;
81+
break;
82+
default:
83+
noNewMsg = 0;
84+
}
85+
ReleasePullResult(pullResult);
86+
thread_sleep(100);
87+
} while (noNewMsg == 0);
88+
thread_sleep(1000);
89+
}
90+
thread_sleep(2000);
91+
ReleaseSubscriptionMessageQueue(mqs);
5592
}
93+
thread_sleep(5000);
5694
ShutdownPullConsumer(consumer);
5795
DestroyPullConsumer(consumer);
5896
printf("PullConsumer Shutdown!\n");

include/CProducer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ ROCKETMQCLIENT_API int StartProducer(CProducer *producer);
3636
ROCKETMQCLIENT_API int ShutdownProducer(CProducer *producer);
3737

3838
ROCKETMQCLIENT_API int SetProducerNameServerAddress(CProducer *producer, const char *namesrv);
39+
ROCKETMQCLIENT_API int SetProducerNameServerDomain(CProducer *producer, const char *domain);
3940
ROCKETMQCLIENT_API int SetProducerGroupName(CProducer *producer, const char *groupName);
4041
ROCKETMQCLIENT_API int SetProducerInstanceName(CProducer *producer, const char *instanceName);
4142
ROCKETMQCLIENT_API int SetProducerSessionCredentials(CProducer *producer, const char *accessKey, const char *secretKey,

include/CPullConsumer.h

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,25 +31,26 @@ extern "C" {
3131
typedef struct CPullConsumer CPullConsumer;
3232

3333

34-
CPullConsumer *CreatePullConsumer(const char *groupId);
35-
int DestroyPullConsumer(CPullConsumer *consumer);
36-
int StartPullConsumer(CPullConsumer *consumer);
37-
int ShutdownPullConsumer(CPullConsumer *consumer);
38-
int SetPullConsumerGroupID(CPullConsumer *consumer, const char *groupId);
39-
const char *GetPullConsumerGroupID(CPullConsumer *consumer);
40-
int SetPullConsumerNameServerAddress(CPullConsumer *consumer, const char *namesrv);
41-
int SetPullConsumerSessionCredentials(CPullConsumer *consumer, const char *accessKey, const char *secretKey,
34+
ROCKETMQCLIENT_API CPullConsumer *CreatePullConsumer(const char *groupId);
35+
ROCKETMQCLIENT_API int DestroyPullConsumer(CPullConsumer *consumer);
36+
ROCKETMQCLIENT_API int StartPullConsumer(CPullConsumer *consumer);
37+
ROCKETMQCLIENT_API int ShutdownPullConsumer(CPullConsumer *consumer);
38+
ROCKETMQCLIENT_API int SetPullConsumerGroupID(CPullConsumer *consumer, const char *groupId);
39+
ROCKETMQCLIENT_API const char *GetPullConsumerGroupID(CPullConsumer *consumer);
40+
ROCKETMQCLIENT_API int SetPullConsumerNameServerAddress(CPullConsumer *consumer, const char *namesrv);
41+
ROCKETMQCLIENT_API int SetPullConsumerNameServerDomain(CPullConsumer *consumer, const char *domain);
42+
ROCKETMQCLIENT_API int SetPullConsumerSessionCredentials(CPullConsumer *consumer, const char *accessKey, const char *secretKey,
4243
const char *channel);
43-
int SetPullConsumerLogPath(CPullConsumer *consumer, const char *logPath);
44-
int SetPullConsumerLogFileNumAndSize(CPullConsumer *consumer, int fileNum, long fileSize);
45-
int SetPullConsumerLogLevel(CPullConsumer *consumer, CLogLevel level);
44+
ROCKETMQCLIENT_API int SetPullConsumerLogPath(CPullConsumer *consumer, const char *logPath);
45+
ROCKETMQCLIENT_API int SetPullConsumerLogFileNumAndSize(CPullConsumer *consumer, int fileNum, long fileSize);
46+
ROCKETMQCLIENT_API int SetPullConsumerLogLevel(CPullConsumer *consumer, CLogLevel level);
4647

47-
int FetchSubscriptionMessageQueues(CPullConsumer *consumer, const char *topic, CMessageQueue **mqs, int *size);
48-
int ReleaseSubscriptionMessageQueue(CMessageQueue *mqs);
48+
ROCKETMQCLIENT_API int FetchSubscriptionMessageQueues(CPullConsumer *consumer, const char *topic, CMessageQueue **mqs, int *size);
49+
ROCKETMQCLIENT_API int ReleaseSubscriptionMessageQueue(CMessageQueue *mqs);
4950

50-
CPullResult
51+
ROCKETMQCLIENT_API CPullResult
5152
Pull(CPullConsumer *consumer, const CMessageQueue *mq, const char *subExpression, long long offset, int maxNums);
52-
int ReleasePullResult(CPullResult pullResult);
53+
ROCKETMQCLIENT_API int ReleasePullResult(CPullResult pullResult);
5354

5455
#ifdef __cplusplus
5556
};

include/CPullResult.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,7 @@
2424
#ifdef __cplusplus
2525
extern "C" {
2626
#endif
27-
typedef enum E_CPullStatus
28-
{
27+
typedef enum E_CPullStatus {
2928
E_FOUND,
3029
E_NO_NEW_MSG,
3130
E_NO_MATCHED_MSG,
@@ -35,11 +34,12 @@ typedef enum E_CPullStatus
3534

3635
typedef struct _CPullResult_ {
3736
CPullStatus pullStatus;
38-
long long nextBeginOffset;
39-
long long minOffset;
40-
long long maxOffset;
41-
CMessageExt** msgFoundList;
37+
long long nextBeginOffset;
38+
long long minOffset;
39+
long long maxOffset;
40+
CMessageExt **msgFoundList;
4241
int size;
42+
void *pData;
4343
} CPullResult;
4444

4545
#ifdef __cplusplus

include/CPushConsumer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ ROCKETMQCLIENT_API int ShutdownPushConsumer(CPushConsumer *consumer);
4242
ROCKETMQCLIENT_API int SetPushConsumerGroupID(CPushConsumer *consumer, const char *groupId);
4343
ROCKETMQCLIENT_API const char *GetPushConsumerGroupID(CPushConsumer *consumer);
4444
ROCKETMQCLIENT_API int SetPushConsumerNameServerAddress(CPushConsumer *consumer, const char *namesrv);
45+
ROCKETMQCLIENT_API int SetPushConsumerNameServerDomain(CPushConsumer *consumer, const char *domain);
4546
ROCKETMQCLIENT_API int Subscribe(CPushConsumer *consumer, const char *topic, const char *expression);
4647
ROCKETMQCLIENT_API int RegisterMessageCallbackOrderly(CPushConsumer *consumer, MessageCallBack pCallback);
4748
ROCKETMQCLIENT_API int RegisterMessageCallback(CPushConsumer *consumer, MessageCallBack pCallback);

src/extern/CProducer.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,13 @@ int SetProducerNameServerAddress(CProducer *producer, const char *namesrv) {
8585
((DefaultMQProducer *) producer)->setNamesrvAddr(namesrv);
8686
return OK;
8787
}
88-
88+
int SetProducerNameServerDomain(CProducer *producer, const char *domain) {
89+
if (producer == NULL) {
90+
return NULL_POINTER;
91+
}
92+
((DefaultMQProducer *) producer)->setNamesrvDomain(domain);
93+
return OK;
94+
}
8995
int SendMessageSync(CProducer *producer, CMessage *msg, CSendResult *result) {
9096
//CSendResult sendResult;
9197
if (producer == NULL || msg == NULL || result == NULL) {

src/extern/CPullConsumer.cpp

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,13 @@ int SetPullConsumerNameServerAddress(CPullConsumer *consumer, const char *namesr
8080
((DefaultMQPullConsumer *) consumer)->setNamesrvAddr(namesrv);
8181
return OK;
8282
}
83+
int SetPullConsumerNameServerDomain(CPullConsumer *consumer, const char *domain) {
84+
if (consumer == NULL) {
85+
return NULL_POINTER;
86+
}
87+
((DefaultMQPullConsumer *) consumer)->setNamesrvDomain(domain);
88+
return OK;
89+
}
8390
int SetPullConsumerSessionCredentials(CPullConsumer *consumer, const char *accessKey, const char *secretKey,
8491
const char *channel) {
8592
if (consumer == NULL) {
@@ -119,24 +126,26 @@ int FetchSubscriptionMessageQueues(CPullConsumer *consumer, const char *topic, C
119126
return NULL_POINTER;
120127
}
121128
unsigned int index = 0;
129+
CMessageQueue *temMQ = NULL;
122130
std::vector<MQMessageQueue> fullMQ;
123131
try {
124132
((DefaultMQPullConsumer *) consumer)->fetchSubscribeMessageQueues(topic, fullMQ);
125133
*size = fullMQ.size();
126134
//Alloc memory to save the pointer to CPP MessageQueue, and the MessageQueues may be changed.
127135
//Thus, this memory should be released by users using @ReleaseSubscribeMessageQueue every time.
128-
*mqs = (CMessageQueue *) malloc(*size * sizeof(CMessageQueue));
129-
if (*mqs == NULL) {
136+
temMQ = (CMessageQueue *) malloc(*size * sizeof(CMessageQueue));
137+
if (temMQ == NULL) {
130138
*size = 0;
131139
*mqs = NULL;
132140
return MALLOC_FAILED;
133141
}
134142
auto iter = fullMQ.begin();
135143
for (index = 0; iter != fullMQ.end() && index <= fullMQ.size(); ++iter, index++) {
136-
strncpy(mqs[index]->topic, iter->getTopic().c_str(), MAX_TOPIC_LENGTH - 1);
137-
strncpy(mqs[index]->brokerName, iter->getBrokerName().c_str(), MAX_BROKER_NAME_ID_LENGTH - 1);
138-
mqs[index]->queueId = iter->getQueueId();
144+
strncpy(temMQ[index].topic, iter->getTopic().c_str(), MAX_TOPIC_LENGTH - 1);
145+
strncpy(temMQ[index].brokerName, iter->getBrokerName().c_str(), MAX_BROKER_NAME_ID_LENGTH - 1);
146+
temMQ[index].queueId = iter->getQueueId();
139147
}
148+
*mqs = temMQ;
140149
} catch (MQException &e) {
141150
*size = 0;
142151
*mqs = NULL;
@@ -160,7 +169,7 @@ Pull(CPullConsumer *consumer, const CMessageQueue *mq, const char *subExpression
160169
PullResult cppPullResult;
161170
try {
162171
cppPullResult = ((DefaultMQPullConsumer *) consumer)->pull(messageQueue, subExpression, offset, maxNums);
163-
}catch (exception &e){
172+
} catch (exception &e) {
164173
cppPullResult.pullStatus = BROKER_TIMEOUT;
165174
}
166175

@@ -171,11 +180,13 @@ Pull(CPullConsumer *consumer, const CMessageQueue *mq, const char *subExpression
171180
pullResult.minOffset = cppPullResult.minOffset;
172181
pullResult.nextBeginOffset = cppPullResult.nextBeginOffset;
173182
pullResult.size = cppPullResult.msgFoundList.size();
183+
PullResult *tmpPullResult = new PullResult(cppPullResult);
184+
pullResult.pData = tmpPullResult;
174185
//Alloc memory to save the pointer to CPP MQMessageExt, which will be release by the CPP SDK core.
175186
//Thus, this memory should be released by users using @ReleasePullResult
176187
pullResult.msgFoundList = (CMessageExt **) malloc(pullResult.size * sizeof(CMessageExt *));
177188
for (size_t i = 0; i < cppPullResult.msgFoundList.size(); i++) {
178-
MQMessageExt *msg = const_cast<MQMessageExt *>(&cppPullResult.msgFoundList[i]);
189+
MQMessageExt *msg = const_cast<MQMessageExt *>(&tmpPullResult->msgFoundList[i]);
179190
pullResult.msgFoundList[i] = (CMessageExt *) (msg);
180191
}
181192
break;
@@ -204,9 +215,16 @@ Pull(CPullConsumer *consumer, const CMessageQueue *mq, const char *subExpression
204215
return pullResult;
205216
}
206217
int ReleasePullResult(CPullResult pullResult) {
207-
if (pullResult.size == 0 || pullResult.msgFoundList == NULL) {
218+
if (pullResult.size == 0 || pullResult.msgFoundList == NULL || pullResult.pData == NULL) {
208219
return NULL_POINTER;
209220
}
221+
if (pullResult.pData != NULL) {
222+
try {
223+
delete ((PullResult *) pullResult.pData);
224+
} catch (exception &e) {
225+
return NULL_POINTER;
226+
}
227+
}
210228
free((void *) pullResult.msgFoundList);
211229
pullResult.msgFoundList = NULL;
212230
return OK;

src/extern/CPushConsumer.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,13 @@ int SetPushConsumerNameServerAddress(CPushConsumer *consumer, const char *namesr
139139
((DefaultMQPushConsumer *) consumer)->setNamesrvAddr(namesrv);
140140
return OK;
141141
}
142+
int SetPushConsumerNameServerDomain(CPushConsumer *consumer, const char *domain) {
143+
if (consumer == NULL) {
144+
return NULL_POINTER;
145+
}
146+
((DefaultMQPushConsumer *) consumer)->setNamesrvDomain(domain);
147+
return OK;
148+
}
142149
int Subscribe(CPushConsumer *consumer, const char *topic, const char *expression) {
143150
if (consumer == NULL) {
144151
return NULL_POINTER;

0 commit comments

Comments
 (0)