Skip to content

[ISSUE #24]Catch CPP exception and convert to the error code in C API #28

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions include/CCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,22 @@ typedef enum _CStatus_{
OK = 0,
// Failed, null pointer value
NULL_POINTER = 1,
MALLOC_FAILED = 2,
PRODUCER_ERROR_CODE_START = 10,
PRODUCER_START_FAILED = 10,
PRODUCER_SEND_SYNC_FAILED = 11,
PRODUCER_SEND_ONEWAY_FAILED = 12,
PRODUCER_SEND_ORDERLY_FAILED = 13,

PUSHCONSUMER_ERROR_CODE_START = 20,
PUSHCONSUMER_START_FAILED = 20,

PULLCONSUMER_ERROR_CODE_START = 30,
PULLCONSUMER_START_FAILED = 30,
PULLCONSUMER_FETCH_MQ_FAILED = 31,
PULLCONSUMER_FETCH_MESSAGE_FAILED = 32
} CStatus;

typedef enum _CLogLevel_{
E_LOG_LEVEL_FATAL = 1,
E_LOG_LEVEL_ERROR = 2,
Expand Down
8 changes: 5 additions & 3 deletions include/CPullConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,18 @@ int SetPullConsumerGroupID(CPullConsumer *consumer, const char *groupId);
const char *GetPullConsumerGroupID(CPullConsumer *consumer);
int SetPullConsumerNameServerAddress(CPullConsumer *consumer, const char *namesrv);
int SetPullConsumerSessionCredentials(CPullConsumer *consumer, const char *accessKey, const char *secretKey,
const char *channel);
const char *channel);
int SetPullConsumerLogPath(CPullConsumer *consumer, const char *logPath);
int SetPullConsumerLogFileNumAndSize(CPullConsumer *consumer, int fileNum, long fileSize);
int SetPullConsumerLogLevel(CPullConsumer *consumer, CLogLevel level);

int FetchSubscriptionMessageQueues(CPullConsumer *consumer, const char *topic, CMessageQueue **mqs , int* size);
int FetchSubscriptionMessageQueues(CPullConsumer *consumer, const char *topic, CMessageQueue **mqs, int *size);
int ReleaseSubscriptionMessageQueue(CMessageQueue *mqs);

CPullResult Pull(CPullConsumer *consumer,const CMessageQueue *mq, const char *subExpression, long long offset, int maxNums);
CPullResult
Pull(CPullConsumer *consumer, const CMessageQueue *mq, const char *subExpression, long long offset, int maxNums);
int ReleasePullResult(CPullResult pullResult);

#ifdef __cplusplus
};
#endif
Expand Down
111 changes: 65 additions & 46 deletions src/extern/CProducer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,23 @@
extern "C" {
#endif
using namespace rocketmq;
using namespace std;

class SelectMessageQueue : public MessageQueueSelector {
public:
SelectMessageQueue(QueueSelectorCallback callback){
public:
SelectMessageQueue(QueueSelectorCallback callback) {
m_pCallback = callback;
}

MQMessageQueue select(const std::vector<MQMessageQueue> &mqs,
const MQMessage &msg, void *arg) {
CMessage * message = (CMessage *) &msg;
const MQMessage &msg, void *arg) {
CMessage *message = (CMessage *) &msg;
//Get the index of sending MQMessageQueue through callback function.
int index = m_pCallback(mqs.size(),message,arg);
int index = m_pCallback(mqs.size(), message, arg);
return mqs[index];
}
private:

private:
QueueSelectorCallback m_pCallback;
};

Expand All @@ -62,7 +64,11 @@ int StartProducer(CProducer *producer) {
if (producer == NULL) {
return NULL_POINTER;
}
((DefaultMQProducer *) producer)->start();
try {
((DefaultMQProducer *) producer)->start();
} catch (exception &e) {
return PRODUCER_START_FAILED;
}
return OK;
}
int ShutdownProducer(CProducer *producer) {
Expand All @@ -85,57 +91,70 @@ int SendMessageSync(CProducer *producer, CMessage *msg, CSendResult *result) {
if (producer == NULL || msg == NULL || result == NULL) {
return NULL_POINTER;
}
DefaultMQProducer *defaultMQProducer = (DefaultMQProducer *) producer;
MQMessage *message = (MQMessage *) msg;
SendResult sendResult = defaultMQProducer->send(*message);
switch (sendResult.getSendStatus()) {
case SEND_OK:
result->sendStatus = E_SEND_OK;
break;
case SEND_FLUSH_DISK_TIMEOUT:
result->sendStatus = E_SEND_FLUSH_DISK_TIMEOUT;
break;
case SEND_FLUSH_SLAVE_TIMEOUT:
result->sendStatus = E_SEND_FLUSH_SLAVE_TIMEOUT;
break;
case SEND_SLAVE_NOT_AVAILABLE:
result->sendStatus = E_SEND_SLAVE_NOT_AVAILABLE;
break;
default:
result->sendStatus = E_SEND_OK;
break;
}
result->offset = sendResult.getQueueOffset();
//strcpy(result->msgId, sendResult.getMsgId().c_str());
strncpy(result->msgId, sendResult.getMsgId().c_str(), MAX_MESSAGE_ID_LENGTH - 1);
result->msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
return OK;
}

int SendMessageOneway(CProducer *producer,CMessage *msg) {
try {
DefaultMQProducer *defaultMQProducer = (DefaultMQProducer *) producer;
MQMessage *message = (MQMessage *) msg;
SendResult sendResult = defaultMQProducer->send(*message);
switch (sendResult.getSendStatus()) {
case SEND_OK:
result->sendStatus = E_SEND_OK;
break;
case SEND_FLUSH_DISK_TIMEOUT:
result->sendStatus = E_SEND_FLUSH_DISK_TIMEOUT;
break;
case SEND_FLUSH_SLAVE_TIMEOUT:
result->sendStatus = E_SEND_FLUSH_SLAVE_TIMEOUT;
break;
case SEND_SLAVE_NOT_AVAILABLE:
result->sendStatus = E_SEND_SLAVE_NOT_AVAILABLE;
break;
default:
result->sendStatus = E_SEND_OK;
break;
}
result->offset = sendResult.getQueueOffset();
strncpy(result->msgId, sendResult.getMsgId().c_str(), MAX_MESSAGE_ID_LENGTH - 1);
result->msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
} catch (exception &e) {
return PRODUCER_SEND_SYNC_FAILED;
}
return OK;
}

int SendMessageOneway(CProducer *producer, CMessage *msg) {
if (producer == NULL || msg == NULL) {
return NULL_POINTER;
}
DefaultMQProducer *defaultMQProducer = (DefaultMQProducer *) producer;
MQMessage *message = (MQMessage *) msg;
defaultMQProducer->sendOneway(*message);
try {
defaultMQProducer->sendOneway(*message);
} catch (exception &e) {
return PRODUCER_SEND_ONEWAY_FAILED;
}
return OK;
}

int SendMessageOrderly(CProducer *producer, CMessage *msg, QueueSelectorCallback callback, void *arg, int autoRetryTimes, CSendResult *result) {
if(producer == NULL || msg == NULL || callback == NULL || arg == NULL || result == NULL){
int
SendMessageOrderly(CProducer *producer, CMessage *msg, QueueSelectorCallback callback, void *arg, int autoRetryTimes,
CSendResult *result) {
if (producer == NULL || msg == NULL || callback == NULL || arg == NULL || result == NULL) {
return NULL_POINTER;
}
DefaultMQProducer *defaultMQProducer = (DefaultMQProducer *) producer;
MQMessage *message = (MQMessage *) msg;
//Constructing SelectMessageQueue objects through function pointer callback
SelectMessageQueue selectMessageQueue(callback);
SendResult sendResult = defaultMQProducer->send(*message,&selectMessageQueue,arg,autoRetryTimes);
//Convert SendStatus to CSendStatus
result->sendStatus = CSendStatus((int)sendResult.getSendStatus());
result->offset = sendResult.getQueueOffset();
strncpy(result->msgId, sendResult.getMsgId().c_str(), MAX_MESSAGE_ID_LENGTH - 1);
result->msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
try {
//Constructing SelectMessageQueue objects through function pointer callback
SelectMessageQueue selectMessageQueue(callback);
SendResult sendResult = defaultMQProducer->send(*message, &selectMessageQueue, arg, autoRetryTimes);
//Convert SendStatus to CSendStatus
result->sendStatus = CSendStatus((int) sendResult.getSendStatus());
result->offset = sendResult.getQueueOffset();
strncpy(result->msgId, sendResult.getMsgId().c_str(), MAX_MESSAGE_ID_LENGTH - 1);
result->msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
} catch (exception &e) {
return PRODUCER_SEND_ORDERLY_FAILED;
}
return OK;
}

Expand Down
18 changes: 14 additions & 4 deletions src/extern/CPullConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ int StartPullConsumer(CPullConsumer *consumer) {
if (consumer == NULL) {
return NULL_POINTER;
}
((DefaultMQPullConsumer *) consumer)->start();
try {
((DefaultMQPullConsumer *) consumer)->start();
} catch (exception &e) {
return PULLCONSUMER_START_FAILED;
}
return OK;
}
int ShutdownPullConsumer(CPullConsumer *consumer) {
Expand Down Expand Up @@ -125,7 +129,7 @@ int FetchSubscriptionMessageQueues(CPullConsumer *consumer, const char *topic, C
if (*mqs == NULL) {
*size = 0;
*mqs = NULL;
return NULL_POINTER;
return MALLOC_FAILED;
}
auto iter = fullMQ.begin();
for (index = 0; iter != fullMQ.end() && index <= fullMQ.size(); ++iter, index++) {
Expand All @@ -136,6 +140,7 @@ int FetchSubscriptionMessageQueues(CPullConsumer *consumer, const char *topic, C
} catch (MQException &e) {
*size = 0;
*mqs = NULL;
return PULLCONSUMER_FETCH_MQ_FAILED;
}
return OK;
}
Expand All @@ -147,12 +152,17 @@ int ReleaseSubscriptionMessageQueue(CMessageQueue *mqs) {
mqs = NULL;
return OK;
}
CPullResult Pull(CPullConsumer *consumer, const CMessageQueue *mq, const char *subExpression, long long offset, int maxNums) {
CPullResult
Pull(CPullConsumer *consumer, const CMessageQueue *mq, const char *subExpression, long long offset, int maxNums) {
CPullResult pullResult;
memset(&pullResult, 0, sizeof(CPullResult));
MQMessageQueue messageQueue(mq->topic, mq->brokerName, mq->queueId);
PullResult cppPullResult;
cppPullResult = ((DefaultMQPullConsumer *) consumer)->pull(messageQueue, subExpression, offset, maxNums);
try {
cppPullResult = ((DefaultMQPullConsumer *) consumer)->pull(messageQueue, subExpression, offset, maxNums);
}catch (exception &e){
cppPullResult.pullStatus = BROKER_TIMEOUT;
}

switch (cppPullResult.pullStatus) {
case FOUND: {
Expand Down
36 changes: 21 additions & 15 deletions src/extern/CPushConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,13 @@ class MessageListenerInner : public MessageListenerConcurrently {
CPushConsumer *m_pconsumer;
};

class MessageListenerOrderlyInner:public MessageListenerOrderly {
class MessageListenerOrderlyInner : public MessageListenerOrderly {
public:
MessageListenerOrderlyInner(CPushConsumer *consumer, MessageCallBack pCallback){
MessageListenerOrderlyInner(CPushConsumer *consumer, MessageCallBack pCallback) {
m_pconsumer = consumer;
m_pMsgReceiveCallback = pCallback;
}

ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
if (m_pMsgReceiveCallback == NULL) {
return RECONSUME_LATER;
Expand All @@ -72,6 +73,7 @@ class MessageListenerOrderlyInner:public MessageListenerOrderly {
}
return CONSUME_SUCCESS;
}

private:
MessageCallBack m_pMsgReceiveCallback;
CPushConsumer *m_pconsumer;
Expand Down Expand Up @@ -103,7 +105,11 @@ int StartPushConsumer(CPushConsumer *consumer) {
if (consumer == NULL) {
return NULL_POINTER;
}
((DefaultMQPushConsumer *) consumer)->start();
try {
((DefaultMQPushConsumer *) consumer)->start();
} catch (exception &e) {
return PUSHCONSUMER_START_FAILED;
}
return OK;
}
int ShutdownPushConsumer(CPushConsumer *consumer) {
Expand Down Expand Up @@ -152,10 +158,10 @@ int RegisterMessageCallback(CPushConsumer *consumer, MessageCallBack pCallback)
}

int RegisterMessageCallbackOrderly(CPushConsumer *consumer, MessageCallBack pCallback) {
if(consumer == NULL || pCallback == NULL){
if (consumer == NULL || pCallback == NULL) {
return NULL_POINTER;
}
MessageListenerOrderlyInner *messageListenerOrderlyInner = new MessageListenerOrderlyInner(consumer,pCallback);
MessageListenerOrderlyInner *messageListenerOrderlyInner = new MessageListenerOrderlyInner(consumer, pCallback);
((DefaultMQPushConsumer *) consumer)->registerMessageListener(messageListenerOrderlyInner);
g_OrderListenerMap[consumer] = messageListenerOrderlyInner;
return OK;
Expand All @@ -164,13 +170,13 @@ int RegisterMessageCallbackOrderly(CPushConsumer *consumer, MessageCallBack pCal

int UnregisterMessageCallbackOrderly(CPushConsumer *consumer) {
if (consumer == NULL) {
return NULL_POINTER;
return NULL_POINTER;
}
map<CPushConsumer *,MessageListenerOrderlyInner *>::iterator iter;
map<CPushConsumer *, MessageListenerOrderlyInner *>::iterator iter;
iter = g_OrderListenerMap.find(consumer);
if(iter != g_OrderListenerMap.end()){
if (iter != g_OrderListenerMap.end()) {
MessageListenerOrderlyInner *listenerInner = iter->second;
if(listenerInner != NULL){
if (listenerInner != NULL) {
delete listenerInner;
}
g_OrderListenerMap.erase(iter);
Expand All @@ -195,11 +201,11 @@ int UnregisterMessageCallback(CPushConsumer *consumer) {
return OK;
}

int SetPushConsumerMessageModel(CPushConsumer *consumer, CMessageModel messageModel){
if(consumer == NULL){
int SetPushConsumerMessageModel(CPushConsumer *consumer, CMessageModel messageModel) {
if (consumer == NULL) {
return NULL_POINTER;
}
((DefaultMQPushConsumer *) consumer)->setMessageModel(MessageModel((int)messageModel));
((DefaultMQPushConsumer *) consumer)->setMessageModel(MessageModel((int) messageModel));
return OK;
}
int SetPushConsumerThreadCount(CPushConsumer *consumer, int threadCount) {
Expand All @@ -226,7 +232,7 @@ int SetPushConsumerInstanceName(CPushConsumer *consumer, const char *instanceNam
}

int SetPushConsumerSessionCredentials(CPushConsumer *consumer, const char *accessKey, const char *secretKey,
const char *channel) {
const char *channel) {
if (consumer == NULL) {
return NULL_POINTER;
}
Expand All @@ -247,15 +253,15 @@ int SetPushConsumerLogFileNumAndSize(CPushConsumer *consumer, int fileNum, long
if (consumer == NULL) {
return NULL_POINTER;
}
((DefaultMQPushConsumer *) consumer)->setLogFileSizeAndNum(fileNum,fileSize);
((DefaultMQPushConsumer *) consumer)->setLogFileSizeAndNum(fileNum, fileSize);
return OK;
}

int SetPushConsumerLogLevel(CPushConsumer *consumer, CLogLevel level) {
if (consumer == NULL) {
return NULL_POINTER;
}
((DefaultMQPushConsumer *) consumer)->setLogLevel((elogLevel)level);
((DefaultMQPushConsumer *) consumer)->setLogLevel((elogLevel) level);
return OK;
}

Expand Down