boost_redis 1.4.2
A redis client library
read.hpp
1/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
2 *
3 * Distributed under the Boost Software License, Version 1.0. (See
4 * accompanying file LICENSE.txt)
5 */
6
7#ifndef BOOST_REDIS_READ_HPP
8#define BOOST_REDIS_READ_HPP
9
10#include <boost/redis/resp3/type.hpp>
11#include <boost/redis/resp3/parser.hpp>
12#include <boost/redis/adapter/ignore.hpp>
13#include <boost/redis/detail/helper.hpp>
14#include <boost/asio/read.hpp>
15#include <boost/asio/compose.hpp>
16#include <boost/asio/coroutine.hpp>
17#include <boost/asio/post.hpp>
18
19#include <string_view>
20#include <limits>
21
22namespace boost::redis::detail {
23
24template <class DynamicBuffer>
25std::string_view buffer_view(DynamicBuffer buf) noexcept
26{
27 char const* start = static_cast<char const*>(buf.data(0, buf.size()).data());
28 return std::string_view{start, std::size(buf)};
29}
30
31template <class AsyncReadStream, class DynamicBuffer>
32class append_some_op {
33private:
34 AsyncReadStream& stream_;
35 DynamicBuffer buf_;
36 std::size_t size_ = 0;
37 std::size_t tmp_ = 0;
38 asio::coroutine coro_{};
39
40public:
41 append_some_op(AsyncReadStream& stream, DynamicBuffer buf, std::size_t size)
42 : stream_ {stream}
43 , buf_ {std::move(buf)}
44 , size_{size}
45 { }
46
47 template <class Self>
48 void operator()( Self& self
49 , system::error_code ec = {}
50 , std::size_t n = 0)
51 {
52 BOOST_ASIO_CORO_REENTER (coro_)
53 {
54 tmp_ = buf_.size();
55 buf_.grow(size_);
56
57 BOOST_ASIO_CORO_YIELD
58 stream_.async_read_some(buf_.data(tmp_, size_), std::move(self));
59 if (ec) {
60 self.complete(ec, 0);
61 return;
62 }
63
64 buf_.shrink(buf_.size() - tmp_ - n);
65 self.complete({}, n);
66 }
67 }
68};
69
70template <class AsyncReadStream, class DynamicBuffer, class CompletionToken>
71auto
72async_append_some(
73 AsyncReadStream& stream,
74 DynamicBuffer buffer,
75 std::size_t size,
76 CompletionToken&& token)
77{
78 return asio::async_compose
79 < CompletionToken
80 , void(system::error_code, std::size_t)
81 >(append_some_op<AsyncReadStream, DynamicBuffer> {stream, buffer, size}, token, stream);
82}
83
84template <
85 class AsyncReadStream,
86 class DynamicBuffer,
87 class ResponseAdapter>
88class parse_op {
89private:
90 AsyncReadStream& stream_;
91 DynamicBuffer buf_;
92 resp3::parser parser_;
93 ResponseAdapter adapter_;
94 bool needs_rescheduling_ = true;
95 system::error_code ec_;
96 asio::coroutine coro_{};
97
98 static std::size_t const growth = 1024;
99
100public:
101 parse_op(AsyncReadStream& stream, DynamicBuffer buf, ResponseAdapter adapter)
102 : stream_ {stream}
103 , buf_ {std::move(buf)}
104 , adapter_ {std::move(adapter)}
105 { }
106
107 template <class Self>
108 void operator()( Self& self
109 , system::error_code ec = {}
110 , std::size_t = 0)
111 {
112 BOOST_ASIO_CORO_REENTER (coro_)
113 {
114 while (!resp3::parse(parser_, buffer_view(buf_), adapter_, ec)) {
115 needs_rescheduling_ = false;
116 BOOST_ASIO_CORO_YIELD
117 async_append_some(
118 stream_, buf_, parser_.get_suggested_buffer_growth(growth),
119 std::move(self));
120 if (ec) {
121 self.complete(ec, 0);
122 return;
123 }
124 }
125
126 ec_ = ec;
127 if (needs_rescheduling_) {
128 BOOST_ASIO_CORO_YIELD
129 asio::post(std::move(self));
130 }
131
132 self.complete(ec_, parser_.get_consumed());
133 }
134 }
135};
136
166template <
167 class SyncReadStream,
168 class DynamicBuffer,
169 class ResponseAdapter
170 >
171auto
172read(
173 SyncReadStream& stream,
174 DynamicBuffer buf,
175 ResponseAdapter adapter,
176 system::error_code& ec) -> std::size_t
177{
178 static std::size_t const growth = 1024;
179
180 resp3::parser parser;
181 while (!parser.done()) {
182 auto const res = parser.consume(detail::buffer_view(buf), ec);
183 if (ec)
184 return 0UL;
185
186 if (!res.has_value()) {
187 auto const size_before = buf.size();
188 buf.grow(parser.get_suggested_buffer_growth(growth));
189 auto const n =
190 stream.read_some(
191 buf.data(size_before, parser.get_suggested_buffer_growth(growth)),
192 ec);
193 if (ec)
194 return 0UL;
195
196 buf.shrink(buf.size() - size_before - n);
197 continue;
198 }
199
200 adapter(res.value(), ec);
201 if (ec)
202 return 0UL;
203 }
204
205 return parser.get_consumed();
206}
207
212template<
213 class SyncReadStream,
214 class DynamicBuffer,
215 class ResponseAdapter = adapter::ignore>
216auto
217read(
218 SyncReadStream& stream,
219 DynamicBuffer buf,
220 ResponseAdapter adapter = ResponseAdapter{})
221{
222 system::error_code ec;
223 auto const n = redis::detail::read(stream, buf, adapter, ec);
224
225 if (ec)
226 BOOST_THROW_EXCEPTION(system::system_error{ec});
227
228 return n;
229}
230
268template <
269 class AsyncReadStream,
270 class DynamicBuffer,
271 class ResponseAdapter = adapter::ignore,
272 class CompletionToken = asio::default_completion_token_t<typename AsyncReadStream::executor_type>
273 >
274auto async_read(
275 AsyncReadStream& stream,
276 DynamicBuffer buffer,
277 ResponseAdapter adapter = ResponseAdapter{},
278 CompletionToken&& token =
279 asio::default_completion_token_t<typename AsyncReadStream::executor_type>{})
280{
281 return asio::async_compose
282 < CompletionToken
283 , void(system::error_code, std::size_t)
284 >(parse_op<AsyncReadStream, DynamicBuffer, ResponseAdapter> {stream, buffer, adapter},
285 token,
286 stream);
287}
288
289} // boost::redis::detail
290
291#endif // BOOST_REDIS_READ_HPP
ignore_t ignore
Global ignore object.