Skip to content

remove removeDropedPullRequestOpaque and deleteOpaqueForDropPullRequest. #157

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 3 additions & 19 deletions src/MQClientAPIImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,6 @@ void MQClientAPIImpl::sendMessageAsync(const string& addr,
}
}

void MQClientAPIImpl::deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque) {
m_pRemotingClient->deleteOpaqueForDropPullRequest(mq, opaque);
}

PullResult* MQClientAPIImpl::pullMessage(const string& addr,
PullMessageRequestHeader* pRequestHeader,
int timeoutMillis,
Expand Down Expand Up @@ -467,21 +463,9 @@ void MQClientAPIImpl::pullMessageAsync(const string& addr,
void* pArg) {
//<!delete in future;
AsyncCallbackWrap* cbw = new PullCallbackWarp(pullCallback, this, pArg);
MQMessageQueue mq;
AsyncArg* pAsyncArg = static_cast<AsyncArg*>(pArg);
if (pAsyncArg && pAsyncArg->pPullRequest) {
mq = pAsyncArg->mq;
pAsyncArg->pPullRequest->setLatestPullRequestOpaque(request.getOpaque());
LOG_DEBUG("pullMessageAsync set opaque:%d, mq:%s", pAsyncArg->pPullRequest->getLatestPullRequestOpaque(),
mq.toString().c_str());
}

if (m_pRemotingClient->invokeAsync(addr, request, cbw, timeoutMillis) == false) {
LOG_ERROR("pullMessageAsync failed of addr:%s, opaque:%d, mq:%s", addr.c_str(), request.getOpaque(),
mq.toString().data());
if (pAsyncArg && pAsyncArg->pPullRequest) {
pAsyncArg->pPullRequest->setLatestPullRequestOpaque(0);
}
LOG_ERROR("pullMessageAsync failed of addr:%s, mq:%s", addr.c_str(),
static_cast<AsyncArg*>(pArg)->mq.toString().data());
deleteAndZero(cbw);
THROW_MQEXCEPTION(MQClientException, "pullMessageAsync failed", -1);
}
Expand Down Expand Up @@ -919,4 +903,4 @@ void MQClientAPIImpl::unlockBatchMQ(const string& addr,
}

//<!************************************************************************
} //<!end namespace;
} // namespace rocketmq
1 change: 0 additions & 1 deletion src/MQClientAPIImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ class MQClientAPIImpl {
int64 timeoutMilliseconds,
int maxRetryTimes = 1,
int retrySendTimes = 1);
void deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque);

private:
SendResult sendMessageSync(const string& addr,
Expand Down
19 changes: 2 additions & 17 deletions src/MQClientFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -988,18 +988,6 @@ void MQClientFactory::findConsumerIds(const string& topic,
}
}

