Skip to content

Commit 4089bfa

Browse files
committed
remove removeDropedPullRequestOpaque and deleteOpaqueForDropPullRequest.
1 parent 5689607 commit 4089bfa

6 files changed

+0
-38
lines changed

src/MQClientAPIImpl.cpp

-4
Original file line numberDiff line numberDiff line change
@@ -430,10 +430,6 @@ void MQClientAPIImpl::sendMessageAsync(const string& addr,
430430
}
431431
}
432432

433-
void MQClientAPIImpl::deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque) {
434-
m_pRemotingClient->deleteOpaqueForDropPullRequest(mq, opaque);
435-
}
436-
437433
PullResult* MQClientAPIImpl::pullMessage(const string& addr,
438434
PullMessageRequestHeader* pRequestHeader,
439435
int timeoutMillis,

src/MQClientAPIImpl.h

-1
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,6 @@ class MQClientAPIImpl {
187187
int64 timeoutMilliseconds,
188188
int maxRetryTimes = 1,
189189
int retrySendTimes = 1);
190-
void deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque);
191190

192191
private:
193192
SendResult sendMessageSync(const string& addr,

src/MQClientFactory.cpp

-14
Original file line numberDiff line numberDiff line change
@@ -988,18 +988,6 @@ void MQClientFactory::findConsumerIds(const string& topic,
988988
}
989989
}
990990

991-
void MQClientFactory::removeDropedPullRequestOpaque(PullRequest* pullRequest) {
992-
// delete the opaque record that's ignore the response of this pullrequest when drop pullrequest
993-
if (!pullRequest)
994-
return;
995-
MQMessageQueue mq = pullRequest->m_messageQueue;
996-
int opaque = pullRequest->getLatestPullRequestOpaque();
997-
if (opaque > 0) {
998-
LOG_INFO("####### need delete the pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data());
999-
getMQClientAPIImpl()->deleteOpaqueForDropPullRequest(mq, opaque);
1000-
}
1001-
}
1002-
1003991
void MQClientFactory::resetOffset(const string& group,
1004992
const string& topic,
1005993
const map<MQMessageQueue, int64>& offsetTable) {
@@ -1014,8 +1002,6 @@ void MQClientFactory::resetOffset(const string& group,
10141002
pullreq->setDroped(true);
10151003
LOG_INFO("resetOffset setDroped for opaque:%d, mq:%s", pullreq->getLatestPullRequestOpaque(),
10161004
mq.toString().data());
1017-
// delete the opaque record that's ignore the response of this pullrequest when drop pullrequest
1018-
// removeDropedPullRequestOpaque(pullreq);
10191005
pullreq->clearAllMsgs();
10201006
pullreq->updateQueueMaxOffset(it->second);
10211007
} else {

src/MQClientFactory.h

-1
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ class MQClientFactory {
108108
void addBrokerToAddrMap(const string& brokerName, map<int, string>& brokerAddrs);
109109
map<string, map<int, string>> getBrokerAddrMap();
110110
void clearBrokerAddrMap();
111-
void removeDropedPullRequestOpaque(PullRequest* pullRequest);
112111

113112
private:
114113
void unregisterClient(const string& producerGroup,

src/transport/TcpRemotingClient.cpp

-16
Original file line numberDiff line numberDiff line change
@@ -639,21 +639,5 @@ void TcpRemotingClient::removeAllTimerCallback() {
639639
m_asyncTimerTable.clear();
640640
}
641641

642-
void TcpRemotingClient::deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque) {
643-
// delete the map record of opaque<->ResponseFuture, so the answer for the pull request will
644-
// discard when receive it later
645-
std::shared_ptr<ResponseFuture> pFuture(findAndDeleteAsyncResponseFuture(opaque));
646-
if (!pFuture) {
647-
pFuture = findAndDeleteResponseFuture(opaque);
648-
if (pFuture) {
649-
LOG_DEBUG("succ deleted the sync pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data());
650-
}
651-
} else {
652-
LOG_DEBUG("succ deleted the async pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data());
653-
// delete the timeout timer for opaque for pullrequest
654-
cancelTimerCallback(opaque);
655-
}
656-
}
657-
658642
//<!************************************************************************
659643
} // namespace rocketmq

src/transport/TcpRemotingClient.h

-2
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,6 @@ class TcpRemotingClient {
5858

5959
void registerProcessor(MQRequestCode requestCode, ClientRemotingProcessor* clientRemotingProcessor);
6060

61-
void deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque);
62-
6361
private:
6462
static void static_messageReceived(void* context, const MemoryBlock& mem, const string& addr);
6563

0 commit comments

Comments
 (0)