Skip to content

Commit f33a696

Browse files
committed
remove removeDropedPullRequestOpaque and deleteOpaqueForDropPullRequest.
1 parent 390bfe6 commit f33a696

12 files changed

+8
-77
lines changed

src/MQClientAPIImpl.cpp

+3-19
Original file line numberDiff line numberDiff line change
@@ -430,10 +430,6 @@ void MQClientAPIImpl::sendMessageAsync(const string& addr,
430430
}
431431
}
432432

433-
void MQClientAPIImpl::deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque) {
434-
m_pRemotingClient->deleteOpaqueForDropPullRequest(mq, opaque);
435-
}
436-
437433
PullResult* MQClientAPIImpl::pullMessage(const string& addr,
438434
PullMessageRequestHeader* pRequestHeader,
439435
int timeoutMillis,
@@ -467,21 +463,9 @@ void MQClientAPIImpl::pullMessageAsync(const string& addr,
467463
void* pArg) {
468464
//<!delete in future;
469465
AsyncCallbackWrap* cbw = new PullCallbackWarp(pullCallback, this, pArg);
470-
MQMessageQueue mq;
471-
AsyncArg* pAsyncArg = static_cast<AsyncArg*>(pArg);
472-
if (pAsyncArg && pAsyncArg->pPullRequest) {
473-
mq = pAsyncArg->mq;
474-
pAsyncArg->pPullRequest->setLatestPullRequestOpaque(request.getOpaque());
475-
LOG_DEBUG("pullMessageAsync set opaque:%d, mq:%s", pAsyncArg->pPullRequest->getLatestPullRequestOpaque(),
476-
mq.toString().c_str());
477-
}
478-
479466
if (m_pRemotingClient->invokeAsync(addr, request, cbw, timeoutMillis) == false) {
480-
LOG_ERROR("pullMessageAsync failed of addr:%s, opaque:%d, mq:%s", addr.c_str(), request.getOpaque(),
481-
mq.toString().data());
482-
if (pAsyncArg && pAsyncArg->pPullRequest) {
483-
pAsyncArg->pPullRequest->setLatestPullRequestOpaque(0);
484-
}
467+
LOG_ERROR("pullMessageAsync failed of addr:%s, mq:%s", addr.c_str(),
468+
static_cast<AsyncArg*>(pArg)->mq.toString().data());
485469
deleteAndZero(cbw);
486470
THROW_MQEXCEPTION(MQClientException, "pullMessageAsync failed", -1);
487471
}
@@ -919,4 +903,4 @@ void MQClientAPIImpl::unlockBatchMQ(const string& addr,
919903
}
920904

921905
//<!************************************************************************
922-
} //<!end namespace;
906+
} // namespace rocketmq

src/MQClientAPIImpl.h

-1
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,6 @@ class MQClientAPIImpl {
187187
int64 timeoutMilliseconds,
188188
int maxRetryTimes = 1,
189189
int retrySendTimes = 1);
190-
void deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque);
191190

192191
private:
193192
SendResult sendMessageSync(const string& addr,

src/MQClientFactory.cpp

+2-17
Original file line numberDiff line numberDiff line change
@@ -988,18 +988,6 @@ void MQClientFactory::findConsumerIds(const string& topic,
988988
}
989989
}
990990

