Tutorial
delay
Let’s start with the simplest example possible: a simple delay.
async::main co_main(int argc, char * argv[]) (1)
{
asio::steady_timer tim{co_await asio::this_coro::executor, (2)
std::chrono::milliseconds(std::stoi(argv[1]))}; (3)
co_await tim.async_wait(async::use_op); (4)
co_return 0; (5)
}
1 | The co_main function defines an implicit main when defined
and is the easiest way to set up an environment to run asynchronous code. |
2 | Take the executor from the current coroutine promise. |
3 | Use an argument to set the timeout |
4 | Perform the wait by using async::use_op. |
5 | Return a value that gets returned from the implicit main. |
In this example we use the async/main.hpp header, which provides us with a main coroutine if co_main
is defined as above. This has a few advantages:
-
The environment get set up correctly (
executor
&memory
) -
asio is signaled that the context is single threaded
-
an
asio::signal_set
withSIGINT
&SIGTERM
is automatically connected to cancellations (i.e.Ctrl+C
causes cancellations)
This coroutine then has an executor in its promise (the promise the C++ name for a coroutine state. Not to be confused with async/promise.hpp) which we can obtain through the dummy-awaitables in the this_coro namespace.
echo server
We’ll be using the use_op
(asio completion) token everywhere,
so we’re using a default completion token, so that we can skip the last parameters.
namespace async = boost::async;
using boost::asio::ip::tcp;
using boost::asio::detached;
using tcp_acceptor = async::use_op_t::as_default_on_t<tcp::acceptor>;
using tcp_socket = async::use_op_t::as_default_on_t<tcp::socket>;
namespace this_coro = boost::async::this_coro;
We’re writing the echo function as a promise coroutine. It’s an eager coroutine and recommended as the default; in case a lazy coro is needed, task is available.
async::promise<void> echo(tcp_socket socket)
{
try (1)
{
char data[4096];
while (socket.is_open()) (2)
{
std::size_t n = co_await socket.async_read_some(boost::asio::buffer(data)); (3)
co_await async_write(socket, boost::asio::buffer(data, n)); (4)
}
}
catch (std::exception& e)
{
std::printf("echo: exception: %s\n", e.what());
}
}
1 | When using the use_op completion token, I/O errors are translated into C++ exceptions. Additionally, if the coroutine gets cancelled (e.g. because the user hit Ctrl-C), an exception will be raised, too. Under these conditions, we print the error and exit the loop. |
2 | We run the loop until we get cancelled (exception) or the user closes the connection. |
3 | Read as much as is available. |
4 | Write all the read bytes. |
Note that promise is eager. Calling echo
will immediately execute code until async_read_some
and then return control to the caller.
Next, we also need an acceptor function. Here, we’re using a generator to manage the acceptor state.
This is a coroutine that can be co_awaited multiple times, until a co_return
expression is reached.
async::generator<tcp_socket> listen()
{
tcp_acceptor acceptor({co_await async::this_coro::executor}, {tcp::v4(), 55555});
for (;;) (1)
{
tcp_socket sock = co_await acceptor.async_accept(); (2)
co_yield std::move(sock); (3)
}
co_return tcp_socket{acceptor.get_executor()}; (4)
}
1 | Cancellation will also lead to an exception here being thrown from the co_await |
2 | Asynchronously accept the connection |
3 | Yield it to the awaiting coroutine |
4 | co_return a value for C++ conformance. |
With those two functions we can now write the server:
async::promise<void> run_server(async::wait_group & workers)
{
auto l = listen(); (1)
while (true)
{
if (workers.size() == 10u)
co_await workers.wait_one(); (2)
else
workers.push_back(echo(co_await l)); (3)
}
}
1 | Construct the listener generator coroutine. When the object is destroyed, the coroutine will be cancelled, performing all required cleanup. |
2 | When we have more than 10 workers, we wait for one to finish |
3 | Accept a new connection & launch it. |
The wait_group is used to manage the running echo functions.
This class will cancel & await the running echo
coroutines.
We do not need to do the same for the listener
, because it will just stop on its own, when l
gets destroyed.
The destructor of a generator will cancel it.
Since the promise
is eager, just calling it is enough to launch.
We then put those promises into a wait_group which will allow us to tear down all the workers on scope exit.
async::main co_main(int argc, char ** argv)
{
co_await async::with(async::wait_group(), &run_server); (1)
co_return 0u;
}
1 | Run run_server with an async scope. |
The with function shown above, will run a function with a resource such as wait_group.
On scope exit with
will invoke & co_await
an asynchronous teardown function.
This will cause all connections to be properly shutdown before co_main
exists.
price ticker
To demonstrate channels
and other tools, we need a certain complexity.
For that purpose our project is a price ticker, that connects to
https://blockchain.info. A user can then connection to localhost
to query a given currency pair, like this:
wscat -c localhost:8080/btc/usd
First we do the same declarations as echo-server.
using executor_type = async::use_op_t::executor_with_default<async::executor>;
using socket_type = typename asio::ip::tcp::socket::rebind_executor<executor_type>::other;
using acceptor_type = typename asio::ip::tcp::acceptor::rebind_executor<executor_type>::other;
using websocket_type = beast::websocket::stream<asio::ssl::stream<socket_type>>;
namespace http = beast::http;
The next step is to write a function to connect an ssl-stream, to connect upstream:
async::promise<asio::ssl::stream<socket_type>> connect(
std::string host, boost::asio::ssl::context & ctx)
{
asio::ip::tcp::resolver res{async::this_thread::get_executor()};
auto ep = co_await res.async_resolve(host, "https", async::use_op); (1)
asio::ssl::stream<socket_type> sock{async::this_thread::get_executor(), ctx};
co_await sock.next_layer().async_connect(*ep.begin()); (2)
co_await sock.async_handshake(asio::ssl::stream_base::client); (3)
co_return sock; (4)
}
1 | Lookup the host |
2 | Connect to the endpoint |
3 | Do the ssl handshake |
4 | Return the socket to the caller |
Next, we’ll need a function to do the websocket upgrade on an existing ssl-stream.
async::promise<void> connect_to_blockchain_info(websocket_type & ws)
{
ws.set_option(beast::websocket::stream_base::decorator(
[](beast::websocket::request_type& req)
{
req.set(http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) + " async-ticker");
req.set(http::field::origin,
"https://exchange.blockchain.com"); (1)
}));
co_await ws.async_handshake("ws.blockchain.info", "/mercury-gateway/v1/ws"); (2)
}
1 | blockchain.info requires this header to be set. |
2 | Perform the websocket handshake. |
Once the websocket is connected, we want to continuously receive json messages, for which a generator is a good choice.
async::generator<json::object> json_reader(websocket_type & ws)
try
{
beast::flat_buffer buf;
while (ws.is_open()) (1)
{
auto sz = co_await ws.async_read(buf); (2)
json::string_view data{static_cast<const char*>(buf.cdata().data()), sz};
auto obj = json::parse(data);
co_yield obj.as_object(); (3)
buf.consume(sz);
}
co_return {};
}
catch (std::exception & e)
{
std::cerr << "Error reading: " << e.what() << std::endl;
throw;
}
1 | Keep running as long as the socket is open |
2 | Read a frame from the websocket |
3 | Parse & co_yield it as an object. |
This then needs to be connected to subscriber, for which we’ll utilize channels to pass raw json.
To make life-time management easy, the subscriber will hold a shared_ptr
, and the producer a weak_ptr
.
using subscription = std::pair<std::string, std::weak_ptr<async::channel<json::object>>>;
using subscription_channel = std::weak_ptr<async::channel<json::object>>;
using subscription_map = boost::unordered_multimap<std::string, subscription_channel>;
The main function running the blockchain connector, operates on two inputs: data coming from the websocket and a channel to handle new subscriptions.
async::promise<void> run_blockchain_info(async::channel<subscription> & subc)
try
{
asio::ssl::context ctx{asio::ssl::context_base::tls_client};
websocket_type ws{co_await connect("blockchain.info", ctx)};
co_await connect_to_blockchain_info(ws); (1)
subscription_map subs;
std::list<std::string> unconfirmed;
auto rd = json_reader(ws); (2)
while (ws.is_open()) (3)
{
switch (auto msg = co_await async::select(rd, subc.read()); msg.index()) (4)
{
case 0: (5)
if (auto ms = get<0>(msg);
ms.at("event") == "rejected") // invalid sub, cancel however subbed
co_await handle_rejections(unconfirmed, subs, ms);
else
co_await handle_update(unconfirmed, subs, ms, ws);
break;
case 1: // (6)
co_await handle_new_subscription(
unconfirmed, subs,
std::move(get<1>(msg)), ws);
break;
}
}
for (auto & [k ,c] : subs)
{
if (auto ptr = c.lock())
ptr->close();
}
}
catch(std::exception & e)
{
std::cerr << "Exception: " << e.what() << std::endl;
throw;
}
1 | Initialize the connection |
2 | Instantiate the json_reader |
3 | Run as long as the websocket is open |
4 | Select, i.e. wait for either a new json message or subscription |
5 | When its a json handle an update or a rejection |
6 | Handle new subscription messages |
The handle_*
function’s contents are not as important for the async
functionality,
so its skipped in this tutorial.
The handle_new_subscription
function sends a message to the blockchain.info
,
which will send a confirmation or rejection back.
The handle_rejection
and handle_update
will take the json values
and forward them to the subscription channel.
On the consumer side, our server will just forward data to the client.
If the client inputs data, we’ll close the websocket immediately.
We’re using as_tuple
to ignore potential errors.
async::promise<void> read_and_close(beast::websocket::stream<socket_type> & st, beast::flat_buffer buf)
{
system::error_code ec;
co_await st.async_read(buf, asio::as_tuple(async::use_op));
co_await st.async_close(beast::websocket::close_code::going_away, asio::as_tuple(async::use_op));
st.next_layer().close(ec);
}
Next, we’re running the session that the users sends
async::promise<void> run_session(beast::websocket::stream<socket_type> st,
async::channel<subscription> & subc)
try
{
http::request<http::empty_body> req;
beast::flat_buffer buf;
co_await http::async_read(st.next_layer(), buf, req); (1)
// check the target
auto r = urls::parse_uri_reference(req.target());
if (r.has_error() || (r->segments().size() != 2u)) (2)
{
http::response<http::string_body> res{http::status::bad_request, 11};
res.body() = r.has_error() ? r.error().message() :
"url needs two segments, e.g. /btc/usd";
co_await http::async_write(st.next_layer(), res);
st.next_layer().close();
co_return ;
}
co_await st.async_accept(req); (3)
auto sym = std::string(r->segments().front()) + "-" +
std::string(r->segments().back());
boost::algorithm::to_upper(sym);
// close when data gets sent
auto p = read_and_close(st, std::move(buf)); (4)
auto ptr = std::make_shared<async::channel<json::object>>(1u); (5)
co_await subc.write(subscription{sym, ptr}); (6)
while (ptr->is_open() && st.is_open()) (7)
{
auto bb = json::serialize(co_await ptr->read());
co_await st.async_write(asio::buffer(bb));
}
co_await st.async_close(beast::websocket::close_code::going_away,
asio::as_tuple(async::use_op)); (8)
st.next_layer().close();
co_await p; (9)
}
catch(std::exception & e)
{
std::cerr << "Session ended with exception: " << e.what() << std::endl;
}
1 | Read the http request, because we want the path |
2 | Check the path, e.g. /btc/usd . |
3 | Accept the websocket |
4 | Start reading & close if the consumer sends something |
5 | Create the channel to receive updates |
6 | Send a subscription requests to run_blockchain_info |
7 | While the channel & websocket are open, we’re forwarding data. |
8 | Close the socket & ignore the error |
9 | Since the websocket is surely closed by now, wait for the read_and_close to close. |
With run_session
and run_blockchain_info
written, we can not move on to main:
async::main co_main(int argc, char * argv[])
{
acceptor_type acc{co_await async::this_coro::executor,
asio::ip::tcp::endpoint (asio::ip::tcp::v4(), 8080)};
std::cout << "Listening on localhost:8080" << std::endl;
constexpr int limit = 10; // allow 10 ongoing sessions
async::channel<subscription> sub_manager; (1)
co_await join( (2)
run_blockchain_info(sub_manager),
async::with( (3)
async::wait_group(
asio::cancellation_type::all,
asio::cancellation_type::all),
[&](async::wait_group & sessions) -> async::promise<void>
{
while (!co_await async::this_coro::cancelled) (4)
{
if (sessions.size() >= limit) (5)
co_await sessions.wait_one();
auto conn = co_await acc.async_accept(); (6)
sessions.push_back( (7)
run_session(
beast::websocket::stream<socket_type>{std::move(conn)},
sub_manager));
}
})
);
co_return 0;
}
1 | Create the channel to manage subscriptions |
2 | Use join to run both tasks in parallel. |
3 | Use an async scope to provide a wait_group . |
4 | Run until cancelled. |
5 | When we’ve reached the limit we wait for one task to complete. |
6 | Wait for a new connection. |
7 | Insert the session into the wait_group . |
Main is using join
because one task failing should cancel the other one.
delay op
We’ve used the use_op
so far, to use an implicit operation based on asio’s completion token mechanic.
We can however implement our own ops, that can also utilize the async_ready
optimization.
To leverage this coroutine feature, async
provides an easy way to create a skipable operation:
struct wait_op final : async::op<system::error_code> (1)
{
asio::steady_timer & tim;
wait_op(asio::steady_timer & tim) : tim(tim) {}
void ready(async::handler<system::error_code> h ) override (2)
{
if (tim.expiry() < std::chrono::steady_clock::now())
h(system::error_code{});
}
void initiate(async::completion_handler<system::error_code> complete) override (3)
{
tim.async_wait(std::move(complete));
}
};
async::main co_main(int argc, char * argv[])
{
asio::steady_timer tim{co_await asio::this_coro::executor,
std::chrono::milliseconds(std::stoi(argv[1]))};
co_await wait_op(tim); (4)
co_return 0; //
}
1 | Declare the op. We inherit op to make it awaitable. |
2 | The pre-suspend check is implemented here |
3 | Do the wait if we need to |
4 | Use the op just like any other awaitable. |
This way we can minimize the amounts of coroutine suspensions.
While the above is used with asio, you can also use these handlers with any other callback based code.
Generator with push value
Coroutines with push values are not as common, but can simplify certain issues significantly.
Since we’ve already got a json_reader in the previous example, here’s how we can write a json_writer that gets values pushed in.
The advantage of using a generator is the internal state management.
async::generator<system::error_code, json::object>
json_writer(websocket_type & ws)
try
{
char buffer[4096];
json::serializer ser;
while (ws.is_open()) (1)
{
auto val = co_yield system::error_code{}; (2)
while (!ser.done())
{
auto sv = ser.read(buffer);
co_await ws.async_write({sv.data(), sv.size()}); (3)
}
}
co_return {};
}
catch (system::system_error& e)
{
co_return e.code();
}
catch (std::exception & e)
{
std::cerr << "Error reading: " << e.what() << std::endl;
throw;
}
1 | Keep running as long as the socket is open |
2 | co_yield the current error and retrieve a new value. |
3 | Write a frame to the websocket |
Now we can use the generator like this:
auto g = json_writer(my_ws);
extern std::vector<json::value> to_write;
for (auto && tw : std::move(to_write))
{
if (auto ec = co_await g(std::move(tw)))
return ec; // yield error
}
Advanced examples
More examples are provided in the repository as code only. All examples are listed below.
An http client that performs a single http get request. |
|
Using the |
|
Uisng nanobind to integrate async with python. It uses python’s asyncio as executor and allows C++ to co_await python functions et vice versa. |
|
Adopting |
|
Creating a |
|
Using worker threads with |
|
Using an |
|
The example used by the delay section |
|
The example used by the delay op section |
|
The example used by the echo server section |
|
The example used by the price ticker section |
|
The example used by the channel reference |