-
Notifications
You must be signed in to change notification settings - Fork 165
[ISSUE #137] split TcpRemotingClient::m_ioService into m_dispatchService and m_handleService #156
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,17 +29,29 @@ namespace rocketmq { | |
|
||
//<!************************************************************************ | ||
TcpRemotingClient::TcpRemotingClient(int pullThreadNum, uint64_t tcpConnectTimeout, uint64_t tcpTransportTryLockTimeout) | ||
: m_pullThreadNum(pullThreadNum), | ||
: m_dispatchThreadNum(1), | ||
m_pullThreadNum(pullThreadNum), | ||
m_tcpConnectTimeout(tcpConnectTimeout), | ||
m_tcpTransportTryLockTimeout(tcpTransportTryLockTimeout), | ||
m_namesrvIndex(0), | ||
m_ioServiceWork(m_ioService) { | ||
m_dispatchServiceWork(m_dispatchService), | ||
m_handleServiceWork(m_handleService) { | ||
#if !defined(WIN32) && !defined(__APPLE__) | ||
string taskName = UtilAll::getProcessName(); | ||
prctl(PR_SET_NAME, "DispatchTP", 0, 0, 0); | ||
#endif | ||
for (int i = 0; i != m_dispatchThreadNum; ++i) { | ||
m_dispatchThreadPool.create_thread(boost::bind(&boost::asio::io_service::run, &m_dispatchService)); | ||
} | ||
#if !defined(WIN32) && !defined(__APPLE__) | ||
prctl(PR_SET_NAME, taskName.c_str(), 0, 0, 0); | ||
#endif | ||
|
||
#if !defined(WIN32) && !defined(__APPLE__) | ||
prctl(PR_SET_NAME, "NetworkTP", 0, 0, 0); | ||
#endif | ||
for (int i = 0; i != m_pullThreadNum; ++i) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the same as pre suggestion |
||
m_threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &m_ioService)); | ||
m_handleThreadPool.create_thread(boost::bind(&boost::asio::io_service::run, &m_handleService)); | ||
} | ||
#if !defined(WIN32) && !defined(__APPLE__) | ||
prctl(PR_SET_NAME, taskName.c_str(), 0, 0, 0); | ||
|
@@ -48,7 +60,7 @@ TcpRemotingClient::TcpRemotingClient(int pullThreadNum, uint64_t tcpConnectTimeo | |
LOG_INFO("m_tcpConnectTimeout:%ju, m_tcpTransportTryLockTimeout:%ju, m_pullThreadNum:%d", m_tcpConnectTimeout, | ||
m_tcpTransportTryLockTimeout, m_pullThreadNum); | ||
|
||
m_async_service_thread.reset(new boost::thread(boost::bind(&TcpRemotingClient::boost_asio_work, this))); | ||
m_timerServiceThread.reset(new boost::thread(boost::bind(&TcpRemotingClient::boost_asio_work, this))); | ||
} | ||
|
||
void TcpRemotingClient::boost_asio_work() { | ||
|
@@ -59,9 +71,9 @@ void TcpRemotingClient::boost_asio_work() { | |
#endif | ||
|
||
// avoid async io service stops after first timer timeout callback | ||
boost::asio::io_service::work work(m_async_ioService); | ||
boost::asio::io_service::work work(m_timerService); | ||
|
||
m_async_ioService.run(); | ||
m_timerService.run(); | ||
} | ||
|
||
TcpRemotingClient::~TcpRemotingClient() { | ||
|
@@ -75,20 +87,24 @@ TcpRemotingClient::~TcpRemotingClient() { | |
void TcpRemotingClient::stopAllTcpTransportThread() { | ||
LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread Begin"); | ||
|
||
m_async_ioService.stop(); | ||
m_async_service_thread->interrupt(); | ||
m_async_service_thread->join(); | ||
m_timerService.stop(); | ||
m_timerServiceThread->interrupt(); | ||
m_timerServiceThread->join(); | ||
removeAllTimerCallback(); | ||
|
||
{ | ||
std::lock_guard<std::timed_mutex> lock(m_tcpTableLock); | ||
for (const auto& trans : m_tcpTable) { | ||
trans.second->disconnect(trans.first); | ||
} | ||
m_tcpTable.clear(); | ||
} | ||
|
||
m_ioService.stop(); | ||
m_threadpool.join_all(); | ||
m_handleService.stop(); | ||
m_handleThreadPool.join_all(); | ||
|
||
m_dispatchService.stop(); | ||
m_dispatchThreadPool.join_all(); | ||
|
||
{ | ||
std::lock_guard<std::mutex> lock(m_futureTableLock); | ||
|
@@ -98,7 +114,7 @@ void TcpRemotingClient::stopAllTcpTransportThread() { | |
} | ||
} | ||
|
||
LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread End"); | ||
LOG_ERROR("TcpRemotingClient::stopAllTcpTransportThread End, m_tcpTable:%lu", m_tcpTable.size()); | ||
} | ||
|
||
void TcpRemotingClient::updateNameServerAddressList(const string& addrs) { | ||
|
@@ -226,13 +242,13 @@ bool TcpRemotingClient::invokeAsync(const string& addr, | |
|
||
if (callback) { | ||
boost::asio::deadline_timer* t = | ||
new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(timeoutMillis)); | ||
new boost::asio::deadline_timer(m_timerService, boost::posix_time::milliseconds(timeoutMillis)); | ||
addTimerCallback(t, opaque); | ||
t->async_wait( | ||
boost::bind(&TcpRemotingClient::handleAsyncRequestTimeout, this, boost::asio::placeholders::error, opaque)); | ||
} | ||
|
||
// Even if send failed, asyncTimerThread will trigger next pull request or report send msg failed | ||
// even if send failed, asyncTimerThread will trigger next pull request or report send msg failed | ||
if (SendCommand(pTcp, request)) { | ||
LOG_DEBUG("invokeAsync success, addr:%s, code:%d, opaque:%d", addr.c_str(), code, opaque); | ||
responseFuture->setSendRequestOK(true); | ||
|
@@ -453,7 +469,7 @@ void TcpRemotingClient::static_messageReceived(void* context, const MemoryBlock& | |
} | ||
|
||
void TcpRemotingClient::messageReceived(const MemoryBlock& mem, const string& addr) { | ||
m_ioService.post(boost::bind(&TcpRemotingClient::ProcessData, this, mem, addr)); | ||
m_dispatchService.post(boost::bind(&TcpRemotingClient::ProcessData, this, mem, addr)); | ||
} | ||
|
||
void TcpRemotingClient::ProcessData(const MemoryBlock& mem, const string& addr) { | ||
|
@@ -482,7 +498,7 @@ void TcpRemotingClient::ProcessData(const MemoryBlock& mem, const string& addr) | |
LOG_DEBUG("find_response opaque:%d", opaque); | ||
processResponseCommand(pRespondCmd, pFuture); | ||
} else { | ||
processRequestCommand(pRespondCmd, addr); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i suggest continue to use processRequestCommand, put m_handleService.post() to it, it will more clear to readers There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No. Do it will write another function for call processRequest and invokeOneway, it is unnecessary. |
||
m_handleService.post(boost::bind(&TcpRemotingClient::processRequestCommand, this, pRespondCmd, addr)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. when so many rebalance request come, m_handleService also be holded, so no thread to handle the response message? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. when so many rebalance request come(consumer changed), m_handleService also be holded, so no thread to handle the response message? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. because rebalance is very quick without PullMsgEvent. we can do rebalance in an independent thread in future, after replace boost::asio with new implement. there is a RebalanceService in java, and i have written it in another branch for cpp. |
||
} | ||
} | ||
|
||
|
@@ -503,7 +519,8 @@ void TcpRemotingClient::processResponseCommand(RemotingCommand* pCmd, std::share | |
|
||
if (pFuture->getAsyncFlag()) { | ||
cancelTimerCallback(opaque); | ||
pFuture->invokeCompleteCallback(); | ||
|
||
m_handleService.post(boost::bind(&ResponseFuture::invokeCompleteCallback, pFuture)); | ||
} | ||
} | ||
|
||
|
@@ -520,7 +537,7 @@ void TcpRemotingClient::handleAsyncRequestTimeout(const boost::system::error_cod | |
LOG_ERROR("no response got for opaque:%d", opaque); | ||
eraseTimerCallback(opaque); | ||
if (pFuture->getAsyncCallbackWrap()) { | ||
pFuture->invokeExceptionCallback(); | ||
m_handleService.post(boost::bind(&ResponseFuture::invokeExceptionCallback, pFuture)); | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i < m_dispatchThreadNum is more safety than i!= m_dispatchThreadNum, it can handle the situation when some one change the i in {]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all codes for create threads used unequal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it not a good style of use unequal in for loop