boost_redis 1.4.2
A redis client library
connection_base.hpp
1/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
2 *
3 * Distributed under the Boost Software License, Version 1.0. (See
4 * accompanying file LICENSE.txt)
5 */
6
7#ifndef BOOST_REDIS_CONNECTION_BASE_HPP
8#define BOOST_REDIS_CONNECTION_BASE_HPP
9
10#include <boost/redis/adapter/adapt.hpp>
11#include <boost/redis/detail/helper.hpp>
12#include <boost/redis/detail/read.hpp>
13#include <boost/redis/error.hpp>
14#include <boost/redis/operation.hpp>
15#include <boost/redis/request.hpp>
16#include <boost/redis/resp3/type.hpp>
17#include <boost/redis/config.hpp>
18#include <boost/redis/detail/runner.hpp>
19
20#include <boost/system.hpp>
21#include <boost/asio/basic_stream_socket.hpp>
22#include <boost/asio/bind_executor.hpp>
23#include <boost/asio/experimental/parallel_group.hpp>
24#include <boost/asio/ip/tcp.hpp>
25#include <boost/asio/steady_timer.hpp>
26#include <boost/asio/write.hpp>
27#include <boost/assert.hpp>
28#include <boost/core/ignore_unused.hpp>
29#include <boost/asio/ssl/stream.hpp>
30#include <boost/asio/read_until.hpp>
31#include <boost/asio/buffer.hpp>
32
33#include <algorithm>
34#include <array>
35#include <chrono>
36#include <deque>
37#include <memory>
38#include <string_view>
39#include <type_traits>
40
41namespace boost::redis::detail {
42
43template <class Conn>
44struct wait_receive_op {
45 Conn* conn_;
46 asio::coroutine coro{};
47
48 template <class Self>
49 void
50 operator()(Self& self , system::error_code ec = {})
51 {
52 BOOST_ASIO_CORO_REENTER (coro)
53 {
54 conn_->read_op_timer_.cancel();
55
56 BOOST_ASIO_CORO_YIELD
57 conn_->read_op_timer_.async_wait(std::move(self));
58 if (!conn_->is_open() || is_cancelled(self)) {
59 self.complete(!!ec ? ec : asio::error::operation_aborted);
60 return;
61 }
62 self.complete({});
63 }
64 }
65};
66
67template <class Conn, class Adapter>
68class read_next_op {
69public:
70 using req_info_type = typename Conn::req_info;
71 using req_info_ptr = typename std::shared_ptr<req_info_type>;
72
73private:
74 Conn* conn_;
75 req_info_ptr info_;
76 Adapter adapter_;
77 std::size_t cmds_ = 0;
78 std::size_t read_size_ = 0;
79 std::size_t index_ = 0;
80 asio::coroutine coro_{};
81
82public:
83 read_next_op(Conn& conn, Adapter adapter, req_info_ptr info)
84 : conn_{&conn}
85 , info_{info}
86 , adapter_{adapter}
87 , cmds_{info->get_number_of_commands()}
88 {}
89
90 auto make_adapter() noexcept
91 {
92 return [i = index_, adpt = adapter_] (resp3::basic_node<std::string_view> const& nd, system::error_code& ec) mutable { adpt(i, nd, ec); };
93 }
94
95 template <class Self>
96 void
97 operator()( Self& self
98 , system::error_code ec = {}
99 , std::size_t n = 0)
100 {
101 BOOST_ASIO_CORO_REENTER (coro_)
102 {
103 // Loop reading the responses to this request.
104 while (cmds_ != 0) {
105 if (info_->stop_requested()) {
106 self.complete(asio::error::operation_aborted, 0);
107 return;
108 }
109
110 //-----------------------------------
111 // If we detect a push in the middle of a request we have
112 // to hand it to the push consumer. To do that we need
113 // some data in the read bufer.
114 if (conn_->read_buffer_.empty()) {
115
116 if (conn_->use_ssl()) {
117 BOOST_ASIO_CORO_YIELD
118 asio::async_read_until(conn_->next_layer(), conn_->dbuf_, resp3::parser::sep, std::move(self));
119 } else {
120 BOOST_ASIO_CORO_YIELD
121 asio::async_read_until(conn_->next_layer().next_layer(), conn_->dbuf_, resp3::parser::sep, std::move(self));
122 }
123
124 BOOST_REDIS_CHECK_OP1(conn_->cancel(operation::run););
125 if (info_->stop_requested()) {
126 self.complete(asio::error::operation_aborted, 0);
127 return;
128 }
129 }
130
131 // If the next request is a push we have to handle it to
132 // the receive_op wait for it to be done and continue.
133 if (resp3::to_type(conn_->read_buffer_.front()) == resp3::type::push) {
134 BOOST_ASIO_CORO_YIELD
135 conn_->async_wait_receive(std::move(self));
136 BOOST_REDIS_CHECK_OP1(conn_->cancel(operation::run););
137 continue;
138 }
139 //-----------------------------------
140
141 if (conn_->use_ssl()) {
142 BOOST_ASIO_CORO_YIELD
143 redis::detail::async_read(conn_->next_layer(), conn_->dbuf_, make_adapter(), std::move(self));
144 } else {
145 BOOST_ASIO_CORO_YIELD
146 redis::detail::async_read(conn_->next_layer().next_layer(), conn_->dbuf_, make_adapter(), std::move(self));
147 }
148
149 ++index_;
150
151 if (ec || redis::detail::is_cancelled(self)) {
152 conn_->cancel(operation::run);
153 self.complete(!!ec ? ec : asio::error::operation_aborted, {});
154 return;
155 }
156
157 conn_->dbuf_.consume(n);
158 read_size_ += n;
159
160 BOOST_ASSERT(cmds_ != 0);
161 --cmds_;
162 }
163
164 self.complete({}, read_size_);
165 }
166 }
167};
168
169template <class Conn, class Adapter>
170struct receive_op {
171 Conn* conn_;
172 Adapter adapter;
173 asio::coroutine coro{};
174
175 template <class Self>
176 void
177 operator()( Self& self
178 , system::error_code ec = {}
179 , std::size_t n = 0)
180 {
181 BOOST_ASIO_CORO_REENTER (coro)
182 {
183 if (!conn_->is_next_push()) {
184 BOOST_ASIO_CORO_YIELD
185 conn_->read_op_timer_.async_wait(std::move(self));
186 if (!conn_->is_open() || is_cancelled(self)) {
187 self.complete(!!ec ? ec : asio::error::operation_aborted, 0);
188 return;
189 }
190 }
191
192 if (conn_->use_ssl()) {
193 BOOST_ASIO_CORO_YIELD
194 redis::detail::async_read(conn_->next_layer(), conn_->dbuf_, adapter, std::move(self));
195 } else {
196 BOOST_ASIO_CORO_YIELD
197 redis::detail::async_read(conn_->next_layer().next_layer(), conn_->dbuf_, adapter, std::move(self));
198 }
199
200 if (ec || is_cancelled(self)) {
201 conn_->cancel(operation::run);
202 conn_->cancel(operation::receive);
203 self.complete(!!ec ? ec : asio::error::operation_aborted, {});
204 return;
205 }
206
207 conn_->dbuf_.consume(n);
208
209 if (!conn_->is_next_push()) {
210 conn_->read_op_timer_.cancel();
211 }
212
213 self.complete({}, n);
214 return;
215 }
216 }
217};
218
219template <class Conn, class Adapter>
220struct exec_op {
221 using req_info_type = typename Conn::req_info;
222
223 Conn* conn = nullptr;
224 request const* req = nullptr;
225 Adapter adapter{};
226 std::shared_ptr<req_info_type> info = nullptr;
227 asio::coroutine coro{};
228
229 template <class Self>
230 void
231 operator()( Self& self
232 , system::error_code ec = {}
233 , std::size_t n = 0)
234 {
235 BOOST_ASIO_CORO_REENTER (coro)
236 {
237 // Check whether the user wants to wait for the connection to
238 // be stablished.
239 if (req->get_config().cancel_if_not_connected && !conn->is_open()) {
240 BOOST_ASIO_CORO_YIELD
241 asio::post(std::move(self));
242 return self.complete(error::not_connected, 0);
243 }
244
245 info = std::allocate_shared<req_info_type>(asio::get_associated_allocator(self), *req, conn->get_executor());
246
247 conn->add_request_info(info);
248EXEC_OP_WAIT:
249 BOOST_ASIO_CORO_YIELD
250 info->async_wait(std::move(self));
251 BOOST_ASSERT(ec == asio::error::operation_aborted);
252
253 if (info->stop_requested()) {
254 // Don't have to call remove_request as it has already
255 // been by cancel(exec).
256 return self.complete(ec, 0);
257 }
258
259 if (is_cancelled(self)) {
260 if (info->is_written()) {
261 using c_t = asio::cancellation_type;
262 auto const c = self.get_cancellation_state().cancelled();
263 if ((c & c_t::terminal) != c_t::none) {
264 // Cancellation requires closing the connection
265 // otherwise it stays in inconsistent state.
266 conn->cancel(operation::run);
267 return self.complete(ec, 0);
268 } else {
269 // Can't implement other cancelation types, ignoring.
270 self.get_cancellation_state().clear();
271 goto EXEC_OP_WAIT;
272 }
273 } else {
274 // Cancelation can be honored.
275 conn->remove_request(info);
276 self.complete(ec, 0);
277 return;
278 }
279 }
280
281 BOOST_ASSERT(conn->is_open());
282
283 if (req->size() == 0) {
284 // Don't have to call remove_request as it has already
285 // been removed.
286 return self.complete({}, 0);
287 }
288
289 BOOST_ASSERT(!conn->reqs_.empty());
290 BOOST_ASSERT(conn->reqs_.front() != nullptr);
291 BOOST_ASIO_CORO_YIELD
292 conn->async_read_next(adapter, std::move(self));
293 BOOST_REDIS_CHECK_OP1(;);
294
295 if (info->stop_requested()) {
296 // Don't have to call remove_request as it has already
297 // been by cancel(exec).
298 return self.complete(ec, 0);
299 }
300
301 BOOST_ASSERT(!conn->reqs_.empty());
302 conn->reqs_.pop_front();
303
304 if (conn->is_waiting_response()) {
305 BOOST_ASSERT(!conn->reqs_.empty());
306 conn->reqs_.front()->proceed();
307 } else {
308 conn->read_timer_.cancel_one();
309 }
310
311 self.complete({}, n);
312 }
313 }
314};
315
316template <class Conn, class Logger>
317struct run_op {
318 Conn* conn = nullptr;
319 Logger logger_;
320 asio::coroutine coro{};
321
322 template <class Self>
323 void operator()( Self& self
324 , std::array<std::size_t, 2> order = {}
325 , system::error_code ec0 = {}
326 , system::error_code ec1 = {})
327 {
328 BOOST_ASIO_CORO_REENTER (coro)
329 {
330 conn->write_buffer_.clear();
331 conn->read_buffer_.clear();
332
333 BOOST_ASIO_CORO_YIELD
334 asio::experimental::make_parallel_group(
335 [this](auto token) { return conn->reader(token);},
336 [this](auto token) { return conn->writer(logger_, token);}
337 ).async_wait(
338 asio::experimental::wait_for_one(),
339 std::move(self));
340
341 if (is_cancelled(self)) {
342 self.complete(asio::error::operation_aborted);
343 return;
344 }
345
346 switch (order[0]) {
347 case 0: self.complete(ec0); break;
348 case 1: self.complete(ec1); break;
349 default: BOOST_ASSERT(false);
350 }
351 }
352 }
353};
354
355template <class Conn, class Logger>
356struct writer_op {
357 Conn* conn_;
358 Logger logger_;
359 asio::coroutine coro{};
360
361 template <class Self>
362 void operator()( Self& self
363 , system::error_code ec = {}
364 , std::size_t n = 0)
365 {
366 ignore_unused(n);
367
368 BOOST_ASIO_CORO_REENTER (coro) for (;;)
369 {
370 while (conn_->coalesce_requests()) {
371 if (conn_->use_ssl())
372 BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
373 else
374 BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer().next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
375
376 logger_.on_write(ec, conn_->write_buffer_);
377 BOOST_REDIS_CHECK_OP0(conn_->cancel(operation::run););
378
379 conn_->on_write();
380
381 // A socket.close() may have been called while a
382 // successful write might had already been queued, so we
383 // have to check here before proceeding.
384 if (!conn_->is_open()) {
385 self.complete({});
386 return;
387 }
388 }
389
390 BOOST_ASIO_CORO_YIELD
391 conn_->writer_timer_.async_wait(std::move(self));
392 if (!conn_->is_open() || is_cancelled(self)) {
393 // Notice this is not an error of the op, stoping was
394 // requested from the outside, so we complete with
395 // success.
396 self.complete({});
397 return;
398 }
399 }
400 }
401};
402
403template <class Conn>
404struct reader_op {
405 Conn* conn;
406 asio::coroutine coro{};
407
408 bool as_push() const
409 {
410 return
411 (resp3::to_type(conn->read_buffer_.front()) == resp3::type::push)
412 || conn->reqs_.empty()
413 || (!conn->reqs_.empty() && conn->reqs_.front()->get_number_of_commands() == 0)
414 || !conn->is_waiting_response(); // Added to deal with MONITOR.
415 }
416
417 template <class Self>
418 void operator()( Self& self
419 , system::error_code ec = {}
420 , std::size_t n = 0)
421 {
422 ignore_unused(n);
423
424 BOOST_ASIO_CORO_REENTER (coro) for (;;)
425 {
426 if (conn->use_ssl())
427 BOOST_ASIO_CORO_YIELD asio::async_read_until(conn->next_layer(), conn->dbuf_, "\r\n", std::move(self));
428 else
429 BOOST_ASIO_CORO_YIELD asio::async_read_until(conn->next_layer().next_layer(), conn->dbuf_, "\r\n", std::move(self));
430
431 if (ec == asio::error::eof) {
432 conn->cancel(operation::run);
433 return self.complete({}); // EOFINAE: EOF is not an error.
434 }
435
436 BOOST_REDIS_CHECK_OP0(conn->cancel(operation::run););
437
438 // We handle unsolicited events in the following way
439 //
440 // 1. Its resp3 type is a push.
441 //
442 // 2. A non-push type is received with an empty requests
443 // queue. I have noticed this is possible (e.g. -MISCONF).
444 // I expect them to have type push so we can distinguish
445 // them from responses to commands, but it is a
446 // simple-error. If we are lucky enough to receive them
447 // when the command queue is empty we can treat them as
448 // server pushes, otherwise it is impossible to handle
449 // them properly
450 //
451 // 3. The request does not expect any response but we got
452 // one. This may happen if for example, subscribe with
453 // wrong syntax.
454 //
455 // Useful links:
456 //
457 // - https://github.com/redis/redis/issues/11784
458 // - https://github.com/redis/redis/issues/6426
459 //
460 BOOST_ASSERT(!conn->read_buffer_.empty());
461 if (as_push()) {
462 BOOST_ASIO_CORO_YIELD
463 conn->async_wait_receive(std::move(self));
464 } else {
465 BOOST_ASSERT_MSG(conn->is_waiting_response(), "Not waiting for a response (using MONITOR command perhaps?)");
466 BOOST_ASSERT(!conn->reqs_.empty());
467 BOOST_ASSERT(conn->reqs_.front()->get_number_of_commands() != 0);
468 conn->reqs_.front()->proceed();
469 BOOST_ASIO_CORO_YIELD
470 conn->read_timer_.async_wait(std::move(self));
471 ec = {};
472 }
473
474 if (!conn->is_open() || ec || is_cancelled(self)) {
475 conn->cancel(operation::run);
476 self.complete(asio::error::basic_errors::operation_aborted);
477 return;
478 }
479 }
480 }
481};
482
489template <class Executor>
491public:
493 using executor_type = Executor;
494
496 using next_layer_type = asio::ssl::stream<asio::basic_stream_socket<asio::ip::tcp, Executor>>;
497
499
502 executor_type ex,
503 asio::ssl::context::method method,
504 std::size_t max_read_size)
505 : ctx_{method}
506 , stream_{std::make_unique<next_layer_type>(ex, ctx_)}
507 , writer_timer_{ex}
508 , read_timer_{ex}
509 , read_op_timer_{ex}
510 , runner_{ex, {}}
511 , dbuf_{read_buffer_, max_read_size}
512 {
513 writer_timer_.expires_at(std::chrono::steady_clock::time_point::max());
514 read_timer_.expires_at(std::chrono::steady_clock::time_point::max());
515 read_op_timer_.expires_at(std::chrono::steady_clock::time_point::max());
516 }
517
519 auto const& get_ssl_context() const noexcept
520 { return ctx_;}
521
523 auto& get_ssl_context() noexcept
524 { return ctx_;}
525
528 {
529 stream_ = std::make_unique<next_layer_type>(writer_timer_.get_executor(), ctx_);
530 }
531
533 auto& next_layer() noexcept { return *stream_; }
534
536 auto const& next_layer() const noexcept { return *stream_; }
537
539 auto get_executor() {return writer_timer_.get_executor();}
540
542 virtual void cancel(operation op)
543 {
544 runner_.cancel(op);
545 if (op == operation::all) {
546 cancel_impl(operation::run);
547 cancel_impl(operation::receive);
548 cancel_impl(operation::exec);
549 return;
550 }
551
552 cancel_impl(op);
553 }
554
555 template <class Response, class CompletionToken>
556 auto async_exec(request const& req, Response& resp, CompletionToken token)
557 {
558 using namespace boost::redis::adapter;
559 auto f = boost_redis_adapt(resp);
560 BOOST_ASSERT_MSG(req.size() <= f.get_supported_response_size(), "Request and response have incompatible sizes.");
561
562 return asio::async_compose
563 < CompletionToken
564 , void(system::error_code, std::size_t)
565 >(redis::detail::exec_op<this_type, decltype(f)>{this, &req, f}, token, writer_timer_);
566 }
567
568 template <class Response, class CompletionToken>
569 auto async_receive(Response& response, CompletionToken token)
570 {
571 using namespace boost::redis::adapter;
572 auto g = boost_redis_adapt(response);
573 auto f = adapter::detail::make_adapter_wrapper(g);
574
575 return asio::async_compose
576 < CompletionToken
577 , void(system::error_code, std::size_t)
578 >(redis::detail::receive_op<this_type, decltype(f)>{this, f}, token, read_op_timer_);
579 }
580
581 template <class Logger, class CompletionToken>
582 auto async_run(config const& cfg, Logger l, CompletionToken token)
583 {
584 runner_.set_config(cfg);
585 l.set_prefix(runner_.get_config().log_prefix);
586 return runner_.async_run(*this, l, std::move(token));
587 }
588
589private:
590 using clock_type = std::chrono::steady_clock;
591 using clock_traits_type = asio::wait_traits<clock_type>;
592 using timer_type = asio::basic_waitable_timer<clock_type, clock_traits_type, executor_type>;
593 using runner_type = redis::detail::runner<executor_type>;
594
595 auto use_ssl() const noexcept
596 { return runner_.get_config().use_ssl;}
597
598 auto cancel_on_conn_lost() -> std::size_t
599 {
600 // Must return false if the request should be removed.
601 auto cond = [](auto const& ptr)
602 {
603 BOOST_ASSERT(ptr != nullptr);
604
605 if (ptr->is_written()) {
606 return !ptr->get_request().get_config().cancel_if_unresponded;
607 } else {
608 return !ptr->get_request().get_config().cancel_on_connection_lost;
609 }
610 };
611
612 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), cond);
613
614 auto const ret = std::distance(point, std::end(reqs_));
615
616 std::for_each(point, std::end(reqs_), [](auto const& ptr) {
617 ptr->stop();
618 });
619
620 reqs_.erase(point, std::end(reqs_));
621 std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
622 return ptr->reset_status();
623 });
624
625 return ret;
626 }
627
628 auto cancel_unwritten_requests() -> std::size_t
629 {
630 auto f = [](auto const& ptr)
631 {
632 BOOST_ASSERT(ptr != nullptr);
633 return ptr->is_written();
634 };
635
636 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), f);
637
638 auto const ret = std::distance(point, std::end(reqs_));
639
640 std::for_each(point, std::end(reqs_), [](auto const& ptr) {
641 ptr->stop();
642 });
643
644 reqs_.erase(point, std::end(reqs_));
645 return ret;
646 }
647
648 void cancel_impl(operation op)
649 {
650 switch (op) {
651 case operation::exec:
652 {
653 cancel_unwritten_requests();
654 } break;
655 case operation::run:
656 {
657 close();
658 read_timer_.cancel();
659 writer_timer_.cancel();
660 cancel_on_conn_lost();
661 } break;
663 {
664 read_op_timer_.cancel();
665 } break;
666 default: /* ignore */;
667 }
668 }
669
670 void on_write()
671 {
672 // We have to clear the payload right after writing it to use it
673 // as a flag that informs there is no ongoing write.
674 write_buffer_.clear();
675
676 // Notice this must come before the for-each below.
677 cancel_push_requests();
678
679 // There is small optimization possible here: traverse only the
680 // partition of unwritten requests instead of them all.
681 std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
682 BOOST_ASSERT_MSG(ptr != nullptr, "Expects non-null pointer.");
683 if (ptr->is_staged())
684 ptr->mark_written();
685 });
686 }
687
688 struct req_info {
689 public:
690 enum class action
691 {
692 stop,
693 proceed,
694 none,
695 };
696
697 explicit req_info(request const& req, executor_type ex)
698 : timer_{ex}
699 , action_{action::none}
700 , req_{&req}
701 , cmds_{std::size(req)}
702 , status_{status::none}
703 {
704 timer_.expires_at(std::chrono::steady_clock::time_point::max());
705 }
706
707 auto proceed()
708 {
709 timer_.cancel();
710 action_ = action::proceed;
711 }
712
713 void stop()
714 {
715 timer_.cancel();
716 action_ = action::stop;
717 }
718
719 [[nodiscard]] auto is_waiting_write() const noexcept
720 { return !is_written() && !is_staged(); }
721
722 [[nodiscard]] auto is_written() const noexcept
723 { return status_ == status::written; }
724
725 [[nodiscard]] auto is_staged() const noexcept
726 { return status_ == status::staged; }
727
728 void mark_written() noexcept
729 { status_ = status::written; }
730
731 void mark_staged() noexcept
732 { status_ = status::staged; }
733
734 void reset_status() noexcept
735 { status_ = status::none; }
736
737 [[nodiscard]] auto get_number_of_commands() const noexcept
738 { return cmds_; }
739
740 [[nodiscard]] auto get_request() const noexcept -> auto const&
741 { return *req_; }
742
743 [[nodiscard]] auto stop_requested() const noexcept
744 { return action_ == action::stop;}
745
746 template <class CompletionToken>
747 auto async_wait(CompletionToken token)
748 {
749 return timer_.async_wait(std::move(token));
750 }
751
752 private:
753 enum class status
754 { none
755 , staged
756 , written
757 };
758
759 timer_type timer_;
760 action action_;
761 request const* req_;
762 std::size_t cmds_;
763 status status_;
764 };
765
766 void remove_request(std::shared_ptr<req_info> const& info)
767 {
768 reqs_.erase(std::remove(std::begin(reqs_), std::end(reqs_), info));
769 }
770
771 using reqs_type = std::deque<std::shared_ptr<req_info>>;
772
773 template <class> friend struct redis::detail::reader_op;
774 template <class, class> friend struct redis::detail::writer_op;
775 template <class, class> friend struct redis::detail::run_op;
776 template <class, class> friend struct redis::detail::exec_op;
777 template <class, class> friend class redis::detail::read_next_op;
778 template <class, class> friend struct redis::detail::receive_op;
779 template <class> friend struct redis::detail::wait_receive_op;
780 template <class, class, class> friend struct redis::detail::run_all_op;
781
782 template <class CompletionToken>
783 auto async_wait_receive(CompletionToken token)
784 {
785 return asio::async_compose
786 < CompletionToken
787 , void(system::error_code)
788 >(redis::detail::wait_receive_op<this_type>{this}, token, read_op_timer_);
789 }
790
791 void cancel_push_requests()
792 {
793 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
794 return !(ptr->is_staged() && ptr->get_request().size() == 0);
795 });
796
797 std::for_each(point, std::end(reqs_), [](auto const& ptr) {
798 ptr->proceed();
799 });
800
801 reqs_.erase(point, std::end(reqs_));
802 }
803
804 [[nodiscard]] bool is_writing() const noexcept
805 {
806 return !write_buffer_.empty();
807 }
808
809 void add_request_info(std::shared_ptr<req_info> const& info)
810 {
811 reqs_.push_back(info);
812
813 if (info->get_request().has_hello_priority()) {
814 auto rend = std::partition_point(std::rbegin(reqs_), std::rend(reqs_), [](auto const& e) {
815 return e->is_waiting_write();
816 });
817
818 std::rotate(std::rbegin(reqs_), std::rbegin(reqs_) + 1, rend);
819 }
820
821 if (is_open() && !is_writing())
822 writer_timer_.cancel();
823 }
824
825 template <class CompletionToken>
826 auto reader(CompletionToken&& token)
827 {
828 return asio::async_compose
829 < CompletionToken
830 , void(system::error_code)
831 >(redis::detail::reader_op<this_type>{this}, token, writer_timer_);
832 }
833
834 template <class CompletionToken, class Logger>
835 auto writer(Logger l, CompletionToken&& token)
836 {
837 return asio::async_compose
838 < CompletionToken
839 , void(system::error_code)
840 >(redis::detail::writer_op<this_type, Logger>{this, l}, token, writer_timer_);
841 }
842
843 template <class Adapter, class CompletionToken>
844 auto async_read_next(Adapter adapter, CompletionToken token)
845 {
846 return asio::async_compose
847 < CompletionToken
848 , void(system::error_code, std::size_t)
849 >(redis::detail::read_next_op<this_type, Adapter>{*this, adapter, reqs_.front()}, token, writer_timer_);
850 }
851
852 template <class Logger, class CompletionToken>
853 auto async_run_lean(config const& cfg, Logger l, CompletionToken token)
854 {
855 runner_.set_config(cfg);
856 l.set_prefix(runner_.get_config().log_prefix);
857 return asio::async_compose
858 < CompletionToken
859 , void(system::error_code)
860 >(redis::detail::run_op<this_type, Logger>{this, l}, token, writer_timer_);
861 }
862
863 [[nodiscard]] bool coalesce_requests()
864 {
865 // Coalesces the requests and marks them staged. After a
866 // successful write staged requests will be marked as written.
867 auto const point = std::partition_point(std::cbegin(reqs_), std::cend(reqs_), [](auto const& ri) {
868 return !ri->is_waiting_write();
869 });
870
871 std::for_each(point, std::cend(reqs_), [this](auto const& ri) {
872 // Stage the request.
873 write_buffer_ += ri->get_request().payload();
874 ri->mark_staged();
875 });
876
877 return point != std::cend(reqs_);
878 }
879
880 bool is_waiting_response() const noexcept
881 {
882 return !std::empty(reqs_) && reqs_.front()->is_written();
883 }
884
885 void close()
886 {
887 if (stream_->next_layer().is_open())
888 stream_->next_layer().close();
889 }
890
891 bool is_next_push() const noexcept
892 {
893 return !read_buffer_.empty() && (resp3::to_type(read_buffer_.front()) == resp3::type::push);
894 }
895
896 auto is_open() const noexcept { return stream_->next_layer().is_open(); }
897 auto& lowest_layer() noexcept { return stream_->lowest_layer(); }
898
899 asio::ssl::context ctx_;
900 std::unique_ptr<next_layer_type> stream_;
901
902 // Notice we use a timer to simulate a condition-variable. It is
903 // also more suitable than a channel and the notify operation does
904 // not suspend.
905 timer_type writer_timer_;
906 timer_type read_timer_;
907 timer_type read_op_timer_;
908 runner_type runner_;
909
910 using dyn_buffer_type = asio::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char>>;
911
912 std::string read_buffer_;
913 dyn_buffer_type dbuf_;
914 std::string write_buffer_;
915 reqs_type reqs_;
916};
917
918} // boost::redis::detail
919
920#endif // BOOST_REDIS_CONNECTION_BASE_HPP
Base class for high level Redis asynchronous connections.
auto get_executor()
Returns the associated executor.
virtual void cancel(operation op)
Cancels specific operations.
void reset_stream()
Resets the underlying stream.
connection_base(executor_type ex, asio::ssl::context::method method, std::size_t max_read_size)
Constructs from an executor.
asio::ssl::stream< asio::basic_stream_socket< asio::ip::tcp, Executor > > next_layer_type
Type of the next layer.
auto & next_layer() noexcept
Returns a reference to the next layer.
auto & get_ssl_context() noexcept
Returns the ssl context.
auto const & get_ssl_context() const noexcept
Returns the ssl context.
auto const & next_layer() const noexcept
Returns a const reference to the next layer.
Creates Redis requests.
Definition: request.hpp:46
std::tuple< adapter::result< Ts >... > response
Response with compile-time size.
Definition: response.hpp:23
operation
Connection operations that can be cancelled.
Definition: operation.hpp:18
@ not_connected
There is no stablished connection.
@ exec
Refers to connection::async_exec operations.
@ all
Refers to all operations.
@ run
Refers to connection::async_run operations.
@ receive
Refers to connection::async_receive operations.
Configure parameters used by the connection classes.
Definition: config.hpp:30
auto async_write(AsyncWriteStream &stream, request const &req, CompletionToken &&token=asio::default_completion_token_t< typename AsyncWriteStream::executor_type >{})
Writes a request asynchronously.
Definition: write.hpp:44