1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/config.h" 18 19 #if _MSC_VER 20 #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning 21 #if _MSC_VER==1700 && !defined(__INTEL_COMPILER) 22 // Suppress "unreachable code" warning by VC++ 17.0 (VS 2012) 23 #pragma warning (disable: 4702) 24 #endif 25 #endif 26 27 // need these to get proper external names for private methods in library. 28 #include "tbb/spin_mutex.h" 29 #include "tbb/spin_rw_mutex.h" 30 #include "tbb/task_arena.h" 31 #include "tbb/task_group.h" 32 33 #define private public 34 #define protected public 35 #include "tbb/flow_graph.h" 36 #undef protected 37 #undef private 38 39 #include "common/test.h" 40 #include "common/utils.h" 41 #include "common/spin_barrier.h" 42 #include "common/graph_utils.h" 43 44 #include <string> // merely prevents LNK2001 error to happen (on ICL+VC9 configurations) 45 46 //! \file test_flow_graph_whitebox.cpp 47 //! \brief Test for [flow_graph.broadcast_node flow_graph.priority_queue_node flow_graph.indexer_node flow_graph.sequencer_node flow_graph.remove_edge flow_graph.join_node flow_graph.split_node flow_graph.limiter_node flow_graph.write_once_node flow_graph.overwrite_node flow_graph.make_edge flow_graph.graph flow_graph.buffer_node flow_graph.function_node flow_graph.multifunction_node flow_graph.continue_node flow_graph.input_node] specification 48 49 template<typename T> 50 struct receiverBody { 51 tbb::flow::continue_msg operator()(const T &/*in*/) { 52 return tbb::flow::continue_msg(); 53 } 54 }; 55 56 // split_nodes cannot have predecessors 57 // they do not reject messages and always forward. 58 // they reject edge reversals from successors. 59 void TestSplitNode() { 60 typedef tbb::flow::split_node<std::tuple<int> > snode_type; 61 tbb::flow::graph g; 62 snode_type snode(g); 63 tbb::flow::function_node<int> rcvr(g,tbb::flow::unlimited, receiverBody<int>()); 64 INFO("Testing split_node\n"); 65 CHECK_MESSAGE( (tbb::flow::output_port<0>(snode).my_successors.empty()), "Constructed split_node has successors"); 66 // tbb::flow::output_port<0>(snode) 67 tbb::flow::make_edge(tbb::flow::output_port<0>(snode), rcvr); 68 CHECK_MESSAGE( (!(tbb::flow::output_port<0>(snode).my_successors.empty())), "after make_edge, split_node has no successor."); 69 snode.try_put(std::tuple<int>(1)); 70 g.wait_for_all(); 71 g.reset(); 72 CHECK_MESSAGE( (!(tbb::flow::output_port<0>(snode).my_successors.empty())), "after reset(), split_node has no successor."); 73 g.reset(tbb::flow::rf_clear_edges); 74 CHECK_MESSAGE( (tbb::flow::output_port<0>(snode).my_successors.empty()), "after reset(rf_clear_edges), split_node has a successor."); 75 } 76 77 // buffering nodes cannot have predecessors 78 // they do not reject messages and always save or forward 79 // they allow edge reversals from successors 80 template< typename B > 81 void TestBufferingNode(const char * name) { 82 tbb::flow::graph g; 83 B bnode(g); 84 tbb::flow::function_node<int,int,tbb::flow::rejecting> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); 85 INFO("Testing " << name << ":"); 86 for(int icnt = 0; icnt < 2; icnt++) { 87 bool reverse_edge = (icnt & 0x2) != 0; 88 serial_fn_state0 = 0; // reset to waiting state. 89 INFO(" make_edge"); 90 tbb::flow::make_edge(bnode, fnode); 91 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after make_edge"); 92 std::thread t([&] { 93 INFO(" try_put"); 94 bnode.try_put(1); // will forward to the fnode 95 g.wait_for_all(); 96 }); 97 utils::SpinWaitWhileEq(serial_fn_state0, 0); 98 if(reverse_edge) { 99 INFO(" try_put2"); 100 bnode.try_put(2); // should reverse the edge 101 // waiting for the edge to reverse 102 utils::SpinWaitWhile([&] { return !bnode.my_successors.empty(); }); 103 } 104 else { 105 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after forwarding message"); 106 } 107 serial_fn_state0 = 0; // release the function_node. 108 if(reverse_edge) { 109 // have to do a second release because the function_node will get the 2nd item 110 utils::SpinWaitWhileEq(serial_fn_state0, 0); 111 serial_fn_state0 = 0; // release the function_node. 112 } 113 t.join(); 114 INFO(" remove_edge"); 115 tbb::flow::remove_edge(bnode, fnode); 116 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after remove_edge"); 117 } 118 tbb::flow::join_node<std::tuple<int,int>,tbb::flow::reserving> jnode(g); 119 tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode)); // will spawn a task 120 g.wait_for_all(); 121 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after attaching to join"); 122 INFO(" reverse"); 123 bnode.try_put(1); // the edge should reverse 124 g.wait_for_all(); 125 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving"); 126 INFO(" reset()"); 127 g.wait_for_all(); 128 g.reset(); // should be in forward direction again 129 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after reset()"); 130 INFO(" remove_edge"); 131 g.reset(tbb::flow::rf_clear_edges); 132 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)"); 133 tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode)); // add edge again 134 // reverse edge by adding to buffer. 135 bnode.try_put(1); // the edge should reverse 136 g.wait_for_all(); 137 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving"); 138 INFO(" remove_edge(reversed)"); 139 g.reset(tbb::flow::rf_clear_edges); 140 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has no successor after reset()"); 141 CHECK_MESSAGE( (tbb::flow::input_port<0>(jnode).my_predecessors.empty()), "predecessor not reset"); 142 INFO(" done\n"); 143 g.wait_for_all(); 144 } 145 146 // continue_node has only predecessor count 147 // they do not have predecessors, only the counts 148 // successor edges cannot be reversed 149 void TestContinueNode() { 150 tbb::flow::graph g; 151 tbb::flow::function_node<int> fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); 152 tbb::flow::continue_node<int> cnode(g, /*number_of_predecessors*/ 1, 153 serial_continue_body<int>(serial_continue_state0)); 154 tbb::flow::function_node<int> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state1)); 155 tbb::flow::make_edge(fnode0, cnode); 156 tbb::flow::make_edge(cnode, fnode1); 157 INFO("Testing continue_node:"); 158 for( int icnt = 0; icnt < 2; ++icnt ) { 159 INFO( " initial" << icnt); 160 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor addition didn't increment count"); 161 CHECK_MESSAGE( (!cnode.successors().empty()), "successors empty though we added one"); 162 CHECK_MESSAGE( (cnode.my_current_count == 0), "state of continue_receiver incorrect"); 163 serial_continue_state0 = 0; 164 serial_fn_state0 = 0; 165 serial_fn_state1 = 0; 166 167 std::thread t([&] { 168 fnode0.try_put(1); // start the first function node. 169 if(icnt == 0) { // first time through, let the continue_node fire 170 INFO(" firing"); 171 fnode0.try_put(1); // second message 172 g.wait_for_all(); 173 174 // try a try_get() 175 { 176 int i; 177 CHECK_MESSAGE( (!cnode.try_get(i)), "try_get not rejected"); 178 } 179 180 INFO(" reset"); 181 CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (before reset)"); 182 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (before reset)"); 183 g.reset(); // should still be the same 184 CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (after reset)" ); 185 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (after reset)"); 186 } 187 else { // we're going to see if the rf_clear_edges resets things. 188 g.wait_for_all(); 189 INFO(" reset(rf_clear_edges)"); 190 CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (before reset)" ); 191 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (before reset)" ); 192 g.reset(tbb::flow::rf_clear_edges); // should be in forward direction again 193 CHECK_MESSAGE( (cnode.my_current_count == 0), "state of continue_receiver incorrect after reset(rf_clear_edges)" ); 194 CHECK_MESSAGE( (cnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)" ); 195 CHECK_MESSAGE( (cnode.my_predecessor_count == cnode.my_initial_predecessor_count), "predecessor count not reset" ); 196 } 197 }); 198 199 utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting for the first message to arrive in function_node 200 // Now the body of function_node 0 is executing. 201 serial_fn_state0 = 0; // release the node 202 if (icnt == 0) { 203 // wait for node to count the message (or for the node body to execute, which would be wrong) 204 utils::SpinWaitWhile([&] { 205 tbb::spin_mutex::scoped_lock l(cnode.my_mutex); 206 return serial_continue_state0 == 0 && cnode.my_current_count == 0; 207 }); 208 CHECK_MESSAGE( (serial_continue_state0 == 0), "Improperly released continue_node"); 209 CHECK_MESSAGE( (cnode.my_current_count == 1), "state of continue_receiver incorrect"); 210 211 utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting for the second message to arrive in function_node 212 // Now the body of function_node 0 is executing. 213 serial_fn_state0 = 0; // release the node 214 215 utils::SpinWaitWhileEq(serial_continue_state0, 0); // waiting for continue_node to start 216 CHECK_MESSAGE( (cnode.my_current_count == 0), " my_current_count not reset before body of continue_node started"); 217 serial_continue_state0 = 0; // release the continue_node 218 219 utils::SpinWaitWhileEq(serial_fn_state1, 0); // wait for the successor function_node to enter body 220 serial_fn_state1 = 0; // release successor function_node. 221 } 222 223 t.join(); 224 } 225 226 INFO(" done\n"); 227 228 } 229 230 // function_node has predecessors and successors 231 // try_get() rejects 232 // successor edges cannot be reversed 233 // predecessors will reverse (only rejecting will reverse) 234 void TestFunctionNode() { 235 tbb::flow::graph g; 236 tbb::flow::queue_node<int> qnode0(g); 237 tbb::flow::function_node<int,int, tbb::flow::rejecting > fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); 238 tbb::flow::function_node<int,int/*, tbb::flow::queueing*/> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); 239 240 tbb::flow::queue_node<int> qnode1(g); 241 242 tbb::flow::make_edge(fnode0, qnode1); 243 tbb::flow::make_edge(qnode0, fnode0); 244 245 serial_fn_state0 = 2; // just let it go 246 qnode0.try_put(1); 247 g.wait_for_all(); 248 int ii; 249 CHECK_MESSAGE( (qnode1.try_get(ii) && ii == 1), "output not passed" ); 250 tbb::flow::remove_edge(qnode0, fnode0); 251 tbb::flow::remove_edge(fnode0, qnode1); 252 253 tbb::flow::make_edge(fnode1, qnode1); 254 tbb::flow::make_edge(qnode0, fnode1); 255 256 serial_fn_state0 = 2; // just let it go 257 qnode0.try_put(1); 258 g.wait_for_all(); 259 CHECK_MESSAGE( (qnode1.try_get(ii) && ii == 1), "output not passed" ); 260 tbb::flow::remove_edge(qnode0, fnode1); 261 tbb::flow::remove_edge(fnode1, qnode1); 262 263 // rejecting 264 serial_fn_state0 = 0; 265 std::atomic<bool> rejected{ false }; 266 std::thread t([&] { 267 g.reset(); // attach to the current arena 268 tbb::flow::make_edge(fnode0, qnode1); 269 tbb::flow::make_edge(qnode0, fnode0); // TODO: invesigate why it always creates a forwarding task 270 INFO("Testing rejecting function_node:"); 271 CHECK_MESSAGE( (!fnode0.my_queue), "node should have no queue"); 272 CHECK_MESSAGE( (!fnode0.my_successors.empty()), "successor edge not added"); 273 qnode0.try_put(1); 274 qnode0.try_put(2); // rejecting node should reject, reverse. 275 rejected = true; 276 g.wait_for_all(); 277 }); 278 utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting rejecting node to start 279 utils::SpinWaitWhileEq(rejected, false); 280 CHECK(fnode0.my_predecessors.empty() == false); 281 serial_fn_state0 = 2; // release function_node body. 282 t.join(); 283 INFO(" reset"); 284 g.reset(); // should reverse the edge from the input to the function node. 285 CHECK_MESSAGE( (!qnode0.my_successors.empty()), "empty successors after reset()"); 286 CHECK_MESSAGE( (fnode0.my_predecessors.empty()), "predecessor not reversed"); 287 tbb::flow::remove_edge(qnode0, fnode0); 288 tbb::flow::remove_edge(fnode0, qnode1); 289 INFO("\n"); 290 291 // queueing 292 tbb::flow::make_edge(fnode1, qnode1); 293 INFO("Testing queueing function_node:"); 294 CHECK_MESSAGE( (fnode1.my_queue), "node should have no queue"); 295 CHECK_MESSAGE( (!fnode1.my_successors.empty()), "successor edge not added"); 296 INFO(" add_pred"); 297 CHECK_MESSAGE( (fnode1.register_predecessor(qnode0)), "Cannot register as predecessor"); 298 CHECK_MESSAGE( (!fnode1.my_predecessors.empty()), "Missing predecessor"); 299 INFO(" reset"); 300 g.wait_for_all(); 301 g.reset(); // should reverse the edge from the input to the function node. 302 CHECK_MESSAGE( (!qnode0.my_successors.empty()), "empty successors after reset()"); 303 CHECK_MESSAGE( (fnode1.my_predecessors.empty()), "predecessor not reversed"); 304 tbb::flow::remove_edge(qnode0, fnode1); 305 tbb::flow::remove_edge(fnode1, qnode1); 306 INFO("\n"); 307 308 serial_fn_state0 = 0; // make the function_node wait 309 rejected = false; 310 std::thread t2([&] { 311 g.reset(); // attach to the current arena 312 313 tbb::flow::make_edge(qnode0, fnode0); // TODO: invesigate why it always creates a forwarding task 314 315 INFO(" start_func"); 316 qnode0.try_put(1); 317 // now if we put an item to the queues the edges to the function_node will reverse. 318 INFO(" put_node(2)"); 319 qnode0.try_put(2); // start queue node. 320 rejected = true; 321 g.wait_for_all(); 322 }); 323 utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting rejecting node to start 324 // wait for the edges to reverse 325 utils::SpinWaitWhileEq(rejected, false); 326 CHECK(fnode0.my_predecessors.empty() == false); 327 g.my_context->cancel_group_execution(); 328 // release the function_node 329 serial_fn_state0 = 2; 330 t2.join(); 331 CHECK_MESSAGE( (!fnode0.my_predecessors.empty() && qnode0.my_successors.empty()), "function_node edge not reversed"); 332 g.reset(tbb::flow::rf_clear_edges); 333 CHECK_MESSAGE( (fnode0.my_predecessors.empty() && qnode0.my_successors.empty()), "function_node edge not removed"); 334 CHECK_MESSAGE( (fnode0.my_successors.empty()), "successor to fnode not removed"); 335 INFO(" done\n"); 336 } 337 338 template<typename TT> 339 class tag_func { 340 TT my_mult; 341 public: 342 tag_func(TT multiplier) : my_mult(multiplier) { } 343 // operator() will return [0 .. Count) 344 tbb::flow::tag_value operator()( TT v) { 345 tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult); 346 return t; 347 } 348 }; 349 350 template<typename JNODE_TYPE> 351 void 352 TestSimpleSuccessorArc(const char *name) { 353 tbb::flow::graph g; 354 { 355 INFO("Join<" << name << "> successor test "); 356 tbb::flow::join_node<std::tuple<int>, JNODE_TYPE> qj(g); 357 tbb::flow::broadcast_node<std::tuple<int> > bnode(g); 358 tbb::flow::make_edge(qj, bnode); 359 CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after linking"); 360 g.reset(); 361 CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after reset()"); 362 g.reset(tbb::flow::rf_clear_edges); 363 CHECK_MESSAGE( (qj.my_successors.empty()), "successors not removed after reset(rf_clear_edges)"); 364 } 365 } 366 367 template<> 368 void 369 TestSimpleSuccessorArc<tbb::flow::tag_matching>(const char *name) { 370 tbb::flow::graph g; 371 { 372 INFO("Join<" << name << "> successor test "); 373 typedef std::tuple<int,int> my_tuple; 374 tbb::flow::join_node<my_tuple, tbb::flow::tag_matching> qj(g, 375 tag_func<int>(1), 376 tag_func<int>(1) 377 ); 378 tbb::flow::broadcast_node<my_tuple > bnode(g); 379 tbb::flow::make_edge(qj, bnode); 380 CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after linking"); 381 g.reset(); 382 CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after reset()"); 383 g.reset(tbb::flow::rf_clear_edges); 384 CHECK_MESSAGE( (qj.my_successors.empty()), "successors not removed after reset(rf_clear_edges)"); 385 } 386 } 387 388 void 389 TestJoinNode() { 390 tbb::flow::graph g; 391 392 TestSimpleSuccessorArc<tbb::flow::queueing>("queueing"); 393 TestSimpleSuccessorArc<tbb::flow::reserving>("reserving"); 394 TestSimpleSuccessorArc<tbb::flow::tag_matching>("tag_matching"); 395 396 // queueing and tagging join nodes have input queues, so the input ports do not reverse. 397 INFO(" reserving preds"); 398 { 399 tbb::flow::join_node<std::tuple<int,int>, tbb::flow::reserving> rj(g); 400 tbb::flow::queue_node<int> q0(g); 401 tbb::flow::queue_node<int> q1(g); 402 tbb::flow::make_edge(q0,tbb::flow::input_port<0>(rj)); 403 tbb::flow::make_edge(q1,tbb::flow::input_port<1>(rj)); 404 q0.try_put(1); 405 g.wait_for_all(); // quiesce 406 CHECK_MESSAGE( (!(tbb::flow::input_port<0>(rj).my_predecessors.empty())),"reversed port missing predecessor"); 407 CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"non-reversed port has pred"); 408 g.reset(); 409 CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"reversed port has pred after reset()"); 410 CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()"); 411 q1.try_put(2); 412 g.wait_for_all(); // quiesce 413 CHECK_MESSAGE( (!(tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port missing predecessor"); 414 CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred"); 415 g.reset(); 416 CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port has pred after reset()"); 417 CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()"); 418 // should reset predecessors just as regular reset. 419 q1.try_put(3); 420 g.wait_for_all(); // quiesce 421 CHECK_MESSAGE( (!(tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port missing predecessor"); 422 CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred"); 423 g.reset(tbb::flow::rf_clear_edges); 424 CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port has pred after reset()"); 425 CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()"); 426 CHECK_MESSAGE( (q0.my_successors.empty()), "edge not removed by reset(rf_clear_edges)"); 427 CHECK_MESSAGE( (q1.my_successors.empty()), "edge not removed by reset(rf_clear_edges)"); 428 } 429 INFO(" done\n"); 430 } 431 432 template <typename DecrementerType> 433 struct limiter_node_type { 434 using type = tbb::flow::limiter_node<int, DecrementerType>; 435 using dtype = DecrementerType; 436 }; 437 438 template <> 439 struct limiter_node_type<void> { 440 using type = tbb::flow::limiter_node<int>; 441 using dtype = tbb::flow::continue_msg; 442 }; 443 444 template <typename DType> 445 struct DecrementerHelper { 446 template <typename Decrementer> 447 static void check(Decrementer&) {} 448 static DType makeDType() { 449 return DType(1); 450 } 451 }; 452 453 template <> 454 struct DecrementerHelper<tbb::flow::continue_msg> { 455 template <typename Decrementer> 456 static void check(Decrementer& decrementer) { 457 auto& d = static_cast<tbb::detail::d1::continue_receiver&>(decrementer); 458 CHECK_MESSAGE(d.my_predecessor_count == 0, "error in pred count"); 459 CHECK_MESSAGE(d.my_initial_predecessor_count == 0, "error in initial pred count"); 460 CHECK_MESSAGE(d.my_current_count == 0, "error in current count"); 461 } 462 static tbb::flow::continue_msg makeDType() { 463 return tbb::flow::continue_msg(); 464 } 465 }; 466 467 template <typename DecrementerType> 468 void TestLimiterNode() { 469 int out_int{}; 470 tbb::flow::graph g; 471 using dtype = typename limiter_node_type<DecrementerType>::dtype; 472 typename limiter_node_type<DecrementerType>::type ln(g,1); 473 INFO("Testing limiter_node: preds and succs"); 474 DecrementerHelper<dtype>::check(ln.decrementer()); 475 CHECK_MESSAGE( (ln.my_threshold == 1), "error in my_threshold"); 476 tbb::flow::queue_node<int> inq(g); 477 tbb::flow::queue_node<int> outq(g); 478 tbb::flow::broadcast_node<dtype> bn(g); 479 480 tbb::flow::make_edge(inq,ln); 481 tbb::flow::make_edge(ln,outq); 482 tbb::flow::make_edge(bn,ln.decrementer()); 483 484 g.wait_for_all(); 485 CHECK_MESSAGE( (!(ln.my_successors.empty())),"successors empty after make_edge"); 486 CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge reversed"); 487 inq.try_put(1); 488 g.wait_for_all(); 489 CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 1), "limiter_node didn't pass first value"); 490 CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge reversed"); 491 inq.try_put(2); 492 g.wait_for_all(); 493 CHECK_MESSAGE( (!outq.try_get(out_int)), "limiter_node incorrectly passed second input"); 494 CHECK_MESSAGE( (!ln.my_predecessors.empty()), "input edge to limiter_node not reversed"); 495 bn.try_put(DecrementerHelper<dtype>::makeDType()); 496 g.wait_for_all(); 497 CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 2), "limiter_node didn't pass second value"); 498 g.wait_for_all(); 499 CHECK_MESSAGE( (!ln.my_predecessors.empty()), "input edge was reversed(after try_get())"); 500 g.reset(); 501 CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge not reset"); 502 inq.try_put(3); 503 g.wait_for_all(); 504 CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 3), "limiter_node didn't pass third value"); 505 506 INFO(" rf_clear_edges"); 507 // currently the limiter_node will not pass another message 508 g.reset(tbb::flow::rf_clear_edges); 509 DecrementerHelper<dtype>::check(ln.decrementer()); 510 CHECK_MESSAGE( (ln.my_threshold == 1), "error in my_threshold"); 511 CHECK_MESSAGE( (ln.my_predecessors.empty()), "preds not reset(rf_clear_edges)"); 512 CHECK_MESSAGE( (ln.my_successors.empty()), "preds not reset(rf_clear_edges)"); 513 CHECK_MESSAGE( (inq.my_successors.empty()), "Arc not removed on reset(rf_clear_edges)"); 514 CHECK_MESSAGE( (inq.my_successors.empty()), "Arc not removed on reset(rf_clear_edges)"); 515 CHECK_MESSAGE( (bn.my_successors.empty()), "control edge not removed on reset(rf_clear_edges)"); 516 tbb::flow::make_edge(inq,ln); 517 tbb::flow::make_edge(ln,outq); 518 inq.try_put(4); 519 inq.try_put(5); 520 g.wait_for_all(); 521 CHECK_MESSAGE( (outq.try_get(out_int)),"missing output after reset(rf_clear_edges)"); 522 CHECK_MESSAGE( (out_int == 4), "input incorrect (4)"); 523 bn.try_put(DecrementerHelper<dtype>::makeDType()); 524 g.wait_for_all(); 525 CHECK_MESSAGE( (!outq.try_get(out_int)),"second output incorrectly passed (rf_clear_edges)"); 526 INFO(" done\n"); 527 } 528 529 template<typename MF_TYPE> 530 struct mf_body { 531 std::atomic<int>& my_flag; 532 mf_body(std::atomic<int>& flag) : my_flag(flag) { } 533 void operator()(const int& in, typename MF_TYPE::output_ports_type& outports) { 534 if(my_flag == 0) { 535 my_flag = 1; 536 537 utils::SpinWaitWhileEq(my_flag, 1); 538 } 539 540 if (in & 0x1) 541 std::get<1>(outports).try_put(in); 542 else 543 std::get<0>(outports).try_put(in); 544 } 545 }; 546 547 template<typename P, typename T> 548 struct test_reversal; 549 template<typename T> 550 struct test_reversal<tbb::flow::queueing, T> { 551 test_reversal() { INFO("<queueing>"); } 552 // queueing node will not reverse. 553 bool operator()(T& node) const { return node.my_predecessors.empty(); } 554 }; 555 556 template<typename T> 557 struct test_reversal<tbb::flow::rejecting, T> { 558 test_reversal() { INFO("<rejecting>"); } 559 bool operator()(T& node) const { return !node.my_predecessors.empty(); } 560 }; 561 562 template<typename P> 563 void TestMultifunctionNode() { 564 typedef tbb::flow::multifunction_node<int, std::tuple<int, int>, P> multinode_type; 565 INFO("Testing multifunction_node"); 566 test_reversal<P,multinode_type> my_test; 567 INFO(":"); 568 tbb::flow::graph g; 569 multinode_type mf(g, tbb::flow::serial, mf_body<multinode_type>(serial_fn_state0)); 570 tbb::flow::queue_node<int> qin(g); 571 tbb::flow::queue_node<int> qodd_out(g); 572 tbb::flow::queue_node<int> qeven_out(g); 573 tbb::flow::make_edge(qin,mf); 574 tbb::flow::make_edge(tbb::flow::output_port<0>(mf), qeven_out); 575 tbb::flow::make_edge(tbb::flow::output_port<1>(mf), qodd_out); 576 g.wait_for_all(); 577 for (int ii = 0; ii < 2 ; ++ii) { 578 serial_fn_state0 = 0; 579 /* if(ii == 0) REMARK(" reset preds"); else REMARK(" 2nd");*/ 580 std::thread t([&] { 581 g.reset(); // attach to the current arena 582 qin.try_put(0); 583 qin.try_put(1); 584 g.wait_for_all(); 585 }); 586 // wait for node to be active 587 utils::SpinWaitWhileEq(serial_fn_state0, 0); 588 utils::SpinWaitWhile( [&] { return !my_test(mf); }); 589 g.my_context->cancel_group_execution(); 590 // release node 591 serial_fn_state0 = 2; 592 t.join(); 593 CHECK_MESSAGE( (my_test(mf)), "fail cancel group test"); 594 if( ii == 1) { 595 INFO(" rf_clear_edges"); 596 g.reset(tbb::flow::rf_clear_edges); 597 CHECK_MESSAGE( (tbb::flow::output_port<0>(mf).my_successors.empty()), "output_port<0> not reset (rf_clear_edges)"); 598 CHECK_MESSAGE( (tbb::flow::output_port<1>(mf).my_successors.empty()), "output_port<1> not reset (rf_clear_edges)"); 599 } 600 else 601 { 602 g.reset(); 603 } 604 CHECK_MESSAGE( (mf.my_predecessors.empty()), "edge didn't reset"); 605 CHECK_MESSAGE( ((ii == 0 && !qin.my_successors.empty()) || (ii == 1 && qin.my_successors.empty())), "edge didn't reset"); 606 } 607 INFO(" done\n"); 608 } 609 610 // indexer_node is like a broadcast_node, in that none of its inputs reverse, and it 611 // never allows a successor to reverse its edge, so we only need test the successors. 612 void 613 TestIndexerNode() { 614 tbb::flow::graph g; 615 typedef tbb::flow::indexer_node< int, int > indexernode_type; 616 indexernode_type inode(g); 617 INFO("Testing indexer_node:"); 618 tbb::flow::queue_node<indexernode_type::output_type> qout(g); 619 tbb::flow::make_edge(inode,qout); 620 g.wait_for_all(); 621 CHECK_MESSAGE( (!inode.my_successors.empty()), "successor of indexer_node missing"); 622 g.reset(); 623 CHECK_MESSAGE( (!inode.my_successors.empty()), "successor of indexer_node missing after reset"); 624 g.reset(tbb::flow::rf_clear_edges); 625 CHECK_MESSAGE( (inode.my_successors.empty()), "successor of indexer_node not removed by reset(rf_clear_edges)"); 626 INFO(" done\n"); 627 } 628 629 template<typename Node> 630 void 631 TestScalarNode(const char *name) { 632 tbb::flow::graph g; 633 Node on(g); 634 tbb::flow::queue_node<int> qout(g); 635 INFO("Testing " << name << ":"); 636 tbb::flow::make_edge(on,qout); 637 g.wait_for_all(); 638 CHECK_MESSAGE( (!on.my_successors.empty()), "edge not added"); 639 g.reset(); 640 CHECK_MESSAGE( (!on.my_successors.empty()), "edge improperly removed"); 641 g.reset(tbb::flow::rf_clear_edges); 642 CHECK_MESSAGE( (on.my_successors.empty()), "edge not removed by reset(rf_clear_edges)"); 643 INFO(" done\n"); 644 } 645 646 struct seq_body { 647 size_t operator()(const int &in) { 648 return size_t(in / 3); 649 } 650 }; 651 652 // sequencer_node behaves like a queueing node, but requires a different constructor. 653 void TestSequencerNode() { 654 tbb::flow::graph g; 655 tbb::flow::sequencer_node<int> bnode(g, seq_body()); 656 INFO("Testing sequencer_node:"); 657 tbb::flow::function_node<int> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); 658 INFO("Testing sequencer_node:"); 659 serial_fn_state0 = 0; // reset to waiting state. 660 INFO(" make_edge"); 661 tbb::flow::make_edge(bnode, fnode); 662 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after make_edge" ); 663 INFO(" try_put"); 664 std::thread t([&]{ 665 bnode.try_put(0); // will forward to the fnode 666 g.wait_for_all(); 667 }); 668 // wait for the function_node to fire up 669 utils::SpinWaitWhileEq(serial_fn_state0, 0); 670 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after forwarding message" ); 671 serial_fn_state0 = 0; // release the function node 672 t.join(); 673 674 INFO(" remove_edge"); 675 tbb::flow::remove_edge(bnode, fnode); 676 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after remove_edge"); 677 tbb::flow::join_node<std::tuple<int,int>,tbb::flow::reserving> jnode(g); 678 tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode)); // will spawn a task 679 g.wait_for_all(); 680 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after attaching to join"); 681 INFO(" reverse"); 682 bnode.try_put(3); // the edge should reverse 683 g.wait_for_all(); 684 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving"); 685 INFO(" reset()"); 686 g.wait_for_all(); 687 g.reset(); // should be in forward direction again 688 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after reset()"); 689 INFO(" remove_edge"); 690 g.reset(tbb::flow::rf_clear_edges); // should be in forward direction again 691 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)"); 692 CHECK_MESSAGE( (fnode.my_predecessors.empty()), "buffering node reversed after reset(rf_clear_edges)"); 693 INFO(" done\n"); 694 g.wait_for_all(); 695 } 696 697 struct snode_body { 698 int max_cnt; 699 int my_cnt; 700 snode_body(const int& in) : max_cnt(in) { my_cnt = 0; } 701 int operator()(tbb::flow_control& fc) { 702 if (max_cnt <= my_cnt++) { 703 fc.stop(); 704 return int(); 705 } 706 return my_cnt; 707 } 708 }; 709 710 void TestInputNode() { 711 tbb::flow::graph g; 712 tbb::flow::input_node<int> in(g, snode_body(4)); 713 INFO("Testing input_node:"); 714 tbb::flow::queue_node<int> qin(g); 715 tbb::flow::join_node<std::tuple<int,int>, tbb::flow::reserving> jn(g); 716 tbb::flow::queue_node<std::tuple<int,int> > qout(g); 717 718 INFO(" make_edges"); 719 tbb::flow::make_edge(in, tbb::flow::input_port<0>(jn)); 720 tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn)); 721 tbb::flow::make_edge(jn,qout); 722 CHECK_MESSAGE( (!in.my_successors.empty()), "input node has no successor after make_edge"); 723 g.wait_for_all(); 724 g.reset(); 725 CHECK_MESSAGE( (!in.my_successors.empty()), "input node has no successor after reset"); 726 g.wait_for_all(); 727 g.reset(tbb::flow::rf_clear_edges); 728 CHECK_MESSAGE( (in.my_successors.empty()), "input node has successor after reset(rf_clear_edges)"); 729 tbb::flow::make_edge(in, tbb::flow::input_port<0>(jn)); 730 tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn)); 731 tbb::flow::make_edge(jn,qout); 732 g.wait_for_all(); 733 INFO(" activate"); 734 in.activate(); // will forward to the fnode 735 INFO(" wait1"); 736 g.wait_for_all(); 737 CHECK_MESSAGE( (in.my_successors.empty()), "input node has no successor after forwarding message"); 738 g.reset(); 739 CHECK_MESSAGE( (!in.my_successors.empty()), "input_node has no successors after reset"); 740 CHECK_MESSAGE( (tbb::flow::input_port<0>(jn).my_predecessors.empty()), "successor of input_node has pred after reset."); 741 INFO(" done\n"); 742 } 743 744 //! Test buffering nodes 745 //! \brief \ref error_guessing 746 TEST_CASE("Test buffering nodes"){ 747 unsigned int MinThread = utils::MinThread; 748 if(MinThread < 3) MinThread = 3; 749 tbb::task_arena arena(MinThread); 750 arena.execute( 751 [&]() { 752 // tests presume at least three threads 753 TestBufferingNode< tbb::flow::buffer_node<int> >("buffer_node"); 754 TestBufferingNode< tbb::flow::priority_queue_node<int> >("priority_queue_node"); 755 TestBufferingNode< tbb::flow::queue_node<int> >("queue_node"); 756 } 757 ); 758 } 759 760 //! Test sequencer_node 761 //! \brief \ref error_guessing 762 TEST_CASE("Test sequencer node"){ 763 TestSequencerNode(); 764 } 765 766 TEST_SUITE("Test multifunction node") { 767 //! Test multifunction_node with rejecting policy 768 //! \brief \ref error_guessing 769 TEST_CASE("with rejecting policy"){ 770 TestMultifunctionNode<tbb::flow::rejecting>(); 771 } 772 773 //! Test multifunction_node with queueing policy 774 //! \brief \ref error_guessing 775 TEST_CASE("with queueing policy") { 776 TestMultifunctionNode<tbb::flow::queueing>(); 777 } 778 } 779 780 //! Test input_node 781 //! \brief \ref error_guessing 782 TEST_CASE("Test input node"){ 783 TestInputNode(); 784 } 785 786 //! Test continue_node 787 //! \brief \ref error_guessing 788 TEST_CASE("Test continue node"){ 789 TestContinueNode(); 790 } 791 792 //! Test function_node 793 //! \brief \ref error_guessing 794 TEST_CASE("Test function node" * doctest::may_fail()){ 795 TestFunctionNode(); 796 } 797 798 //! Test join_node 799 //! \brief \ref error_guessing 800 TEST_CASE("Test join node"){ 801 TestJoinNode(); 802 } 803 804 //! Test limiter_node 805 //! \brief \ref error_guessing 806 TEST_CASE("Test limiter node"){ 807 TestLimiterNode<void>(); 808 TestLimiterNode<int>(); 809 TestLimiterNode<tbb::flow::continue_msg>(); 810 } 811 812 //! Test indexer_node 813 //! \brief \ref error_guessing 814 TEST_CASE("Test indexer node"){ 815 TestIndexerNode(); 816 } 817 818 //! Test split_node 819 //! \brief \ref error_guessing 820 TEST_CASE("Test split node"){ 821 TestSplitNode(); 822 } 823 824 //! Test broadcast, overwrite, write_once nodes 825 //! \brief \ref error_guessing 826 TEST_CASE("Test scalar node"){ 827 TestScalarNode<tbb::flow::broadcast_node<int> >("broadcast_node"); 828 TestScalarNode<tbb::flow::overwrite_node<int> >("overwrite_node"); 829 TestScalarNode<tbb::flow::write_once_node<int> >("write_once_node"); 830 } 831 832 //! try_get in inactive graph 833 //! \brief \ref error_guessing 834 TEST_CASE("try_get in inactive graph"){ 835 tbb::flow::graph g; 836 837 tbb::flow::input_node<int> src(g, [&](tbb::flow_control& fc) { fc.stop(); return 0;}); 838 deactivate_graph(g); 839 840 int tmp = -1; 841 CHECK_MESSAGE((src.try_get(tmp) == false), "try_get can not succeed"); 842 843 src.activate(); 844 tmp = -1; 845 CHECK_MESSAGE((src.try_get(tmp) == false), "try_get can not succeed"); 846 } 847 848 //! Test make_edge in inactive graph 849 //! \brief \ref error_guessing 850 TEST_CASE("Test make_edge in inactive graph"){ 851 tbb::flow::graph g; 852 853 tbb::flow::continue_node<int> c(g, [](const tbb::flow::continue_msg&){ return 1; }); 854 855 tbb::flow::function_node<int, int> f(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); 856 857 c.try_put(tbb::flow::continue_msg()); 858 g.wait_for_all(); 859 860 deactivate_graph(g); 861 862 make_edge(c, f); 863 } 864 865 //! Test make_edge from overwrite_node in inactive graph 866 //! \brief \ref error_guessing 867 TEST_CASE("Test make_edge from overwrite_node in inactive graph"){ 868 tbb::flow::graph g; 869 870 tbb::flow::queue_node<int> q(g); 871 872 tbb::flow::overwrite_node<int> on(g); 873 874 on.try_put(1); 875 g.wait_for_all(); 876 877 deactivate_graph(g); 878 879 make_edge(on, q); 880 881 int tmp = -1; 882 CHECK_MESSAGE((q.try_get(tmp) == false), "Message should not be passed on"); 883 } 884 885 //! Test iterators directly 886 //! \brief \ref error_guessing 887 TEST_CASE("graph_iterator details"){ 888 tbb::flow::graph g; 889 const tbb::flow::graph cg; 890 891 tbb::flow::graph::iterator b = g.begin(); 892 tbb::flow::graph::iterator b2 = g.begin(); 893 ++b2; 894 // Cast to a volatile pointer to workaround self assignment warnings from some compilers. 895 tbb::flow::graph::iterator* volatile b2_ptr = &b2; 896 b2 = *b2_ptr; 897 b = b2; 898 CHECK_MESSAGE((b == b2), "Assignment should make iterators equal"); 899 } 900 901 //! const graph 902 //! \brief \ref error_guessing 903 TEST_CASE("const graph"){ 904 using namespace tbb::flow; 905 906 const graph g; 907 CHECK_MESSAGE((g.cbegin() == g.cend()), "Starting graph is empty"); 908 CHECK_MESSAGE((g.begin() == g.end()), "Starting graph is empty"); 909 910 graph g2; 911 CHECK_MESSAGE((g2.begin() == g2.end()), "Starting graph is empty"); 912 } 913 914 //! Send message to continue_node while graph is inactive 915 //! \brief \ref error_guessing 916 TEST_CASE("Send message to continue_node while graph is inactive") { 917 using namespace tbb::flow; 918 919 graph g; 920 921 continue_node<int> c(g, [](const continue_msg&){ return 1; }); 922 buffer_node<int> b(g); 923 924 make_edge(c, b); 925 926 deactivate_graph(g); 927 928 c.try_put(continue_msg()); 929 g.wait_for_all(); 930 931 int tmp = -1; 932 CHECK_MESSAGE((b.try_get(tmp) == false), "Message should not arrive"); 933 CHECK_MESSAGE((tmp == -1), "Value should not be altered"); 934 } 935 936 937 //! Bypass of a successor's message in a node with lightweight policy 938 //! \brief \ref error_guessing 939 TEST_CASE("Bypass of a successor's message in a node with lightweight policy") { 940 using namespace tbb::flow; 941 942 graph g; 943 944 auto body = [](const int&v)->int { return v * 2; }; 945 function_node<int, int, lightweight> f1(g, unlimited, body); 946 947 auto body2 = [](const int&v)->int {return v / 2;}; 948 function_node<int, int> f2(g, unlimited, body2); 949 950 buffer_node<int> b(g); 951 952 make_edge(f1, f2); 953 make_edge(f2, b); 954 955 f1.try_put(1); 956 g.wait_for_all(); 957 958 int tmp = -1; 959 CHECK_MESSAGE((b.try_get(tmp) == true), "Functional nodes can work in succession"); 960 CHECK_MESSAGE((tmp == 1), "Value should not be altered"); 961 } 962