Skip to content

Commit 46dd8f5

Browse files
authored
feat(version): add maxConsumerTimes to support higher client version (#230)
* feat(version): add maxConsumerTimes to support higher client version * feat(version): add maxConsumerTimes to support higher client version * feat(version): add maxConsumerTimes to support higher client version
1 parent 51ef3a6 commit 46dd8f5

9 files changed

+96
-4
lines changed

include/DefaultMQPushConsumer.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer : public MQConsumer {
111111
*/
112112
void setConsumeThreadCount(int threadCount);
113113
int getConsumeThreadCount() const;
114+
void setMaxReconsumeTimes(int maxReconsumeTimes);
115+
int getMaxReconsumeTimes() const;
114116

115117
/*
116118
set pullMsg thread count, default value is cpu cores
@@ -144,6 +146,7 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer : public MQConsumer {
144146
MQMessageListener* m_pMessageListener;
145147
int m_consumeMessageBatchMaxSize;
146148
int m_maxMsgCacheSize;
149+
int m_maxReconsumeTimes = -1;
147150
boost::asio::io_service m_async_ioService;
148151
boost::scoped_ptr<boost::thread> m_async_service_thread;
149152

src/MQClientAPIImpl.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -836,11 +836,16 @@ void MQClientAPIImpl::consumerSendMessageBack(const string addr,
836836
const string& consumerGroup,
837837
int delayLevel,
838838
int timeoutMillis,
839+
int maxReconsumeTimes,
839840
const SessionCredentials& sessionCredentials) {
840841
ConsumerSendMsgBackRequestHeader* pRequestHeader = new ConsumerSendMsgBackRequestHeader();
841842
pRequestHeader->group = consumerGroup;
842843
pRequestHeader->offset = msg.getCommitLogOffset();
843844
pRequestHeader->delayLevel = delayLevel;
845+
pRequestHeader->unitMode = false;
846+
pRequestHeader->originTopic = msg.getTopic();
847+
pRequestHeader->originMsgId = msg.getMsgId();
848+
pRequestHeader->maxReconsumeTimes = maxReconsumeTimes;
844849

845850
// string addr = socketAddress2IPPort(msg.getStoreHost());
846851
RemotingCommand request(CONSUMER_SEND_MSG_BACK, pRequestHeader);

src/MQClientAPIImpl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ class MQClientAPIImpl {
174174
const string& consumerGroup,
175175
int delayLevel,
176176
int timeoutMillis,
177+
int maxReconsumeTimes,
177178
const SessionCredentials& sessionCredentials);
178179

179180
virtual void lockBatchMQ(const string& addr,

src/consumer/ConsumeMessageConcurrentlyService.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,9 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullReque
194194
msgs[i].getReconsumeTimes());
195195
if (m_pConsumer->getConsumeType() == CONSUME_PASSIVELY) {
196196
string brokerName = request->m_messageQueue.getBrokerName();
197+
if (m_pConsumer->isUseNameSpaceMode()) {
198+
MessageAccessor::withNameSpace(msgs[i], m_pConsumer->getNameSpace());
199+
}
197200
if (!m_pConsumer->sendMessageBack(msgs[i], 0, brokerName)) {
198201
LOG_WARN("Send message back fail, MQ is:%s, its msgId is:%s, index is:%d, re-consume times is:%d",
199202
(request->m_messageQueue).toString().c_str(), msgs[i].getMsgId().c_str(), i,

src/consumer/DefaultMQPushConsumer.cpp

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,8 +258,8 @@ bool DefaultMQPushConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel, s
258258
else
259259
brokerAddr = socketAddress2IPPort(msg.getStoreHost());
260260
try {
261-
getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(brokerAddr, msg, getGroupName(), delayLevel, 3000,
262-
getSessionCredentials());
261+
getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(brokerAddr, msg, getGroupName(), delayLevel,
262+
getMaxReconsumeTimes(), 3000, getSessionCredentials());
263263
} catch (MQException& e) {
264264
LOG_ERROR(e.what());
265265
return false;
@@ -918,6 +918,21 @@ void DefaultMQPushConsumer::setConsumeThreadCount(int threadCount) {
918918
int DefaultMQPushConsumer::getConsumeThreadCount() const {
919919
return m_consumeThreadCount;
920920
}
921+
void DefaultMQPushConsumer::setMaxReconsumeTimes(int maxReconsumeTimes) {
922+
if (maxReconsumeTimes > 0) {
923+
m_maxReconsumeTimes = maxReconsumeTimes;
924+
} else {
925+
LOG_ERROR("set maxReconsumeTimes with invalid value");
926+
}
927+
}
928+
929+
int DefaultMQPushConsumer::getMaxReconsumeTimes() const {
930+
if (m_maxReconsumeTimes >= 0) {
931+
return m_maxReconsumeTimes;
932+
}
933+
// return 16 as default;
934+
return 16;
935+
}
921936

922937
void DefaultMQPushConsumer::setPullMsgThreadPoolCount(int threadCount) {
923938
m_pullMsgThreadPoolNum = threadCount;

src/protocol/CommandHeader.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -492,16 +492,20 @@ void ConsumerSendMsgBackRequestHeader::Encode(Json::Value& outData) {
492492
outData["group"] = group;
493493
outData["delayLevel"] = delayLevel;
494494
outData["offset"] = UtilAll::to_string(offset);
495-
#ifdef ONS
495+
outData["unitMode"] = UtilAll::to_string(unitMode);
496496
outData["originMsgId"] = originMsgId;
497497
outData["originTopic"] = originTopic;
498-
#endif
498+
outData["maxReconsumeTimes"] = maxReconsumeTimes;
499499
}
500500

501501
void ConsumerSendMsgBackRequestHeader::SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap) {
502502
requestMap.insert(pair<string, string>("group", group));
503503
requestMap.insert(pair<string, string>("delayLevel", UtilAll::to_string(delayLevel)));
504504
requestMap.insert(pair<string, string>("offset", UtilAll::to_string(offset)));
505+
requestMap.insert(pair<string, string>("unitMode", UtilAll::to_string(unitMode)));
506+
requestMap.insert(pair<string, string>("originMsgId", originMsgId));
507+
requestMap.insert(pair<string, string>("originTopic", originTopic));
508+
requestMap.insert(pair<string, string>("maxReconsumeTimes", UtilAll::to_string(maxReconsumeTimes)));
505509
}
506510
//<!***************************************************************************
507511
void GetConsumerListByGroupResponseBody::Decode(const MemoryBlock* mem, vector<string>& cids) {

src/protocol/CommandHeader.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,10 @@ class ConsumerSendMsgBackRequestHeader : public CommandHeader {
456456
string group;
457457
int delayLevel;
458458
int64 offset;
459+
bool unitMode = false;
460+
string originMsgId;
461+
string originTopic;
462+
int maxReconsumeTimes = 16;
459463
};
460464

461465
//<!***************************************************************************

test/src/MQClientAPIImpTest.cpp

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,26 @@ TEST(MQClientAPIImplTest, sendMessage) {
170170
EXPECT_EQ(result.getMessageQueue().getBrokerName(), "testBroker");
171171
EXPECT_EQ(result.getMessageQueue().getTopic(), "testTopic");
172172
}
173+
174+
TEST(MQClientAPIImplTest, consumerSendMessageBack) {
175+
SessionCredentials sc;
176+
MQMessageExt msg;
177+
MockMQClientAPIImpl* impl = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockClientAPIImpl();
178+
Mock::AllowLeak(impl);
179+
MockTcpRemotingClient* pClient = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockRemotingClient();
180+
Mock::AllowLeak(pClient);
181+
RemotingCommand* pCommandFailed = new RemotingCommand(SYSTEM_ERROR, nullptr);
182+
RemotingCommand* pCommandSuccuss = new RemotingCommand(SUCCESS_VALUE, nullptr);
183+
EXPECT_CALL(*pClient, invokeSync(_, _, _))
184+
.Times(3)
185+
.WillOnce(Return(nullptr))
186+
.WillOnce(Return(pCommandFailed))
187+
.WillOnce(Return(pCommandSuccuss));
188+
EXPECT_ANY_THROW(impl->consumerSendMessageBack("127.0.0.0:10911", msg, "testGroup", 0, 1000, 16, sc));
189+
EXPECT_ANY_THROW(impl->consumerSendMessageBack("127.0.0.0:10911", msg, "testGroup", 0, 1000, 16, sc));
190+
EXPECT_NO_THROW(impl->consumerSendMessageBack("127.0.0.0:10911", msg, "testGroup", 0, 1000, 16, sc));
191+
}
192+
173193
int main(int argc, char* argv[]) {
174194
InitGoogleMock(&argc, argv);
175195
testing::GTEST_FLAG(filter) = "MQClientAPIImplTest.*";

test/src/protocol/CommandHeaderTest.cpp

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,43 @@ using rocketmq::UnregisterClientRequestHeader;
6767
using rocketmq::UpdateConsumerOffsetRequestHeader;
6868
using rocketmq::ViewMessageRequestHeader;
6969

70+
TEST(commandHeader, ConsumerSendMsgBackRequestHeader) {
71+
string group = "testGroup";
72+
int delayLevel = 2;
73+
int64 offset = 3027;
74+
bool unitMode = true;
75+
string originMsgId = "testOriginMsgId";
76+
string originTopic = "testTopic";
77+
int maxReconsumeTimes = 12;
78+
ConsumerSendMsgBackRequestHeader header;
79+
header.group = group;
80+
header.delayLevel = delayLevel;
81+
header.offset = offset;
82+
header.unitMode = unitMode;
83+
header.originMsgId = originMsgId;
84+
header.originTopic = originTopic;
85+
header.maxReconsumeTimes = maxReconsumeTimes;
86+
map<string, string> requestMap;
87+
header.SetDeclaredFieldOfCommandHeader(requestMap);
88+
EXPECT_EQ(requestMap["group"], group);
89+
EXPECT_EQ(requestMap["delayLevel"], "2");
90+
EXPECT_EQ(requestMap["offset"], "3027");
91+
EXPECT_EQ(requestMap["unitMode"], "1");
92+
EXPECT_EQ(requestMap["originMsgId"], originMsgId);
93+
EXPECT_EQ(requestMap["originTopic"], originTopic);
94+
EXPECT_EQ(requestMap["maxReconsumeTimes"], "12");
95+
96+
Value outData;
97+
header.Encode(outData);
98+
EXPECT_EQ(outData["group"], group);
99+
EXPECT_EQ(outData["delayLevel"], 2);
100+
EXPECT_EQ(outData["offset"], "3027");
101+
EXPECT_EQ(outData["unitMode"], "1");
102+
EXPECT_EQ(outData["originMsgId"], originMsgId);
103+
EXPECT_EQ(outData["originTopic"], originTopic);
104+
EXPECT_EQ(outData["maxReconsumeTimes"], 12);
105+
}
106+
70107
TEST(commandHeader, GetRouteInfoRequestHeader) {
71108
GetRouteInfoRequestHeader header("testTopic");
72109
map<string, string> requestMap;

0 commit comments

Comments
 (0)