7#ifndef BOOST_REDIS_RUNNER_HPP
8#define BOOST_REDIS_RUNNER_HPP
10#include <boost/redis/detail/health_checker.hpp>
11#include <boost/redis/config.hpp>
12#include <boost/redis/response.hpp>
13#include <boost/redis/detail/helper.hpp>
14#include <boost/redis/error.hpp>
15#include <boost/redis/logger.hpp>
16#include <boost/redis/operation.hpp>
17#include <boost/redis/detail/connector.hpp>
18#include <boost/redis/detail/resolver.hpp>
19#include <boost/redis/detail/handshaker.hpp>
20#include <boost/asio/compose.hpp>
21#include <boost/asio/connect.hpp>
22#include <boost/asio/coroutine.hpp>
23#include <boost/asio/experimental/parallel_group.hpp>
24#include <boost/asio/ip/tcp.hpp>
25#include <boost/asio/steady_timer.hpp>
30namespace boost::redis::detail
33template <
class Runner,
class Connection,
class Logger>
35 Runner* runner_ =
nullptr;
36 Connection* conn_ =
nullptr;
38 asio::coroutine coro_{};
41 void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
43 BOOST_ASIO_CORO_REENTER (coro_)
45 runner_->hello_req_.clear();
46 if (runner_->hello_resp_.has_value())
47 runner_->hello_resp_.value().clear();
51 conn_->async_exec(runner_->hello_req_, runner_->hello_resp_, std::move(self));
52 logger_.on_hello(ec, runner_->hello_resp_);
59template <
class Runner,
class Connection,
class Logger>
62 Runner* runner_ =
nullptr;
63 Connection* conn_ =
nullptr;
65 asio::coroutine coro_{};
68 runner_op(Runner* runner, Connection* conn, Logger l)
75 void operator()( Self& self
76 , std::array<std::size_t, 3> order = {}
77 , system::error_code ec0 = {}
78 , system::error_code ec1 = {}
79 , system::error_code ec2 = {}
82 BOOST_ASIO_CORO_REENTER (coro_)
85 asio::experimental::make_parallel_group(
86 [
this](
auto token) {
return runner_->async_run_all(*conn_, logger_, token); },
87 [
this](
auto token) {
return runner_->health_checker_.async_check_health(*conn_, token); },
88 [
this](
auto token) {
return runner_->async_hello(*conn_, logger_, token); }
90 asio::experimental::wait_for_all(),
93 if (is_cancelled(self)) {
94 self.complete(asio::error::operation_aborted);
103 if (order[0] == 2 && !!ec2) {
118template <
class Runner,
class Connection,
class Logger>
120 Runner* runner_ =
nullptr;
121 Connection* conn_ =
nullptr;
123 asio::coroutine coro_{};
125 template <
class Self>
126 void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
128 BOOST_ASIO_CORO_REENTER (coro_)
130 BOOST_ASIO_CORO_YIELD
131 runner_->resv_.async_resolve(std::move(self));
132 logger_.on_resolve(ec, runner_->resv_.results());
135 BOOST_ASIO_CORO_YIELD
136 runner_->ctor_.async_connect(conn_->next_layer().next_layer(), runner_->resv_.results(), std::move(self));
137 logger_.on_connect(ec, runner_->ctor_.endpoint());
140 if (conn_->use_ssl()) {
141 BOOST_ASIO_CORO_YIELD
142 runner_->hsher_.async_handshake(conn_->next_layer(), std::move(self));
143 logger_.on_ssl_handshake(ec);
147 BOOST_ASIO_CORO_YIELD
148 conn_->async_run_lean(runner_->cfg_, logger_, std::move(self));
149 BOOST_REDIS_CHECK_OP0(;)
155template <
class Executor>
158 runner(Executor ex, config cfg)
162 , health_checker_{ex}
171 health_checker_.cancel(op);
175 void set_config(config
const& cfg)
178 resv_.set_config(cfg);
179 ctor_.set_config(cfg);
180 hsher_.set_config(cfg);
181 health_checker_.set_config(cfg);
184 template <
class Connection,
class Logger,
class CompletionToken>
185 auto async_run(Connection& conn, Logger l, CompletionToken token)
187 return asio::async_compose
189 , void(system::error_code)
190 >(runner_op<runner, Connection, Logger>{
this, &conn, l}, token, conn);
193 config
const& get_config() const noexcept {
return cfg_;}
196 using resolver_type = resolver<Executor>;
197 using connector_type = connector<Executor>;
198 using handshaker_type = detail::handshaker<Executor>;
199 using health_checker_type = health_checker<Executor>;
200 using timer_type =
typename connector_type::timer_type;
202 template <
class,
class,
class>
friend struct run_all_op;
203 template <
class,
class,
class>
friend class runner_op;
204 template <
class,
class,
class>
friend struct hello_op;
206 template <
class Connection,
class Logger,
class CompletionToken>
207 auto async_run_all(Connection& conn, Logger l, CompletionToken token)
209 return asio::async_compose
211 , void(system::error_code)
212 >(run_all_op<runner, Connection, Logger>{
this, &conn, l}, token, conn);
215 template <
class Connection,
class Logger,
class CompletionToken>
216 auto async_hello(Connection& conn, Logger l, CompletionToken token)
218 return asio::async_compose
220 , void(system::error_code)
221 >(hello_op<runner, Connection, Logger>{
this, &conn, l}, token, conn);
226 if (!cfg_.username.empty() && !cfg_.password.empty() && !cfg_.clientname.empty())
227 hello_req_.push(
"HELLO",
"3",
"AUTH", cfg_.username, cfg_.password,
"SETNAME", cfg_.clientname);
228 else if (cfg_.username.empty() && cfg_.password.empty() && cfg_.clientname.empty())
229 hello_req_.push(
"HELLO",
"3");
230 else if (cfg_.clientname.empty())
231 hello_req_.push(
"HELLO",
"3",
"AUTH", cfg_.username, cfg_.password);
233 hello_req_.push(
"HELLO",
"3",
"SETNAME", cfg_.clientname);
235 if (cfg_.database_index)
236 hello_req_.push(
"SELECT", cfg_.database_index.value());
240 connector_type ctor_;
241 handshaker_type hsher_;
242 health_checker_type health_checker_;
adapter::result< std::vector< resp3::node > > generic_response
A generic response to a request.
operation
Connection operations that can be cancelled.
@ resolve_timeout
Resolve timeout.
@ pong_timeout
Connect timeout.
@ connect_timeout
Connect timeout.
@ run
Refers to connection::async_run operations.