1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #if _MSC_VER 18 // Suppress "decorated name length exceeded, name was truncated" warning 19 #pragma warning(disable : 4503) 20 #endif 21 22 #include <cstdlib> 23 #include <cstdio> 24 25 #include <iostream> 26 #include <thread> 27 #include <chrono> 28 #include <tuple> 29 30 #include "oneapi/tbb/flow_graph.h" 31 #include "oneapi/tbb/tick_count.h" 32 #include "oneapi/tbb/spin_mutex.h" 33 #include "oneapi/tbb/global_control.h" 34 35 #include "common/utility/utility.hpp" 36 #include "common/utility/get_default_num_threads.hpp" 37 38 // Each philosopher is an object, and is invoked in the think() function_node, the 39 // eat() function_node and forward() multifunction_node. 40 // 41 // The graph is constructed, and each think() function_node is started with a continue_msg. 42 // 43 // The philosopher will think, then gather two chopsticks, eat, place the chopsticks back, 44 // and if they have not completed the required number of cycles, will start to think() again 45 // by sending a continue_msg to their corresponding think() function_node. 46 // 47 // The reserving join has as its inputs the left and right chopstick queues an a queue 48 // that stores the continue_msg emitted by the function_node after think()ing is done. 49 // When all three inputs are available, a tuple of the inputs will be forwarded to the 50 // eat() function_node. The output of the eat() function_node is sent to the forward() 51 // multifunction_node. 52 53 const std::chrono::seconds think_time(1); 54 const std::chrono::seconds eat_time(1); 55 const int num_times = 10; 56 57 oneapi::tbb::tick_count t0; 58 bool verbose = false; 59 60 const char *names[] = { "Archimedes", "Bakunin", "Confucius", "Democritus", "Euclid", 61 "Favorinus", "Geminus", "Heraclitus", "Ichthyas", "Jason of Nysa", 62 "Kant", "Lavrov", "Metrocles", "Nausiphanes", "Onatas", 63 "Phaedrus", "Quillot", "Russell", "Socrates", "Thales", 64 "Udayana", "Vernadsky", "Wittgenstein", "Xenophilus", "Yen Yuan", 65 "Zenodotus" }; 66 const int NumPhilosophers = sizeof(names) / sizeof(char *); 67 68 struct RunOptions { 69 utility::thread_number_range threads; 70 int number_of_philosophers; 71 bool silent; 72 RunOptions(utility::thread_number_range threads_, int number_of_philosophers_, bool silent_) 73 : threads(threads_), 74 number_of_philosophers(number_of_philosophers_), 75 silent(silent_) {} 76 }; 77 78 RunOptions ParseCommandLine(int argc, char *argv[]) { 79 int auto_threads = utility::get_default_num_threads(); 80 utility::thread_number_range threads( 81 utility::get_default_num_threads, auto_threads, auto_threads); 82 int nPhilosophers = 5; 83 bool verbose = false; 84 char charbuf[100]; 85 std::sprintf(charbuf, "%d", NumPhilosophers); 86 std::string pCount = "how many philosophers, from 2-"; 87 pCount += charbuf; 88 89 utility::cli_argument_pack cli_pack; 90 cli_pack.positional_arg(threads, "n-of_threads", utility::thread_number_range_desc) 91 .positional_arg(nPhilosophers, "n-of-philosophers", pCount) 92 .arg(verbose, "verbose", "verbose output"); 93 utility::parse_cli_arguments(argc, argv, cli_pack); 94 if (nPhilosophers < 2 || nPhilosophers > NumPhilosophers) { 95 std::cout << "Number of philosophers (" << nPhilosophers 96 << ") out of range [2:" << NumPhilosophers << "]\n"; 97 std::cout << cli_pack.usage_string(argv[0]) << std::flush; 98 std::exit(-1); 99 } 100 return RunOptions(threads, nPhilosophers, !verbose); 101 } 102 103 oneapi::tbb::spin_mutex my_mutex; 104 105 class chopstick {}; 106 107 typedef std::tuple<oneapi::tbb::flow::continue_msg, chopstick, chopstick> join_output; 108 typedef oneapi::tbb::flow::join_node<join_output, oneapi::tbb::flow::reserving> join_node_type; 109 110 typedef oneapi::tbb::flow::function_node<oneapi::tbb::flow::continue_msg, 111 oneapi::tbb::flow::continue_msg> 112 think_node_type; 113 typedef oneapi::tbb::flow::function_node<join_output, oneapi::tbb::flow::continue_msg> 114 eat_node_type; 115 typedef oneapi::tbb::flow::multifunction_node<oneapi::tbb::flow::continue_msg, join_output> 116 forward_node_type; 117 118 class philosopher { 119 public: 120 philosopher(const char *name) : my_name(name), my_count(num_times) {} 121 122 ~philosopher() {} 123 124 void check(); 125 const char *name() const { 126 return my_name; 127 } 128 129 private: 130 friend std::ostream &operator<<(std::ostream &o, philosopher const &p); 131 132 const char *my_name; 133 int my_count; 134 135 friend class think_node_body; 136 friend class eat_node_body; 137 friend class forward_node_body; 138 139 void think(); 140 void eat(); 141 void forward(const oneapi::tbb::flow::continue_msg &in, 142 forward_node_type::output_ports_type &out_ports); 143 }; 144 145 std::ostream &operator<<(std::ostream &o, philosopher const &p) { 146 o << "< philosopher[" << reinterpret_cast<uintptr_t>(const_cast<philosopher *>(&p)) << "] " 147 << p.name() << ", my_count=" << p.my_count; 148 149 return o; 150 } 151 152 class think_node_body { 153 philosopher &my_philosopher; 154 155 public: 156 think_node_body(philosopher &p) : my_philosopher(p) {} 157 think_node_body(const think_node_body &other) : my_philosopher(other.my_philosopher) {} 158 oneapi::tbb::flow::continue_msg operator()(oneapi::tbb::flow::continue_msg /*m*/) { 159 my_philosopher.think(); 160 return oneapi::tbb::flow::continue_msg(); 161 } 162 }; 163 164 class eat_node_body { 165 philosopher &my_philosopher; 166 167 public: 168 eat_node_body(philosopher &p) : my_philosopher(p) {} 169 eat_node_body(const eat_node_body &other) : my_philosopher(other.my_philosopher) {} 170 oneapi::tbb::flow::continue_msg operator()(const join_output &in) { 171 my_philosopher.eat(); 172 return oneapi::tbb::flow::continue_msg(); 173 } 174 }; 175 176 class forward_node_body { 177 philosopher &my_philosopher; 178 179 public: 180 forward_node_body(philosopher &p) : my_philosopher(p) {} 181 forward_node_body(const forward_node_body &other) : my_philosopher(other.my_philosopher) {} 182 void operator()(const oneapi::tbb::flow::continue_msg &in, 183 forward_node_type::output_ports_type &out) { 184 my_philosopher.forward(in, out); 185 } 186 }; 187 188 void philosopher::check() { 189 if (my_count != 0) { 190 std::printf("ERROR: philosopher %s still had to run %d more times\n", name(), my_count); 191 std::exit(-1); 192 } 193 } 194 195 void philosopher::forward(const oneapi::tbb::flow::continue_msg & /*in*/, 196 forward_node_type::output_ports_type &out_ports) { 197 if (my_count < 0) 198 abort(); 199 --my_count; 200 (void)std::get<1>(out_ports).try_put(chopstick()); 201 (void)std::get<2>(out_ports).try_put(chopstick()); 202 if (my_count > 0) { 203 (void)std::get<0>(out_ports).try_put( 204 oneapi::tbb::flow::continue_msg()); //start thinking again 205 } 206 else { 207 if (verbose) { 208 oneapi::tbb::spin_mutex::scoped_lock lock(my_mutex); 209 std::printf("%s has left the building\n", name()); 210 } 211 } 212 } 213 214 void philosopher::eat() { 215 if (verbose) { 216 oneapi::tbb::spin_mutex::scoped_lock lock(my_mutex); 217 std::printf("%s eating\n", name()); 218 } 219 std::this_thread::sleep_for(eat_time); 220 if (verbose) { 221 oneapi::tbb::spin_mutex::scoped_lock lock(my_mutex); 222 std::printf("%s done eating\n", name()); 223 } 224 } 225 226 void philosopher::think() { 227 if (verbose) { 228 oneapi::tbb::spin_mutex::scoped_lock lock(my_mutex); 229 std::printf("%s thinking\n", name()); 230 } 231 std::this_thread::sleep_for(think_time); 232 if (verbose) { 233 oneapi::tbb::spin_mutex::scoped_lock lock(my_mutex); 234 std::printf("%s done thinking\n", name()); 235 } 236 } 237 238 typedef oneapi::tbb::flow::queue_node<oneapi::tbb::flow::continue_msg> thinking_done_type; 239 240 int main(int argc, char *argv[]) { 241 using oneapi::tbb::flow::make_edge; 242 using oneapi::tbb::flow::input_port; 243 using oneapi::tbb::flow::output_port; 244 245 oneapi::tbb::tick_count main_time = oneapi::tbb::tick_count::now(); 246 int num_threads; 247 int num_philosophers; 248 249 RunOptions options = ParseCommandLine(argc, argv); 250 num_philosophers = options.number_of_philosophers; 251 verbose = !options.silent; 252 253 for (num_threads = options.threads.first; num_threads <= options.threads.last; 254 num_threads = options.threads.step(num_threads)) { 255 oneapi::tbb::global_control c(oneapi::tbb::global_control::max_allowed_parallelism, 256 num_threads); 257 258 oneapi::tbb::flow::graph g; 259 260 if (verbose) { 261 std::cout << "\n" 262 << num_philosophers << " philosophers with " << num_threads << " threads" 263 << "\n" 264 << "\n"; 265 } 266 t0 = oneapi::tbb::tick_count::now(); 267 268 std::vector<oneapi::tbb::flow::queue_node<chopstick>> places( 269 num_philosophers, oneapi::tbb::flow::queue_node<chopstick>(g)); 270 std::vector<philosopher> philosophers; 271 philosophers.reserve(num_philosophers); 272 std::vector<think_node_type *> think_nodes; 273 think_nodes.reserve(num_philosophers); 274 std::vector<thinking_done_type> done_vector(num_philosophers, thinking_done_type(g)); 275 std::vector<join_node_type> join_vector(num_philosophers, join_node_type(g)); 276 std::vector<eat_node_type *> eat_nodes; 277 eat_nodes.reserve(num_philosophers); 278 std::vector<forward_node_type *> forward_nodes; 279 forward_nodes.reserve(num_philosophers); 280 for (int i = 0; i < num_philosophers; ++i) { 281 places[i].try_put(chopstick()); 282 philosophers.push_back( 283 philosopher(names[i])); // allowed because of default generated assignment 284 if (verbose) { 285 oneapi::tbb::spin_mutex::scoped_lock lock(my_mutex); 286 std::cout << "Built philosopher " << philosophers[i] << "\n"; 287 } 288 think_nodes.push_back(new think_node_type( 289 g, oneapi::tbb::flow::unlimited, think_node_body(philosophers[i]))); 290 eat_nodes.push_back( 291 new eat_node_type(g, oneapi::tbb::flow::unlimited, eat_node_body(philosophers[i]))); 292 forward_nodes.push_back(new forward_node_type( 293 g, oneapi::tbb::flow::unlimited, forward_node_body(philosophers[i]))); 294 } 295 296 // attach chopstick buffers and think function_nodes to joins 297 for (int i = 0; i < num_philosophers; ++i) { 298 make_edge(*think_nodes[i], done_vector[i]); 299 make_edge(done_vector[i], input_port<0>(join_vector[i])); 300 make_edge(places[i], input_port<1>(join_vector[i])); // left chopstick 301 make_edge(places[(i + 1) % num_philosophers], 302 input_port<2>(join_vector[i])); // right chopstick 303 make_edge(join_vector[i], *eat_nodes[i]); 304 make_edge(*eat_nodes[i], *forward_nodes[i]); 305 make_edge(output_port<0>(*forward_nodes[i]), *think_nodes[i]); 306 make_edge(output_port<1>(*forward_nodes[i]), places[i]); 307 make_edge(output_port<2>(*forward_nodes[i]), places[(i + 1) % num_philosophers]); 308 } 309 310 // start all the philosophers thinking 311 for (int i = 0; i < num_philosophers; ++i) 312 think_nodes[i]->try_put(oneapi::tbb::flow::continue_msg()); 313 314 g.wait_for_all(); 315 316 oneapi::tbb::tick_count t1 = oneapi::tbb::tick_count::now(); 317 if (verbose) 318 std::cout << "\n" 319 << num_philosophers << " philosophers with " << num_threads 320 << " threads have taken " << (t1 - t0).seconds() << "seconds" 321 << "\n"; 322 323 for (int i = 0; i < num_philosophers; ++i) 324 philosophers[i].check(); 325 326 for (int i = 0; i < num_philosophers; ++i) { 327 delete think_nodes[i]; 328 delete eat_nodes[i]; 329 delete forward_nodes[i]; 330 } 331 } 332 333 utility::report_elapsed_time((oneapi::tbb::tick_count::now() - main_time).seconds()); 334 return 0; 335 } 336