1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #if _MSC_VER
18 // Suppress "decorated name length exceeded, name was truncated" warning
19 #pragma warning(disable : 4503)
20 #endif
21 
22 #include <cstdlib>
23 #include <cstdio>
24 
25 #include <iostream>
26 #include <thread>
27 #include <chrono>
28 #include <tuple>
29 
30 #include "oneapi/tbb/flow_graph.h"
31 #include "oneapi/tbb/tick_count.h"
32 #include "oneapi/tbb/spin_mutex.h"
33 #include "oneapi/tbb/global_control.h"
34 
35 #include "common/utility/utility.hpp"
36 #include "common/utility/get_default_num_threads.hpp"
37 
38 // Each philosopher is an object, and is invoked in the think() function_node, the
39 // eat() function_node and forward() multifunction_node.
40 //
41 // The graph is constructed, and each think() function_node is started with a continue_msg.
42 //
43 // The philosopher will think, then gather two chopsticks, eat, place the chopsticks back,
44 // and if they have not completed the required number of cycles, will start to think() again
45 // by sending a continue_msg to their corresponding think() function_node.
46 //
47 // The reserving join has as its inputs the left and right chopstick queues an a queue
48 // that stores the continue_msg emitted by the function_node after think()ing is done.
49 // When all three inputs are available, a tuple of the inputs will be forwarded to the
50 // eat() function_node.  The output of the eat() function_node is sent to the forward()
51 // multifunction_node.
52 
53 const std::chrono::seconds think_time(1);
54 const std::chrono::seconds eat_time(1);
55 const int num_times = 10;
56 
57 oneapi::tbb::tick_count t0;
58 bool verbose = false;
59 
60 const char *names[] = { "Archimedes", "Bakunin",   "Confucius",    "Democritus",  "Euclid",
61                         "Favorinus",  "Geminus",   "Heraclitus",   "Ichthyas",    "Jason of Nysa",
62                         "Kant",       "Lavrov",    "Metrocles",    "Nausiphanes", "Onatas",
63                         "Phaedrus",   "Quillot",   "Russell",      "Socrates",    "Thales",
64                         "Udayana",    "Vernadsky", "Wittgenstein", "Xenophilus",  "Yen Yuan",
65                         "Zenodotus" };
66 const int NumPhilosophers = sizeof(names) / sizeof(char *);
67 
68 struct RunOptions {
69     utility::thread_number_range threads;
70     int number_of_philosophers;
71     bool silent;
RunOptionsRunOptions72     RunOptions(utility::thread_number_range threads_, int number_of_philosophers_, bool silent_)
73             : threads(threads_),
74               number_of_philosophers(number_of_philosophers_),
75               silent(silent_) {}
76 };
77 
ParseCommandLine(int argc,char * argv[])78 RunOptions ParseCommandLine(int argc, char *argv[]) {
79     int auto_threads = utility::get_default_num_threads();
80     utility::thread_number_range threads(
81         utility::get_default_num_threads, auto_threads, auto_threads);
82     int nPhilosophers = 5;
83     bool verbose = false;
84     char charbuf[100];
85     std::sprintf(charbuf, "%d", NumPhilosophers);
86     std::string pCount = "how many philosophers, from 2-";
87     pCount += charbuf;
88 
89     utility::cli_argument_pack cli_pack;
90     cli_pack.positional_arg(threads, "n-of_threads", utility::thread_number_range_desc)
91         .positional_arg(nPhilosophers, "n-of-philosophers", pCount)
92         .arg(verbose, "verbose", "verbose output");
93     utility::parse_cli_arguments(argc, argv, cli_pack);
94     if (nPhilosophers < 2 || nPhilosophers > NumPhilosophers) {
95         std::cout << "Number of philosophers (" << nPhilosophers
96                   << ") out of range [2:" << NumPhilosophers << "]\n";
97         std::cout << cli_pack.usage_string(argv[0]) << std::flush;
98         std::exit(-1);
99     }
100     return RunOptions(threads, nPhilosophers, !verbose);
101 }
102 
103 oneapi::tbb::spin_mutex my_mutex;
104 
105 class chopstick {};
106 
107 typedef std::tuple<oneapi::tbb::flow::continue_msg, chopstick, chopstick> join_output;
108 typedef oneapi::tbb::flow::join_node<join_output, oneapi::tbb::flow::reserving> join_node_type;
109 
110 typedef oneapi::tbb::flow::function_node<oneapi::tbb::flow::continue_msg,
111                                          oneapi::tbb::flow::continue_msg>
112     think_node_type;
113 typedef oneapi::tbb::flow::function_node<join_output, oneapi::tbb::flow::continue_msg>
114     eat_node_type;
115 typedef oneapi::tbb::flow::multifunction_node<oneapi::tbb::flow::continue_msg, join_output>
116     forward_node_type;
117 
118 class philosopher {
119 public:
philosopher(const char * name)120     philosopher(const char *name) : my_name(name), my_count(num_times) {}
121 
~philosopher()122     ~philosopher() {}
123 
124     void check();
name() const125     const char *name() const {
126         return my_name;
127     }
128 
129 private:
130     friend std::ostream &operator<<(std::ostream &o, philosopher const &p);
131 
132     const char *my_name;
133     int my_count;
134 
135     friend class think_node_body;
136     friend class eat_node_body;
137     friend class forward_node_body;
138 
139     void think();
140     void eat();
141     void forward(const oneapi::tbb::flow::continue_msg &in,
142                  forward_node_type::output_ports_type &out_ports);
143 };
144 
operator <<(std::ostream & o,philosopher const & p)145 std::ostream &operator<<(std::ostream &o, philosopher const &p) {
146     o << "< philosopher[" << reinterpret_cast<uintptr_t>(const_cast<philosopher *>(&p)) << "] "
147       << p.name() << ", my_count=" << p.my_count;
148 
149     return o;
150 }
151 
152 class think_node_body {
153     philosopher &my_philosopher;
154 
155 public:
think_node_body(philosopher & p)156     think_node_body(philosopher &p) : my_philosopher(p) {}
think_node_body(const think_node_body & other)157     think_node_body(const think_node_body &other) : my_philosopher(other.my_philosopher) {}
operator ()(oneapi::tbb::flow::continue_msg)158     oneapi::tbb::flow::continue_msg operator()(oneapi::tbb::flow::continue_msg /*m*/) {
159         my_philosopher.think();
160         return oneapi::tbb::flow::continue_msg();
161     }
162 };
163 
164 class eat_node_body {
165     philosopher &my_philosopher;
166 
167 public:
eat_node_body(philosopher & p)168     eat_node_body(philosopher &p) : my_philosopher(p) {}
eat_node_body(const eat_node_body & other)169     eat_node_body(const eat_node_body &other) : my_philosopher(other.my_philosopher) {}
operator ()(const join_output & in)170     oneapi::tbb::flow::continue_msg operator()(const join_output &in) {
171         my_philosopher.eat();
172         return oneapi::tbb::flow::continue_msg();
173     }
174 };
175 
176 class forward_node_body {
177     philosopher &my_philosopher;
178 
179 public:
forward_node_body(philosopher & p)180     forward_node_body(philosopher &p) : my_philosopher(p) {}
forward_node_body(const forward_node_body & other)181     forward_node_body(const forward_node_body &other) : my_philosopher(other.my_philosopher) {}
operator ()(const oneapi::tbb::flow::continue_msg & in,forward_node_type::output_ports_type & out)182     void operator()(const oneapi::tbb::flow::continue_msg &in,
183                     forward_node_type::output_ports_type &out) {
184         my_philosopher.forward(in, out);
185     }
186 };
187 
check()188 void philosopher::check() {
189     if (my_count != 0) {
190         std::printf("ERROR: philosopher %s still had to run %d more times\n", name(), my_count);
191         std::exit(-1);
192     }
193 }
194 
forward(const oneapi::tbb::flow::continue_msg &,forward_node_type::output_ports_type & out_ports)195 void philosopher::forward(const oneapi::tbb::flow::continue_msg & /*in*/,
196                           forward_node_type::output_ports_type &out_ports) {
197     if (my_count < 0)
198         abort();
199     --my_count;
200     (void)std::get<1>(out_ports).try_put(chopstick());
201     (void)std::get<2>(out_ports).try_put(chopstick());
202     if (my_count > 0) {
203         (void)std::get<0>(out_ports).try_put(
204             oneapi::tbb::flow::continue_msg()); //start thinking again
205     }
206     else {
207         if (verbose) {
208             oneapi::tbb::spin_mutex::scoped_lock lock(my_mutex);
209             std::printf("%s has left the building\n", name());
210         }
211     }
212 }
213 
eat()214 void philosopher::eat() {
215     if (verbose) {
216         oneapi::tbb::spin_mutex::scoped_lock lock(my_mutex);
217         std::printf("%s eating\n", name());
218     }
219     std::this_thread::sleep_for(eat_time);
220     if (verbose) {
221         oneapi::tbb::spin_mutex::scoped_lock lock(my_mutex);
222         std::printf("%s done eating\n", name());
223     }
224 }
225 
think()226 void philosopher::think() {
227     if (verbose) {
228         oneapi::tbb::spin_mutex::scoped_lock lock(my_mutex);
229         std::printf("%s thinking\n", name());
230     }
231     std::this_thread::sleep_for(think_time);
232     if (verbose) {
233         oneapi::tbb::spin_mutex::scoped_lock lock(my_mutex);
234         std::printf("%s done thinking\n", name());
235     }
236 }
237 
238 typedef oneapi::tbb::flow::queue_node<oneapi::tbb::flow::continue_msg> thinking_done_type;
239 
main(int argc,char * argv[])240 int main(int argc, char *argv[]) {
241     using oneapi::tbb::flow::make_edge;
242     using oneapi::tbb::flow::input_port;
243     using oneapi::tbb::flow::output_port;
244 
245     oneapi::tbb::tick_count main_time = oneapi::tbb::tick_count::now();
246     int num_threads;
247     int num_philosophers;
248 
249     RunOptions options = ParseCommandLine(argc, argv);
250     num_philosophers = options.number_of_philosophers;
251     verbose = !options.silent;
252 
253     for (num_threads = options.threads.first; num_threads <= options.threads.last;
254          num_threads = options.threads.step(num_threads)) {
255         oneapi::tbb::global_control c(oneapi::tbb::global_control::max_allowed_parallelism,
256                                       num_threads);
257 
258         oneapi::tbb::flow::graph g;
259 
260         if (verbose) {
261             std::cout << "\n"
262                       << num_philosophers << " philosophers with " << num_threads << " threads"
263                       << "\n"
264                       << "\n";
265         }
266         t0 = oneapi::tbb::tick_count::now();
267 
268         std::vector<oneapi::tbb::flow::queue_node<chopstick>> places(
269             num_philosophers, oneapi::tbb::flow::queue_node<chopstick>(g));
270         std::vector<philosopher> philosophers;
271         philosophers.reserve(num_philosophers);
272         std::vector<think_node_type *> think_nodes;
273         think_nodes.reserve(num_philosophers);
274         std::vector<thinking_done_type> done_vector(num_philosophers, thinking_done_type(g));
275         std::vector<join_node_type> join_vector(num_philosophers, join_node_type(g));
276         std::vector<eat_node_type *> eat_nodes;
277         eat_nodes.reserve(num_philosophers);
278         std::vector<forward_node_type *> forward_nodes;
279         forward_nodes.reserve(num_philosophers);
280         for (int i = 0; i < num_philosophers; ++i) {
281             places[i].try_put(chopstick());
282             philosophers.push_back(
283                 philosopher(names[i])); // allowed because of default generated assignment
284             if (verbose) {
285                 oneapi::tbb::spin_mutex::scoped_lock lock(my_mutex);
286                 std::cout << "Built philosopher " << philosophers[i] << "\n";
287             }
288             think_nodes.push_back(new think_node_type(
289                 g, oneapi::tbb::flow::unlimited, think_node_body(philosophers[i])));
290             eat_nodes.push_back(
291                 new eat_node_type(g, oneapi::tbb::flow::unlimited, eat_node_body(philosophers[i])));
292             forward_nodes.push_back(new forward_node_type(
293                 g, oneapi::tbb::flow::unlimited, forward_node_body(philosophers[i])));
294         }
295 
296         // attach chopstick buffers and think function_nodes to joins
297         for (int i = 0; i < num_philosophers; ++i) {
298             make_edge(*think_nodes[i], done_vector[i]);
299             make_edge(done_vector[i], input_port<0>(join_vector[i]));
300             make_edge(places[i], input_port<1>(join_vector[i])); // left chopstick
301             make_edge(places[(i + 1) % num_philosophers],
302                       input_port<2>(join_vector[i])); // right chopstick
303             make_edge(join_vector[i], *eat_nodes[i]);
304             make_edge(*eat_nodes[i], *forward_nodes[i]);
305             make_edge(output_port<0>(*forward_nodes[i]), *think_nodes[i]);
306             make_edge(output_port<1>(*forward_nodes[i]), places[i]);
307             make_edge(output_port<2>(*forward_nodes[i]), places[(i + 1) % num_philosophers]);
308         }
309 
310         // start all the philosophers thinking
311         for (int i = 0; i < num_philosophers; ++i)
312             think_nodes[i]->try_put(oneapi::tbb::flow::continue_msg());
313 
314         g.wait_for_all();
315 
316         oneapi::tbb::tick_count t1 = oneapi::tbb::tick_count::now();
317         if (verbose)
318             std::cout << "\n"
319                       << num_philosophers << " philosophers with " << num_threads
320                       << " threads have taken " << (t1 - t0).seconds() << "seconds"
321                       << "\n";
322 
323         for (int i = 0; i < num_philosophers; ++i)
324             philosophers[i].check();
325 
326         for (int i = 0; i < num_philosophers; ++i) {
327             delete think_nodes[i];
328             delete eat_nodes[i];
329             delete forward_nodes[i];
330         }
331     }
332 
333     utility::report_elapsed_time((oneapi::tbb::tick_count::now() - main_time).seconds());
334     return 0;
335 }
336