boost_redis 1.4.2
A redis client library
health_checker.hpp
1/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
2 *
3 * Distributed under the Boost Software License, Version 1.0. (See
4 * accompanying file LICENSE.txt)
5 */
6
7#ifndef BOOST_REDIS_HEALTH_CHECKER_HPP
8#define BOOST_REDIS_HEALTH_CHECKER_HPP
9
10// Has to included before promise.hpp to build on msvc.
11#include <boost/redis/request.hpp>
12#include <boost/redis/response.hpp>
13#include <boost/redis/operation.hpp>
14#include <boost/redis/detail/helper.hpp>
15#include <boost/redis/config.hpp>
16#include <boost/asio/steady_timer.hpp>
17#include <boost/asio/compose.hpp>
18#include <boost/asio/consign.hpp>
19#include <boost/asio/coroutine.hpp>
20#include <boost/asio/post.hpp>
21#include <boost/asio/experimental/parallel_group.hpp>
22#include <memory>
23#include <chrono>
24
25namespace boost::redis::detail {
26
27template <class HealthChecker, class Connection>
28class ping_op {
29public:
30 HealthChecker* checker_ = nullptr;
31 Connection* conn_ = nullptr;
32 asio::coroutine coro_{};
33
34 template <class Self>
35 void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
36 {
37 BOOST_ASIO_CORO_REENTER (coro_) for (;;)
38 {
39 if (checker_->checker_has_exited_) {
40 self.complete({});
41 return;
42 }
43
44 BOOST_ASIO_CORO_YIELD
45 conn_->async_exec(checker_->req_, checker_->resp_, std::move(self));
46 BOOST_REDIS_CHECK_OP0(checker_->wait_timer_.cancel();)
47
48 // Wait before pinging again.
49 checker_->ping_timer_.expires_after(checker_->ping_interval_);
50 BOOST_ASIO_CORO_YIELD
51 checker_->ping_timer_.async_wait(std::move(self));
52 BOOST_REDIS_CHECK_OP0(;)
53 }
54 }
55};
56
57template <class HealthChecker, class Connection>
58class check_timeout_op {
59public:
60 HealthChecker* checker_ = nullptr;
61 Connection* conn_ = nullptr;
62 asio::coroutine coro_{};
63
64 template <class Self>
65 void operator()(Self& self, system::error_code ec = {})
66 {
67 BOOST_ASIO_CORO_REENTER (coro_) for (;;)
68 {
69 checker_->wait_timer_.expires_after(2 * checker_->ping_interval_);
70 BOOST_ASIO_CORO_YIELD
71 checker_->wait_timer_.async_wait(std::move(self));
72 BOOST_REDIS_CHECK_OP0(;)
73
74 if (checker_->resp_.has_error()) {
75 self.complete({});
76 return;
77 }
78
79 if (checker_->resp_.value().empty()) {
80 checker_->ping_timer_.cancel();
81 conn_->cancel(operation::run);
82 checker_->checker_has_exited_ = true;
83 self.complete(error::pong_timeout);
84 return;
85 }
86
87 if (checker_->resp_.has_value()) {
88 checker_->resp_.value().clear();
89 }
90 }
91 }
92};
93
94template <class HealthChecker, class Connection>
95class check_health_op {
96public:
97 HealthChecker* checker_ = nullptr;
98 Connection* conn_ = nullptr;
99 asio::coroutine coro_{};
100
101 template <class Self>
102 void
103 operator()(
104 Self& self,
105 std::array<std::size_t, 2> order = {},
106 system::error_code ec1 = {},
107 system::error_code ec2 = {})
108 {
109 BOOST_ASIO_CORO_REENTER (coro_)
110 {
111 if (checker_->ping_interval_ == std::chrono::seconds::zero()) {
112 BOOST_ASIO_CORO_YIELD
113 asio::post(std::move(self));
114 self.complete({});
115 return;
116 }
117
118 BOOST_ASIO_CORO_YIELD
119 asio::experimental::make_parallel_group(
120 [this](auto token) { return checker_->async_ping(*conn_, token); },
121 [this](auto token) { return checker_->async_check_timeout(*conn_, token);}
122 ).async_wait(
123 asio::experimental::wait_for_one(),
124 std::move(self));
125
126 if (is_cancelled(self)) {
127 self.complete(asio::error::operation_aborted);
128 return;
129 }
130
131 switch (order[0]) {
132 case 0: self.complete(ec1); return;
133 case 1: self.complete(ec2); return;
134 default: BOOST_ASSERT(false);
135 }
136 }
137 }
138};
139
140template <class Executor>
141class health_checker {
142private:
143 using timer_type =
144 asio::basic_waitable_timer<
145 std::chrono::steady_clock,
146 asio::wait_traits<std::chrono::steady_clock>,
147 Executor>;
148
149public:
150 health_checker(Executor ex)
151 : ping_timer_{ex}
152 , wait_timer_{ex}
153 {
154 req_.push("PING", "Boost.Redis");
155 }
156
157 void set_config(config const& cfg)
158 {
159 req_.clear();
160 req_.push("PING", cfg.health_check_id);
161 ping_interval_ = cfg.health_check_interval;
162 }
163
164 template <
165 class Connection,
166 class CompletionToken = asio::default_completion_token_t<Executor>
167 >
168 auto async_check_health(Connection& conn, CompletionToken token = CompletionToken{})
169 {
170 checker_has_exited_ = false;
171 return asio::async_compose
172 < CompletionToken
173 , void(system::error_code)
174 >(check_health_op<health_checker, Connection>{this, &conn}, token, conn);
175 }
176
177 std::size_t cancel(operation op)
178 {
179 switch (op) {
181 case operation::all:
182 ping_timer_.cancel();
183 wait_timer_.cancel();
184 break;
185 default: /* ignore */;
186 }
187
188 return 0;
189 }
190
191private:
192 template <class Connection, class CompletionToken>
193 auto async_ping(Connection& conn, CompletionToken token)
194 {
195 return asio::async_compose
196 < CompletionToken
197 , void(system::error_code)
198 >(ping_op<health_checker, Connection>{this, &conn}, token, conn, ping_timer_);
199 }
200
201 template <class Connection, class CompletionToken>
202 auto async_check_timeout(Connection& conn, CompletionToken token)
203 {
204 return asio::async_compose
205 < CompletionToken
206 , void(system::error_code)
207 >(check_timeout_op<health_checker, Connection>{this, &conn}, token, conn, wait_timer_);
208 }
209
210 template <class, class> friend class ping_op;
211 template <class, class> friend class check_timeout_op;
212 template <class, class> friend class check_health_op;
213
214 timer_type ping_timer_;
215 timer_type wait_timer_;
216 redis::request req_;
218 std::chrono::steady_clock::duration ping_interval_ = std::chrono::seconds{5};
219 bool checker_has_exited_ = false;
220};
221
222} // boost::redis::detail
223
224#endif // BOOST_REDIS_HEALTH_CHECKER_HPP
adapter::result< std::vector< resp3::node > > generic_response
A generic response to a request.
Definition: response.hpp:33
operation
Connection operations that can be cancelled.
Definition: operation.hpp:18
@ pong_timeout
Connect timeout.
@ health_check
Health check operation.
@ all
Refers to all operations.
@ run
Refers to connection::async_run operations.