Tutorial

delay

Let’s start with the simplest example possible: a simple delay.

example/delay.cpp
async::main co_main(int argc, char * argv[]) (1)
{
  asio::steady_timer tim{co_await asio::this_coro::executor, (2)
                         std::chrono::milliseconds(std::stoi(argv[1]))}; (3)
  co_await tim.async_wait(async::use_op); (4)
  co_return 0; (5)
}
1 The co_main function defines an implicit main when defined and is the easiest way to set up an environment to run asynchronous code.
2 Take the executor from the current coroutine promise.
3 Use an argument to set the timeout
4 Perform the wait by using async::use_op.
5 Return a value that gets returned from the implicit main.

In this example we use the async/main.hpp header, which provides us with a main coroutine if co_main is defined as above. This has a few advantages:

  • The environment get set up correctly (executor & memory)

  • asio is signaled that the context is single threaded

  • an asio::signal_set with SIGINT & SIGTERM is automatically connected to cancellations (i.e. Ctrl+C causes cancellations)

This coroutine then has an executor in its promise (the promise the C++ name for a coroutine state. Not to be confused with async/promise.hpp) which we can obtain through the dummy-awaitables in the this_coro namespace.

We can then construct a timer and initiate the async_wait with use_op. async provides multiple ways to co_await to interact with asio, of which use_op is the easiest.

echo server

We’ll be using the use_op (asio completion) token everywhere, so we’re using a default completion token, so that we can skip the last parameters.

example/echo_server.cpp declarations
namespace async = boost::async;
using boost::asio::ip::tcp;
using boost::asio::detached;
using tcp_acceptor = async::use_op_t::as_default_on_t<tcp::acceptor>;
using tcp_socket   = async::use_op_t::as_default_on_t<tcp::socket>;
namespace this_coro = boost::async::this_coro;

We’re writing the echo function as a promise coroutine. It’s an eager coroutine and recommended as the default; in case a lazy coro is needed, task is available.

example/echo_server.cpp echo function
async::promise<void> echo(tcp_socket socket)
{
  try (1)
  {
    char data[4096];
    while (socket.is_open()) (2)
    {
      std::size_t n = co_await socket.async_read_some(boost::asio::buffer(data)); (3)
      co_await async_write(socket, boost::asio::buffer(data, n)); (4)
    }
  }
  catch (std::exception& e)
  {
    std::printf("echo: exception: %s\n", e.what());
  }
}
1 When using the use_op completion token, I/O errors are translated into C++ exceptions. Additionally, if the coroutine gets cancelled (e.g. because the user hit Ctrl-C), an exception will be raised, too. Under these conditions, we print the error and exit the loop.
2 We run the loop until we get cancelled (exception) or the user closes the connection.
3 Read as much as is available.
4 Write all the read bytes.

Note that promise is eager. Calling echo will immediately execute code until async_read_some and then return control to the caller.

Next, we also need an acceptor function. Here, we’re using a generator to manage the acceptor state. This is a coroutine that can be co_awaited multiple times, until a co_return expression is reached.

example/echo_server.cpp listen function
async::generator<tcp_socket> listen()
{
  tcp_acceptor acceptor({co_await async::this_coro::executor}, {tcp::v4(), 55555});
  for (;;) (1)
  {
    tcp_socket sock = co_await acceptor.async_accept(); (2)
    co_yield std::move(sock); (3)
  }
  co_return tcp_socket{acceptor.get_executor()}; (4)
}
1 Cancellation will also lead to an exception here being thrown from the co_await
2 Asynchronously accept the connection
3 Yield it to the awaiting coroutine
4 co_return a value for C++ conformance.

With those two functions we can now write the server:

example/echo_server.cpp run_server function
async::promise<void> run_server(async::wait_group & workers)
{
  auto l = listen(); (1)
  while (true)
  {
    if (workers.size() == 10u)
      co_await workers.wait_one();  (2)
    else
      workers.push_back(echo(co_await l)); (3)
  }
}
1 Construct the listener generator coroutine. When the object is destroyed, the coroutine will be cancelled, performing all required cleanup.
2 When we have more than 10 workers, we wait for one to finish
3 Accept a new connection & launch it.

The wait_group is used to manage the running echo functions. This class will cancel & await the running echo coroutines.

We do not need to do the same for the listener, because it will just stop on its own, when l gets destroyed. The destructor of a generator will cancel it.