991-
void MQClientFactory::removeDropedPullRequestOpaque(PullRequest* pullRequest) {
992-
// delete the opaque record that's ignore the response of this pullrequest when drop pullrequest
993-
if (!pullRequest)
994-
return;
995-
MQMessageQueue mq = pullRequest->m_messageQueue;
996-
int opaque = pullRequest->getLatestPullRequestOpaque();
997-
if (opaque > 0) {
998-
LOG_INFO("####### need delete the pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data());
999-
getMQClientAPIImpl()->deleteOpaqueForDropPullRequest(mq, opaque);
1000-
}
1001-
}
1002-
1003991
void MQClientFactory::resetOffset(const string& group,
1004992
const string& topic,
1005993
const map<MQMessageQueue, int64>& offsetTable) {
@@ -1012,10 +1000,7 @@ void MQClientFactory::resetOffset(const string& group,
10121000
PullRequest* pullreq = pConsumer->getRebalance()->getPullRequest(mq);
10131001
if (pullreq) {
10141002
pullreq->setDroped(true);
1015-
LOG_INFO("resetOffset setDroped for opaque:%d, mq:%s", pullreq->getLatestPullRequestOpaque(),
1016-
mq.toString().data());
1017-
// delete the opaque record that's ignore the response of this pullrequest when drop pullrequest
1018-
// removeDropedPullRequestOpaque(pullreq);
1003+
LOG_INFO("resetOffset setDroped for mq:%s", mq.toString().data());
10191004
pullreq->clearAllMsgs();
10201005
pullreq->updateQueueMaxOffset(it->second);
10211006
} else {
@@ -1102,4 +1087,4 @@ void MQClientFactory::getSessionCredentialsFromOneOfProducerOrConsumer(SessionCr
11021087
}
11031088

11041089
//<!************************************************************************
1105-
} //<!end namespace;
1090+
} // namespace rocketmq

src/MQClientFactory.h

-1
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ class MQClientFactory {
108108
void addBrokerToAddrMap(const string& brokerName, map<int, string>& brokerAddrs);
109109
map<string, map<int, string>> getBrokerAddrMap();
110110
void clearBrokerAddrMap();
111-
void removeDropedPullRequestOpaque(PullRequest* pullRequest);
112111

113112
private:
114113
void unregisterClient(const string& producerGroup,

src/common/AsyncArg.h

+1-2
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,14 @@
2121
#include "MQMessageQueue.h"
2222
#include "PullAPIWrapper.h"
2323
#include "SubscriptionData.h"
24-
#include "../consumer/PullRequest.h"
24+
2525
namespace rocketmq {
2626
//<!***************************************************************************
2727

2828
struct AsyncArg {
2929
MQMessageQueue mq;
3030
SubscriptionData subData;
3131
PullAPIWrapper* pPullWrapper;
32-
PullRequest* pPullRequest;
3332
};
3433

3534
//<!***************************************************************************

src/consumer/DefaultMQPullConsumer.cpp

-1
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,6 @@ void DefaultMQPullConsumer::pullAsyncImpl(const MQMessageQueue& mq,
263263
arg.mq = mq;
264264
arg.subData = *pSData;
265265
arg.pPullWrapper = m_pPullAPIWrapper;
266-
arg.pPullRequest = NULL;
267266

268267
try {
269268
unique_ptr<PullResult> pullResult(m_pPullAPIWrapper->pullKernelImpl(mq, // 1

src/consumer/DefaultMQPushConsumer.cpp

-1
Original file line numberDiff line numberDiff line change
@@ -784,7 +784,6 @@ void DefaultMQPushConsumer::pullMessageAsync(PullRequest* request) {
784784
arg.mq = messageQueue;
785785
arg.subData = *pSdata;
786786
arg.pPullWrapper = m_pPullAPIWrapper;
787-
arg.pPullRequest = request;
788787
try {
789788
request->setLastPullTimestamp(UtilAll::currentTimeMillis());
790789
m_pPullAPIWrapper->pullKernelImpl(messageQueue, // 1

src/consumer/PullRequest.cpp

-12
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ PullRequest::PullRequest(const string& groupname)
2828
m_queueOffsetMax(0),
2929
m_bDroped(false),
3030
m_bLocked(false),
31-
m_latestPullRequestOpaque(0),
3231
m_bPullMsgEventInprogress(false) {}
3332

3433
PullRequest::~PullRequest() {
@@ -46,7 +45,6 @@ PullRequest& PullRequest::operator=(const PullRequest& other) {
4645
m_messageQueue = other.m_messageQueue;
4746
m_msgTreeMap = other.m_msgTreeMap;
4847
m_msgTreeMapTemp = other.m_msgTreeMapTemp;
49-
m_latestPullRequestOpaque = other.m_latestPullRequestOpaque;
5048
}
5149
return *this;
5250
}
@@ -272,15 +270,5 @@ bool PullRequest::addPullMsgEvent() {
272270
return false;
273271
}
274272

275-
int PullRequest::getLatestPullRequestOpaque() {
276-
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
277-
return m_latestPullRequestOpaque;
278-
}
279-
280-
void PullRequest::setLatestPullRequestOpaque(int opaque) {
281-
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
282-
m_latestPullRequestOpaque = opaque;
283-
}
284-
285273
//<!***************************************************************************
286274
} //<!end namespace;

src/consumer/PullRequest.h

-3
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,6 @@ class PullRequest {
6868
boost::timed_mutex& getPullRequestCriticalSection();
6969
void removePullMsgEvent();
7070
bool addPullMsgEvent();
71-
int getLatestPullRequestOpaque();
72-
void setLatestPullRequestOpaque(int opaque);
7371

7472
public:
7573
MQMessageQueue m_messageQueue;
@@ -89,7 +87,6 @@ class PullRequest {
8987
// uint64 m_tryUnlockTimes;
9088
uint64 m_lastPullTimestamp;
9189
uint64 m_lastConsumeTimestamp;
92-
int m_latestPullRequestOpaque;
9390
boost::timed_mutex m_consumeLock;
9491
boost::atomic<bool> m_bPullMsgEventInprogress;
9592
};

src/consumer/Rebalance.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ bool RebalancePush::updateRequestTableInRebalance(const string& topic, vector<MQ
458458
it->second->clearAllMsgs(); // add clear operation to avoid bad state
459459
// when dropped pullRequest returns
460460
// normal
461-
LOG_INFO("drop mq:%s, delete opaque:%d", mqtemp.toString().c_str(), it->second->getLatestPullRequestOpaque());
461+
LOG_INFO("drop mq:%s", mqtemp.toString().c_str());
462462
}
463463
changed = true;
464464
}
@@ -634,4 +634,4 @@ void RebalancePush::removeUnnecessaryMessageQueue(const MQMessageQueue& mq) {
634634
}
635635

636636
//<!************************************************************************
637-
} //<!end namespace;
637+
} // namespace rocketmq

src/transport/TcpRemotingClient.cpp

-16
Original file line numberDiff line numberDiff line change
@@ -639,21 +639,5 @@ void TcpRemotingClient::removeAllTimerCallback() {
639639
m_asyncTimerTable.clear();
640640
}
641641

642-
void TcpRemotingClient::deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque) {
643-
// delete the map record of opaque<->ResponseFuture, so the answer for the pull request will
644-
// discard when receive it later
645-
std::shared_ptr<ResponseFuture> pFuture(findAndDeleteAsyncResponseFuture(opaque));
646-
if (!pFuture) {
647-
pFuture = findAndDeleteResponseFuture(opaque);
648-
if (pFuture) {
649-
LOG_DEBUG("succ deleted the sync pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data());
650-
}
651-
} else {
652-
LOG_DEBUG("succ deleted the async pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data());
653-
// delete the timeout timer for opaque for pullrequest
654-
cancelTimerCallback(opaque);
655-
}
656-
}
657-
658642
//<!************************************************************************
659643
} // namespace rocketmq

src/transport/TcpRemotingClient.h

-2
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,6 @@ class TcpRemotingClient {
5858

5959
void registerProcessor(MQRequestCode requestCode, ClientRemotingProcessor* clientRemotingProcessor);
6060

61-
void deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque);
62-
6361
private:
6462
static void static_messageReceived(void* context, const MemoryBlock& mem, const string& addr);
6563

0 commit comments

Comments
 (0)