7#ifndef BOOST_REDIS_CONNECTION_BASE_HPP
8#define BOOST_REDIS_CONNECTION_BASE_HPP
10#include <boost/redis/adapter/adapt.hpp>
11#include <boost/redis/detail/helper.hpp>
12#include <boost/redis/detail/read.hpp>
13#include <boost/redis/error.hpp>
14#include <boost/redis/operation.hpp>
15#include <boost/redis/request.hpp>
16#include <boost/redis/resp3/type.hpp>
17#include <boost/redis/config.hpp>
18#include <boost/redis/detail/runner.hpp>
20#include <boost/system.hpp>
21#include <boost/asio/basic_stream_socket.hpp>
22#include <boost/asio/bind_executor.hpp>
23#include <boost/asio/experimental/parallel_group.hpp>
24#include <boost/asio/ip/tcp.hpp>
25#include <boost/asio/steady_timer.hpp>
26#include <boost/asio/write.hpp>
27#include <boost/assert.hpp>
28#include <boost/core/ignore_unused.hpp>
29#include <boost/asio/ssl/stream.hpp>
30#include <boost/asio/read_until.hpp>
31#include <boost/asio/buffer.hpp>
41namespace boost::redis::detail {
44struct wait_receive_op {
46 asio::coroutine coro{};
50 operator()(Self& self , system::error_code ec = {})
52 BOOST_ASIO_CORO_REENTER (coro)
54 conn_->read_op_timer_.cancel();
57 conn_->read_op_timer_.async_wait(std::move(self));
58 if (!conn_->is_open() || is_cancelled(self)) {
59 self.complete(!!ec ? ec : asio::error::operation_aborted);
67template <
class Conn,
class Adapter>
70 using req_info_type =
typename Conn::req_info;
71 using req_info_ptr =
typename std::shared_ptr<req_info_type>;
77 std::size_t cmds_ = 0;
78 std::size_t read_size_ = 0;
79 std::size_t index_ = 0;
80 asio::coroutine coro_{};
83 read_next_op(Conn& conn, Adapter adapter, req_info_ptr info)
87 , cmds_{info->get_number_of_commands()}
90 auto make_adapter() noexcept
92 return [i = index_, adpt = adapter_] (resp3::basic_node<std::string_view>
const& nd, system::error_code& ec)
mutable { adpt(i, nd, ec); };
97 operator()( Self& self
98 , system::error_code ec = {}
101 BOOST_ASIO_CORO_REENTER (coro_)
105 if (info_->stop_requested()) {
106 self.complete(asio::error::operation_aborted, 0);
114 if (conn_->read_buffer_.empty()) {
116 if (conn_->use_ssl()) {
117 BOOST_ASIO_CORO_YIELD
118 asio::async_read_until(conn_->next_layer(), conn_->dbuf_, resp3::parser::sep, std::move(self));
120 BOOST_ASIO_CORO_YIELD
121 asio::async_read_until(conn_->next_layer().next_layer(), conn_->dbuf_, resp3::parser::sep, std::move(self));
125 if (info_->stop_requested()) {
126 self.complete(asio::error::operation_aborted, 0);
134 BOOST_ASIO_CORO_YIELD
135 conn_->async_wait_receive(std::move(self));
141 if (conn_->use_ssl()) {
142 BOOST_ASIO_CORO_YIELD
143 redis::detail::async_read(conn_->next_layer(), conn_->dbuf_, make_adapter(), std::move(self));
145 BOOST_ASIO_CORO_YIELD
146 redis::detail::async_read(conn_->next_layer().next_layer(), conn_->dbuf_, make_adapter(), std::move(self));
151 if (ec || redis::detail::is_cancelled(self)) {
153 self.complete(!!ec ? ec : asio::error::operation_aborted, {});
157 conn_->dbuf_.consume(n);
160 BOOST_ASSERT(cmds_ != 0);
164 self.complete({}, read_size_);
169template <
class Conn,
class Adapter>
173 asio::coroutine coro{};
175 template <
class Self>
177 operator()( Self& self
178 , system::error_code ec = {}
181 BOOST_ASIO_CORO_REENTER (coro)
183 if (!conn_->is_next_push()) {
184 BOOST_ASIO_CORO_YIELD
185 conn_->read_op_timer_.async_wait(std::move(self));
186 if (!conn_->is_open() || is_cancelled(self)) {
187 self.complete(!!ec ? ec : asio::error::operation_aborted, 0);
192 if (conn_->use_ssl()) {
193 BOOST_ASIO_CORO_YIELD
194 redis::detail::async_read(conn_->next_layer(), conn_->dbuf_, adapter, std::move(self));
196 BOOST_ASIO_CORO_YIELD
197 redis::detail::async_read(conn_->next_layer().next_layer(), conn_->dbuf_, adapter, std::move(self));
200 if (ec || is_cancelled(self)) {
203 self.complete(!!ec ? ec : asio::error::operation_aborted, {});
207 conn_->dbuf_.consume(n);
209 if (!conn_->is_next_push()) {
210 conn_->read_op_timer_.cancel();
213 self.complete({}, n);
219template <
class Conn,
class Adapter>
221 using req_info_type =
typename Conn::req_info;
223 Conn* conn =
nullptr;
224 request
const* req =
nullptr;
226 std::shared_ptr<req_info_type> info =
nullptr;
227 asio::coroutine coro{};
229 template <
class Self>
231 operator()( Self& self
232 , system::error_code ec = {}
235 BOOST_ASIO_CORO_REENTER (coro)
239 if (req->get_config().cancel_if_not_connected && !conn->is_open()) {
240 BOOST_ASIO_CORO_YIELD
241 asio::post(std::move(self));
245 info = std::allocate_shared<req_info_type>(asio::get_associated_allocator(self), *req, conn->get_executor());
247 conn->add_request_info(info);
249 BOOST_ASIO_CORO_YIELD
250 info->async_wait(std::move(self));
251 BOOST_ASSERT(ec == asio::error::operation_aborted);
253 if (info->stop_requested()) {
256 return self.complete(ec, 0);
259 if (is_cancelled(self)) {
260 if (info->is_written()) {
261 using c_t = asio::cancellation_type;
262 auto const c = self.get_cancellation_state().cancelled();
263 if ((c & c_t::terminal) != c_t::none) {
267 return self.complete(ec, 0);
270 self.get_cancellation_state().clear();
275 conn->remove_request(info);
276 self.complete(ec, 0);
281 BOOST_ASSERT(conn->is_open());
283 if (req->size() == 0) {
286 return self.complete({}, 0);
289 BOOST_ASSERT(!conn->reqs_.empty());
290 BOOST_ASSERT(conn->reqs_.front() !=
nullptr);
291 BOOST_ASIO_CORO_YIELD
292 conn->async_read_next(adapter, std::move(self));
293 BOOST_REDIS_CHECK_OP1(;);
295 if (info->stop_requested()) {
298 return self.complete(ec, 0);
301 BOOST_ASSERT(!conn->reqs_.empty());
302 conn->reqs_.pop_front();
304 if (conn->is_waiting_response()) {
305 BOOST_ASSERT(!conn->reqs_.empty());
306 conn->reqs_.front()->proceed();
308 conn->read_timer_.cancel_one();
311 self.complete({}, n);
316template <
class Conn,
class Logger>
318 Conn* conn =
nullptr;
320 asio::coroutine coro{};
322 template <
class Self>
323 void operator()( Self& self
324 , std::array<std::size_t, 2> order = {}
325 , system::error_code ec0 = {}
326 , system::error_code ec1 = {})
328 BOOST_ASIO_CORO_REENTER (coro)
330 conn->write_buffer_.clear();
331 conn->read_buffer_.clear();
333 BOOST_ASIO_CORO_YIELD
334 asio::experimental::make_parallel_group(
335 [
this](
auto token) {
return conn->reader(token);},
336 [
this](
auto token) {
return conn->writer(logger_, token);}
338 asio::experimental::wait_for_one(),
341 if (is_cancelled(self)) {
342 self.complete(asio::error::operation_aborted);
347 case 0: self.complete(ec0);
break;
348 case 1: self.complete(ec1);
break;
349 default: BOOST_ASSERT(
false);
355template <
class Conn,
class Logger>
359 asio::coroutine coro{};
361 template <
class Self>
362 void operator()( Self& self
363 , system::error_code ec = {}
368 BOOST_ASIO_CORO_REENTER (coro)
for (;;)
370 while (conn_->coalesce_requests()) {
371 if (conn_->use_ssl())
372 BOOST_ASIO_CORO_YIELD
asio::async_write(conn_->next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
374 BOOST_ASIO_CORO_YIELD
asio::async_write(conn_->next_layer().next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
376 logger_.on_write(ec, conn_->write_buffer_);
384 if (!conn_->is_open()) {
390 BOOST_ASIO_CORO_YIELD
391 conn_->writer_timer_.async_wait(std::move(self));
392 if (!conn_->is_open() || is_cancelled(self)) {
406 asio::coroutine coro{};
412 || conn->reqs_.empty()
413 || (!conn->reqs_.empty() && conn->reqs_.front()->get_number_of_commands() == 0)
414 || !conn->is_waiting_response();
417 template <
class Self>
418 void operator()( Self& self
419 , system::error_code ec = {}
424 BOOST_ASIO_CORO_REENTER (coro)
for (;;)
427 BOOST_ASIO_CORO_YIELD asio::async_read_until(conn->next_layer(), conn->dbuf_,
"\r\n", std::move(self));
429 BOOST_ASIO_CORO_YIELD asio::async_read_until(conn->next_layer().next_layer(), conn->dbuf_,
"\r\n", std::move(self));
431 if (ec == asio::error::eof) {
433 return self.complete({});
460 BOOST_ASSERT(!conn->read_buffer_.empty());
462 BOOST_ASIO_CORO_YIELD
463 conn->async_wait_receive(std::move(self));
465 BOOST_ASSERT_MSG(conn->is_waiting_response(),
"Not waiting for a response (using MONITOR command perhaps?)");
466 BOOST_ASSERT(!conn->reqs_.empty());
467 BOOST_ASSERT(conn->reqs_.front()->get_number_of_commands() != 0);
468 conn->reqs_.front()->proceed();
469 BOOST_ASIO_CORO_YIELD
470 conn->read_timer_.async_wait(std::move(self));
474 if (!conn->is_open() || ec || is_cancelled(self)) {
476 self.complete(asio::error::basic_errors::operation_aborted);
489template <
class Executor>
496 using next_layer_type = asio::ssl::stream<asio::basic_stream_socket<asio::ip::tcp, Executor>>;
503 asio::ssl::context::method method,
504 std::size_t max_read_size)
511 , dbuf_{read_buffer_, max_read_size}
513 writer_timer_.expires_at(std::chrono::steady_clock::time_point::max());
514 read_timer_.expires_at(std::chrono::steady_clock::time_point::max());
515 read_op_timer_.expires_at(std::chrono::steady_clock::time_point::max());
529 stream_ = std::make_unique<next_layer_type>(writer_timer_.get_executor(), ctx_);
555 template <
class Response,
class CompletionToken>
556 auto async_exec(
request const& req, Response& resp, CompletionToken token)
558 using namespace boost::redis::adapter;
559 auto f = boost_redis_adapt(resp);
560 BOOST_ASSERT_MSG(req.size() <= f.get_supported_response_size(),
"Request and response have incompatible sizes.");
562 return asio::async_compose
564 , void(system::error_code, std::size_t)
565 >(redis::detail::exec_op<this_type,
decltype(f)>{
this, &req, f}, token, writer_timer_);
568 template <
class Response,
class CompletionToken>
569 auto async_receive(Response&
response, CompletionToken token)
571 using namespace boost::redis::adapter;
572 auto g = boost_redis_adapt(
response);
573 auto f = adapter::detail::make_adapter_wrapper(g);
575 return asio::async_compose
577 , void(system::error_code, std::size_t)
578 >(redis::detail::receive_op<this_type,
decltype(f)>{
this, f}, token, read_op_timer_);
581 template <
class Logger,
class CompletionToken>
582 auto async_run(
config const& cfg, Logger l, CompletionToken token)
584 runner_.set_config(cfg);
585 l.set_prefix(runner_.get_config().log_prefix);
586 return runner_.async_run(*
this, l, std::move(token));
590 using clock_type = std::chrono::steady_clock;
591 using clock_traits_type = asio::wait_traits<clock_type>;
592 using timer_type = asio::basic_waitable_timer<clock_type, clock_traits_type, executor_type>;
593 using runner_type = redis::detail::runner<executor_type>;
595 auto use_ssl() const noexcept
596 {
return runner_.get_config().use_ssl;}
598 auto cancel_on_conn_lost() -> std::size_t
601 auto cond = [](
auto const& ptr)
603 BOOST_ASSERT(ptr !=
nullptr);
605 if (ptr->is_written()) {
606 return !ptr->get_request().get_config().cancel_if_unresponded;
608 return !ptr->get_request().get_config().cancel_on_connection_lost;
612 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), cond);
614 auto const ret = std::distance(point, std::end(reqs_));
616 std::for_each(point, std::end(reqs_), [](
auto const& ptr) {
620 reqs_.erase(point, std::end(reqs_));
621 std::for_each(std::begin(reqs_), std::end(reqs_), [](
auto const& ptr) {
622 return ptr->reset_status();
628 auto cancel_unwritten_requests() -> std::size_t
630 auto f = [](
auto const& ptr)
632 BOOST_ASSERT(ptr !=
nullptr);
633 return ptr->is_written();
636 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), f);
638 auto const ret = std::distance(point, std::end(reqs_));
640 std::for_each(point, std::end(reqs_), [](
auto const& ptr) {
644 reqs_.erase(point, std::end(reqs_));
653 cancel_unwritten_requests();
658 read_timer_.cancel();
659 writer_timer_.cancel();
660 cancel_on_conn_lost();
664 read_op_timer_.cancel();
674 write_buffer_.clear();
677 cancel_push_requests();
681 std::for_each(std::begin(reqs_), std::end(reqs_), [](
auto const& ptr) {
682 BOOST_ASSERT_MSG(ptr !=
nullptr,
"Expects non-null pointer.");
683 if (ptr->is_staged())
697 explicit req_info(
request const& req, executor_type ex)
699 , action_{action::none}
701 , cmds_{std::size(req)}
702 , status_{status::none}
704 timer_.expires_at(std::chrono::steady_clock::time_point::max());
710 action_ = action::proceed;
716 action_ = action::stop;
719 [[nodiscard]]
auto is_waiting_write() const noexcept
720 {
return !is_written() && !is_staged(); }
722 [[nodiscard]]
auto is_written() const noexcept
723 {
return status_ == status::written; }
725 [[nodiscard]]
auto is_staged() const noexcept
726 {
return status_ == status::staged; }
728 void mark_written() noexcept
729 { status_ = status::written; }
731 void mark_staged() noexcept
732 { status_ = status::staged; }
734 void reset_status() noexcept
735 { status_ = status::none; }
737 [[nodiscard]]
auto get_number_of_commands() const noexcept
740 [[nodiscard]]
auto get_request() const noexcept -> auto const&
743 [[nodiscard]]
auto stop_requested() const noexcept
744 {
return action_ == action::stop;}
746 template <
class CompletionToken>
747 auto async_wait(CompletionToken token)
749 return timer_.async_wait(std::move(token));
766 void remove_request(std::shared_ptr<req_info>
const& info)
768 reqs_.erase(std::remove(std::begin(reqs_), std::end(reqs_), info));
771 using reqs_type = std::deque<std::shared_ptr<req_info>>;
773 template <
class>
friend struct redis::detail::reader_op;
774 template <
class,
class>
friend struct redis::detail::writer_op;
775 template <
class,
class>
friend struct redis::detail::run_op;
776 template <
class,
class>
friend struct redis::detail::exec_op;
777 template <
class,
class>
friend class redis::detail::read_next_op;
778 template <
class,
class>
friend struct redis::detail::receive_op;
779 template <
class>
friend struct redis::detail::wait_receive_op;
780 template <
class,
class,
class>
friend struct redis::detail::run_all_op;
782 template <
class CompletionToken>
783 auto async_wait_receive(CompletionToken token)
785 return asio::async_compose
787 , void(system::error_code)
788 >(redis::detail::wait_receive_op<this_type>{
this}, token, read_op_timer_);
791 void cancel_push_requests()
793 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](
auto const& ptr) {
794 return !(ptr->is_staged() && ptr->get_request().size() == 0);
797 std::for_each(point, std::end(reqs_), [](
auto const& ptr) {
801 reqs_.erase(point, std::end(reqs_));
804 [[nodiscard]]
bool is_writing() const noexcept
806 return !write_buffer_.empty();
809 void add_request_info(std::shared_ptr<req_info>
const& info)
811 reqs_.push_back(info);
813 if (info->get_request().has_hello_priority()) {
814 auto rend = std::partition_point(std::rbegin(reqs_), std::rend(reqs_), [](
auto const& e) {
815 return e->is_waiting_write();
818 std::rotate(std::rbegin(reqs_), std::rbegin(reqs_) + 1, rend);
821 if (is_open() && !is_writing())
822 writer_timer_.cancel();
825 template <
class CompletionToken>
826 auto reader(CompletionToken&& token)
828 return asio::async_compose
830 , void(system::error_code)
831 >(redis::detail::reader_op<this_type>{
this}, token, writer_timer_);
834 template <
class CompletionToken,
class Logger>
835 auto writer(Logger l, CompletionToken&& token)
837 return asio::async_compose
839 , void(system::error_code)
840 >(redis::detail::writer_op<this_type, Logger>{
this, l}, token, writer_timer_);
843 template <
class Adapter,
class CompletionToken>
844 auto async_read_next(Adapter adapter, CompletionToken token)
846 return asio::async_compose
848 , void(system::error_code, std::size_t)
849 >(redis::detail::read_next_op<this_type, Adapter>{*
this, adapter, reqs_.front()}, token, writer_timer_);
852 template <
class Logger,
class CompletionToken>
853 auto async_run_lean(
config const& cfg, Logger l, CompletionToken token)
855 runner_.set_config(cfg);
856 l.set_prefix(runner_.get_config().log_prefix);
857 return asio::async_compose
859 , void(system::error_code)
860 >(redis::detail::run_op<this_type, Logger>{
this, l}, token, writer_timer_);
863 [[nodiscard]]
bool coalesce_requests()
867 auto const point = std::partition_point(std::cbegin(reqs_), std::cend(reqs_), [](
auto const& ri) {
868 return !ri->is_waiting_write();
871 std::for_each(point, std::cend(reqs_), [
this](
auto const& ri) {
873 write_buffer_ += ri->get_request().payload();
877 return point != std::cend(reqs_);
880 bool is_waiting_response() const noexcept
882 return !std::empty(reqs_) && reqs_.front()->is_written();
887 if (stream_->next_layer().is_open())
888 stream_->next_layer().close();
891 bool is_next_push() const noexcept
893 return !read_buffer_.empty() && (resp3::to_type(read_buffer_.front()) ==
resp3::type::push);
896 auto is_open() const noexcept {
return stream_->next_layer().is_open(); }
897 auto& lowest_layer() noexcept {
return stream_->lowest_layer(); }
899 asio::ssl::context ctx_;
900 std::unique_ptr<next_layer_type> stream_;
905 timer_type writer_timer_;
906 timer_type read_timer_;
907 timer_type read_op_timer_;
910 using dyn_buffer_type = asio::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char>>;
912 std::string read_buffer_;
913 dyn_buffer_type dbuf_;
914 std::string write_buffer_;
Base class for high level Redis asynchronous connections.
auto get_executor()
Returns the associated executor.
virtual void cancel(operation op)
Cancels specific operations.
void reset_stream()
Resets the underlying stream.
connection_base(executor_type ex, asio::ssl::context::method method, std::size_t max_read_size)
Constructs from an executor.
asio::ssl::stream< asio::basic_stream_socket< asio::ip::tcp, Executor > > next_layer_type
Type of the next layer.
auto & next_layer() noexcept
Returns a reference to the next layer.
auto & get_ssl_context() noexcept
Returns the ssl context.
Executor executor_type
Executor type.
auto const & get_ssl_context() const noexcept
Returns the ssl context.
auto const & next_layer() const noexcept
Returns a const reference to the next layer.
std::tuple< adapter::result< Ts >... > response
Response with compile-time size.
operation
Connection operations that can be cancelled.
@ not_connected
There is no stablished connection.
@ exec
Refers to connection::async_exec operations.
@ all
Refers to all operations.
@ run
Refers to connection::async_run operations.
@ receive
Refers to connection::async_receive operations.
Configure parameters used by the connection classes.
auto async_write(AsyncWriteStream &stream, request const &req, CompletionToken &&token=asio::default_completion_token_t< typename AsyncWriteStream::executor_type >{})
Writes a request asynchronously.