diff --git a/src/transport/TcpRemotingClient.cpp b/src/transport/TcpRemotingClient.cpp index 2f2ab465c..5353b059e 100755 --- a/src/transport/TcpRemotingClient.cpp +++ b/src/transport/TcpRemotingClient.cpp @@ -29,17 +29,29 @@ namespace rocketmq { //interrupt(); - m_async_service_thread->join(); + m_timerService.stop(); + m_timerServiceThread->interrupt(); + m_timerServiceThread->join(); removeAllTimerCallback(); { + std::lock_guard lock(m_tcpTableLock); for (const auto& trans : m_tcpTable) { trans.second->disconnect(trans.first); } m_tcpTable.clear(); } - m_ioService.stop(); - m_threadpool.join_all(); + m_handleService.stop(); + m_handleThreadPool.join_all(); + + m_dispatchService.stop(); + m_dispatchThreadPool.join_all(); { std::lock_guard lock(m_futureTableLock); @@ -98,7 +114,7 @@ void TcpRemotingClient::stopAllTcpTransportThread() { } } - LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread End"); + LOG_ERROR("TcpRemotingClient::stopAllTcpTransportThread End, m_tcpTable:%lu", m_tcpTable.size()); } void TcpRemotingClient::updateNameServerAddressList(const string& addrs) { @@ -226,13 +242,13 @@ bool TcpRemotingClient::invokeAsync(const string& addr, if (callback) { boost::asio::deadline_timer* t = - new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(timeoutMillis)); + new boost::asio::deadline_timer(m_timerService, boost::posix_time::milliseconds(timeoutMillis)); addTimerCallback(t, opaque); t->async_wait( boost::bind(&TcpRemotingClient::handleAsyncRequestTimeout, this, boost::asio::placeholders::error, opaque)); } - // Even if send failed, asyncTimerThread will trigger next pull request or report send msg failed + // even if send failed, asyncTimerThread will trigger next pull request or report send msg failed if (SendCommand(pTcp, request)) { LOG_DEBUG("invokeAsync success, addr:%s, code:%d, opaque:%d", addr.c_str(), code, opaque); responseFuture->setSendRequestOK(true); @@ -453,7 +469,7 @@ void TcpRemotingClient::static_messageReceived(void* context, const MemoryBlock& } void TcpRemotingClient::messageReceived(const MemoryBlock& mem, const string& addr) { - m_ioService.post(boost::bind(&TcpRemotingClient::ProcessData, this, mem, addr)); + m_dispatchService.post(boost::bind(&TcpRemotingClient::ProcessData, this, mem, addr)); } void TcpRemotingClient::ProcessData(const MemoryBlock& mem, const string& addr) { @@ -482,7 +498,7 @@ void TcpRemotingClient::ProcessData(const MemoryBlock& mem, const string& addr) LOG_DEBUG("find_response opaque:%d", opaque); processResponseCommand(pRespondCmd, pFuture); } else { - processRequestCommand(pRespondCmd, addr); + m_handleService.post(boost::bind(&TcpRemotingClient::processRequestCommand, this, pRespondCmd, addr)); } } @@ -503,7 +519,8 @@ void TcpRemotingClient::processResponseCommand(RemotingCommand* pCmd, std::share if (pFuture->getAsyncFlag()) { cancelTimerCallback(opaque); - pFuture->invokeCompleteCallback(); + + m_handleService.post(boost::bind(&ResponseFuture::invokeCompleteCallback, pFuture)); } } @@ -520,7 +537,7 @@ void TcpRemotingClient::handleAsyncRequestTimeout(const boost::system::error_cod LOG_ERROR("no response got for opaque:%d", opaque); eraseTimerCallback(opaque); if (pFuture->getAsyncCallbackWrap()) { - pFuture->invokeExceptionCallback(); + m_handleService.post(boost::bind(&ResponseFuture::invokeExceptionCallback, pFuture)); } } } diff --git a/src/transport/TcpRemotingClient.h b/src/transport/TcpRemotingClient.h index 82f1155ac..dfacd7611 100755 --- a/src/transport/TcpRemotingClient.h +++ b/src/transport/TcpRemotingClient.h @@ -111,6 +111,7 @@ class TcpRemotingClient { AsyncTimerMap m_asyncTimerTable; std::mutex m_asyncTimerTableLock; + int m_dispatchThreadNum; int m_pullThreadNum; uint64_t m_tcpConnectTimeout; // ms uint64_t m_tcpTransportTryLockTimeout; // s @@ -121,12 +122,16 @@ class TcpRemotingClient { string m_namesrvAddrChoosed; unsigned int m_namesrvIndex; - boost::asio::io_service m_ioService; - boost::asio::io_service::work m_ioServiceWork; - boost::thread_group m_threadpool; + boost::asio::io_service m_dispatchService; + boost::asio::io_service::work m_dispatchServiceWork; + boost::thread_group m_dispatchThreadPool; - boost::asio::io_service m_async_ioService; - unique_ptr m_async_service_thread; + boost::asio::io_service m_handleService; + boost::asio::io_service::work m_handleServiceWork; + boost::thread_group m_handleThreadPool; + + boost::asio::io_service m_timerService; + unique_ptr m_timerServiceThread; }; //