Since the promise is eager, just calling it is enough to launch. We then put those promises into a wait_group which will allow us to tear down all the workers on scope exit.

example/echo_server.cpp co_main function
async::main co_main(int argc, char ** argv)
{
  co_await async::with(async::wait_group(), &run_server); (1)
  co_return 0u;
}
1 Run run_server with an async scope.

The with function shown above, will run a function with a resource such as wait_group. On scope exit with will invoke & co_await an asynchronous teardown function. This will cause all connections to be properly shutdown before co_main exists.

price ticker

To demonstrate channels and other tools, we need a certain complexity. For that purpose our project is a price ticker, that connects to https://blockchain.info. A user can then connection to localhost to query a given currency pair, like this:

wscat -c localhost:8080/btc/usd

First we do the same declarations as echo-server.

example/ticker.cpp declarations
using executor_type = async::use_op_t::executor_with_default<async::executor>;
using socket_type   = typename asio::ip::tcp::socket::rebind_executor<executor_type>::other;
using acceptor_type = typename asio::ip::tcp::acceptor::rebind_executor<executor_type>::other;
using websocket_type = beast::websocket::stream<asio::ssl::stream<socket_type>>;
namespace http = beast::http;

The next step is to write a function to connect an ssl-stream, to connect upstream:

example/ticker.cpp connect
async::promise<asio::ssl::stream<socket_type>> connect(
        std::string host, boost::asio::ssl::context & ctx)
{
    asio::ip::tcp::resolver res{async::this_thread::get_executor()};
    auto ep = co_await res.async_resolve(host, "https", async::use_op); (1)

    asio::ssl::stream<socket_type> sock{async::this_thread::get_executor(), ctx};
    co_await sock.next_layer().async_connect(*ep.begin()); (2)
    co_await sock.async_handshake(asio::ssl::stream_base::client); (3)

    co_return sock; (4)
}
1 Lookup the host
2 Connect to the endpoint
3 Do the ssl handshake
4 Return the socket to the caller

Next, we’ll need a function to do the websocket upgrade on an existing ssl-stream.

example/ticker.cpp connect_to_blockchain_info
async::promise<void> connect_to_blockchain_info(websocket_type & ws)
{
 ws.set_option(beast::websocket::stream_base::decorator(
     [](beast::websocket::request_type& req)
     {
       req.set(http::field::user_agent,
               std::string(BOOST_BEAST_VERSION_STRING) + " async-ticker");
       req.set(http::field::origin,
               "https://exchange.blockchain.com"); (1)
     }));

 co_await ws.async_handshake("ws.blockchain.info", "/mercury-gateway/v1/ws"); (2)
}
1 blockchain.info requires this header to be set.
2 Perform the websocket handshake.

Once the websocket is connected, we want to continuously receive json messages, for which a generator is a good choice.

example/ticker.cpp json_read
async::generator<json::object> json_reader(websocket_type & ws)
try
{
    beast::flat_buffer buf;
    while (ws.is_open()) (1)
    {
        auto sz = co_await ws.async_read(buf); (2)
        json::string_view data{static_cast<const char*>(buf.cdata().data()), sz};
        auto obj = json::parse(data);
        co_yield obj.as_object(); (3)
        buf.consume(sz);
    }
    co_return {};
}
catch (std::exception & e)
{
  std::cerr << "Error reading: " << e.what() << std::endl;
  throw;
}
1 Keep running as long as the socket is open
2 Read a frame from the websocket
3 Parse & co_yield it as an object.

This then needs to be connected to subscriber, for which we’ll utilize channels to pass raw json. To make life-time management easy, the subscriber will hold a shared_ptr, and the producer a weak_ptr.

example/ticker.cpp subscription types
using subscription = std::pair<std::string, std::weak_ptr<async::channel<json::object>>>;
using subscription_channel = std::weak_ptr<async::channel<json::object>>;
using subscription_map = boost::unordered_multimap<std::string, subscription_channel>;

The main function running the blockchain connector, operates on two inputs: data coming from the websocket and a channel to handle new subscriptions.

