asio 有兩種方式來支持多線程,本文在於探討其差別和在實際項目中如何簡單切換這兩種模式的方案。本文的完整示例在gitlab上可查看到。
首先來看看要如何使用這兩種方式:
- 方案A -> 分配一個全局的 io_context,然後在多個線程中調用 io_context 的 run 方法
- 方案B -> 爲每個線程創建單獨的 io_context,然後線程調用自己 io_context 的 run 方法
方案A 的優點在於代碼簡單,問題在於效率略遜方案B,且因爲所有線程共享同個 io_context 所以如果某個socket 快速讀取兩次,這兩次的處理程序將運行在不同的線程上,如不注意這可能會引發奇怪的 bug。
方案B 的優點在於效率更好,因爲每個線程擁有自己獨立的 io_context ,io_context 不需要在多線程間共享以及同步事件。缺點在於 方案A 中事件處理會被 asio 自動派發到空閒的 線程中,方案B 則需要我們自己實現算法將任務合理的分配到不同 io_context(即不同的線程中),存在缺陷的算法可能會導致某些線程過於忙碌另外一些線程卻無事可幹。
在兩種模式間切換
兩種模式各有特色,並且都需要一點額外的代碼來啓動線程之類的操作,既然如此何不將其代碼封裝下,以讓使用更簡單並提供一個公共接口來適配兩種模式,使其使用趨於一致。對於 c++ 來說可以定義一個 純虛類 來作爲公共接口,然後爲兩種模式實現子類算法即可,本喵如下定義此類:
namespace king011::context
{
class basic_context_t
{
public:
typedef boost::asio::io_context context_type;
typedef std::pair<std::size_t, context_type &> pair_type;
basic_context_t() = default;
virtual ~basic_context_t() = default;
// 獲取一個就緒的 context 和其索引 並爲其增加任務計數
virtual pair_type get_context() = 0;
// 返回所有獨立的 context 並爲其增加任務計數
virtual std::vector<pair_type> get_contexts() = 0;
// 減少 context 上的任務計數
virtual void put_context(std::size_t) = 0;
// 運行 context
virtual void run() = 0;
// 停止 context
virtual void stop() = 0;
};
};
basic_context_t 接口會在內部保存 io_context,並且安排線程去調用其 run 方法,其提供的公共函數有:
- get_context -> 返回一個std::pair 其中包含 io_context 的引用 以及與 io_context 關聯的一個標識(對於方案A 固定返回共享的 io_context 以及 0 作爲標識即可,方案 B 則使用自己的任務分配算法 返回一個 io_context 以及其關聯的 標識即可)
- put_context -> 傳入 io_context 關聯的標識,以通知任務分配算法此 io_context 減少一個任務計數
- run -> 在內部線程中調用 io_context 的 run 並等待其返回
- stop -> 爲所有 io_context 調用 stop
如此我們便可以實現各種 basic_context_t 的子類以讓 asio 來使用不同模式和算法來運行,而只需要簡單的 切換實例化的 子類即可簡單的切換算法和模式。
在創建異步對象比如 boost::asio::ip::tcp::socket 時,我們需要調用 get_context 來獲取一個關聯的 io_context 分配算法會返回合適的 io_context 並知道她被關聯到一組異步任務,當此 socket 銷毀時我們需要調用 put_context 來通知任務分配算法關聯的一組任務已經完成。所以 get_context 與 put_context 的調用應該是一一對應的,爲了避免 put_context 未被調用,我們可以模仿 std::unique_lock 和 std::lock_guard 來實現 RAII 方式的調用:
namespace king011::context
{
class context_guard_t final
{
public:
typedef std::pair<std::size_t, basic_context_t::context_type &> value_type;
private:
value_type value_;
basic_context_t &context_;
public:
explicit context_guard_t(basic_context_t &context)
: context_(context),
value_(context.get_context())
{
}
explicit context_guard_t(const context_guard_t &) = delete;
context_guard_t &operator=(const context_guard_t &) = delete;
~context_guard_t()
{
context_.put_context(value_.first);
}
basic_context_t::context_type &get_context() noexcept
{
return value_.second;
}
};
class unique_context_t final
{
public:
private:
std::size_t id_ = 0;
basic_context_t::context_type *type_ = nullptr;
basic_context_t *context_ = nullptr;
public:
explicit unique_context_t(basic_context_t &context)
: context_(&context)
{
auto elem = context.get_context();
id_ = elem.first;
type_ = &elem.second;
}
unique_context_t(unique_context_t &&o) noexcept : context_(o.context_)
{
id_ = o.id_;
type_ = o.type_;
o.context_ = nullptr;
}
explicit unique_context_t(const unique_context_t &) = delete;
unique_context_t &operator=(const unique_context_t &) = delete;
unique_context_t &operator=(unique_context_t &&o) noexcept
{
if (context_)
{
context_->put_context(id_);
context_ = nullptr;
}
context_ = o.context_;
id_ = o.id_;
type_ = o.type_;
if (o.context_)
{
o.context_ = nullptr;
}
return *this;
}
~unique_context_t()
{
put_context();
}
void put_context()
{
if (context_)
{
context_->put_context(id_);
context_ = nullptr;
}
}
basic_context_t::context_type &get_context() noexcept
{
return *type_;
}
operator bool() const noexcept
{
return context_;
}
};
}
快速開始
本喵在示例中創建了三個 basic_context_t 的子類來實現使用不同模式和算法
- threads_context_t -> 方案A的 多線程共享 io_context 模式
- round_robin_context_t -> 方案B的 線程獨享 io_context 模式,在分配任務時 使用依次循環的方式
- shared_work_context_t -> 方案B的 線程獨享 io_context 模式,在分配任務時 使用最小堆將任務分配給關聯異步對象最少的 io_context
有了這三個對象,依據用戶傳入不同參數實例化不同的 basic_context_t 子類即可,下面是一個 echo 服務器的例子:
#include <iostream>
#include "CLI11.hpp"
#include "version.h"
// #define __CONTEXT__CONTEXT_SHARED_WORK_TEST__
// #define __CONTEXT__CONTEXT_SHARED_WORK_TRACE__
#include "context/round_robin.hpp"
#include "context/threads.hpp"
#include "context/shared_work.hpp"
#include <boost/lexical_cast.hpp>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#define BUFFER_SIZE (1024 * 32)
typedef boost::asio::io_context io_context_t;
typedef boost::asio::ip::tcp::acceptor acceptor_t;
typedef boost::asio::ip::tcp::socket socket_t;
typedef boost::asio::ip::tcp::endpoint endpoint_t;
endpoint_t get_endpoint(const std::string &addr)
{
auto find = addr.find_last_of(":");
if (find == std::string::npos)
{
throw std::domain_error("bad addr");
}
auto port = boost::lexical_cast<boost::asio::ip::port_type>(addr.substr(find + 1));
auto domain = addr.substr(0, find);
if (domain.empty())
{
return endpoint_t(boost::asio::ip::tcp::v6(), port);
}
else
{
return endpoint_t(boost::asio::ip::address::from_string(domain), port);
}
}
void accept_sleep(const std::exception &e, std::size_t &temp_delay, acceptor_t &acceptor, boost::asio::yield_context yield)
{
if (temp_delay)
{
temp_delay *= 2;
if (temp_delay > 1000)
{
temp_delay = 1000;
}
}
else
{
temp_delay = 5;
}
boost::asio::deadline_timer sleep(acceptor.get_executor(), boost::posix_time::milliseconds(temp_delay));
std::cerr << "Accept error: " << e.what()
<< "; retrying in " << temp_delay << "ms"
<< std::endl;
sleep.async_wait(yield);
}
void accept_routine(king011::context::basic_context_t &pool, acceptor_t &acceptor, boost::asio::yield_context yield)
{
std::size_t temp_delay = 0;
for (;;)
{
try
{
// 接受一個新連接
king011::context::unique_context_t unique_context(pool);
auto &context = unique_context.get_context();
socket_t s(context);
acceptor.async_accept(s, yield);
temp_delay = 0;
// 啓動 socket 協程
boost::asio::spawn(context, [s = boost::move(s), unique_context = std::move(unique_context)](boost::asio::yield_context yield) mutable
{
auto remote = s.remote_endpoint();
try
{
char buffer[BUFFER_SIZE];
for (;;)
{
// 讀取
size_t length = s.async_read_some(
boost::asio::buffer(buffer, BUFFER_SIZE),
yield);
// 寫入
s.async_write_some(
boost::asio::buffer(buffer, length),
yield);
}
}
catch (const boost::system::system_error &e)
{
if (e.code().value() != boost::asio::error::eof)
{
std::cout << boost::diagnostic_information(e) << std::endl;
}
}
catch (const std::exception &e)
{
std::cout << e.what() << std::endl;
}
});
}
catch (const std::exception &e)
{
accept_sleep(e, temp_delay, acceptor, yield);
}
}
}
void run(const std::string &mode, king011::context::basic_context_t &pool, const std::string &addr)
{
// 創建監聽器
auto endpoint = get_endpoint(addr);
auto context = pool.get_context();
acceptor_t acceptor(context.second, endpoint);
std::cout << mode << " work at " << addr << std::endl;
auto contexts = pool.get_contexts();
for (auto &context : contexts)
{
// 運行監聽器協程
boost::asio::spawn(
context.second,
std::bind(accept_routine,
std::ref(pool),
std::ref(acceptor),
std::placeholders::_1));
}
// 運行服務
pool.run();
}
int main(int argc, char *argv[])
{
CLI::App app("context");
app.callback(
[&cmd = app]
{
const CLI::Option &v = *cmd.get_option("--version");
auto addr = cmd.get_option("--addr")->as<std::string>();
if (v)
{
std::cout << CONTEXT_VERSION
<< std::endl;
}
else if (!addr.empty())
{
auto mode = cmd.get_option("--mode")->as<std::string>();
if ("threads" == mode)
{
king011::context::threads_context_t context;
run(mode, context, addr);
}
else if ("round" == mode)
{
king011::context::round_robin_context_t context;
run(mode, context, addr);
}
else if ("shared" == mode)
{
king011::context::shared_work_context_t context;
run(mode, context, addr);
}
else
{
throw std::runtime_error("unknow mode");
}
}
else
{
throw CLI::CallForHelp();
}
});
app.add_flag("-v,--version", "display version");
std::string addr = ":9000";
app.add_option("-a,--addr",
addr,
"listen addr",
true);
std::string mode = "shared";
app.add_option("-m,--mode",
mode,
"work mode [threads round shared]",
true);
CLI11_PARSE(app, argc, argv);
return 0;
}
核心代碼只有 accept_routine 函數就是通常的 asio 協程寫法,以及 run 函數調用 basic_context_t 的 run 方法啓動工作線程,其它代碼則用於解析命令行參數。
後面我們來看看 threads_context_t round_robin_context_t shared_work_context_t 要如何實現
threads_context_t
threads_context_t 需要保存一個 io_context 以及一個 std::size_t 記錄需要啓動的線程數即可
- get_contexts 函數返回 io_context
- put_context 函數空實現
- run 函數啓動各種線程並調用 io_context 的 run
- stop 則調用 io_context 的 stop 即可
注意 run 函數還應該創建一個 io_context::work 以避免工作線程因爲 asio 沒有任何異步任務而退出,而 stop 函數應該銷毀 run 中創建的 io_context::work。
完整的實現代碼如下:
#ifndef __CONTEXT__CONTEXT_THREAD_HPP__
#define __CONTEXT__CONTEXT_THREAD_HPP__
#include "basic.hpp"
#include <thread>
namespace king011::context
{
template <typename Mutex = std::mutex>
class threads_context_t final : public basic_context_t
{
public:
typedef Mutex mutex_type;
private:
typedef std::promise<boost::system::error_code> promise_type;
typedef std::shared_future<boost::system::error_code> future_type;
typedef std::pair<std::shared_ptr<promise_type>, future_type> completer_type;
// 執行上下文
context_type context_;
// 防止 io_context 空閒退出
std::vector<context_type::work> works_;
completer_type completer_;
// 同步對象
mutex_type mutex_;
// 併發線程數
std::size_t concurrency_;
public:
explicit threads_context_t(std::size_t concurrency = std::thread::hardware_concurrency())
{
if (concurrency < 1)
{
throw std::length_error("concurrency should be > 0");
}
concurrency_ = concurrency;
}
threads_context_t(const threads_context_t &) = delete;
threads_context_t *operator=(const threads_context_t &) = delete;
virtual ~threads_context_t()
{
stop();
}
virtual pair_type get_context() override
{
return pair_type(0, context_);
}
virtual std::vector<pair_type> get_contexts() override
{
return std::vector<pair_type>(1, pair_type(0, context_));
}
virtual void put_context(std::size_t) override {}
virtual void run() override
{
completer_type completer;
{
std::lock_guard<mutex_type> lock(mutex_);
if (completer_.first)
{
completer = completer_;
}
else
{
auto promise = std::make_shared<promise_type>();
completer = std::make_pair(promise, promise->get_future().share());
completer_ = completer;
works_.emplace_back(context_);
std::thread([promise, &context = context_, concurrency = concurrency_]
{
std::vector<std::thread> threads;
for (size_t i = 1; i < concurrency; i++)
{
threads.emplace_back([&context]
{
boost::system::error_code ec;
context.run(ec);
});
}
boost::system::error_code ec;
context.run(ec);
for (auto &thread : threads)
{
thread.join();
}
promise->set_value(ec);
})
.detach();
}
}
// 等待完成
auto ec = completer.second.get();
{
std::lock_guard<mutex_type> lock(mutex_);
if (completer.first == completer_.first)
{
completer_.first.reset();
}
}
if (ec)
{
throw boost::system::system_error(ec);
}
}
virtual void stop() override
{
std::lock_guard<mutex_type> lock(mutex_);
context_.stop();
works_.clear();
}
};
}
#endif // __CONTEXT__CONTEXT_THREAD_HPP__
round_robin_context_t
round_robin_context_t 採用方案 B 的工作模式,爲每個併發的線程,創建一個獨立的 io_context,get_context 將循環從 io_context 數組中依次返回 一個 io_context,所有 io_context 都會被公平的分配,這樣的實現優點在於代碼簡單,分配算法高效,但如果 io_context 關聯的異步對象生命期很長而另外一些 io_context 關聯的 異步對象生命期很短,可能會導致 關聯長生命期的 io_context 很忙碌,關聯短生命期的 io_context 很悠閒。
但通常情況下 round_robin_context_t 都可以高效的正常工作,還是看下 如何實現 round_robin_context_t。
round_robin_context_t 需要保存一個 io_context 數組以及一個 next 遊標指示下次 get_context 應該返回數組中的哪個 io_context,同時當調用過 get_context 後將遊標向後移動,完整代碼如下:
#ifndef __CONTEXT__CONTEXT_ROUND_ROBIN_HPP__
#define __CONTEXT__CONTEXT_ROUND_ROBIN_HPP__
#include "basic.hpp"
#include <thread>
namespace king011::context
{
template <typename Mutex = std::mutex>
class round_robin_context_t final : public basic_context_t
{
public:
typedef Mutex mutex_type;
private:
typedef std::promise<boost::system::error_code> promise_type;
typedef std::shared_future<boost::system::error_code> future_type;
typedef std::pair<std::shared_ptr<promise_type>, future_type> completer_type;
// 執行上下文
std::vector<context_type> contexts_;
// 防止 io_context 空閒退出
std::vector<context_type::work> works_;
completer_type completer_;
// 同步對象
mutex_type mutex_;
// 循環 索引
std::size_t next_;
public:
explicit round_robin_context_t(std::size_t concurrency = std::thread::hardware_concurrency()) : next_(0)
{
if (concurrency < 1)
{
throw std::length_error("concurrency should be > 0");
}
contexts_ = std::move(std::vector<context_type>(concurrency));
}
round_robin_context_t(const round_robin_context_t &) = delete;
round_robin_context_t *operator=(const round_robin_context_t &) = delete;
virtual ~round_robin_context_t()
{
stop();
}
// 返回下個 context 並移動索引
virtual pair_type get_context() override
{
std::lock_guard<mutex_type> lock(mutex_);
auto i = next_++;
if (next_ == contexts_.size())
{
next_ = 0;
}
return pair_type(i, contexts_[i]);
}
virtual std::vector<pair_type> get_contexts() override
{
std::vector<pair_type> result;
std::size_t i;
for (auto &context : contexts_)
{
result.emplace_back(i++, context);
}
return std::move(result);
}
virtual void put_context(std::size_t) override {}
virtual void run() override
{
completer_type completer;
{
std::lock_guard<mutex_type> lock(mutex_);
if (completer_.first)
{
completer = completer_;
}
else
{
auto promise = std::make_shared<promise_type>();
completer = std::make_pair(promise, promise->get_future().share());
completer_ = completer;
auto count = contexts_.size();
for (size_t i = 0; i < count; i++)
{
works_.emplace_back(contexts_[i]);
}
std::thread([promise, &contexts = contexts_]
{
std::vector<std::thread> threads;
auto count = contexts.size();
for (size_t i = 1; i < count; i++)
{
threads.emplace_back([&context = contexts[i]]
{
boost::system::error_code ec;
context.run(ec);
});
}
boost::system::error_code ec;
contexts[0].run(ec);
for (auto &thread : threads)
{
thread.join();
}
promise->set_value(ec);
})
.detach();
}
}
// 等待完成
auto ec = completer.second.get();
{
std::lock_guard<mutex_type> lock(mutex_);
if (completer.first == completer_.first)
{
completer_.first.reset();
}
}
if (ec)
{
throw boost::system::system_error(ec);
}
}
virtual void stop() override
{
std::lock_guard<mutex_type> lock(mutex_);
for (auto &context : contexts_)
{
context.stop();
}
works_.clear();
}
};
};
#endif // __CONTEXT__CONTEXT_ROUND_ROBIN_HPP__
注意現在 run 函數中應該爲每個 io_context 都創建一個 io_context::work
shared_work_context_t
shared_work_context_t 同樣是採用方案 B 的工作模式,爲每個併發的線程,創建一個獨立的 io_context。相對 round_robin_context_t 的優勢在於使用了一個最小堆記錄 io_context 關聯的異步對象數量,get_context 每次返回關聯對象最少的 io_context ,以儘量讓每個線程當前的繁忙度儘量趨於一致。
在有了 round_robin_context_t 的實現後 很容易修改爲 shared_work_context_t 只需要多保存一個最小堆用於排序,在 get_context 和 put_context 後重寫調整最小堆即可,然而 c++ 標準庫的最小堆當元素修改時無法調整只能重新初始化堆,爲此我們先實現一個自己的堆模板,代碼如下:
#ifndef __CONTEXT__CONTAINER_HEAP_HPP__
#define __CONTEXT__CONTAINER_HEAP_HPP__
#include <vector>
#include <functional>
namespace king011::container
{
template <typename Type, typename Compare = std::less<Type>>
class heap_t final
{
private:
Compare less_;
public:
typedef Type value_type;
std::vector<value_type> array;
explicit heap_t() = default;
~heap_t() = default;
heap_t(const heap_t &o) = delete;
heap_t &operator=(const heap_t &o) = delete;
explicit heap_t(heap_t &&o) : less_()
{
this->array = std::move(o.array);
}
explicit heap_t(std::vector<value_type> &&arr) : less_()
{
array = std::move(arr);
}
heap_t &operator=(heap_t &&o)
{
array = std::move(o.array);
return *this;
}
heap_t &operator=(std::vector<value_type> &&arr)
{
array = std::move(arr);
return *this;
}
inline void swap(heap_t &o)
{
array.swap(o.array);
}
// 初始化堆
void init()
{
// heapify
int n = array.size();
for (int i = n / 2 - 1; i >= 0; i--)
{
_down(i, n);
}
}
// 返回堆頂元素
inline value_type &top() noexcept
{
return array.front();
}
inline const value_type &top() const noexcept
{
return array.front();
}
// 將堆頂元素彈出
void pop()
{
int n = array.size();
--n;
_swap(0, n);
_down(0, n);
array.pop_back();
}
// 向堆中添加元素
inline void push(value_type &v)
{
array.push_back(v);
_up(array.size() - 1);
}
// 向堆中添加元素
inline void push(value_type &&v)
{
array.push_back(std::move(v));
_up(array.size() - 1);
}
// 當堆中元素變化 修復堆性質
inline void fix(int i)
{
if (!_down(i, array.size()))
{
_up(i);
}
}
// 移除堆中元素
void remove(int i)
{
int n = array.size();
--n;
if (n != i)
{
_swap(i, n);
if (!_down(i, n))
{
_up(i);
}
}
array.pop_back();
}
inline bool empty() const noexcept
{
return array.empty();
}
inline std::size_t size() const noexcept
{
return array.size();
}
inline value_type &operator[](std::size_t i) noexcept
{
return array[i];
}
inline const value_type &operator[](std::size_t i) const noexcept
{
return array[i];
}
private:
inline bool _less(int i, int j)
{
return less_(array[i], array[j]);
}
inline void _swap(int i, int j)
{
return std::swap(array[i], array[j]);
}
bool _down(int i0, int n)
{
auto i = i0;
for (;;)
{
auto j1 = 2 * i + 1;
if (j1 >= n || j1 < 0)
{ // j1 < 0 after int overflow
break;
}
auto j = j1; // left child
if (auto j2 = j1 + 1; j2 < n && _less(j2, j1))
{
j = j2; // = 2*i + 2 // right child
}
if (!_less(j, i))
{
break;
}
_swap(i, j);
i = j;
}
return i > i0;
}
void _up(int j)
{
for (;;)
{
auto i = (j - 1) / 2; // parent
if (i == j || !_less(j, i))
{
break;
}
_swap(i, j);
j = i;
}
}
};
};
#endif // __CONTEXT__CONTAINER_HEAP_HPP__
有了最小堆 將 round_robin_context_t 複製一份爲 shared_work_context_t 並加入最小堆屬性並且修改 get_context 和 put_context 的實現即可,爲了避免每次 get_context 和 put_context 都會調整堆,可以多定義一個 adjust 屬性只有當 io_context 變化數超過 adjust 時才重新調整堆,完整代碼如下:
#ifndef __CONTEXT__CONTEXT_SHARED_WORK_HPP__
#define __CONTEXT__CONTEXT_SHARED_WORK_HPP__
#include "basic.hpp"
#include <thread>
#include <container/heap.hpp>
namespace king011::context
{
class priority_type
{
public:
std::size_t id;
std::size_t index;
std::size_t work;
priority_type(std::size_t id = 0, std::size_t index = 0, std::size_t work = 0)
: id(id),
index(index),
work(work) {}
};
class priority_wrapper
{
public:
priority_type *impl;
priority_wrapper(priority_type *impl = nullptr)
: impl(impl) {}
// 重載比較
bool operator<(const priority_wrapper &o) const noexcept
{
return impl->work < o.impl->work;
}
void swap(priority_wrapper &o) noexcept
{
std::swap(impl, o.impl);
std::swap(impl->index, o.impl->index);
}
};
template <typename Mutex = std::mutex>
class shared_work_context_t final : public basic_context_t
{
public:
typedef Mutex mutex_type;
private:
typedef std::promise<boost::system::error_code> promise_type;
typedef std::shared_future<boost::system::error_code> future_type;
typedef std::pair<std::shared_ptr<promise_type>, future_type> completer_type;
// 執行上下文
std::vector<context_type> contexts_;
// 防止 io_context 空閒退出
std::vector<context_type::work> works_;
completer_type completer_;
// 同步對象
mutex_type mutex_;
// 隊列索引
std::vector<priority_type> keys_;
// 優先隊列
king011::container::heap_t<priority_wrapper> priority_;
// 儘量避免 隊列排序的 調整值
std::size_t adjust_;
public:
explicit shared_work_context_t(std::size_t concurrency = std::thread::hardware_concurrency(), std::size_t adjust = 50)
: adjust_(adjust)
{
if (concurrency < 1)
{
throw std::length_error("concurrency should be > 0");
}
contexts_ = std::move(std::vector<context_type>(concurrency));
for (std::size_t i = 0; i < concurrency; i++)
{
keys_.emplace_back(i, i);
}
auto data = keys_.data();
for (size_t i = 0; i < concurrency; i++)
{
priority_.array.emplace_back(data + i);
}
}
shared_work_context_t(const shared_work_context_t &) = delete;
shared_work_context_t *operator=(const shared_work_context_t &) = delete;
virtual ~shared_work_context_t()
{
stop();
}
virtual pair_type get_context() override
{
std::lock_guard<mutex_type> lock(mutex_);
auto elem = priority_.top().impl;
auto id = elem->id;
elem->work++;
if (adjust_ < 2 || elem->work % adjust_ == 0)
{
auto index = elem->index;
priority_.fix(index);
}
return pair_type(id, contexts_[id]);
}
virtual std::vector<pair_type> get_contexts() override
{
std::lock_guard<mutex_type> lock(mutex_);
std::vector<pair_type> result;
for (auto &node : keys_)
{
node.work++;
result.emplace_back(node.id, contexts_[node.id]);
}
if (adjust_ > 2)
{
priority_.init();
}
return std::move(result);
}
virtual void put_context(std::size_t id) override
{
std::lock_guard<mutex_type> lock(mutex_);
auto &elem = keys_[id];
#ifdef __CONTEXT__CONTEXT_SHARED_WORK_TEST__
if (elem.id != id)
{
throw std::runtime_error("put_context error: id not match");
}
auto index = elem.index;
auto impl = (priority_[index]).impl;
if (impl->index != index)
{
throw std::runtime_error("put_context error: index not match");
}
else if (impl->id != id)
{
throw std::runtime_error("put_context error: id2 not match");
}
#endif // __CONTEXT__CONTEXT_SHARED_WORK_TEST__
if (elem.work > 0)
{
elem.work--;
if (adjust_ < 2 || elem.work % adjust_ == 0)
{
auto index = elem.index;
priority_.fix(index);
}
}
else
{
throw std::runtime_error("put_context error: elem's work is 0");
}
}
virtual void run() override
{
completer_type completer;
{
std::lock_guard<mutex_type> lock(mutex_);
if (completer_.first)
{
completer = completer_;
}
else
{
auto promise = std::make_shared<promise_type>();
completer = std::make_pair(promise, promise->get_future().share());
completer_ = completer;
auto count = contexts_.size();
for (size_t i = 0; i < count; i++)
{
works_.emplace_back(contexts_[i]);
}
std::thread([
#ifdef __CONTEXT__CONTEXT_SHARED_WORK_TRACE__
self = this,
#endif //__CONTEXT__CONTEXT_SHARED_WORK_TRACE__
promise, &contexts = contexts_]
{
std::vector<std::thread> threads;
auto count = contexts.size();
for (size_t i = 1; i < count; i++)
{
threads.emplace_back([&context = contexts[i]]
{
boost::system::error_code ec;
context.run(ec);
});
}
#ifdef __CONTEXT__CONTEXT_SHARED_WORK_TRACE__
std::atomic_bool trace = true;
std::thread thread([&trace, &keys = self->keys_, &mutex = self->mutex_]
{
while (trace)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
mutex.lock();
for (auto &node : keys)
{
std::cout << "id=" << node.id << " index=" << node.index << " work=" << node.work << "\n";
}
std::cout << std::endl;
mutex.unlock();
}
});
#endif // __CONTEXT__CONTEXT_SHARED_WORK_TRACE__
boost::system::error_code ec;
contexts[0].run(ec);
for (auto &thread : threads)
{
thread.join();
}
#ifdef __CONTEXT__CONTEXT_SHARED_WORK_TRACE__
trace = false;
thread.join();
#endif // __CONTEXT__CONTEXT_SHARED_WORK_TRACE__
promise->set_value(ec);
})
.detach();
}
}
// 等待完成
auto ec = completer.second.get();
{
std::lock_guard<mutex_type> lock(mutex_);
if (completer.first == completer_.first)
{
completer_.first.reset();
}
}
if (ec)
{
throw boost::system::system_error(ec);
}
}
virtual void stop() override
{
std::lock_guard<mutex_type> lock(mutex_);
for (auto &context : contexts_)
{
context.stop();
}
works_.clear();
}
};
};
namespace std
{
// 特化 swap
template <>
void swap<king011::context::priority_wrapper>(king011::context::priority_wrapper &a, king011::context::priority_wrapper &b)
{
a.swap(b);
}
};
#endif // __CONTEXT__CONTEXT_SHARED_WORK_HPP__
然而 shared_work_context_t 只是平衡了每個 io_context 關聯的異步對象數量,即平衡了每個工作線程關聯的異步對象數量,如果某些異步對象的異步事件很頻繁而另外一些對象長期閒置依然會導致某些線程繁忙另外一些空閒,但這不是通常情況,大部分需要高效的繁忙服務器異步事件和異步對象是成爲正比的,如果需要平衡異步事件而非平衡異步對象可以使用 threads_context_t 的實現。
總結
通常上述三種實現都可以很好並且高效的工作,除非有特殊需求否則任選一個即可
- 如果需要工作線程間的 異步事件 能夠平衡 應該選用 threads_context_t
- 異步對象生命期 差不多 需要儘可能高效利用 asio 應該選用 round_robin_context_t
- 異步對象生命期長短不一 同時需要儘可能高效利用 asio 應該選用 shared_work_context_t