本文主要描述 Axio 多线程的两种主要策略。
每个线程一个 I/O Service
- 在多核的机器上,这种方案可以充分利用多个 CPU 核心。
- 某个 socket 描述符并不会在多个线程之间共享,所以不需要引入同步机制。
- 在 event handler 中不能执行阻塞的操作,否则将会阻塞掉io_service所在的线程。
- 这种方案效率最高,但是实现起来具有侵略性
上下文线程池的分配
- 使用
asio::executor_work_guard<ExecutorType>
(而不是asio::io_context::work (已弃用)
) 与io_context
绑定使得io_context::run
不会直接返回 - 每次调用
getIOContext
返回相对空闲的上下文 -
stop
时先销毁Work
使得 Context 不被占位的executor_work_guard
占用,之后等待仍然被占用的 Context 终止。
class AsioIOContextPool
{
public:
using Work = asio::executor_work_guard<asio::io_context::executor_type>;
using WorkPtr = std::unique_ptr<Work>;
using IOContext = asio::io_context;
private:
size_t _nextContext;
std::vector<WorkPtr> _works;
std::vector<IOContext> _ioContexts;
std::vector<std::thread> _threads;
public:
explicit AsioIOContextPool(size_t n = std::thread::hardware_concurrency())
: _works(n), _ioContexts(n), _nextContext(0)
{
for (int i = 0; i < n; ++i)
{
_works[i] = std::unique_ptr<Work>(new Work(_ioContexts[i].get_executor()));
}
for (int i = 0; i < n; ++i)
{
_threads.emplace_back([this, i]() { _ioContexts[i].run(); });
}
}
AsioIOContextPool(const AsioIOContextPool &) = delete;
AsioIOContextPool &operator=(const AsioIOContextPool &) = delete;
/**
* @brief Get a reference to the next IOContext in the pool
*/
IOContext &getIOContext()
{
auto &context = _ioContexts[_nextContext++];
if (_nextContext == _ioContexts.size())
_nextContext = 0;
return context;
}
void stop()
{
for (auto &work : _works)
work.reset();
for (auto &thread : _threads)
thread.join();
}
~AsioIOContextPool()
{
stop();
}
};
实现的具体细节
以官方示例的简单聊天室实现为例,要改为多线程异步,具体来说
-
acceptor
与主线程以及其对应的 io_context关联,所有连接都经过它,之后它会将各个会话分配到各个子线程与io_context - 在开始 accept 后,获取新的io_context并将Session关联。
class ChatServer
{
private:
tcp::acceptor _acceptor;
ChatRoom _room;
AsioIOContextPool _contextPool;
public:
ChatServer(asio::io_context &ioContext, const tcp::endpoint &endpoint)
: _acceptor(ioContext, endpoint)
{
startAccept();
}
private:
void startAccept()
{
auto &ioContext = _contextPool.getIOContext();
auto newSession = std::make_shared<ChatSession>(ioContext, _room);
_acceptor.async_accept(
newSession->socket(), [this, session=newSession](std::error_code ec) {
if (!ec)
{
fmt::println(
stderr, "[thread {}] accept socket", std::this_thread::get_id());
session->start();
}
startAccept();
});
}
};
官方的所有 socket 共享主线程的 io_context,但是在这种方案里,每个新会话的 socket 都与独立的 io_context 关联.
class ChatSession : public ChatParticipant, public std::enable_shared_from_this<ChatSession>
{
private:
tcp::socket _socket;
ChatRoom &_room;
ChatMessage _readMessage;
ChatMessageQueue _writeMessageQueue;
void doReadHeader()
{
auto self(this->shared_from_this());
auto asyncReadAction = [this, self](std::error_code ec, std::size_t /*length*/) {
if (!ec && _readMessage.decodeHeader())
{
doReadBody();
}
else
{
_room.leave(shared_from_this());
}
};
asio::async_read(
_socket, asio::buffer(_readMessage.data(), ChatMessage::HEADER_LENGTH),
asyncReadAction);
}
void doReadBody()
{
auto self(shared_from_this());
auto action = [this, self](std::error_code ec, std::size_t length) {
if (!ec)
{
_room.deliver(_readMessage);
doReadHeader();
}
else
{
_room.leave(shared_from_this());
}
};
asio::async_read(
_socket, asio::buffer(_readMessage.body(), _readMessage.bodyLength()), action);
}
void doWrite()
{
auto self(shared_from_this());
auto token = [this, self](std::error_code ec, std::size_t length) {
if (!ec)
{
_writeMessageQueue.pop_front();
if (!_writeMessageQueue.empty())
doWrite();
fmt::println("[thread {}] write message", std::this_thread::get_id());
}
else
{
_room.leave(shared_from_this());
}
};
asio::async_write(
_socket,
asio::buffer(_writeMessageQueue.front().data(), _writeMessageQueue.front().length()),
token);
}
public:
ChatSession(tcp::socket socket, ChatRoom &room)
: _socket(std::move(socket)), _room(room)
{
}
ChatSession(asio::io_context &context, ChatRoom &room)
: _socket(context), _room(room)
{
}
void start()
{
_room.join(this->shared_from_this());
doReadHeader();
}
void deliver(const ChatMessage &msg) override
{
bool writeInProgress = !_writeMessageQueue.empty();
_writeMessageQueue.push_back(msg);
if (!writeInProgress)
doWrite();
}
[[nodiscard]]
tcp::socket &socket() noexcept
{
return _socket;
}
};
所有线程共享一个 io_context
- 全局只分配一个
io_service
,并且让这个io_service
在多个线程之间共享,每个线程都调用全局的io_service
的run()
方法 - 在 event handler 中允许执行阻塞的操作 (例如数据库查询操作)。
- 线程数可以大于 CPU 核心数,譬如说,如果需要在 event handler 中执行阻塞的操作,为了提高程序的响应速度,这时就需要提高线程的数目。
- 由于多个线程同时运行事件循环(event loop),所以会导致一个问题:即一个 socket 描述符可能会在多个线程之间共享,容易出现竞态条件 (race condition)。譬如说,如果某个 socket 的可读事件很快发生了两次,那么就会出现两个线程同时读同一个 socket 的问题 (可以使用
strand
解决这个问题)。
boost::thread_group threads;
void listen_thread() {
service.run();
}
void start_listen(int thread_count) {
for ( int i = 0; i < thread_count; ++i)
threads.create_thread( listen_thread);
}
int main(int argc, char* argv[]) {
talk_to_client::ptr client = talk_to_client::new_();
acc.async_accept(client->sock(), boost::bind(handle_accept,client,_1));
start_listen(100);
threads.join_all();
}