7#ifndef BOOST_REDIS_HEALTH_CHECKER_HPP
8#define BOOST_REDIS_HEALTH_CHECKER_HPP
11#include <boost/redis/request.hpp>
12#include <boost/redis/response.hpp>
13#include <boost/redis/operation.hpp>
14#include <boost/redis/detail/helper.hpp>
15#include <boost/redis/config.hpp>
16#include <boost/asio/steady_timer.hpp>
17#include <boost/asio/compose.hpp>
18#include <boost/asio/consign.hpp>
19#include <boost/asio/coroutine.hpp>
20#include <boost/asio/post.hpp>
21#include <boost/asio/experimental/parallel_group.hpp>
25namespace boost::redis::detail {
27template <
class HealthChecker,
class Connection>
30 HealthChecker* checker_ =
nullptr;
31 Connection* conn_ =
nullptr;
32 asio::coroutine coro_{};
35 void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
37 BOOST_ASIO_CORO_REENTER (coro_)
for (;;)
39 if (checker_->checker_has_exited_) {
45 conn_->async_exec(checker_->req_, checker_->resp_, std::move(self));
46 BOOST_REDIS_CHECK_OP0(checker_->wait_timer_.cancel();)
49 checker_->ping_timer_.expires_after(checker_->ping_interval_);
51 checker_->ping_timer_.async_wait(std::move(self));
52 BOOST_REDIS_CHECK_OP0(;)
57template <class HealthChecker, class Connection>
58class check_timeout_op {
60 HealthChecker* checker_ =
nullptr;
61 Connection* conn_ =
nullptr;
62 asio::coroutine coro_{};
65 void operator()(Self& self, system::error_code ec = {})
67 BOOST_ASIO_CORO_REENTER (coro_)
for (;;)
69 checker_->wait_timer_.expires_after(2 * checker_->ping_interval_);
71 checker_->wait_timer_.async_wait(std::move(self));
72 BOOST_REDIS_CHECK_OP0(;)
74 if (checker_->resp_.has_error()) {
79 if (checker_->resp_.value().empty()) {
80 checker_->ping_timer_.cancel();
82 checker_->checker_has_exited_ =
true;
87 if (checker_->resp_.has_value()) {
88 checker_->resp_.value().clear();
94template <
class HealthChecker,
class Connection>
95class check_health_op {
97 HealthChecker* checker_ =
nullptr;
98 Connection* conn_ =
nullptr;
99 asio::coroutine coro_{};
101 template <
class Self>
105 std::array<std::size_t, 2> order = {},
106 system::error_code ec1 = {},
107 system::error_code ec2 = {})
109 BOOST_ASIO_CORO_REENTER (coro_)
111 if (checker_->ping_interval_ == std::chrono::seconds::zero()) {
112 BOOST_ASIO_CORO_YIELD
113 asio::post(std::move(self));
118 BOOST_ASIO_CORO_YIELD
119 asio::experimental::make_parallel_group(
120 [
this](
auto token) {
return checker_->async_ping(*conn_, token); },
121 [
this](
auto token) {
return checker_->async_check_timeout(*conn_, token);}
123 asio::experimental::wait_for_one(),
126 if (is_cancelled(self)) {
127 self.complete(asio::error::operation_aborted);
132 case 0: self.complete(ec1);
return;
133 case 1: self.complete(ec2);
return;
134 default: BOOST_ASSERT(
false);
140template <
class Executor>
141class health_checker {
144 asio::basic_waitable_timer<
145 std::chrono::steady_clock,
146 asio::wait_traits<std::chrono::steady_clock>,
150 health_checker(Executor ex)
154 req_.push(
"PING",
"Boost.Redis");
157 void set_config(config
const& cfg)
160 req_.push(
"PING", cfg.health_check_id);
161 ping_interval_ = cfg.health_check_interval;
166 class CompletionToken = asio::default_completion_token_t<Executor>
168 auto async_check_health(Connection& conn, CompletionToken token = CompletionToken{})
170 checker_has_exited_ =
false;
171 return asio::async_compose
173 , void(system::error_code)
174 >(check_health_op<health_checker, Connection>{
this, &conn}, token, conn);
182 ping_timer_.cancel();
183 wait_timer_.cancel();
192 template <
class Connection,
class CompletionToken>
193 auto async_ping(Connection& conn, CompletionToken token)
195 return asio::async_compose
197 , void(system::error_code)
198 >(ping_op<health_checker, Connection>{
this, &conn}, token, conn, ping_timer_);
201 template <
class Connection,
class CompletionToken>
202 auto async_check_timeout(Connection& conn, CompletionToken token)
204 return asio::async_compose
206 , void(system::error_code)
207 >(check_timeout_op<health_checker, Connection>{
this, &conn}, token, conn, wait_timer_);
210 template <
class,
class>
friend class ping_op;
211 template <
class,
class>
friend class check_timeout_op;
212 template <
class,
class>
friend class check_health_op;
214 timer_type ping_timer_;
215 timer_type wait_timer_;
218 std::chrono::steady_clock::duration ping_interval_ = std::chrono::seconds{5};
219 bool checker_has_exited_ =
false;
adapter::result< std::vector< resp3::node > > generic_response
A generic response to a request.
operation
Connection operations that can be cancelled.
@ pong_timeout
Connect timeout.
@ health_check
Health check operation.
@ all
Refers to all operations.
@ run
Refers to connection::async_run operations.