boost_redis 1.4.2
A redis client library
connection.hpp
1/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
2 *
3 * Distributed under the Boost Software License, Version 1.0. (See
4 * accompanying file LICENSE.txt)
5 */
6
7#ifndef BOOST_REDIS_CONNECTION_HPP
8#define BOOST_REDIS_CONNECTION_HPP
9
10#include <boost/redis/detail/connection_base.hpp>
11#include <boost/redis/logger.hpp>
12#include <boost/redis/config.hpp>
13#include <boost/asio/io_context.hpp>
14#include <boost/asio/coroutine.hpp>
15#include <boost/asio/steady_timer.hpp>
16#include <boost/asio/any_io_executor.hpp>
17#include <boost/asio/any_completion_handler.hpp>
18
19#include <chrono>
20#include <memory>
21#include <limits>
22
23namespace boost::redis {
24namespace detail
25{
26template <class Connection, class Logger>
27struct reconnection_op {
28 Connection* conn_ = nullptr;
29 Logger logger_;
30 asio::coroutine coro_{};
31
32 template <class Self>
33 void operator()(Self& self, system::error_code ec = {})
34 {
35 BOOST_ASIO_CORO_REENTER (coro_) for (;;)
36 {
37 BOOST_ASIO_CORO_YIELD
38 conn_->impl_.async_run(conn_->cfg_, logger_, std::move(self));
39 conn_->cancel(operation::receive);
40 logger_.on_connection_lost(ec);
41 if (!conn_->will_reconnect() || is_cancelled(self)) {
42 conn_->cancel(operation::reconnection);
43 self.complete(!!ec ? ec : asio::error::operation_aborted);
44 return;
45 }
46
47 conn_->timer_.expires_after(conn_->cfg_.reconnect_wait_interval);
48 BOOST_ASIO_CORO_YIELD
49 conn_->timer_.async_wait(std::move(self));
50 BOOST_REDIS_CHECK_OP0(;)
51 if (!conn_->will_reconnect()) {
52 self.complete(asio::error::operation_aborted);
53 return;
54 }
55 conn_->reset_stream();
56 }
57 }
58};
59} // detail
60
71template <class Executor>
73public:
75 using executor_type = Executor;
76
79 { return impl_.get_executor(); }
80
82 template <class Executor1>
84 {
87 };
88
90 explicit
93 asio::ssl::context::method method = asio::ssl::context::tls_client,
94 std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)())
95 : impl_{ex, method, max_read_size}
96 , timer_{ex}
97 { }
98
100 explicit
102 asio::io_context& ioc,
103 asio::ssl::context::method method = asio::ssl::context::tls_client,
104 std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)())
105 : basic_connection(ioc.get_executor(), method, max_read_size)
106 { }
107
145 template <
146 class Logger = logger,
147 class CompletionToken = asio::default_completion_token_t<executor_type>>
148 auto
150 config const& cfg = {},
151 Logger l = Logger{},
152 CompletionToken token = CompletionToken{})
153 {
154 using this_type = basic_connection<executor_type>;
155
156 cfg_ = cfg;
157 l.set_prefix(cfg_.log_prefix);
158 return asio::async_compose
159 < CompletionToken
160 , void(system::error_code)
161 >(detail::reconnection_op<this_type, Logger>{this, l}, token, timer_);
162 }
163
187 template <
188 class Response = ignore_t,
189 class CompletionToken = asio::default_completion_token_t<executor_type>
190 >
191 auto
193 Response& response,
194 CompletionToken token = CompletionToken{})
195 {
196 return impl_.async_receive(response, token);
197 }
198
222 template <
223 class Response = ignore_t,
224 class CompletionToken = asio::default_completion_token_t<executor_type>
225 >
226 auto
228 request const& req,
229 Response& resp = ignore,
230 CompletionToken&& token = CompletionToken{})
231 {
232 return impl_.async_exec(req, resp, std::forward<CompletionToken>(token));
233 }
234
248 {
249 switch (op) {
251 case operation::all:
252 cfg_.reconnect_wait_interval = std::chrono::seconds::zero();
253 timer_.cancel();
254 break;
255 default: /* ignore */;
256 }
257
258 impl_.cancel(op);
259 }
260
262 bool will_reconnect() const noexcept
263 { return cfg_.reconnect_wait_interval != std::chrono::seconds::zero();}
264
266 auto const& get_ssl_context() const noexcept
267 { return impl_.get_ssl_context();}
268
270 auto& get_ssl_context() noexcept
271 { return impl_.get_ssl_context();}
272
275 { impl_.reset_stream(); }
276
278 auto& next_layer() noexcept
279 { return impl_.next_layer(); }
280
282 auto const& next_layer() const noexcept
283 { return impl_.next_layer(); }
284
285private:
286 using timer_type =
287 asio::basic_waitable_timer<
288 std::chrono::steady_clock,
289 asio::wait_traits<std::chrono::steady_clock>,
290 Executor>;
291
292 template <class, class> friend struct detail::reconnection_op;
293
294 config cfg_;
296 timer_type timer_;
297};
298
309public:
311 using executor_type = asio::any_io_executor;
312
314 explicit
316 executor_type ex,
317 asio::ssl::context::method method = asio::ssl::context::tls_client,
318 std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)());
319
321 explicit
323 asio::io_context& ioc,
324 asio::ssl::context::method method = asio::ssl::context::tls_client,
325 std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)());
326
329 { return impl_.get_executor(); }
330
332 template <class CompletionToken>
333 auto async_run(config const& cfg, logger l, CompletionToken token)
334 {
335 return asio::async_initiate<
336 CompletionToken, void(boost::system::error_code)>(
337 [](auto handler, connection* self, config const* cfg, logger l)
338 {
339 self->async_run_impl(*cfg, l, std::move(handler));
340 }, token, this, &cfg, l);
341 }
342
344 template <class Response, class CompletionToken>
345 auto async_receive(Response& response, CompletionToken token)
346 {
347 return impl_.async_receive(response, std::move(token));
348 }
349
351 template <class Response, class CompletionToken>
352 auto async_exec(request const& req, Response& resp, CompletionToken token)
353 {
354 return impl_.async_exec(req, resp, std::move(token));
355 }
356
359
361 bool will_reconnect() const noexcept
362 { return impl_.will_reconnect();}
363
365 auto& next_layer() noexcept
366 { return impl_.next_layer(); }
367
369 auto const& next_layer() const noexcept
370 { return impl_.next_layer(); }
371
374 { impl_.reset_stream();}
375
376private:
377 void
378 async_run_impl(
379 config const& cfg,
380 logger l,
381 asio::any_completion_handler<void(boost::system::error_code)> token);
382
384};
385
386} // boost::redis
387
388#endif // BOOST_REDIS_CONNECTION_HPP
A SSL connection to the Redis server.
Definition: connection.hpp:72
void reset_stream()
Resets the underlying stream.
Definition: connection.hpp:274
auto async_receive(Response &response, CompletionToken token=CompletionToken{})
Receives server side pushes asynchronously.
Definition: connection.hpp:192
bool will_reconnect() const noexcept
Returns true if the connection was canceled.
Definition: connection.hpp:262
executor_type get_executor() noexcept
Returns the underlying executor.
Definition: connection.hpp:78
auto async_exec(request const &req, Response &resp=ignore, CompletionToken &&token=CompletionToken{})
Executes commands on the Redis server asynchronously.
Definition: connection.hpp:227
basic_connection(executor_type ex, asio::ssl::context::method method=asio::ssl::context::tls_client, std::size_t max_read_size=(std::numeric_limits< std::size_t >::max)())
Contructs from an executor.
Definition: connection.hpp:91
auto & get_ssl_context() noexcept
Returns the ssl context.
Definition: connection.hpp:270
auto const & get_ssl_context() const noexcept
Returns the ssl context.
Definition: connection.hpp:266
auto const & next_layer() const noexcept
Returns a const reference to the next layer.
Definition: connection.hpp:282
basic_connection(asio::io_context &ioc, asio::ssl::context::method method=asio::ssl::context::tls_client, std::size_t max_read_size=(std::numeric_limits< std::size_t >::max)())
Contructs from a context.
Definition: connection.hpp:101
void cancel(operation op=operation::all)
Cancel operations.
Definition: connection.hpp:247
auto & next_layer() noexcept
Returns a reference to the next layer.
Definition: connection.hpp:278
auto async_run(config const &cfg={}, Logger l=Logger{}, CompletionToken token=CompletionToken{})
Starts underlying connection operations.
Definition: connection.hpp:149
Executor executor_type
Executor type.
Definition: connection.hpp:75
Rebinds the socket type to another executor.
Definition: connection.hpp:84
A basic_connection that type erases the executor.
Definition: connection.hpp:308
bool will_reconnect() const noexcept
Calls boost::redis::basic_connection::will_reconnect.
Definition: connection.hpp:361
auto async_exec(request const &req, Response &resp, CompletionToken token)
Calls boost::redis::basic_connection::async_exec.
Definition: connection.hpp:352
auto async_receive(Response &response, CompletionToken token)
Calls boost::redis::basic_connection::async_receive.
Definition: connection.hpp:345
auto async_run(config const &cfg, logger l, CompletionToken token)
Calls boost::redis::basic_connection::async_run.
Definition: connection.hpp:333
asio::any_io_executor executor_type
Executor type.
Definition: connection.hpp:311
connection(executor_type ex, asio::ssl::context::method method=asio::ssl::context::tls_client, std::size_t max_read_size=(std::numeric_limits< std::size_t >::max)())
Contructs from an executor.
auto & next_layer() noexcept
Calls boost::redis::basic_connection::next_layer.
Definition: connection.hpp:365
auto const & next_layer() const noexcept
Calls boost::redis::basic_connection::next_layer.
Definition: connection.hpp:369
void cancel(operation op=operation::all)
Calls boost::redis::basic_connection::cancel.
connection(asio::io_context &ioc, asio::ssl::context::method method=asio::ssl::context::tls_client, std::size_t max_read_size=(std::numeric_limits< std::size_t >::max)())
Contructs from a context.
executor_type get_executor() noexcept
Returns the underlying executor.
Definition: connection.hpp:328
void reset_stream()
Calls boost::redis::basic_connection::reset_stream.
Definition: connection.hpp:373
auto get_executor()
Returns the associated executor.
virtual void cancel(operation op)
Cancels specific operations.
void reset_stream()
Resets the underlying stream.
auto & next_layer() noexcept
Returns a reference to the next layer.
auto const & get_ssl_context() const noexcept
Returns the ssl context.
Logger class.
Definition: logger.hpp:23
Creates Redis requests.
Definition: request.hpp:46
std::chrono::steady_clock::duration reconnect_wait_interval
Time waited before trying a reconnection.
Definition: config.hpp:80
std::string log_prefix
Logger prefix, see boost::redis::logger.
Definition: config.hpp:59
ignore_t ignore
Global ignore object.
std::decay_t< decltype(std::ignore)> ignore_t
Type used to ignore responses.
Definition: ignore.hpp:31
std::tuple< adapter::result< Ts >... > response
Response with compile-time size.
Definition: response.hpp:23
operation
Connection operations that can be cancelled.
Definition: operation.hpp:18
@ reconnection
Cancels reconnection.
@ all
Refers to all operations.
@ receive
Refers to connection::async_receive operations.
Configure parameters used by the connection classes.
Definition: config.hpp:30