1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/config.h" 18 19 #if _MSC_VER 20 #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning 21 #if _MSC_VER==1700 && !defined(__INTEL_COMPILER) 22 // Suppress "unreachable code" warning by VC++ 17.0 (VS 2012) 23 #pragma warning (disable: 4702) 24 #endif 25 #endif 26 27 // need these to get proper external names for private methods in library. 28 #include "tbb/spin_mutex.h" 29 #include "tbb/spin_rw_mutex.h" 30 #include "tbb/task_arena.h" 31 #include "tbb/task_group.h" 32 33 #define private public 34 #define protected public 35 #include "tbb/flow_graph.h" 36 #undef protected 37 #undef private 38 39 #include "common/test.h" 40 #include "common/utils.h" 41 #include "common/spin_barrier.h" 42 #include "common/graph_utils.h" 43 44 #include <string> // merely prevents LNK2001 error to happen (on ICL+VC9 configurations) 45 46 //! \file test_flow_graph_whitebox.cpp 47 //! \brief Test for [flow_graph.broadcast_node flow_graph.priority_queue_node flow_graph.indexer_node flow_graph.sequencer_node flow_graph.remove_edge flow_graph.join_node flow_graph.split_node flow_graph.limiter_node flow_graph.write_once_node flow_graph.overwrite_node flow_graph.make_edge flow_graph.graph flow_graph.buffer_node flow_graph.function_node flow_graph.multifunction_node flow_graph.continue_node flow_graph.input_node] specification 48 49 template<typename T> 50 struct receiverBody { 51 tbb::flow::continue_msg operator()(const T &/*in*/) { 52 return tbb::flow::continue_msg(); 53 } 54 }; 55 56 // split_nodes cannot have predecessors 57 // they do not reject messages and always forward. 58 // they reject edge reversals from successors. 59 void TestSplitNode() { 60 typedef tbb::flow::split_node<std::tuple<int> > snode_type; 61 tbb::flow::graph g; 62 snode_type snode(g); 63 tbb::flow::function_node<int> rcvr(g,tbb::flow::unlimited, receiverBody<int>()); 64 INFO("Testing split_node\n"); 65 CHECK_MESSAGE( (tbb::flow::output_port<0>(snode).my_successors.empty()), "Constructed split_node has successors"); 66 // tbb::flow::output_port<0>(snode) 67 tbb::flow::make_edge(tbb::flow::output_port<0>(snode), rcvr); 68 CHECK_MESSAGE( (!(tbb::flow::output_port<0>(snode).my_successors.empty())), "after make_edge, split_node has no successor."); 69 snode.try_put(std::tuple<int>(1)); 70 g.wait_for_all(); 71 g.reset(); 72 CHECK_MESSAGE( (!(tbb::flow::output_port<0>(snode).my_successors.empty())), "after reset(), split_node has no successor."); 73 g.reset(tbb::flow::rf_clear_edges); 74 CHECK_MESSAGE( (tbb::flow::output_port<0>(snode).my_successors.empty()), "after reset(rf_clear_edges), split_node has a successor."); 75 } 76 77 // buffering nodes cannot have predecessors 78 // they do not reject messages and always save or forward 79 // they allow edge reversals from successors 80 template< typename B > 81 void TestBufferingNode(const char * name) { 82 tbb::flow::graph g; 83 B bnode(g); 84 tbb::flow::function_node<int,int,tbb::flow::rejecting> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); 85 INFO("Testing " << name << ":"); 86 for(int icnt = 0; icnt < 2; icnt++) { 87 bool reverse_edge = (icnt & 0x2) != 0; 88 serial_fn_state0 = 0; // reset to waiting state. 89 INFO(" make_edge"); 90 tbb::flow::make_edge(bnode, fnode); 91 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after make_edge"); 92 std::thread t([&] { 93 INFO(" try_put"); 94 bnode.try_put(1); // will forward to the fnode 95 g.wait_for_all(); 96 }); 97 utils::SpinWaitWhileEq(serial_fn_state0, 0); 98 if(reverse_edge) { 99 INFO(" try_put2"); 100 bnode.try_put(2); // should reverse the edge 101 // waiting for the edge to reverse 102 utils::SpinWaitWhile([&] { return !bnode.my_successors.empty(); }); 103 } 104 else { 105 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after forwarding message"); 106 } 107 serial_fn_state0 = 0; // release the function_node. 108 if(reverse_edge) { 109 // have to do a second release because the function_node will get the 2nd item 110 utils::SpinWaitWhileEq(serial_fn_state0, 0); 111 serial_fn_state0 = 0; // release the function_node. 112 } 113 t.join(); 114 INFO(" remove_edge"); 115 tbb::flow::remove_edge(bnode, fnode); 116 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after remove_edge"); 117 } 118 tbb::flow::join_node<std::tuple<int,int>,tbb::flow::reserving> jnode(g); 119 tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode)); // will spawn a task 120 g.wait_for_all(); 121 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after attaching to join"); 122 INFO(" reverse"); 123 bnode.try_put(1); // the edge should reverse 124 g.wait_for_all(); 125 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving"); 126 INFO(" reset()"); 127 g.wait_for_all(); 128 g.reset(); // should be in forward direction again 129 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after reset()"); 130 INFO(" remove_edge"); 131 g.reset(tbb::flow::rf_clear_edges); 132 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)"); 133 tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode)); // add edge again 134 // reverse edge by adding to buffer. 135 bnode.try_put(1); // the edge should reverse 136 g.wait_for_all(); 137 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving"); 138 INFO(" remove_edge(reversed)"); 139 g.reset(tbb::flow::rf_clear_edges); 140 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has no successor after reset()"); 141 CHECK_MESSAGE( (tbb::flow::input_port<0>(jnode).my_predecessors.empty()), "predecessor not reset"); 142 INFO(" done\n"); 143 g.wait_for_all(); 144 } 145 146 // continue_node has only predecessor count 147 // they do not have predecessors, only the counts 148 // successor edges cannot be reversed 149 void TestContinueNode() { 150 tbb::flow::graph g; 151 tbb::flow::function_node<int> fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); 152 tbb::flow::continue_node<int> cnode(g, /*number_of_predecessors*/ 1, 153 serial_continue_body<int>(serial_continue_state0)); 154 tbb::flow::function_node<int> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state1)); 155 tbb::flow::make_edge(fnode0, cnode); 156 tbb::flow::make_edge(cnode, fnode1); 157 INFO("Testing continue_node:"); 158 for( int icnt = 0; icnt < 2; ++icnt ) { 159 INFO( " initial" << icnt); 160 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor addition didn't increment count"); 161 CHECK_MESSAGE( (!cnode.successors().empty()), "successors empty though we added one"); 162 CHECK_MESSAGE( (cnode.my_current_count == 0), "state of continue_receiver incorrect"); 163 serial_continue_state0 = 0; 164 serial_fn_state0 = 0; 165 serial_fn_state1 = 0; 166 167 std::thread t([&] { 168 fnode0.try_put(1); // start the first function node. 169 if(icnt == 0) { // first time through, let the continue_node fire 170 INFO(" firing"); 171 fnode0.try_put(1); // second message 172 g.wait_for_all(); 173 174 // try a try_get() 175 { 176 int i; 177 CHECK_MESSAGE( (!cnode.try_get(i)), "try_get not rejected"); 178 } 179 180 INFO(" reset"); 181 CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (before reset)"); 182 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (before reset)"); 183 g.reset(); // should still be the same 184 CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (after reset)" ); 185 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (after reset)"); 186 } 187 else { // we're going to see if the rf_clear_edges resets things. 188 g.wait_for_all(); 189 INFO(" reset(rf_clear_edges)"); 190 CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (before reset)" ); 191 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (before reset)" ); 192 g.reset(tbb::flow::rf_clear_edges); // should be in forward direction again 193 CHECK_MESSAGE( (cnode.my_current_count == 0), "state of continue_receiver incorrect after reset(rf_clear_edges)" ); 194 CHECK_MESSAGE( (cnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)" ); 195 CHECK_MESSAGE( (cnode.my_predecessor_count == cnode.my_initial_predecessor_count), "predecessor count not reset" ); 196 } 197 }); 198 199 utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting for the first message to arrive in function_node 200 // Now the body of function_node 0 is executing. 201 serial_fn_state0 = 0; // release the node 202 if (icnt == 0) { 203 // wait for node to count the message (or for the node body to execute, which would be wrong) 204 utils::SpinWaitWhile([&] { 205 return serial_continue_state0 == 0 && cnode.my_current_count == 0; 206 }); 207 CHECK_MESSAGE( (serial_continue_state0 == 0), "Improperly released continue_node"); 208 CHECK_MESSAGE( (cnode.my_current_count == 1), "state of continue_receiver incorrect"); 209 210 utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting for the second message to arrive in function_node 211 // Now the body of function_node 0 is executing. 212 serial_fn_state0 = 0; // release the node 213 214 utils::SpinWaitWhileEq(serial_continue_state0, 0); // waiting for continue_node to start 215 CHECK_MESSAGE( (cnode.my_current_count == 0), " my_current_count not reset before body of continue_node started"); 216 serial_continue_state0 = 0; // release the continue_node 217 218 utils::SpinWaitWhileEq(serial_fn_state1, 0); // wait for the successor function_node to enter body 219 serial_fn_state1 = 0; // release successor function_node. 220 } 221 222 t.join(); 223 } 224 225 INFO(" done\n"); 226 227 } 228 229 // function_node has predecessors and successors 230 // try_get() rejects 231 // successor edges cannot be reversed 232 // predecessors will reverse (only rejecting will reverse) 233 void TestFunctionNode() { 234 tbb::flow::graph g; 235 tbb::flow::queue_node<int> qnode0(g); 236 tbb::flow::function_node<int,int, tbb::flow::rejecting > fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); 237 tbb::flow::function_node<int,int/*, tbb::flow::queueing*/> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); 238 239 tbb::flow::queue_node<int> qnode1(g); 240 241 tbb::flow::make_edge(fnode0, qnode1); 242 tbb::flow::make_edge(qnode0, fnode0); 243 244 serial_fn_state0 = 2; // just let it go 245 qnode0.try_put(1); 246 g.wait_for_all(); 247 int ii; 248 CHECK_MESSAGE( (qnode1.try_get(ii) && ii == 1), "output not passed" ); 249 tbb::flow::remove_edge(qnode0, fnode0); 250 tbb::flow::remove_edge(fnode0, qnode1); 251 252 tbb::flow::make_edge(fnode1, qnode1); 253 tbb::flow::make_edge(qnode0, fnode1); 254 255 serial_fn_state0 = 2; // just let it go 256 qnode0.try_put(1); 257 g.wait_for_all(); 258 CHECK_MESSAGE( (qnode1.try_get(ii) && ii == 1), "output not passed" ); 259 tbb::flow::remove_edge(qnode0, fnode1); 260 tbb::flow::remove_edge(fnode1, qnode1); 261 262 // rejecting 263 serial_fn_state0 = 0; 264 tbb::flow::make_edge(fnode0, qnode1); 265 tbb::flow::make_edge(qnode0, fnode0); 266 INFO("Testing rejecting function_node:"); 267 CHECK_MESSAGE( (!fnode0.my_queue), "node should have no queue"); 268 CHECK_MESSAGE( (!fnode0.my_successors.empty()), "successor edge not added"); 269 std::thread t([&] { 270 g.reset(); // attach to the current arena 271 qnode0.try_put(1); 272 qnode0.try_put(2); // rejecting node should reject, reverse. 273 g.wait_for_all(); 274 }); 275 utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting rejecting node to start 276 utils::SpinWaitWhile([&] { return fnode0.my_predecessors.empty(); }); 277 serial_fn_state0 = 2; // release function_node body. 278 t.join(); 279 INFO(" reset"); 280 g.reset(); // should reverse the edge from the input to the function node. 281 CHECK_MESSAGE( (!qnode0.my_successors.empty()), "empty successors after reset()"); 282 CHECK_MESSAGE( (fnode0.my_predecessors.empty()), "predecessor not reversed"); 283 tbb::flow::remove_edge(qnode0, fnode0); 284 tbb::flow::remove_edge(fnode0, qnode1); 285 INFO("\n"); 286 287 // queueing 288 tbb::flow::make_edge(fnode1, qnode1); 289 INFO("Testing queueing function_node:"); 290 CHECK_MESSAGE( (fnode1.my_queue), "node should have no queue"); 291 CHECK_MESSAGE( (!fnode1.my_successors.empty()), "successor edge not added"); 292 INFO(" add_pred"); 293 CHECK_MESSAGE( (fnode1.register_predecessor(qnode0)), "Cannot register as predecessor"); 294 CHECK_MESSAGE( (!fnode1.my_predecessors.empty()), "Missing predecessor"); 295 INFO(" reset"); 296 g.wait_for_all(); 297 g.reset(); // should reverse the edge from the input to the function node. 298 CHECK_MESSAGE( (!qnode0.my_successors.empty()), "empty successors after reset()"); 299 CHECK_MESSAGE( (fnode1.my_predecessors.empty()), "predecessor not reversed"); 300 tbb::flow::remove_edge(qnode0, fnode1); 301 tbb::flow::remove_edge(fnode1, qnode1); 302 INFO("\n"); 303 304 serial_fn_state0 = 0; // make the function_node wait 305 tbb::flow::make_edge(qnode0, fnode0); 306 307 std::thread t2([&] { 308 g.reset(); // attach to the current arena 309 INFO(" start_func"); 310 qnode0.try_put(1); 311 // now if we put an item to the queues the edges to the function_node will reverse. 312 INFO(" put_node(2)"); 313 qnode0.try_put(2); // start queue node. 314 g.wait_for_all(); 315 }); 316 utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting rejecting node to start 317 // wait for the edges to reverse 318 utils::SpinWaitWhile([&] { return fnode0.my_predecessors.empty(); }); 319 g.my_context->cancel_group_execution(); 320 // release the function_node 321 serial_fn_state0 = 2; 322 t2.join(); 323 CHECK_MESSAGE( (!fnode0.my_predecessors.empty() && qnode0.my_successors.empty()), "function_node edge not reversed"); 324 g.reset(tbb::flow::rf_clear_edges); 325 CHECK_MESSAGE( (fnode0.my_predecessors.empty() && qnode0.my_successors.empty()), "function_node edge not removed"); 326 CHECK_MESSAGE( (fnode0.my_successors.empty()), "successor to fnode not removed"); 327 INFO(" done\n"); 328 } 329 330 template<typename TT> 331 class tag_func { 332 TT my_mult; 333 public: 334 tag_func(TT multiplier) : my_mult(multiplier) { } 335 // operator() will return [0 .. Count) 336 tbb::flow::tag_value operator()( TT v) { 337 tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult); 338 return t; 339 } 340 }; 341 342 template<typename JNODE_TYPE> 343 void 344 TestSimpleSuccessorArc(const char *name) { 345 tbb::flow::graph g; 346 { 347 INFO("Join<" << name << "> successor test "); 348 tbb::flow::join_node<std::tuple<int>, JNODE_TYPE> qj(g); 349 tbb::flow::broadcast_node<std::tuple<int> > bnode(g); 350 tbb::flow::make_edge(qj, bnode); 351 CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after linking"); 352 g.reset(); 353 CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after reset()"); 354 g.reset(tbb::flow::rf_clear_edges); 355 CHECK_MESSAGE( (qj.my_successors.empty()), "successors not removed after reset(rf_clear_edges)"); 356 } 357 } 358 359 template<> 360 void 361 TestSimpleSuccessorArc<tbb::flow::tag_matching>(const char *name) { 362 tbb::flow::graph g; 363 { 364 INFO("Join<" << name << "> successor test "); 365 typedef std::tuple<int,int> my_tuple; 366 tbb::flow::join_node<my_tuple, tbb::flow::tag_matching> qj(g, 367 tag_func<int>(1), 368 tag_func<int>(1) 369 ); 370 tbb::flow::broadcast_node<my_tuple > bnode(g); 371 tbb::flow::make_edge(qj, bnode); 372 CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after linking"); 373 g.reset(); 374 CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after reset()"); 375 g.reset(tbb::flow::rf_clear_edges); 376 CHECK_MESSAGE( (qj.my_successors.empty()), "successors not removed after reset(rf_clear_edges)"); 377 } 378 } 379 380 void 381 TestJoinNode() { 382 tbb::flow::graph g; 383 384 TestSimpleSuccessorArc<tbb::flow::queueing>("queueing"); 385 TestSimpleSuccessorArc<tbb::flow::reserving>("reserving"); 386 TestSimpleSuccessorArc<tbb::flow::tag_matching>("tag_matching"); 387 388 // queueing and tagging join nodes have input queues, so the input ports do not reverse. 389 INFO(" reserving preds"); 390 { 391 tbb::flow::join_node<std::tuple<int,int>, tbb::flow::reserving> rj(g); 392 tbb::flow::queue_node<int> q0(g); 393 tbb::flow::queue_node<int> q1(g); 394 tbb::flow::make_edge(q0,tbb::flow::input_port<0>(rj)); 395 tbb::flow::make_edge(q1,tbb::flow::input_port<1>(rj)); 396 q0.try_put(1); 397 g.wait_for_all(); // quiesce 398 CHECK_MESSAGE( (!(tbb::flow::input_port<0>(rj).my_predecessors.empty())),"reversed port missing predecessor"); 399 CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"non-reversed port has pred"); 400 g.reset(); 401 CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"reversed port has pred after reset()"); 402 CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()"); 403 q1.try_put(2); 404 g.wait_for_all(); // quiesce 405 CHECK_MESSAGE( (!(tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port missing predecessor"); 406 CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred"); 407 g.reset(); 408 CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port has pred after reset()"); 409 CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()"); 410 // should reset predecessors just as regular reset. 411 q1.try_put(3); 412 g.wait_for_all(); // quiesce 413 CHECK_MESSAGE( (!(tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port missing predecessor"); 414 CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred"); 415 g.reset(tbb::flow::rf_clear_edges); 416 CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port has pred after reset()"); 417 CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()"); 418 CHECK_MESSAGE( (q0.my_successors.empty()), "edge not removed by reset(rf_clear_edges)"); 419 CHECK_MESSAGE( (q1.my_successors.empty()), "edge not removed by reset(rf_clear_edges)"); 420 } 421 INFO(" done\n"); 422 } 423 424 template <typename DecrementerType> 425 struct limiter_node_type { 426 using type = tbb::flow::limiter_node<int, DecrementerType>; 427 using dtype = DecrementerType; 428 }; 429 430 template <> 431 struct limiter_node_type<void> { 432 using type = tbb::flow::limiter_node<int>; 433 using dtype = tbb::flow::continue_msg; 434 }; 435 436 template <typename DType> 437 struct DecrementerHelper { 438 template <typename Decrementer> 439 static void check(Decrementer&) {} 440 static DType makeDType() { 441 return DType(1); 442 } 443 }; 444 445 template <> 446 struct DecrementerHelper<tbb::flow::continue_msg> { 447 template <typename Decrementer> 448 static void check(Decrementer& decrementer) { 449 auto& d = static_cast<tbb::detail::d1::continue_receiver&>(decrementer); 450 CHECK_MESSAGE(d.my_predecessor_count == 0, "error in pred count"); 451 CHECK_MESSAGE(d.my_initial_predecessor_count == 0, "error in initial pred count"); 452 CHECK_MESSAGE(d.my_current_count == 0, "error in current count"); 453 } 454 static tbb::flow::continue_msg makeDType() { 455 return tbb::flow::continue_msg(); 456 } 457 }; 458 459 template <typename DecrementerType> 460 void TestLimiterNode() { 461 int out_int{}; 462 tbb::flow::graph g; 463 using dtype = typename limiter_node_type<DecrementerType>::dtype; 464 typename limiter_node_type<DecrementerType>::type ln(g,1); 465 INFO("Testing limiter_node: preds and succs"); 466 DecrementerHelper<dtype>::check(ln.decrementer()); 467 CHECK_MESSAGE( (ln.my_threshold == 1), "error in my_threshold"); 468 tbb::flow::queue_node<int> inq(g); 469 tbb::flow::queue_node<int> outq(g); 470 tbb::flow::broadcast_node<dtype> bn(g); 471 472 tbb::flow::make_edge(inq,ln); 473 tbb::flow::make_edge(ln,outq); 474 tbb::flow::make_edge(bn,ln.decrementer()); 475 476 g.wait_for_all(); 477 CHECK_MESSAGE( (!(ln.my_successors.empty())),"successors empty after make_edge"); 478 CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge reversed"); 479 inq.try_put(1); 480 g.wait_for_all(); 481 CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 1), "limiter_node didn't pass first value"); 482 CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge reversed"); 483 inq.try_put(2); 484 g.wait_for_all(); 485 CHECK_MESSAGE( (!outq.try_get(out_int)), "limiter_node incorrectly passed second input"); 486 CHECK_MESSAGE( (!ln.my_predecessors.empty()), "input edge to limiter_node not reversed"); 487 bn.try_put(DecrementerHelper<dtype>::makeDType()); 488 g.wait_for_all(); 489 CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 2), "limiter_node didn't pass second value"); 490 g.wait_for_all(); 491 CHECK_MESSAGE( (!ln.my_predecessors.empty()), "input edge was reversed(after try_get())"); 492 g.reset(); 493 CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge not reset"); 494 inq.try_put(3); 495 g.wait_for_all(); 496 CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 3), "limiter_node didn't pass third value"); 497 498 INFO(" rf_clear_edges"); 499 // currently the limiter_node will not pass another message 500 g.reset(tbb::flow::rf_clear_edges); 501 DecrementerHelper<dtype>::check(ln.decrementer()); 502 CHECK_MESSAGE( (ln.my_threshold == 1), "error in my_threshold"); 503 CHECK_MESSAGE( (ln.my_predecessors.empty()), "preds not reset(rf_clear_edges)"); 504 CHECK_MESSAGE( (ln.my_successors.empty()), "preds not reset(rf_clear_edges)"); 505 CHECK_MESSAGE( (inq.my_successors.empty()), "Arc not removed on reset(rf_clear_edges)"); 506 CHECK_MESSAGE( (inq.my_successors.empty()), "Arc not removed on reset(rf_clear_edges)"); 507 CHECK_MESSAGE( (bn.my_successors.empty()), "control edge not removed on reset(rf_clear_edges)"); 508 tbb::flow::make_edge(inq,ln); 509 tbb::flow::make_edge(ln,outq); 510 inq.try_put(4); 511 inq.try_put(5); 512 g.wait_for_all(); 513 CHECK_MESSAGE( (outq.try_get(out_int)),"missing output after reset(rf_clear_edges)"); 514 CHECK_MESSAGE( (out_int == 4), "input incorrect (4)"); 515 bn.try_put(DecrementerHelper<dtype>::makeDType()); 516 g.wait_for_all(); 517 CHECK_MESSAGE( (!outq.try_get(out_int)),"second output incorrectly passed (rf_clear_edges)"); 518 INFO(" done\n"); 519 } 520 521 template<typename MF_TYPE> 522 struct mf_body { 523 std::atomic<int>& my_flag; 524 mf_body(std::atomic<int>& flag) : my_flag(flag) { } 525 void operator()(const int& in, typename MF_TYPE::output_ports_type& outports) { 526 if(my_flag == 0) { 527 my_flag = 1; 528 529 utils::SpinWaitWhileEq(my_flag, 1); 530 } 531 532 if (in & 0x1) 533 std::get<1>(outports).try_put(in); 534 else 535 std::get<0>(outports).try_put(in); 536 } 537 }; 538 539 template<typename P, typename T> 540 struct test_reversal; 541 template<typename T> 542 struct test_reversal<tbb::flow::queueing, T> { 543 test_reversal() { INFO("<queueing>"); } 544 // queueing node will not reverse. 545 bool operator()(T& node) const { return node.my_predecessors.empty(); } 546 }; 547 548 template<typename T> 549 struct test_reversal<tbb::flow::rejecting, T> { 550 test_reversal() { INFO("<rejecting>"); } 551 bool operator()(T& node) const { return !node.my_predecessors.empty(); } 552 }; 553 554 template<typename P> 555 void TestMultifunctionNode() { 556 typedef tbb::flow::multifunction_node<int, std::tuple<int, int>, P> multinode_type; 557 INFO("Testing multifunction_node"); 558 test_reversal<P,multinode_type> my_test; 559 INFO(":"); 560 tbb::flow::graph g; 561 multinode_type mf(g, tbb::flow::serial, mf_body<multinode_type>(serial_fn_state0)); 562 tbb::flow::queue_node<int> qin(g); 563 tbb::flow::queue_node<int> qodd_out(g); 564 tbb::flow::queue_node<int> qeven_out(g); 565 tbb::flow::make_edge(qin,mf); 566 tbb::flow::make_edge(tbb::flow::output_port<0>(mf), qeven_out); 567 tbb::flow::make_edge(tbb::flow::output_port<1>(mf), qodd_out); 568 g.wait_for_all(); 569 for (int ii = 0; ii < 2 ; ++ii) { 570 serial_fn_state0 = 0; 571 /* if(ii == 0) REMARK(" reset preds"); else REMARK(" 2nd");*/ 572 std::thread t([&] { 573 g.reset(); // attach to the current arena 574 qin.try_put(0); 575 qin.try_put(1); 576 g.wait_for_all(); 577 }); 578 // wait for node to be active 579 utils::SpinWaitWhileEq(serial_fn_state0, 0); 580 utils::SpinWaitWhile( [&] { return !my_test(mf); }); 581 g.my_context->cancel_group_execution(); 582 // release node 583 serial_fn_state0 = 2; 584 t.join(); 585 CHECK_MESSAGE( (my_test(mf)), "fail cancel group test"); 586 if( ii == 1) { 587 INFO(" rf_clear_edges"); 588 g.reset(tbb::flow::rf_clear_edges); 589 CHECK_MESSAGE( (tbb::flow::output_port<0>(mf).my_successors.empty()), "output_port<0> not reset (rf_clear_edges)"); 590 CHECK_MESSAGE( (tbb::flow::output_port<1>(mf).my_successors.empty()), "output_port<1> not reset (rf_clear_edges)"); 591 } 592 else 593 { 594 g.reset(); 595 } 596 CHECK_MESSAGE( (mf.my_predecessors.empty()), "edge didn't reset"); 597 CHECK_MESSAGE( ((ii == 0 && !qin.my_successors.empty()) || (ii == 1 && qin.my_successors.empty())), "edge didn't reset"); 598 } 599 INFO(" done\n"); 600 } 601 602 // indexer_node is like a broadcast_node, in that none of its inputs reverse, and it 603 // never allows a successor to reverse its edge, so we only need test the successors. 604 void 605 TestIndexerNode() { 606 tbb::flow::graph g; 607 typedef tbb::flow::indexer_node< int, int > indexernode_type; 608 indexernode_type inode(g); 609 INFO("Testing indexer_node:"); 610 tbb::flow::queue_node<indexernode_type::output_type> qout(g); 611 tbb::flow::make_edge(inode,qout); 612 g.wait_for_all(); 613 CHECK_MESSAGE( (!inode.my_successors.empty()), "successor of indexer_node missing"); 614 g.reset(); 615 CHECK_MESSAGE( (!inode.my_successors.empty()), "successor of indexer_node missing after reset"); 616 g.reset(tbb::flow::rf_clear_edges); 617 CHECK_MESSAGE( (inode.my_successors.empty()), "successor of indexer_node not removed by reset(rf_clear_edges)"); 618 INFO(" done\n"); 619 } 620 621 template<typename Node> 622 void 623 TestScalarNode(const char *name) { 624 tbb::flow::graph g; 625 Node on(g); 626 tbb::flow::queue_node<int> qout(g); 627 INFO("Testing " << name << ":"); 628 tbb::flow::make_edge(on,qout); 629 g.wait_for_all(); 630 CHECK_MESSAGE( (!on.my_successors.empty()), "edge not added"); 631 g.reset(); 632 CHECK_MESSAGE( (!on.my_successors.empty()), "edge improperly removed"); 633 g.reset(tbb::flow::rf_clear_edges); 634 CHECK_MESSAGE( (on.my_successors.empty()), "edge not removed by reset(rf_clear_edges)"); 635 INFO(" done\n"); 636 } 637 638 struct seq_body { 639 size_t operator()(const int &in) { 640 return size_t(in / 3); 641 } 642 }; 643 644 // sequencer_node behaves like a queueing node, but requires a different constructor. 645 void TestSequencerNode() { 646 tbb::flow::graph g; 647 tbb::flow::sequencer_node<int> bnode(g, seq_body()); 648 INFO("Testing sequencer_node:"); 649 tbb::flow::function_node<int> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); 650 INFO("Testing sequencer_node:"); 651 serial_fn_state0 = 0; // reset to waiting state. 652 INFO(" make_edge"); 653 tbb::flow::make_edge(bnode, fnode); 654 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after make_edge" ); 655 INFO(" try_put"); 656 std::thread t([&]{ 657 bnode.try_put(0); // will forward to the fnode 658 g.wait_for_all(); 659 }); 660 // wait for the function_node to fire up 661 utils::SpinWaitWhileEq(serial_fn_state0, 0); 662 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after forwarding message" ); 663 serial_fn_state0 = 0; // release the function node 664 t.join(); 665 666 INFO(" remove_edge"); 667 tbb::flow::remove_edge(bnode, fnode); 668 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after remove_edge"); 669 tbb::flow::join_node<std::tuple<int,int>,tbb::flow::reserving> jnode(g); 670 tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode)); // will spawn a task 671 g.wait_for_all(); 672 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after attaching to join"); 673 INFO(" reverse"); 674 bnode.try_put(3); // the edge should reverse 675 g.wait_for_all(); 676 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving"); 677 INFO(" reset()"); 678 g.wait_for_all(); 679 g.reset(); // should be in forward direction again 680 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after reset()"); 681 INFO(" remove_edge"); 682 g.reset(tbb::flow::rf_clear_edges); // should be in forward direction again 683 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)"); 684 CHECK_MESSAGE( (fnode.my_predecessors.empty()), "buffering node reversed after reset(rf_clear_edges)"); 685 INFO(" done\n"); 686 g.wait_for_all(); 687 } 688 689 struct snode_body { 690 int max_cnt; 691 int my_cnt; 692 snode_body(const int& in) : max_cnt(in) { my_cnt = 0; } 693 int operator()(tbb::flow_control& fc) { 694 if (max_cnt <= my_cnt++) { 695 fc.stop(); 696 return int(); 697 } 698 return my_cnt; 699 } 700 }; 701 702 void TestInputNode() { 703 tbb::flow::graph g; 704 tbb::flow::input_node<int> in(g, snode_body(4)); 705 INFO("Testing input_node:"); 706 tbb::flow::queue_node<int> qin(g); 707 tbb::flow::join_node<std::tuple<int,int>, tbb::flow::reserving> jn(g); 708 tbb::flow::queue_node<std::tuple<int,int> > qout(g); 709 710 INFO(" make_edges"); 711 tbb::flow::make_edge(in, tbb::flow::input_port<0>(jn)); 712 tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn)); 713 tbb::flow::make_edge(jn,qout); 714 CHECK_MESSAGE( (!in.my_successors.empty()), "input node has no successor after make_edge"); 715 g.wait_for_all(); 716 g.reset(); 717 CHECK_MESSAGE( (!in.my_successors.empty()), "input node has no successor after reset"); 718 g.wait_for_all(); 719 g.reset(tbb::flow::rf_clear_edges); 720 CHECK_MESSAGE( (in.my_successors.empty()), "input node has successor after reset(rf_clear_edges)"); 721 tbb::flow::make_edge(in, tbb::flow::input_port<0>(jn)); 722 tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn)); 723 tbb::flow::make_edge(jn,qout); 724 g.wait_for_all(); 725 INFO(" activate"); 726 in.activate(); // will forward to the fnode 727 INFO(" wait1"); 728 g.wait_for_all(); 729 CHECK_MESSAGE( (in.my_successors.empty()), "input node has no successor after forwarding message"); 730 g.reset(); 731 CHECK_MESSAGE( (!in.my_successors.empty()), "input_node has no successors after reset"); 732 CHECK_MESSAGE( (tbb::flow::input_port<0>(jn).my_predecessors.empty()), "successor of input_node has pred after reset."); 733 INFO(" done\n"); 734 } 735 736 //! Test buffering nodes 737 //! \brief \ref error_guessing 738 TEST_CASE("Test buffering nodes"){ 739 unsigned int MinThread = utils::MinThread; 740 if(MinThread < 3) MinThread = 3; 741 tbb::task_arena arena(MinThread); 742 arena.execute( 743 [&]() { 744 // tests presume at least three threads 745 TestBufferingNode< tbb::flow::buffer_node<int> >("buffer_node"); 746 TestBufferingNode< tbb::flow::priority_queue_node<int> >("priority_queue_node"); 747 TestBufferingNode< tbb::flow::queue_node<int> >("queue_node"); 748 } 749 ); 750 } 751 752 //! Test sequencer_node 753 //! \brief \ref error_guessing 754 TEST_CASE("Test sequencer node"){ 755 TestSequencerNode(); 756 } 757 758 TEST_SUITE("Test multifunction node") { 759 //! Test multifunction_node with rejecting policy 760 //! \brief \ref error_guessing 761 TEST_CASE("with rejecting policy"){ 762 TestMultifunctionNode<tbb::flow::rejecting>(); 763 } 764 765 //! Test multifunction_node with queueing policy 766 //! \brief \ref error_guessing 767 TEST_CASE("with queueing policy") { 768 TestMultifunctionNode<tbb::flow::queueing>(); 769 } 770 } 771 772 //! Test input_node 773 //! \brief \ref error_guessing 774 TEST_CASE("Test input node"){ 775 TestInputNode(); 776 } 777 778 //! Test continue_node 779 //! \brief \ref error_guessing 780 TEST_CASE("Test continue node"){ 781 TestContinueNode(); 782 } 783 784 //! Test function_node 785 //! \brief \ref error_guessing 786 TEST_CASE("Test function node" * doctest::may_fail()){ 787 TestFunctionNode(); 788 } 789 790 //! Test join_node 791 //! \brief \ref error_guessing 792 TEST_CASE("Test join node"){ 793 TestJoinNode(); 794 } 795 796 //! Test limiter_node 797 //! \brief \ref error_guessing 798 TEST_CASE("Test limiter node"){ 799 TestLimiterNode<void>(); 800 TestLimiterNode<int>(); 801 TestLimiterNode<tbb::flow::continue_msg>(); 802 } 803 804 //! Test indexer_node 805 //! \brief \ref error_guessing 806 TEST_CASE("Test indexer node"){ 807 TestIndexerNode(); 808 } 809 810 //! Test split_node 811 //! \brief \ref error_guessing 812 TEST_CASE("Test split node"){ 813 TestSplitNode(); 814 } 815 816 //! Test broadcast, overwrite, write_once nodes 817 //! \brief \ref error_guessing 818 TEST_CASE("Test scalar node"){ 819 TestScalarNode<tbb::flow::broadcast_node<int> >("broadcast_node"); 820 TestScalarNode<tbb::flow::overwrite_node<int> >("overwrite_node"); 821 TestScalarNode<tbb::flow::write_once_node<int> >("write_once_node"); 822 } 823 824 //! try_get in inactive graph 825 //! \brief \ref error_guessing 826 TEST_CASE("try_get in inactive graph"){ 827 tbb::flow::graph g; 828 829 tbb::flow::input_node<int> src(g, [&](tbb::flow_control& fc) { fc.stop(); return 0;}); 830 deactivate_graph(g); 831 832 int tmp = -1; 833 CHECK_MESSAGE((src.try_get(tmp) == false), "try_get can not succeed"); 834 835 src.activate(); 836 tmp = -1; 837 CHECK_MESSAGE((src.try_get(tmp) == false), "try_get can not succeed"); 838 } 839 840 //! Test make_edge in inactive graph 841 //! \brief \ref error_guessing 842 TEST_CASE("Test make_edge in inactive graph"){ 843 tbb::flow::graph g; 844 845 tbb::flow::continue_node<int> c(g, [](const tbb::flow::continue_msg&){ return 1; }); 846 847 tbb::flow::function_node<int, int> f(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); 848 849 c.try_put(tbb::flow::continue_msg()); 850 g.wait_for_all(); 851 852 deactivate_graph(g); 853 854 make_edge(c, f); 855 } 856 857 //! Test make_edge from overwrite_node in inactive graph 858 //! \brief \ref error_guessing 859 TEST_CASE("Test make_edge from overwrite_node in inactive graph"){ 860 tbb::flow::graph g; 861 862 tbb::flow::queue_node<int> q(g); 863 864 tbb::flow::overwrite_node<int> on(g); 865 866 on.try_put(1); 867 g.wait_for_all(); 868 869 deactivate_graph(g); 870 871 make_edge(on, q); 872 873 int tmp = -1; 874 CHECK_MESSAGE((q.try_get(tmp) == false), "Message should not be passed on"); 875 } 876 877 //! Test iterators directly 878 //! \brief \ref error_guessing 879 TEST_CASE("graph_iterator details"){ 880 tbb::flow::graph g; 881 const tbb::flow::graph cg; 882 883 tbb::flow::graph::iterator b = g.begin(); 884 tbb::flow::graph::iterator b2 = g.begin(); 885 ++b2; 886 // Cast to a volatile pointer to workaround self assignment warnings from some compilers. 887 tbb::flow::graph::iterator* volatile b2_ptr = &b2; 888 b2 = *b2_ptr; 889 b = b2; 890 CHECK_MESSAGE((b == b2), "Assignment should make iterators equal"); 891 } 892 893 //! const graph 894 //! \brief \ref error_guessing 895 TEST_CASE("const graph"){ 896 using namespace tbb::flow; 897 898 const graph g; 899 CHECK_MESSAGE((g.cbegin() == g.cend()), "Starting graph is empty"); 900 CHECK_MESSAGE((g.begin() == g.end()), "Starting graph is empty"); 901 902 graph g2; 903 CHECK_MESSAGE((g2.begin() == g2.end()), "Starting graph is empty"); 904 } 905 906 //! Send message to continue_node while graph is inactive 907 //! \brief \ref error_guessing 908 TEST_CASE("Send message to continue_node while graph is inactive") { 909 using namespace tbb::flow; 910 911 graph g; 912 913 continue_node<int> c(g, [](const continue_msg&){ return 1; }); 914 buffer_node<int> b(g); 915 916 make_edge(c, b); 917 918 deactivate_graph(g); 919 920 c.try_put(continue_msg()); 921 g.wait_for_all(); 922 923 int tmp = -1; 924 CHECK_MESSAGE((b.try_get(tmp) == false), "Message should not arrive"); 925 CHECK_MESSAGE((tmp == -1), "Value should not be altered"); 926 } 927 928 929 //! Bypass of a successor's message in a node with lightweight policy 930 //! \brief \ref error_guessing 931 TEST_CASE("Bypass of a successor's message in a node with lightweight policy") { 932 using namespace tbb::flow; 933 934 graph g; 935 936 auto body = [](const int&v)->int { return v * 2; }; 937 function_node<int, int, lightweight> f1(g, unlimited, body); 938 939 auto body2 = [](const int&v)->int {return v / 2;}; 940 function_node<int, int> f2(g, unlimited, body2); 941 942 buffer_node<int> b(g); 943 944 make_edge(f1, f2); 945 make_edge(f2, b); 946 947 f1.try_put(1); 948 g.wait_for_all(); 949 950 int tmp = -1; 951 CHECK_MESSAGE((b.try_get(tmp) == true), "Functional nodes can work in succession"); 952 CHECK_MESSAGE((tmp == 1), "Value should not be altered"); 953 } 954 955