actor.cpp
7.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
#include <asio/any_io_executor.hpp>
#include <asio/defer.hpp>
#include <asio/post.hpp>
#include <asio/strand.hpp>
#include <asio/system_executor.hpp>
#include <condition_variable>
#include <deque>
#include <memory>
#include <mutex>
#include <typeinfo>
#include <vector>
using asio::any_io_executor;
using asio::defer;
using asio::post;
using asio::strand;
using asio::system_executor;
//------------------------------------------------------------------------------
// A tiny actor framework
// ~~~~~~~~~~~~~~~~~~~~~~
class actor;
// Used to identify the sender and recipient of messages.
typedef actor* actor_address;
// Base class for all registered message handlers.
class message_handler_base
{
public:
virtual ~message_handler_base() {}
// Used to determine which message handlers receive an incoming message.
virtual const std::type_info& message_id() const = 0;
};
// Base class for a handler for a specific message type.
template <class Message>
class message_handler : public message_handler_base
{
public:
// Handle an incoming message.
virtual void handle_message(Message msg, actor_address from) = 0;
};
// Concrete message handler for a specific message type.
template <class Actor, class Message>
class mf_message_handler : public message_handler<Message>
{
public:
// Construct a message handler to invoke the specified member function.
mf_message_handler(void (Actor::* mf)(Message, actor_address), Actor* a)
: function_(mf), actor_(a)
{
}
// Used to determine which message handlers receive an incoming message.
virtual const std::type_info& message_id() const
{
return typeid(Message);
}
// Handle an incoming message.
virtual void handle_message(Message msg, actor_address from)
{
(actor_->*function_)(std::move(msg), from);
}
// Determine whether the message handler represents the specified function.
bool is_function(void (Actor::* mf)(Message, actor_address)) const
{
return mf == function_;
}
private:
void (Actor::* function_)(Message, actor_address);
Actor* actor_;
};
// Base class for all actors.
class actor
{
public:
virtual ~actor()
{
}
// Obtain the actor's address for use as a message sender or recipient.
actor_address address()
{
return this;
}
// Send a message from one actor to another.
template <class Message>
friend void send(Message msg, actor_address from, actor_address to)
{
// Execute the message handler in the context of the target's executor.
post(to->executor_,
[=, msg=std::move(msg)]() mutable
{
to->call_handler(std::move(msg), from);
});
}
protected:
// Construct the actor to use the specified executor for all message handlers.
actor(any_io_executor e)
: executor_(std::move(e))
{
}
// Register a handler for a specific message type. Duplicates are permitted.
template <class Actor, class Message>
void register_handler(void (Actor::* mf)(Message, actor_address))
{
handlers_.push_back(
std::make_shared<mf_message_handler<Actor, Message>>(
mf, static_cast<Actor*>(this)));
}
// Deregister a handler. Removes only the first matching handler.
template <class Actor, class Message>
void deregister_handler(void (Actor::* mf)(Message, actor_address))
{
const std::type_info& id = typeid(Message);
for (auto iter = handlers_.begin(); iter != handlers_.end(); ++iter)
{
if ((*iter)->message_id() == id)
{
auto mh = static_cast<mf_message_handler<Actor, Message>*>(iter->get());
if (mh->is_function(mf))
{
handlers_.erase(iter);
return;
}
}
}
}
// Send a message from within a message handler.
template <class Message>
void tail_send(Message msg, actor_address to)
{
// Execute the message handler in the context of the target's executor.
defer(to->executor_,
[=, msg=std::move(msg), from=this]
{
to->call_handler(std::move(msg), from);
});
}
private:
// Find the matching message handlers, if any, and call them.
template <class Message>
void call_handler(Message msg, actor_address from)
{
const std::type_info& message_id = typeid(Message);
for (auto& h: handlers_)
{
if (h->message_id() == message_id)
{
auto mh = static_cast<message_handler<Message>*>(h.get());
mh->handle_message(msg, from);
}
}
}
// All messages associated with a single actor object should be processed
// non-concurrently. We use a strand to ensure non-concurrent execution even
// if the underlying executor may use multiple threads.
strand<any_io_executor> executor_;
std::vector<std::shared_ptr<message_handler_base>> handlers_;
};
// A concrete actor that allows synchronous message retrieval.
template <class Message>
class receiver : public actor
{
public:
receiver()
: actor(system_executor())
{
register_handler(&receiver::message_handler);
}
// Block until a message has been received.
Message wait()
{
std::unique_lock<std::mutex> lock(mutex_);
condition_.wait(lock, [this]{ return !message_queue_.empty(); });
Message msg(std::move(message_queue_.front()));
message_queue_.pop_front();
return msg;
}
private:
// Handle a new message by adding it to the queue and waking a waiter.
void message_handler(Message msg, actor_address /* from */)
{
std::lock_guard<std::mutex> lock(mutex_);
message_queue_.push_back(std::move(msg));
condition_.notify_one();
}
std::mutex mutex_;
std::condition_variable condition_;
std::deque<Message> message_queue_;
};
//------------------------------------------------------------------------------
#include <asio/thread_pool.hpp>
#include <iostream>
using asio::thread_pool;
class member : public actor
{
public:
explicit member(any_io_executor e)
: actor(std::move(e))
{
register_handler(&member::init_handler);
}
private:
void init_handler(actor_address next, actor_address from)
{
next_ = next;
caller_ = from;
register_handler(&member::token_handler);
deregister_handler(&member::init_handler);
}
void token_handler(int token, actor_address /*from*/)
{
int msg(token);
actor_address to(caller_);
if (token > 0)
{
msg = token - 1;
to = next_;
}
tail_send(msg, to);
}
actor_address next_;
actor_address caller_;
};
int main()
{
const std::size_t num_threads = 16;
const int num_hops = 50000000;
const std::size_t num_actors = 503;
const int token_value = (num_hops + num_actors - 1) / num_actors;
const std::size_t actors_per_thread = num_actors / num_threads;
struct single_thread_pool : thread_pool { single_thread_pool() : thread_pool(1) {} };
single_thread_pool pools[num_threads];
std::vector<std::shared_ptr<member>> members(num_actors);
receiver<int> rcvr;
// Create the member actors.
for (std::size_t i = 0; i < num_actors; ++i)
members[i] = std::make_shared<member>(pools[(i / actors_per_thread) % num_threads].get_executor());
// Initialise the actors by passing each one the address of the next actor in the ring.
for (std::size_t i = num_actors, next_i = 0; i > 0; next_i = --i)
send(members[next_i]->address(), rcvr.address(), members[i - 1]->address());
// Send exactly one token to each actor, all with the same initial value, rounding up if required.
for (std::size_t i = 0; i < num_actors; ++i)
send(token_value, rcvr.address(), members[i]->address());
// Wait for all signal messages, indicating the tokens have all reached zero.
for (std::size_t i = 0; i < num_actors; ++i)
rcvr.wait();
}