7#ifndef BOOST_REDIS_READ_HPP
8#define BOOST_REDIS_READ_HPP
10#include <boost/redis/resp3/type.hpp>
11#include <boost/redis/resp3/parser.hpp>
12#include <boost/redis/adapter/ignore.hpp>
13#include <boost/redis/detail/helper.hpp>
14#include <boost/asio/read.hpp>
15#include <boost/asio/compose.hpp>
16#include <boost/asio/coroutine.hpp>
17#include <boost/asio/post.hpp>
22namespace boost::redis::detail {
24template <
class DynamicBuffer>
25std::string_view buffer_view(DynamicBuffer buf)
noexcept
27 char const* start =
static_cast<char const*
>(buf.data(0, buf.size()).data());
28 return std::string_view{start, std::size(buf)};
31template <
class AsyncReadStream,
class DynamicBuffer>
34 AsyncReadStream& stream_;
36 std::size_t size_ = 0;
38 asio::coroutine coro_{};
41 append_some_op(AsyncReadStream& stream, DynamicBuffer buf, std::size_t size)
43 , buf_ {std::move(buf)}
48 void operator()( Self& self
49 , system::error_code ec = {}
52 BOOST_ASIO_CORO_REENTER (coro_)
58 stream_.async_read_some(buf_.data(tmp_, size_), std::move(self));
64 buf_.shrink(buf_.size() - tmp_ - n);
70template <
class AsyncReadStream,
class DynamicBuffer,
class CompletionToken>
73 AsyncReadStream& stream,
76 CompletionToken&& token)
78 return asio::async_compose
80 , void(system::error_code, std::size_t)
81 >(append_some_op<AsyncReadStream, DynamicBuffer> {stream, buffer, size}, token, stream);
85 class AsyncReadStream,
87 class ResponseAdapter>
90 AsyncReadStream& stream_;
92 resp3::parser parser_;
93 ResponseAdapter adapter_;
94 bool needs_rescheduling_ =
true;
95 system::error_code ec_;
96 asio::coroutine coro_{};
98 static std::size_t
const growth = 1024;
101 parse_op(AsyncReadStream& stream, DynamicBuffer buf, ResponseAdapter adapter)
103 , buf_ {std::move(buf)}
104 , adapter_ {std::move(adapter)}
107 template <
class Self>
108 void operator()( Self& self
109 , system::error_code ec = {}
112 BOOST_ASIO_CORO_REENTER (coro_)
114 while (!resp3::parse(parser_, buffer_view(buf_), adapter_, ec)) {
115 needs_rescheduling_ =
false;
116 BOOST_ASIO_CORO_YIELD
118 stream_, buf_, parser_.get_suggested_buffer_growth(growth),
121 self.complete(ec, 0);
127 if (needs_rescheduling_) {
128 BOOST_ASIO_CORO_YIELD
129 asio::post(std::move(self));
132 self.complete(ec_, parser_.get_consumed());
167 class SyncReadStream,
169 class ResponseAdapter
173 SyncReadStream& stream,
175 ResponseAdapter adapter,
176 system::error_code& ec) -> std::size_t
178 static std::size_t
const growth = 1024;
180 resp3::parser parser;
181 while (!parser.done()) {
182 auto const res = parser.consume(detail::buffer_view(buf), ec);
186 if (!res.has_value()) {
187 auto const size_before = buf.size();
188 buf.grow(parser.get_suggested_buffer_growth(growth));
191 buf.data(size_before, parser.get_suggested_buffer_growth(growth)),
196 buf.shrink(buf.size() - size_before - n);
200 adapter(res.value(), ec);
205 return parser.get_consumed();
213 class SyncReadStream,
218 SyncReadStream& stream,
220 ResponseAdapter adapter = ResponseAdapter{})
222 system::error_code ec;
223 auto const n = redis::detail::read(stream, buf, adapter, ec);
226 BOOST_THROW_EXCEPTION(system::system_error{ec});
269 class AsyncReadStream,
272 class CompletionToken = asio::default_completion_token_t<typename AsyncReadStream::executor_type>
275 AsyncReadStream& stream,
276 DynamicBuffer buffer,
277 ResponseAdapter adapter = ResponseAdapter{},
278 CompletionToken&& token =
279 asio::default_completion_token_t<typename AsyncReadStream::executor_type>{})
281 return asio::async_compose
283 , void(system::error_code, std::size_t)
284 >(parse_op<AsyncReadStream, DynamicBuffer, ResponseAdapter> {stream, buffer, adapter},
ignore_t ignore
Global ignore object.