composed_8.cpp
7.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
//
// composed_8.cpp
// ~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2020 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <asio/compose.hpp>
#include <asio/io_context.hpp>
#include <asio/ip/tcp.hpp>
#include <asio/steady_timer.hpp>
#include <asio/use_future.hpp>
#include <asio/write.hpp>
#include <functional>
#include <iostream>
#include <memory>
#include <sstream>
#include <string>
#include <type_traits>
#include <utility>
using asio::ip::tcp;
// NOTE: This example requires the new asio::async_compose function. For
// an example that works with the Networking TS style of completion tokens,
// please see an older version of asio.
//------------------------------------------------------------------------------
// This composed operation shows composition of multiple underlying operations,
// using asio's stackless coroutines support to express the flow of control. It
// automatically serialises a message, using its I/O streams insertion
// operator, before sending it N times on the socket. To do this, it must
// allocate a buffer for the encoded message and ensure this buffer's validity
// until all underlying async_write operation complete. A one second delay is
// inserted prior to each write operation, using a steady_timer.
#include <asio/yield.hpp>
// In this example, the composed operation's logic is implemented as a state
// machine within a hand-crafted function object.
struct async_write_messages_implementation
{
// The implementation holds a reference to the socket as it is used for
// multiple async_write operations.
tcp::socket& socket_;
// The allocated buffer for the encoded message. The std::unique_ptr smart
// pointer is move-only, and as a consequence our implementation is also
// move-only.
std::unique_ptr<std::string> encoded_message_;
// The repeat count remaining.
std::size_t repeat_count_;
// A steady timer used for introducing a delay.
std::unique_ptr<asio::steady_timer> delay_timer_;
// The coroutine state.
asio::coroutine coro_;
// The first argument to our function object's call operator is a reference
// to the enclosing intermediate completion handler. This intermediate
// completion handler is provided for us by the asio::async_compose
// function, and takes care of all the details required to implement a
// conforming asynchronous operation. When calling an underlying asynchronous
// operation, we pass it this enclosing intermediate completion handler
// as the completion token.
//
// All arguments after the first must be defaulted to allow the state machine
// to be started, as well as to allow the completion handler to match the
// completion signature of both the async_write and steady_timer::async_wait
// operations.
template <typename Self>
void operator()(Self& self,
const std::error_code& error = std::error_code(),
std::size_t = 0)
{
reenter (coro_)
{
while (repeat_count_ > 0)
{
--repeat_count_;
delay_timer_->expires_after(std::chrono::seconds(1));
yield delay_timer_->async_wait(std::move(self));
if (error)
break;
yield asio::async_write(socket_,
asio::buffer(*encoded_message_), std::move(self));
if (error)
break;
}
// Deallocate the encoded message and delay timer before calling the
// user-supplied completion handler.
encoded_message_.reset();
delay_timer_.reset();
// Call the user-supplied handler with the result of the operation.
self.complete(error);
}
}
};
#include <asio/unyield.hpp>
template <typename T, typename CompletionToken>
auto async_write_messages(tcp::socket& socket,
const T& message, std::size_t repeat_count,
CompletionToken&& token)
// The return type of the initiating function is deduced from the combination
// of CompletionToken type and the completion handler's signature. When the
// completion token is a simple callback, the return type is always void.
// In this example, when the completion token is asio::yield_context
// (used for stackful coroutines) the return type would be also be void, as
// there is no non-error argument to the completion handler. When the
// completion token is asio::use_future it would be std::future<void>.
-> typename asio::async_result<
typename std::decay<CompletionToken>::type,
void(std::error_code)>::return_type
{
// Encode the message and copy it into an allocated buffer. The buffer will
// be maintained for the lifetime of the composed asynchronous operation.
std::ostringstream os;
os << message;
std::unique_ptr<std::string> encoded_message(new std::string(os.str()));
// Create a steady_timer to be used for the delay between messages.
std::unique_ptr<asio::steady_timer> delay_timer(
new asio::steady_timer(socket.get_executor()));
// The asio::async_compose function takes:
//
// - our asynchronous operation implementation,
// - the completion token,
// - the completion handler signature, and
// - any I/O objects (or executors) used by the operation
//
// It then wraps our implementation in an intermediate completion handler
// that meets the requirements of a conforming asynchronous operation. This
// includes tracking outstanding work against the I/O executors associated
// with the operation (in this example, this is the socket's executor).
return asio::async_compose<
CompletionToken, void(std::error_code)>(
async_write_messages_implementation{socket,
std::move(encoded_message), repeat_count,
std::move(delay_timer), asio::coroutine()},
token, socket);
}
//------------------------------------------------------------------------------
void test_callback()
{
asio::io_context io_context;
tcp::acceptor acceptor(io_context, {tcp::v4(), 55555});
tcp::socket socket = acceptor.accept();
// Test our asynchronous operation using a lambda as a callback.
async_write_messages(socket, "Testing callback\r\n", 5,
[](const std::error_code& error)
{
if (!error)
{
std::cout << "Messages sent\n";
}
else
{
std::cout << "Error: " << error.message() << "\n";
}
});
io_context.run();
}
//------------------------------------------------------------------------------
void test_future()
{
asio::io_context io_context;
tcp::acceptor acceptor(io_context, {tcp::v4(), 55555});
tcp::socket socket = acceptor.accept();
// Test our asynchronous operation using the use_future completion token.
// This token causes the operation's initiating function to return a future,
// which may be used to synchronously wait for the result of the operation.
std::future<void> f = async_write_messages(
socket, "Testing future\r\n", 5, asio::use_future);
io_context.run();
try
{
// Get the result of the operation.
f.get();
std::cout << "Messages sent\n";
}
catch (const std::exception& e)
{
std::cout << "Error: " << e.what() << "\n";
}
}
//------------------------------------------------------------------------------
int main()
{
test_callback();
test_future();
}