example/ticker.cpp run blockchain_info
async::promise<void> run_blockchain_info(async::channel<subscription> & subc)
try
{
    asio::ssl::context ctx{asio::ssl::context_base::tls_client};
    websocket_type ws{co_await connect("blockchain.info", ctx)};
    co_await connect_to_blockchain_info(ws); (1)

    subscription_map subs;
    std::list<std::string> unconfirmed;

    auto rd = json_reader(ws); (2)
    while (ws.is_open()) (3)
    {
      switch (auto msg = co_await async::select(rd, subc.read()); msg.index()) (4)
      {
        case 0: (5)
          if (auto ms = get<0>(msg);
              ms.at("event") == "rejected") // invalid sub, cancel however subbed
            co_await handle_rejections(unconfirmed, subs, ms);
          else
            co_await handle_update(unconfirmed, subs, ms, ws);
        break;
        case 1: // (6)
            co_await handle_new_subscription(
                unconfirmed, subs,
                std::move(get<1>(msg)), ws);
        break;
      }
    }

    for (auto & [k ,c] : subs)
    {
        if (auto ptr = c.lock())
            ptr->close();
    }
}
catch(std::exception & e)
{
  std::cerr << "Exception: " << e.what() << std::endl;
  throw;
}
1 Initialize the connection
2 Instantiate the json_reader
3 Run as long as the websocket is open
4 Select, i.e. wait for either a new json message or subscription
5 When its a json handle an update or a rejection
6 Handle new subscription messages

The handle_* function’s contents are not as important for the async functionality, so its skipped in this tutorial.

The handle_new_subscription function sends a message to the blockchain.info, which will send a confirmation or rejection back. The handle_rejection and handle_update will take the json values and forward them to the subscription channel.

On the consumer side, our server will just forward data to the client. If the client inputs data, we’ll close the websocket immediately. We’re using as_tuple to ignore potential errors.

example/ticker.cpp read and close
async::promise<void> read_and_close(beast::websocket::stream<socket_type> & st, beast::flat_buffer buf)
{
    system::error_code ec;
    co_await st.async_read(buf, asio::as_tuple(async::use_op));
    co_await st.async_close(beast::websocket::close_code::going_away, asio::as_tuple(async::use_op));
    st.next_layer().close(ec);
}

Next, we’re running the session that the users sends

example/ticker.cpp run_session
async::promise<void> run_session(beast::websocket::stream<socket_type> st,
                                 async::channel<subscription> & subc)
try
{
    http::request<http::empty_body> req;
    beast::flat_buffer buf;
    co_await http::async_read(st.next_layer(), buf, req); (1)
    // check the target
    auto r = urls::parse_uri_reference(req.target());
    if (r.has_error() || (r->segments().size() != 2u)) (2)
    {
        http::response<http::string_body> res{http::status::bad_request, 11};
        res.body() = r.has_error() ? r.error().message() :
                    "url needs two segments, e.g. /btc/usd";
        co_await http::async_write(st.next_layer(), res);
        st.next_layer().close();
        co_return ;
    }

    co_await st.async_accept(req); (3)

    auto sym = std::string(r->segments().front()) + "-" +
               std::string(r->segments().back());
    boost::algorithm::to_upper(sym);
    // close when data gets sent
    auto p = read_and_close(st, std::move(buf)); (4)

    auto ptr = std::make_shared<async::channel<json::object>>(1u); (5)
    co_await subc.write(subscription{sym, ptr}); (6)

    while (ptr->is_open() && st.is_open()) (7)
    {
      auto bb = json::serialize(co_await ptr->read());
      co_await st.async_write(asio::buffer(bb));
    }

    co_await st.async_close(beast::websocket::close_code::going_away,
                            asio::as_tuple(async::use_op)); (8)
    st.next_layer().close();
    co_await p; (9)

}
catch(std::exception & e)
{
    std::cerr << "Session ended with exception: " << e.what() << std::endl;
}
1 Read the http request, because we want the path
2 Check the path, e.g. /btc/usd.
3 Accept the websocket
4 Start reading & close if the consumer sends something
5 Create the channel to receive updates
6 Send a subscription requests to run_blockchain_info
7 While the channel & websocket are open, we’re forwarding data.
8 Close the socket & ignore the error
9 Since the websocket is surely closed by now, wait for the read_and_close to close.

With run_session and run_blockchain_info written, we can not move on to main:

