1 /*
2 Copyright (c) 2005-2021 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #if _MSC_VER
18 // Suppress "decorated name length exceeded, name was truncated" warning
19 #pragma warning(disable : 4503)
20 #endif
21
22 #include <cstdlib>
23 #include <cstdio>
24
25 #include <iostream>
26 #include <thread>
27 #include <chrono>
28 #include <tuple>
29
30 #include "oneapi/tbb/flow_graph.h"
31 #include "oneapi/tbb/tick_count.h"
32 #include "oneapi/tbb/spin_mutex.h"
33 #include "oneapi/tbb/global_control.h"
34
35 #include "common/utility/utility.hpp"
36 #include "common/utility/get_default_num_threads.hpp"
37
38 // Each philosopher is an object, and is invoked in the think() function_node, the
39 // eat() function_node and forward() multifunction_node.
40 //
41 // The graph is constructed, and each think() function_node is started with a continue_msg.
42 //
43 // The philosopher will think, then gather two chopsticks, eat, place the chopsticks back,
44 // and if they have not completed the required number of cycles, will start to think() again
45 // by sending a continue_msg to their corresponding think() function_node.
46 //
47 // The reserving join has as its inputs the left and right chopstick queues an a queue
48 // that stores the continue_msg emitted by the function_node after think()ing is done.
49 // When all three inputs are available, a tuple of the inputs will be forwarded to the
50 // eat() function_node. The output of the eat() function_node is sent to the forward()
51 // multifunction_node.
52
53 const std::chrono::seconds think_time(1);
54 const std::chrono::seconds eat_time(1);
55 const int num_times = 10;
56
57 oneapi::tbb::tick_count t0;
58 bool verbose = false;
59
60 const char *names[] = { "Archimedes", "Bakunin", "Confucius", "Democritus", "Euclid",
61 "Favorinus", "Geminus", "Heraclitus", "Ichthyas", "Jason of Nysa",
62 "Kant", "Lavrov", "Metrocles", "Nausiphanes", "Onatas",
63 "Phaedrus", "Quillot", "Russell", "Socrates", "Thales",
64 "Udayana", "Vernadsky", "Wittgenstein", "Xenophilus", "Yen Yuan",
65 "Zenodotus" };
66 const int NumPhilosophers = sizeof(names) / sizeof(char *);
67
68 struct RunOptions {
69 utility::thread_number_range threads;
70 int number_of_philosophers;
71 bool silent;
RunOptionsRunOptions72 RunOptions(utility::thread_number_range threads_, int number_of_philosophers_, bool silent_)
73 : threads(threads_),
74 number_of_philosophers(number_of_philosophers_),
75 silent(silent_) {}
76 };
77
ParseCommandLine(int argc,char * argv[])78 RunOptions ParseCommandLine(int argc, char *argv[]) {
79 int auto_threads = utility::get_default_num_threads();
80 utility::thread_number_range threads(
81 utility::get_default_num_threads, auto_threads, auto_threads);
82 int nPhilosophers = 5;
83 bool verbose = false;
84 char charbuf[100];
85 std::sprintf(charbuf, "%d", NumPhilosophers);
86 std::string pCount = "how many philosophers, from 2-";
87 pCount += charbuf;
88
89 utility::cli_argument_pack cli_pack;
90 cli_pack.positional_arg(threads, "n-of_threads", utility::thread_number_range_desc)
91 .positional_arg(nPhilosophers, "n-of-philosophers", pCount)
92 .arg(verbose, "verbose", "verbose output");
93 utility::parse_cli_arguments(argc, argv, cli_pack);
94 if (nPhilosophers < 2 || nPhilosophers > NumPhilosophers) {
95 std::cout << "Number of philosophers (" << nPhilosophers
96 << ") out of range [2:" << NumPhilosophers << "]\n";
97 std::cout << cli_pack.usage_string(argv[0]) << std::flush;
98 std::exit(-1);
99 }
100 return RunOptions(threads, nPhilosophers, !verbose);
101 }
102
103 oneapi::tbb::spin_mutex my_mutex;
104
105 class chopstick {};
106
107 typedef std::tuple<oneapi::tbb::flow::continue_msg, chopstick, chopstick> join_output;
108 typedef oneapi::tbb::flow::join_node<join_output, oneapi::tbb::flow::reserving> join_node_type;
109
110 typedef oneapi::tbb::flow::function_node<oneapi::tbb::flow::continue_msg,
111 oneapi::tbb::flow::continue_msg>
112 think_node_type;
113 typedef oneapi::tbb::flow::function_node<join_output, oneapi::tbb::flow::continue_msg>
114 eat_node_type;
115 typedef oneapi::tbb::flow::multifunction_node<oneapi::tbb::flow::continue_msg, join_output>
116 forward_node_type;
117
118 class philosopher {
119 public:
philosopher(const char * name)120 philosopher(const char *name) : my_name(name), my_count(num_times) {}
121
~philosopher()122 ~philosopher() {}
123
124 void check();
name() const125 const char *name() const {
126 return my_name;
127 }
128
129 private:
130 friend std::ostream &operator<<(std::ostream &o, philosopher const &p);
131
132 const char *my_name;
133 int my_count;
134
135 friend class think_node_body;
136 friend class eat_node_body;
137 friend class forward_node_body;
138
139 void think();
140 void eat();
141 void forward(const oneapi::tbb::flow::continue_msg &in,
142 forward_node_type::output_ports_type &out_ports);
143 };
144
operator <<(std::ostream & o,philosopher const & p)145 std::ostream &operator<<(std::ostream &o, philosopher const &p) {
146 o << "< philosopher[" << reinterpret_cast<uintptr_t>(const_cast<philosopher *>(&p)) << "] "
147 << p.name() << ", my_count=" << p.my_count;
148
149 return o;
150 }
151
152 class think_node_body {
153 philosopher &my_philosopher;
154
155 public:
think_node_body(philosopher & p)156 think_node_body(philosopher &p) : my_philosopher(p) {}
think_node_body(const think_node_body & other)157 think_node_body(const think_node_body &other) : my_philosopher(other.my_philosopher) {}
operator ()(oneapi::tbb::flow::continue_msg)158 oneapi::tbb::flow::continue_msg operator()(oneapi::tbb::flow::continue_msg /*m*/) {
159 my_philosopher.think();
160 return oneapi::tbb::flow::continue_msg();
161 }
162 };
163
164 class eat_node_body {
165 philosopher &my_philosopher;
166
167 public:
eat_node_body(philosopher & p)168 eat_node_body(philosopher &p) : my_philosopher(p) {}
eat_node_body(const eat_node_body & other)169 eat_node_body(const eat_node_body &other) : my_philosopher(other.my_philosopher) {}
operator ()(const join_output & in)170 oneapi::tbb::flow::continue_msg operator()(const join_output &in) {
171 my_philosopher.eat();
172 return oneapi::tbb::flow::continue_msg();
173 }
174 };
175
176 class forward_node_body {
177 philosopher &my_philosopher;
178
179 public:
forward_node_body(philosopher & p)180 forward_node_body(philosopher &p) : my_philosopher(p) {}
forward_node_body(const forward_node_body & other)181 forward_node_body(const forward_node_body &other) : my_philosopher(other.my_philosopher) {}
operator ()(const oneapi::tbb::flow::continue_msg & in,forward_node_type::output_ports_type & out)182 void operator()(const oneapi::tbb::flow::continue_msg &in,
183 forward_node_type::output_ports_type &out) {
184 my_philosopher.forward(in, out);
185 }
186 };
187
check()188 void philosopher::check() {
189 if (my_count != 0) {
190 std::printf("ERROR: philosopher %s still had to run %d more times\n", name(), my_count);
191 std::exit(-1);
192 }
193 }
194
forward(const oneapi::tbb::flow::continue_msg &,forward_node_type::output_ports_type & out_ports)195 void philosopher::forward(const oneapi::tbb::flow::continue_msg & /*in*/,
196 forward_node_type::output_ports_type &out_ports) {
197 if (my_count < 0)
198 abort();
199 --my_count;
200 (void)std::get<1>(out_ports).try_put(chopstick());
201 (void)std::get<2>(out_ports).try_put(chopstick());
202 if (my_count > 0) {
203 (void)std::get<0>(out_ports).try_put(
204 oneapi::tbb::flow::continue_msg()); //start thinking again
205 }
206 else {
207 if (verbose) {
208 oneapi::tbb::spin_mutex::scoped_lock lock(my_mutex);
209 std::printf("%s has left the building\n", name());
210 }
211 }
212 }
213
eat()214 void philosopher::eat() {
215 if (verbose) {
216 oneapi::tbb::spin_mutex::scoped_lock lock(my_mutex);
217 std::printf("%s eating\n", name());
218 }
219 std::this_thread::sleep_for(eat_time);
220 if (verbose) {
221 oneapi::tbb::spin_mutex::scoped_lock lock(my_mutex);
222 std::printf("%s done eating\n", name());
223 }
224 }
225
think()226 void philosopher::think() {
227 if (verbose) {
228 oneapi::tbb::spin_mutex::scoped_lock lock(my_mutex);
229 std::printf("%s thinking\n", name());
230 }
231 std::this_thread::sleep_for(think_time);
232 if (verbose) {
233 oneapi::tbb::spin_mutex::scoped_lock lock(my_mutex);
234 std::printf("%s done thinking\n", name());
235 }
236 }
237
238 typedef oneapi::tbb::flow::queue_node<oneapi::tbb::flow::continue_msg> thinking_done_type;
239
main(int argc,char * argv[])240 int main(int argc, char *argv[]) {
241 using oneapi::tbb::flow::make_edge;
242 using oneapi::tbb::flow::input_port;
243 using oneapi::tbb::flow::output_port;
244
245 oneapi::tbb::tick_count main_time = oneapi::tbb::tick_count::now();
246 int num_threads;
247 int num_philosophers;
248
249 RunOptions options = ParseCommandLine(argc, argv);
250 num_philosophers = options.number_of_philosophers;
251 verbose = !options.silent;
252
253 for (num_threads = options.threads.first; num_threads <= options.threads.last;
254 num_threads = options.threads.step(num_threads)) {
255 oneapi::tbb::global_control c(oneapi::tbb::global_control::max_allowed_parallelism,
256 num_threads);
257
258 oneapi::tbb::flow::graph g;
259
260 if (verbose) {
261 std::cout << "\n"
262 << num_philosophers << " philosophers with " << num_threads << " threads"
263 << "\n"
264 << "\n";
265 }
266 t0 = oneapi::tbb::tick_count::now();
267
268 std::vector<oneapi::tbb::flow::queue_node<chopstick>> places(
269 num_philosophers, oneapi::tbb::flow::queue_node<chopstick>(g));
270 std::vector<philosopher> philosophers;
271 philosophers.reserve(num_philosophers);
272 std::vector<think_node_type *> think_nodes;
273 think_nodes.reserve(num_philosophers);
274 std::vector<thinking_done_type> done_vector(num_philosophers, thinking_done_type(g));
275 std::vector<join_node_type> join_vector(num_philosophers, join_node_type(g));
276 std::vector<eat_node_type *> eat_nodes;
277 eat_nodes.reserve(num_philosophers);
278 std::vector<forward_node_type *> forward_nodes;
279 forward_nodes.reserve(num_philosophers);
280 for (int i = 0; i < num_philosophers; ++i) {
281 places[i].try_put(chopstick());
282 philosophers.push_back(
283 philosopher(names[i])); // allowed because of default generated assignment
284 if (verbose) {
285 oneapi::tbb::spin_mutex::scoped_lock lock(my_mutex);
286 std::cout << "Built philosopher " << philosophers[i] << "\n";
287 }
288 think_nodes.push_back(new think_node_type(
289 g, oneapi::tbb::flow::unlimited, think_node_body(philosophers[i])));
290 eat_nodes.push_back(
291 new eat_node_type(g, oneapi::tbb::flow::unlimited, eat_node_body(philosophers[i])));
292 forward_nodes.push_back(new forward_node_type(
293 g, oneapi::tbb::flow::unlimited, forward_node_body(philosophers[i])));
294 }
295
296 // attach chopstick buffers and think function_nodes to joins
297 for (int i = 0; i < num_philosophers; ++i) {
298 make_edge(*think_nodes[i], done_vector[i]);
299 make_edge(done_vector[i], input_port<0>(join_vector[i]));
300 make_edge(places[i], input_port<1>(join_vector[i])); // left chopstick
301 make_edge(places[(i + 1) % num_philosophers],
302 input_port<2>(join_vector[i])); // right chopstick
303 make_edge(join_vector[i], *eat_nodes[i]);
304 make_edge(*eat_nodes[i], *forward_nodes[i]);
305 make_edge(output_port<0>(*forward_nodes[i]), *think_nodes[i]);
306 make_edge(output_port<1>(*forward_nodes[i]), places[i]);
307 make_edge(output_port<2>(*forward_nodes[i]), places[(i + 1) % num_philosophers]);
308 }
309
310 // start all the philosophers thinking
311 for (int i = 0; i < num_philosophers; ++i)
312 think_nodes[i]->try_put(oneapi::tbb::flow::continue_msg());
313
314 g.wait_for_all();
315
316 oneapi::tbb::tick_count t1 = oneapi::tbb::tick_count::now();
317 if (verbose)
318 std::cout << "\n"
319 << num_philosophers << " philosophers with " << num_threads
320 << " threads have taken " << (t1 - t0).seconds() << "seconds"
321 << "\n";
322
323 for (int i = 0; i < num_philosophers; ++i)
324 philosophers[i].check();
325
326 for (int i = 0; i < num_philosophers; ++i) {
327 delete think_nodes[i];
328 delete eat_nodes[i];
329 delete forward_nodes[i];
330 }
331 }
332
333 utility::report_elapsed_time((oneapi::tbb::tick_count::now() - main_time).seconds());
334 return 0;
335 }
336