掘金 后端 ( ) • 2024-05-11 10:49

本文主要描述 Axio 多线程的两种主要策略。

每个线程一个 I/O Service

  • 在多核的机器上,这种方案可以充分利用多个 CPU 核心。
  • 某个 socket 描述符并不会在多个线程之间共享,所以不需要引入同步机制。
  • 在 event handler 中不能执行阻塞的操作,否则将会阻塞掉io_service所在的线程。
  • 这种方案效率最高,但是实现起来具有侵略性

上下文线程池的分配

  • 使用 asio::executor_work_guard<ExecutorType> (而不是 asio::io_context::work (已弃用)) 与 io_context 绑定使得 io_context::run 不会直接返回
  • 每次调用 getIOContext 返回相对空闲的上下文
  • stop 时先销毁 Work 使得 Context 不被占位的executor_work_guard占用,之后等待仍然被占用的 Context 终止。
class AsioIOContextPool
{
public:
	using Work      = asio::executor_work_guard<asio::io_context::executor_type>;
	using WorkPtr   = std::unique_ptr<Work>;
	using IOContext = asio::io_context;

private:
	size_t _nextContext;
	std::vector<WorkPtr> _works;
	std::vector<IOContext> _ioContexts;
	std::vector<std::thread> _threads;

public:
	explicit AsioIOContextPool(size_t n = std::thread::hardware_concurrency())
		: _works(n), _ioContexts(n), _nextContext(0)
	{
		for (int i = 0; i < n; ++i)
		{
			_works[i] = std::unique_ptr<Work>(new Work(_ioContexts[i].get_executor()));
		}

		for (int i = 0; i < n; ++i)
		{
			_threads.emplace_back([this, i]() { _ioContexts[i].run(); });
		}
	}

	AsioIOContextPool(const AsioIOContextPool &)            = delete;
	AsioIOContextPool &operator=(const AsioIOContextPool &) = delete;

	/**
	 * @brief Get a reference to the next IOContext in the pool
	 */
	IOContext &getIOContext()
	{
		auto &context = _ioContexts[_nextContext++];
		if (_nextContext == _ioContexts.size())
			_nextContext = 0;
		return context;
	}

	void stop()
	{
		for (auto &work : _works)
			work.reset();
		for (auto &thread : _threads)
			thread.join();
	}

	~AsioIOContextPool()
	{
		stop();
	}
};

实现的具体细节

以官方示例的简单聊天室实现为例,要改为多线程异步,具体来说

  • acceptor 与主线程以及其对应的 io_context关联,所有连接都经过它,之后它会将各个会话分配到各个子线程与io_context
  • 在开始 accept 后,获取新的io_context并将Session关联。
class ChatServer
{
private:
	tcp::acceptor _acceptor;
	ChatRoom _room;
	AsioIOContextPool _contextPool;

public:
	ChatServer(asio::io_context &ioContext, const tcp::endpoint &endpoint)
		: _acceptor(ioContext, endpoint)
	{
		startAccept();
	}

private:
	void startAccept()
	{
		auto &ioContext = _contextPool.getIOContext();
		auto newSession = std::make_shared<ChatSession>(ioContext, _room);
		_acceptor.async_accept(
			newSession->socket(), [this, session=newSession](std::error_code ec) {
				if (!ec)
				{
					fmt::println(
					stderr, "[thread {}] accept socket", std::this_thread::get_id());
					session->start();
				}
				startAccept();
			});
	}
};

官方的所有 socket 共享主线程的 io_context,但是在这种方案里,每个新会话的 socket 都与独立的 io_context 关联.

class ChatSession : public ChatParticipant, public std::enable_shared_from_this<ChatSession>
{
private:
	tcp::socket _socket;
	ChatRoom &_room;
	ChatMessage _readMessage;
	ChatMessageQueue _writeMessageQueue;

	void doReadHeader()
	{
		auto self(this->shared_from_this());
		auto asyncReadAction = [this, self](std::error_code ec, std::size_t /*length*/) {
			if (!ec && _readMessage.decodeHeader())
			{
				doReadBody();
			}
			else
			{
				_room.leave(shared_from_this());
			}
		};
		asio::async_read(
			_socket, asio::buffer(_readMessage.data(), ChatMessage::HEADER_LENGTH),
			asyncReadAction);
	}

	void doReadBody()
	{
		auto self(shared_from_this());
		auto action = [this, self](std::error_code ec, std::size_t length) {
			if (!ec)
			{
				_room.deliver(_readMessage);
				doReadHeader();
			}
			else
			{
				_room.leave(shared_from_this());
			}
		};
		asio::async_read(
			_socket, asio::buffer(_readMessage.body(), _readMessage.bodyLength()), action);
	}

	void doWrite()
	{
		auto self(shared_from_this());
		auto token = [this, self](std::error_code ec, std::size_t length) {
			if (!ec)
			{
				_writeMessageQueue.pop_front();
				if (!_writeMessageQueue.empty())
					doWrite();
				fmt::println("[thread {}] write message", std::this_thread::get_id());
			}
			else
			{
				_room.leave(shared_from_this());
			}
		};
		asio::async_write(
			_socket,
			asio::buffer(_writeMessageQueue.front().data(), _writeMessageQueue.front().length()),
			token);
	}

public:
	ChatSession(tcp::socket socket, ChatRoom &room) 
	: _socket(std::move(socket)), _room(room)
	{
	}

	ChatSession(asio::io_context &context, ChatRoom &room) 
	: _socket(context), _room(room)
	{
	}

	void start()
	{
		_room.join(this->shared_from_this());
		doReadHeader();
	}

	void deliver(const ChatMessage &msg) override
	{
		bool writeInProgress = !_writeMessageQueue.empty();
		_writeMessageQueue.push_back(msg);
		if (!writeInProgress)
			doWrite();
	}

	[[nodiscard]]
	tcp::socket &socket() noexcept
	{
		return _socket;
	}
};

所有线程共享一个 io_context

  • 全局只分配一个io_service,并且让这个io_service在多个线程之间共享,每个线程都调用全局的io_servicerun()方法
  • 在 event handler 中允许执行阻塞的操作 (例如数据库查询操作)。
  • 线程数可以大于 CPU 核心数,譬如说,如果需要在 event handler 中执行阻塞的操作,为了提高程序的响应速度,这时就需要提高线程的数目。
  • 由于多个线程同时运行事件循环(event loop),所以会导致一个问题:即一个 socket 描述符可能会在多个线程之间共享,容易出现竞态条件 (race condition)。譬如说,如果某个 socket 的可读事件很快发生了两次,那么就会出现两个线程同时读同一个 socket 的问题 (可以使用strand解决这个问题)。
boost::thread_group threads;
void listen_thread() {
    service.run();
}
void start_listen(int thread_count) {
    for ( int i = 0; i < thread_count; ++i)
        threads.create_thread( listen_thread);
}
int main(int argc, char* argv[]) {
    talk_to_client::ptr client = talk_to_client::new_();
    acc.async_accept(client->sock(), boost::bind(handle_accept,client,_1));
    start_listen(100);
    threads.join_all();
}