boost_redis 1.4.2
A redis client library
cpp20_subscriber.cpp
1/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
2 *
3 * Distributed under the Boost Software License, Version 1.0. (See
4 * accompanying file LICENSE.txt)
5 */
6
7#include <boost/redis/connection.hpp>
8#include <boost/redis/logger.hpp>
9#include <boost/asio/awaitable.hpp>
10#include <boost/asio/use_awaitable.hpp>
11#include <boost/asio/deferred.hpp>
12#include <boost/asio/co_spawn.hpp>
13#include <boost/asio/detached.hpp>
14#include <boost/asio/consign.hpp>
15#include <boost/asio/redirect_error.hpp>
16#include <boost/asio/signal_set.hpp>
17#include <iostream>
18
19#if defined(BOOST_ASIO_HAS_CO_AWAIT)
20
21namespace net = boost::asio;
22using namespace std::chrono_literals;
28using boost::system::error_code;
30using signal_set = net::deferred_t::as_default_on_t<net::signal_set>;
31
32/* This example will subscribe and read pushes indefinitely.
33 *
34 * To test send messages with redis-cli
35 *
36 * $ redis-cli -3
37 * 127.0.0.1:6379> PUBLISH channel some-message
38 * (integer) 3
39 * 127.0.0.1:6379>
40 *
41 * To test reconnection try, for example, to close all clients currently
42 * connected to the Redis instance
43 *
44 * $ redis-cli
45 * > CLIENT kill TYPE pubsub
46 */
47
48// Receives server pushes.
49auto
50receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
51{
52 request req;
53 req.push("SUBSCRIBE", "channel");
54
55 // Loop while reconnection is enabled
56 while (conn->will_reconnect()) {
57
58 // Reconnect to channels.
59 co_await conn->async_exec(req, ignore, net::deferred);
60
61 // Loop reading Redis pushs messages.
62 for (generic_response resp;;) {
63 error_code ec;
64 co_await conn->async_receive(resp, net::redirect_error(net::use_awaitable, ec));
65 if (ec)
66 break; // Connection lost, break so we can reconnect to channels.
67 std::cout
68 << resp.value().at(1).value
69 << " " << resp.value().at(2).value
70 << " " << resp.value().at(3).value
71 << std::endl;
72 resp.value().clear();
73 }
74 }
75}
76
77auto co_main(config cfg) -> net::awaitable<void>
78{
79 auto ex = co_await net::this_coro::executor;
80 auto conn = std::make_shared<connection>(ex);
81 net::co_spawn(ex, receiver(conn), net::detached);
82 conn->async_run(cfg, {}, net::consign(net::detached, conn));
83
84 signal_set sig_set(ex, SIGINT, SIGTERM);
85 co_await sig_set.async_wait();
86
87 conn->cancel();
88}
89
90#endif // defined(BOOST_ASIO_HAS_CO_AWAIT)
A basic_connection that type erases the executor.
Definition: connection.hpp:308
auto async_exec(request const &req, Response &resp, CompletionToken token)
Calls boost::redis::basic_connection::async_exec.
Definition: connection.hpp:352
Logger class.
Definition: logger.hpp:23
Creates Redis requests.
Definition: request.hpp:46
ignore_t ignore
Global ignore object.
adapter::result< std::vector< resp3::node > > generic_response
A generic response to a request.
Definition: response.hpp:33
Configure parameters used by the connection classes.
Definition: config.hpp:30