example/ticker.cpp main
async::main co_main(int argc, char * argv[])
{
    acceptor_type acc{co_await async::this_coro::executor,
                      asio::ip::tcp::endpoint (asio::ip::tcp::v4(), 8080)};
    std::cout << "Listening on localhost:8080" << std::endl;

    constexpr int limit = 10; // allow 10 ongoing sessions
    async::channel<subscription> sub_manager; (1)

    co_await join( (2)
      run_blockchain_info(sub_manager),
      async::with( (3)
        async::wait_group(
            asio::cancellation_type::all,
            asio::cancellation_type::all),
        [&](async::wait_group & sessions) -> async::promise<void>
        {
          while (!co_await async::this_coro::cancelled) (4)
          {
            if (sessions.size() >= limit) (5)
              co_await sessions.wait_one();

            auto conn = co_await acc.async_accept(); (6)
            sessions.push_back( (7)
                run_session(
                    beast::websocket::stream<socket_type>{std::move(conn)},
                    sub_manager));
          }
        })
      );

    co_return 0;
}
1 Create the channel to manage subscriptions
2 Use join to run both tasks in parallel.
3 Use an async scope to provide a wait_group.
4 Run until cancelled.
5 When we’ve reached the limit we wait for one task to complete.
6 Wait for a new connection.
7 Insert the session into the wait_group.

Main is using join because one task failing should cancel the other one.

delay op

We’ve used the use_op so far, to use an implicit operation based on asio’s completion token mechanic.

We can however implement our own ops, that can also utilize the async_ready optimization. To leverage this coroutine feature, async provides an easy way to create a skipable operation:

example/delay_op.cpp
struct wait_op final : async::op<system::error_code> (1)
{
  asio::steady_timer & tim;
  wait_op(asio::steady_timer & tim) : tim(tim) {}
  void ready(async::handler<system::error_code> h ) override (2)
  {
    if (tim.expiry() < std::chrono::steady_clock::now())
      h(system::error_code{});
  }
  void initiate(async::completion_handler<system::error_code> complete) override (3)
  {
    tim.async_wait(std::move(complete));
  }
};


async::main co_main(int argc, char * argv[])
{
  asio::steady_timer tim{co_await asio::this_coro::executor,
                         std::chrono::milliseconds(std::stoi(argv[1]))};
  co_await wait_op(tim); (4)
  co_return 0; //
}
1 Declare the op. We inherit op to make it awaitable.
2 The pre-suspend check is implemented here
3 Do the wait if we need to
4 Use the op just like any other awaitable.

This way we can minimize the amounts of coroutine suspensions.

While the above is used with asio, you can also use these handlers with any other callback based code.

Generator with push value

Coroutines with push values are not as common, but can simplify certain issues significantly.

Since we’ve already got a json_reader in the previous example, here’s how we can write a json_writer that gets values pushed in.

The advantage of using a generator is the internal state management.

async::generator<system::error_code, json::object>
    json_writer(websocket_type & ws)
try
{
    char buffer[4096];
    json::serializer ser;

    while (ws.is_open()) (1)
    {
        auto val = co_yield system::error_code{}; (2)

        while (!ser.done())
        {
            auto sv = ser.read(buffer);
            co_await ws.async_write({sv.data(), sv.size()}); (3)
        }

    }
    co_return {};
}
catch (system::system_error& e)
{
    co_return e.code();
}
catch (std::exception & e)
{
    std::cerr << "Error reading: " << e.what() << std::endl;
    throw;
}
1 Keep running as long as the socket is open
2 co_yield the current error and retrieve a new value.
3 Write a frame to the websocket

Now we can use the generator like this:

auto g = json_writer(my_ws);

extern std::vector<json::value> to_write;

for (auto && tw : std::move(to_write))
{
    if (auto ec = co_await g(std::move(tw)))
        return ec; // yield error
}

Advanced examples

More examples are provided in the repository as code only. All examples are listed below.

Table 5. All examples

example/http.cpp

An http client that performs a single http get request.

example/outcome.cpp

Using the boost.outcome coroutine types.

example/python.cpp & example/python.py

Uisng nanobind to integrate async with python. It uses python’s asyncio as executor and allows C++ to co_await python functions et vice versa.

example/signals.cpp

Adopting boost.signals2 into an awaitable type (single threaded).

example/spsc.cpp

Creating a boost.lockfree based & awaitable spsc_queue (multi threaded).

example/thread.cpp

Using worker threads with asio’s `concurrent_channel.

example/thread_pool.cpp

Using an asio::thread_pool and spawning tasks onto them.

example/delay.cpp

The example used by the delay section

example/delay_op.cpp

The example used by the delay op section

example/echo_server.cpp

The example used by the echo server section

example/ticker.cpp

The example used by the price ticker section

example/channel.cpp

The example used by the channel reference