void MQClientFactory::removeDropedPullRequestOpaque(PullRequest* pullRequest) {
// delete the opaque record that's ignore the response of this pullrequest when drop pullrequest
if (!pullRequest)
return;
MQMessageQueue mq = pullRequest->m_messageQueue;
int opaque = pullRequest->getLatestPullRequestOpaque();
if (opaque > 0) {
LOG_INFO("####### need delete the pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data());
getMQClientAPIImpl()->deleteOpaqueForDropPullRequest(mq, opaque);
}
}

void MQClientFactory::resetOffset(const string& group,
const string& topic,
const map<MQMessageQueue, int64>& offsetTable) {
Expand All @@ -1012,10 +1000,7 @@ void MQClientFactory::resetOffset(const string& group,
PullRequest* pullreq = pConsumer->getRebalance()->getPullRequest(mq);
if (pullreq) {
pullreq->setDroped(true);
LOG_INFO("resetOffset setDroped for opaque:%d, mq:%s", pullreq->getLatestPullRequestOpaque(),
mq.toString().data());
// delete the opaque record that's ignore the response of this pullrequest when drop pullrequest
// removeDropedPullRequestOpaque(pullreq);
LOG_INFO("resetOffset setDroped for mq:%s", mq.toString().data());
pullreq->clearAllMsgs();
pullreq->updateQueueMaxOffset(it->second);
} else {
Expand Down Expand Up @@ -1102,4 +1087,4 @@ void MQClientFactory::getSessionCredentialsFromOneOfProducerOrConsumer(SessionCr
}

//<!************************************************************************
} //<!end namespace;
} // namespace rocketmq
1 change: 0 additions & 1 deletion src/MQClientFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ class MQClientFactory {
void addBrokerToAddrMap(const string& brokerName, map<int, string>& brokerAddrs);
map<string, map<int, string>> getBrokerAddrMap();
void clearBrokerAddrMap();
void removeDropedPullRequestOpaque(PullRequest* pullRequest);

private:
void unregisterClient(const string& producerGroup,
Expand Down
3 changes: 1 addition & 2 deletions src/common/AsyncArg.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@
#include "MQMessageQueue.h"
#include "PullAPIWrapper.h"
#include "SubscriptionData.h"
#include "../consumer/PullRequest.h"

namespace rocketmq {
//<!***************************************************************************

struct AsyncArg {
MQMessageQueue mq;
SubscriptionData subData;
PullAPIWrapper* pPullWrapper;
PullRequest* pPullRequest;
};

//<!***************************************************************************
Expand Down
1 change: 0 additions & 1 deletion src/consumer/DefaultMQPullConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ void DefaultMQPullConsumer::pullAsyncImpl(const MQMessageQueue& mq,
arg.mq = mq;
arg.subData = *pSData;
arg.pPullWrapper = m_pPullAPIWrapper;
arg.pPullRequest = NULL;

try {
unique_ptr<PullResult> pullResult(m_pPullAPIWrapper->pullKernelImpl(mq, // 1
Expand Down
1 change: 0 additions & 1 deletion src/consumer/DefaultMQPushConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,6 @@ void DefaultMQPushConsumer::pullMessageAsync(PullRequest* request) {
arg.mq = messageQueue;
arg.subData = *pSdata;
arg.pPullWrapper = m_pPullAPIWrapper;
arg.pPullRequest = request;
try {
request->setLastPullTimestamp(UtilAll::currentTimeMillis());
m_pPullAPIWrapper->pullKernelImpl(messageQueue, // 1
Expand Down
12 changes: 0 additions & 12 deletions src/consumer/PullRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ PullRequest::PullRequest(const string& groupname)
m_queueOffsetMax(0),
m_bDroped(false),
m_bLocked(false),
m_latestPullRequestOpaque(0),
m_bPullMsgEventInprogress(false) {}

PullRequest::~PullRequest() {
Expand All @@ -46,7 +45,6 @@ PullRequest& PullRequest::operator=(const PullRequest& other) {
m_messageQueue = other.m_messageQueue;
m_msgTreeMap = other.m_msgTreeMap;
m_msgTreeMapTemp = other.m_msgTreeMapTemp;
m_latestPullRequestOpaque = other.m_latestPullRequestOpaque;
}
return *this;
}
Expand Down Expand Up @@ -272,15 +270,5 @@ bool PullRequest::addPullMsgEvent() {
return false;
}

int PullRequest::getLatestPullRequestOpaque() {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
return m_latestPullRequestOpaque;
}

void PullRequest::setLatestPullRequestOpaque(int opaque) {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
m_latestPullRequestOpaque = opaque;
}

//<!***************************************************************************
} //<!end namespace;
3 changes: 0 additions & 3 deletions src/consumer/PullRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ class PullRequest {
boost::timed_mutex& getPullRequestCriticalSection();
void removePullMsgEvent();
bool addPullMsgEvent();
int getLatestPullRequestOpaque();
void setLatestPullRequestOpaque(int opaque);

public:
MQMessageQueue m_messageQueue;
Expand All @@ -89,7 +87,6 @@ class PullRequest {
// uint64 m_tryUnlockTimes;
uint64 m_lastPullTimestamp;
uint64 m_lastConsumeTimestamp;
int m_latestPullRequestOpaque;
boost::timed_mutex m_consumeLock;
boost::atomic<bool> m_bPullMsgEventInprogress;
};
Expand Down
4 changes: 2 additions & 2 deletions src/consumer/Rebalance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ bool RebalancePush::updateRequestTableInRebalance(const string& topic, vector<MQ
it->second->clearAllMsgs(); // add clear operation to avoid bad state
// when dropped pullRequest returns
// normal
LOG_INFO("drop mq:%s, delete opaque:%d", mqtemp.toString().c_str(), it->second->getLatestPullRequestOpaque());
LOG_INFO("drop mq:%s", mqtemp.toString().c_str());
}
changed = true;
}
Expand Down Expand Up @@ -634,4 +634,4 @@ void RebalancePush::removeUnnecessaryMessageQueue(const MQMessageQueue& mq) {
}

//<!************************************************************************
} //<!end namespace;
} // namespace rocketmq
16 changes: 0 additions & 16 deletions src/transport/TcpRemotingClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -639,21 +639,5 @@ void TcpRemotingClient::removeAllTimerCallback() {
m_asyncTimerTable.clear();
}

void TcpRemotingClient::deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque) {
// delete the map record of opaque<->ResponseFuture, so the answer for the pull request will
// discard when receive it later
std::shared_ptr<ResponseFuture> pFuture(findAndDeleteAsyncResponseFuture(opaque));
if (!pFuture) {
pFuture = findAndDeleteResponseFuture(opaque);
if (pFuture) {
LOG_DEBUG("succ deleted the sync pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data());
}
} else {
LOG_DEBUG("succ deleted the async pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data());
// delete the timeout timer for opaque for pullrequest
cancelTimerCallback(opaque);
}
}

//<!************************************************************************
} // namespace rocketmq
2 changes: 0 additions & 2 deletions src/transport/TcpRemotingClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ class TcpRemotingClient {

void registerProcessor(MQRequestCode requestCode, ClientRemotingProcessor* clientRemotingProcessor);

void deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque);

private:
static void static_messageReceived(void* context, const MemoryBlock& mem, const string& addr);

Expand Down