From e7244f66ffeb2e039724db7315e993d5589c1c6f Mon Sep 17 00:00:00 2001 From: ShannonDing Date: Tue, 11 Dec 2018 17:55:22 +0800 Subject: [PATCH] Change PullConsumer model to CLUSTER & format example code style --- .gitignore | 6 +-- example/Producer.c | 54 +++++++++++++++---------- example/PullConsumeMessage.c | 24 ++++++++--- example/PushConsumeMessage.c | 56 ++++++++++++++------------ src/consumer/DefaultMQPullConsumer.cpp | 2 +- 5 files changed, 84 insertions(+), 58 deletions(-) diff --git a/.gitignore b/.gitignore index e1f4b40aa..5693999b6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ .idea cmake-build-debug/ -bin -libs - +bin +build +libs/signature/lib tmp_* diff --git a/example/Producer.c b/example/Producer.c index 5888fc8bc..cef8383cb 100644 --- a/example/Producer.c +++ b/example/Producer.c @@ -14,9 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#ifndef WIN32 -#include -#endif + #include #include "CProducer.h" @@ -24,35 +22,47 @@ #include "CMessage.h" #include "CSendResult.h" -void startSendMessage(CProducer* producer) -{ +#ifdef _WIN32 +#include +#else + +#include +#include + +#endif + +void thread_sleep(unsigned milliseconds) { +#ifdef _WIN32 + Sleep(milliseconds); +#else + usleep(milliseconds * 1000); // takes microseconds +#endif +} + +void startSendMessage(CProducer *producer) { int i = 0; char DestMsg[256]; - CMessage* msg = CreateMessage("T_TestTopic"); - SetMessageTags(msg,"Test_Tag"); - SetMessageKeys(msg,"Test_Keys"); + CMessage *msg = CreateMessage("T_TestTopic"); + SetMessageTags(msg, "Test_Tag"); + SetMessageKeys(msg, "Test_Keys"); CSendResult result; - for( i=0; i<10; i++) - { - printf("send one message : %d\n",i); - memset(DestMsg,0,sizeof(DestMsg)); - snprintf(DestMsg,255,"New message body: index %d",i); - SetMessageBody(msg,DestMsg); + for (i = 0; i < 10; i++) { + printf("send one message : %d\n", i); + memset(DestMsg, 0, sizeof(DestMsg)); + snprintf(DestMsg, 255, "New message body: index %d", i); + SetMessageBody(msg, DestMsg); SendMessageSync(producer, msg, &result); - printf("Msg Send ID:%s\n",result.msgId); -#ifndef WIN32 - sleep(1); -#endif + printf("Msg Send ID:%s\n", result.msgId); + thread_sleep(1000); } } -int main(int argc,char * argv [ ]) -{ +int main(int argc, char *argv[]) { printf("Producer Initializing.....\n"); - CProducer* producer = CreateProducer("Group_producer"); - SetProducerNameServerAddress(producer,"172.17.0.2:9876"); + CProducer *producer = CreateProducer("Group_producer"); + SetProducerNameServerAddress(producer, "172.17.0.2:9876"); StartProducer(producer); printf("Producer start.....\n"); startSendMessage(producer); diff --git a/example/PullConsumeMessage.c b/example/PullConsumeMessage.c index 5890cbc58..44e2abdb0 100644 --- a/example/PullConsumeMessage.c +++ b/example/PullConsumeMessage.c @@ -14,12 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#ifndef WIN32 -#include -#endif -#include - +#include #include "CPullConsumer.h" #include "CCommon.h" @@ -27,6 +23,22 @@ #include "CPullResult.h" #include "CMessageQueue.h" +#ifdef _WIN32 +#include +#else + +#include +#include + +#endif + +void thread_sleep(unsigned milliseconds) { +#ifdef _WIN32 + Sleep(milliseconds); +#else + usleep(milliseconds * 1000); // takes microseconds +#endif +} int main(int argc,char * argv []) { @@ -39,7 +51,7 @@ int main(int argc,char * argv []) for( i=0; i<10; i++) { printf("Now Running : %d S\n",i*10); - sleep(10); + thread_sleep(10000); } ShutdownPullConsumer(consumer); DestroyPullConsumer(consumer); diff --git a/example/PushConsumeMessage.c b/example/PushConsumeMessage.c index 6ab19066b..0e2906d23 100644 --- a/example/PushConsumeMessage.c +++ b/example/PushConsumeMessage.c @@ -14,48 +14,52 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#ifndef WIN32 -#include -#else -#include -void sleep(int interval) { - Sleep(interval * 10); -} -#endif -#include - +#include #include "CPushConsumer.h" #include "CCommon.h" #include "CMessageExt.h" +#ifdef _WIN32 +#include +#else + +#include +#include + +#endif + +void thread_sleep(unsigned milliseconds) { +#ifdef _WIN32 + Sleep(milliseconds); +#else + usleep(milliseconds * 1000); // takes microseconds +#endif +} -int doConsumeMessage(struct CPushConsumer * consumer, CMessageExt * msgExt) -{ +int doConsumeMessage(struct CPushConsumer *consumer, CMessageExt *msgExt) { printf("Hello,doConsumeMessage by Application!\n"); - printf("Msg Topic:%s\n",GetMessageTopic(msgExt)); - printf("Msg Tags:%s\n",GetMessageTags(msgExt)); - printf("Msg Keys:%s\n",GetMessageKeys(msgExt)); - printf("Msg Body:%s\n",GetMessageBody(msgExt)); + printf("Msg Topic:%s\n", GetMessageTopic(msgExt)); + printf("Msg Tags:%s\n", GetMessageTags(msgExt)); + printf("Msg Keys:%s\n", GetMessageKeys(msgExt)); + printf("Msg Body:%s\n", GetMessageBody(msgExt)); return E_CONSUME_SUCCESS; } -int main(int argc,char * argv []) -{ +int main(int argc, char *argv[]) { int i = 0; printf("PushConsumer Initializing....\n"); - CPushConsumer* consumer = CreatePushConsumer("Group_Consumer_Test"); - SetPushConsumerNameServerAddress(consumer,"172.17.0.2:9876"); - Subscribe(consumer,"T_TestTopic","*"); - RegisterMessageCallback(consumer,doConsumeMessage); + CPushConsumer *consumer = CreatePushConsumer("Group_Consumer_Test"); + SetPushConsumerNameServerAddress(consumer, "172.17.0.2:9876"); + Subscribe(consumer, "T_TestTopic", "*"); + RegisterMessageCallback(consumer, doConsumeMessage); StartPushConsumer(consumer); printf("Push Consumer Start...\n"); - for( i=0; i<10; i++) - { - printf("Now Running : %d S\n",i*10); - sleep(10); + for (i = 0; i < 10; i++) { + printf("Now Running : %d S\n", i * 10); + thread_sleep(10000); } ShutdownPushConsumer(consumer); DestroyPushConsumer(consumer); diff --git a/src/consumer/DefaultMQPullConsumer.cpp b/src/consumer/DefaultMQPullConsumer.cpp index ff5fbbbab..4aa33f655 100755 --- a/src/consumer/DefaultMQPullConsumer.cpp +++ b/src/consumer/DefaultMQPullConsumer.cpp @@ -43,7 +43,7 @@ DefaultMQPullConsumer::DefaultMQPullConsumer(const string& groupname) string gname = groupname.empty() ? DEFAULT_CONSUMER_GROUP : groupname; setGroupName(gname); - setMessageModel(BROADCASTING); + setMessageModel(CLUSTERING); } DefaultMQPullConsumer::~DefaultMQPullConsumer() {