Skip to content

Format project with clang-format Chromium #131

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions .clang-format
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
BasedOnStyle: Chromium
ColumnLimit: 120
IndentWidth: 4
DerivePointerAlignment: false
IndentCaseLabels: false
PointerAlignment: Right
SpaceAfterCStyleCast: true
TabWidth: 2
21 changes: 8 additions & 13 deletions example/AsyncProducer.cpp
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,10 @@ class MyAutoDeleteSendCallback : public AutoDeleteSendCallBack {
g_finished.notify_one();
}
}
virtual void onException(MQException& e) {
std::cout << "send Exception" << e << "\n";
}
virtual void onException(MQException& e) { std::cout << "send Exception" << e << "\n"; }
};

void AsyncProducerWorker(RocketmqSendAndConsumerArgs* info,
DefaultMQProducer* producer) {
void AsyncProducerWorker(RocketmqSendAndConsumerArgs* info, DefaultMQProducer* producer) {
while (!g_quit.load()) {
if (g_msgCount.load() <= 0) {
std::unique_lock<std::mutex> lck(g_mtx);
Expand Down Expand Up @@ -99,7 +96,8 @@ int main(int argc, char* argv[]) {

PrintRocketmqSendAndConsumerArgs(info);

if (!info.namesrv.empty()) producer.setNamesrvAddr(info.namesrv);
if (!info.namesrv.empty())
producer.setNamesrvAddr(info.namesrv);

producer.setGroupName(info.groupname);
producer.setInstanceName(info.groupname);
Expand All @@ -110,8 +108,7 @@ int main(int argc, char* argv[]) {
auto start = std::chrono::system_clock::now();
int msgcount = g_msgCount.load();
for (int j = 0; j < info.thread_count; j++) {
std::shared_ptr<std::thread> th =
std::make_shared<std::thread>(AsyncProducerWorker, &info, &producer);
std::shared_ptr<std::thread> th = std::make_shared<std::thread>(AsyncProducerWorker, &info, &producer);
work_pool.push_back(th);
}

Expand All @@ -122,12 +119,10 @@ int main(int argc, char* argv[]) {
}

auto end = std::chrono::system_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);

std::cout
<< "per msg time: " << duration.count() / (double)msgcount << "ms \n"
<< "========================finished==============================\n";
std::cout << "per msg time: " << duration.count() / (double)msgcount << "ms \n"
<< "========================finished==============================\n";

producer.shutdown();
for (size_t th = 0; th != work_pool.size(); ++th) {
Expand Down
8 changes: 4 additions & 4 deletions example/AsyncPushConsumer.cpp
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class MyMsgListener : public MessageListenerConcurrently {
MyMsgListener() {}
virtual ~MyMsgListener() {}

virtual ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
virtual ConsumeStatus consumeMessage(const std::vector<MQMessageExt>& msgs) {
g_msgCount.store(g_msgCount.load() - msgs.size());
for (size_t i = 0; i < msgs.size(); ++i) {
g_tps.Increment();
Expand All @@ -51,7 +51,7 @@ class MyMsgListener : public MessageListenerConcurrently {
}
};

int main(int argc, char *argv[]) {
int main(int argc, char* argv[]) {
RocketmqSendAndConsumerArgs info;
if (!ParseArgs(argc, argv, &info)) {
exit(-1);
Expand Down Expand Up @@ -82,7 +82,7 @@ int main(int argc, char *argv[]) {

try {
consumer.start();
} catch (MQClientException &e) {
} catch (MQClientException& e) {
cout << e << endl;
}
g_tps.start();
Expand All @@ -95,7 +95,7 @@ int main(int argc, char *argv[]) {

try {
producer.send(msg);
} catch (MQException &e) {
} catch (MQException& e) {
std::cout << e << endl; // if catch excepiton , need re-send this msg by
// service
}
Expand Down
152 changes: 73 additions & 79 deletions example/BatchProducer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,97 +34,91 @@ std::mutex g_mtx;
std::condition_variable g_finished;
TpsReportService g_tps;

void SyncProducerWorker(RocketmqSendAndConsumerArgs* info,
DefaultMQProducer* producer) {
while (!g_quit.load()) {
if (g_msgCount.load() <= 0) {
std::unique_lock<std::mutex> lck(g_mtx);
g_finished.notify_one();
break;
}
void SyncProducerWorker(RocketmqSendAndConsumerArgs* info, DefaultMQProducer* producer) {
while (!g_quit.load()) {
if (g_msgCount.load() <= 0) {
std::unique_lock<std::mutex> lck(g_mtx);
g_finished.notify_one();
break;
}

vector<MQMessage> msgs;
MQMessage msg1(info->topic, "*", info->body);
msg1.setProperty("property1", "value1");
MQMessage msg2(info->topic, "*", info->body);
msg2.setProperty("property1", "value1");
msg2.setProperty("property2", "value2");
MQMessage msg3(info->topic, "*", info->body);
msg3.setProperty("property1", "value1");
msg3.setProperty("property2", "value2");
msg3.setProperty("property3", "value3");
msgs.push_back(msg1);
msgs.push_back(msg2);
msgs.push_back(msg3);
try {
auto start = std::chrono::system_clock::now();
SendResult sendResult = producer->send(msgs);
g_tps.Increment();
--g_msgCount;
auto end = std::chrono::system_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
if (duration.count() >= 500) {
std::cout << "send RT more than: " << duration.count()
<< " ms with msgid: " << sendResult.getMsgId() << endl;
}
} catch (const MQException& e) {
std::cout << "send failed: " << e.what() << std::endl;
std::unique_lock<std::mutex> lck(g_mtx);
g_finished.notify_one();
return;
}
vector<MQMessage> msgs;
MQMessage msg1(info->topic, "*", info->body);
msg1.setProperty("property1", "value1");
MQMessage msg2(info->topic, "*", info->body);
msg2.setProperty("property1", "value1");
msg2.setProperty("property2", "value2");
MQMessage msg3(info->topic, "*", info->body);
msg3.setProperty("property1", "value1");
msg3.setProperty("property2", "value2");
msg3.setProperty("property3", "value3");
msgs.push_back(msg1);
msgs.push_back(msg2);
msgs.push_back(msg3);
try {
auto start = std::chrono::system_clock::now();
SendResult sendResult = producer->send(msgs);
g_tps.Increment();
--g_msgCount;
auto end = std::chrono::system_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
if (duration.count() >= 500) {
std::cout << "send RT more than: " << duration.count() << " ms with msgid: " << sendResult.getMsgId() << endl;
}
} catch (const MQException& e) {
std::cout << "send failed: " << e.what() << std::endl;
std::unique_lock<std::mutex> lck(g_mtx);
g_finished.notify_one();
return;
}
}
}

int main(int argc, char* argv[]) {
RocketmqSendAndConsumerArgs info;
if (!ParseArgs(argc, argv, &info)) {
exit(-1);
}
PrintRocketmqSendAndConsumerArgs(info);
DefaultMQProducer producer("please_rename_unique_group_name");
producer.setNamesrvAddr(info.namesrv);
producer.setNamesrvDomain(info.namesrv_domain);
producer.setGroupName(info.groupname);
producer.setInstanceName(info.groupname);
producer.setSessionCredentials("mq acesskey", "mq secretkey", "ALIYUN");
producer.setSendMsgTimeout(500);
producer.setTcpTransportTryLockTimeout(1000);
producer.setTcpTransportConnectTimeout(400);
RocketmqSendAndConsumerArgs info;
if (!ParseArgs(argc, argv, &info)) {
exit(-1);
}
PrintRocketmqSendAndConsumerArgs(info);
DefaultMQProducer producer("please_rename_unique_group_name");
producer.setNamesrvAddr(info.namesrv);
producer.setNamesrvDomain(info.namesrv_domain);
producer.setGroupName(info.groupname);
producer.setInstanceName(info.groupname);
producer.setSessionCredentials("mq acesskey", "mq secretkey", "ALIYUN");
producer.setSendMsgTimeout(500);
producer.setTcpTransportTryLockTimeout(1000);
producer.setTcpTransportConnectTimeout(400);

producer.start();
std::vector<std::shared_ptr<std::thread>> work_pool;
auto start = std::chrono::system_clock::now();
int msgcount = g_msgCount.load();
g_tps.start();
producer.start();
std::vector<std::shared_ptr<std::thread>> work_pool;
auto start = std::chrono::system_clock::now();
int msgcount = g_msgCount.load();
g_tps.start();

int threadCount = info.thread_count;
for (int j = 0; j < threadCount; j++) {
std::shared_ptr<std::thread> th =
std::make_shared<std::thread>(SyncProducerWorker, &info, &producer);
work_pool.push_back(th);
}
int threadCount = info.thread_count;
for (int j = 0; j < threadCount; j++) {
std::shared_ptr<std::thread> th = std::make_shared<std::thread>(SyncProducerWorker, &info, &producer);
work_pool.push_back(th);
}

{
std::unique_lock<std::mutex> lck(g_mtx);
g_finished.wait(lck);
g_quit.store(true);
}
{
std::unique_lock<std::mutex> lck(g_mtx);
g_finished.wait(lck);
g_quit.store(true);
}

auto end = std::chrono::system_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
auto end = std::chrono::system_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);

std::cout
<< "per msg time: " << duration.count() / (double)msgcount << "ms \n"
std::cout << "per msg time: " << duration.count() / (double)msgcount << "ms \n"
<< "========================finished==============================\n";

for (size_t th = 0; th != work_pool.size(); ++th) {
work_pool[th]->join();
}
for (size_t th = 0; th != work_pool.size(); ++th) {
work_pool[th]->join();
}

producer.shutdown();
producer.shutdown();

return 0;
return 0;
}
87 changes: 43 additions & 44 deletions example/CAsyncProducer.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,61 +29,60 @@

void thread_sleep(unsigned milliseconds) {
#ifdef _WIN32
Sleep(milliseconds);
Sleep(milliseconds);
#else
usleep(milliseconds * 1000); // takes microseconds
usleep(milliseconds * 1000); // takes microseconds
#endif
}

void SendSuccessCallback(CSendResult result){
printf("async send success, msgid:%s\n", result.msgId);
void SendSuccessCallback(CSendResult result) {
printf("async send success, msgid:%s\n", result.msgId);
}

void SendExceptionCallback(CMQException e){
char msg[1024];
snprintf(msg, sizeof(msg), "error:%d, msg:%s, file:%s:%d", e.error, e.msg, e.file, e.line);
printf("async send exception %s\n", msg);
void SendExceptionCallback(CMQException e) {
char msg[1024];
snprintf(msg, sizeof(msg), "error:%d, msg:%s, file:%s:%d", e.error, e.msg, e.file, e.line);
printf("async send exception %s\n", msg);
}

void StartSendMessage(CProducer *producer) {
int i = 0;
int ret_code = 0;
char body[128];
CMessage *msg = CreateMessage("T_TestTopic");
SetMessageTags(msg, "Test_Tag");
SetMessageKeys(msg, "Test_Keys");
for (i = 0; i < 10; i++) {
memset(body, 0, sizeof(body));
snprintf(body, sizeof(body), "new message body, index %d", i);
SetMessageBody(msg, body);
ret_code = SendMessageAsync(producer, msg, SendSuccessCallback , SendExceptionCallback);
printf("async send message[%d] return code: %d\n", i, ret_code);
thread_sleep(1000);
}
DestroyMessage(msg);
void StartSendMessage(CProducer* producer) {
int i = 0;
int ret_code = 0;
char body[128];
CMessage* msg = CreateMessage("T_TestTopic");
SetMessageTags(msg, "Test_Tag");
SetMessageKeys(msg, "Test_Keys");
for (i = 0; i < 10; i++) {
memset(body, 0, sizeof(body));
snprintf(body, sizeof(body), "new message body, index %d", i);
SetMessageBody(msg, body);
ret_code = SendMessageAsync(producer, msg, SendSuccessCallback, SendExceptionCallback);
printf("async send message[%d] return code: %d\n", i, ret_code);
thread_sleep(1000);
}
DestroyMessage(msg);
}

void CreateProducerAndStartSendMessage(int i){
printf("Producer Initializing.....\n");
CProducer *producer = CreateProducer("Group_producer");
SetProducerNameServerAddress(producer, "127.0.0.1:9876");
if(i == 1){
SetProducerSendMsgTimeout(producer , 3);
}
StartProducer(producer);
printf("Producer start.....\n");
StartSendMessage(producer);
ShutdownProducer(producer);
DestroyProducer(producer);
printf("Producer Shutdown!\n");
void CreateProducerAndStartSendMessage(int i) {
printf("Producer Initializing.....\n");
CProducer* producer = CreateProducer("Group_producer");
SetProducerNameServerAddress(producer, "127.0.0.1:9876");
if (i == 1) {
SetProducerSendMsgTimeout(producer, 3);
}
StartProducer(producer);
printf("Producer start.....\n");
StartSendMessage(producer);
ShutdownProducer(producer);
DestroyProducer(producer);
printf("Producer Shutdown!\n");
}

int main(int argc, char *argv[]) {
printf("Send Async successCallback.....\n");
CreateProducerAndStartSendMessage(0);
int main(int argc, char* argv[]) {
printf("Send Async successCallback.....\n");
CreateProducerAndStartSendMessage(0);

printf("Send Async exceptionCallback.....\n");
CreateProducerAndStartSendMessage(1);
return 0;
printf("Send Async exceptionCallback.....\n");
CreateProducerAndStartSendMessage(1);
return 0;
}

Loading