1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/config.h" 18 19 #if _MSC_VER 20 #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning 21 #if _MSC_VER==1700 && !defined(__INTEL_COMPILER) 22 // Suppress "unreachable code" warning by VC++ 17.0 (VS 2012) 23 #pragma warning (disable: 4702) 24 #endif 25 #endif 26 27 // need these to get proper external names for private methods in library. 28 #include "tbb/spin_mutex.h" 29 #include "tbb/spin_rw_mutex.h" 30 #include "tbb/task_arena.h" 31 #include "tbb/task_group.h" 32 33 #define private public 34 #define protected public 35 #include "tbb/flow_graph.h" 36 #undef protected 37 #undef private 38 39 #include "common/test.h" 40 #include "common/utils.h" 41 #include "common/spin_barrier.h" 42 #include "common/graph_utils.h" 43 44 #include <string> // merely prevents LNK2001 error to happen (on ICL+VC9 configurations) 45 46 //! \file test_flow_graph_whitebox.cpp 47 //! \brief Test for [flow_graph.broadcast_node flow_graph.priority_queue_node flow_graph.indexer_node flow_graph.sequencer_node flow_graph.remove_edge flow_graph.join_node flow_graph.split_node flow_graph.limiter_node flow_graph.write_once_node flow_graph.overwrite_node flow_graph.make_edge flow_graph.graph flow_graph.buffer_node flow_graph.function_node flow_graph.multifunction_node flow_graph.continue_node flow_graph.input_node] specification 48 49 template<typename T> 50 struct receiverBody { 51 tbb::flow::continue_msg operator()(const T &/*in*/) { 52 return tbb::flow::continue_msg(); 53 } 54 }; 55 56 // split_nodes cannot have predecessors 57 // they do not reject messages and always forward. 58 // they reject edge reversals from successors. 59 void TestSplitNode() { 60 typedef tbb::flow::split_node<std::tuple<int> > snode_type; 61 tbb::flow::graph g; 62 snode_type snode(g); 63 tbb::flow::function_node<int> rcvr(g,tbb::flow::unlimited, receiverBody<int>()); 64 INFO("Testing split_node\n"); 65 CHECK_MESSAGE( (tbb::flow::output_port<0>(snode).my_successors.empty()), "Constructed split_node has successors"); 66 // tbb::flow::output_port<0>(snode) 67 tbb::flow::make_edge(tbb::flow::output_port<0>(snode), rcvr); 68 CHECK_MESSAGE( (!(tbb::flow::output_port<0>(snode).my_successors.empty())), "after make_edge, split_node has no successor."); 69 snode.try_put(std::tuple<int>(1)); 70 g.wait_for_all(); 71 g.reset(); 72 CHECK_MESSAGE( (!(tbb::flow::output_port<0>(snode).my_successors.empty())), "after reset(), split_node has no successor."); 73 g.reset(tbb::flow::rf_clear_edges); 74 CHECK_MESSAGE( (tbb::flow::output_port<0>(snode).my_successors.empty()), "after reset(rf_clear_edges), split_node has a successor."); 75 } 76 77 // buffering nodes cannot have predecessors 78 // they do not reject messages and always save or forward 79 // they allow edge reversals from successors 80 template< typename B > 81 void TestBufferingNode(const char * name) { 82 tbb::flow::graph g; 83 B bnode(g); 84 tbb::flow::function_node<int,int,tbb::flow::rejecting> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); 85 INFO("Testing " << name << ":"); 86 for(int icnt = 0; icnt < 2; icnt++) { 87 bool reverse_edge = (icnt & 0x2) != 0; 88 serial_fn_state0 = 0; // reset to waiting state. 89 INFO(" make_edge"); 90 tbb::flow::make_edge(bnode, fnode); 91 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after make_edge"); 92 std::thread t([&] { 93 INFO(" try_put"); 94 bnode.try_put(1); // will forward to the fnode 95 g.wait_for_all(); 96 }); 97 utils::SpinWaitWhileEq(serial_fn_state0, 0); 98 if(reverse_edge) { 99 INFO(" try_put2"); 100 bnode.try_put(2); // should reverse the edge 101 // waiting for the edge to reverse 102 utils::SpinWaitWhile([&] { return !bnode.my_successors.empty(); }); 103 } 104 else { 105 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after forwarding message"); 106 } 107 serial_fn_state0 = 0; // release the function_node. 108 if(reverse_edge) { 109 // have to do a second release because the function_node will get the 2nd item 110 utils::SpinWaitWhileEq(serial_fn_state0, 0); 111 serial_fn_state0 = 0; // release the function_node. 112 } 113 t.join(); 114 INFO(" remove_edge"); 115 tbb::flow::remove_edge(bnode, fnode); 116 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after remove_edge"); 117 } 118 tbb::flow::join_node<std::tuple<int,int>,tbb::flow::reserving> jnode(g); 119 tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode)); // will spawn a task 120 g.wait_for_all(); 121 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after attaching to join"); 122 INFO(" reverse"); 123 bnode.try_put(1); // the edge should reverse 124 g.wait_for_all(); 125 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving"); 126 INFO(" reset()"); 127 g.wait_for_all(); 128 g.reset(); // should be in forward direction again 129 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after reset()"); 130 INFO(" remove_edge"); 131 g.reset(tbb::flow::rf_clear_edges); 132 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)"); 133 tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode)); // add edge again 134 // reverse edge by adding to buffer. 135 bnode.try_put(1); // the edge should reverse 136 g.wait_for_all(); 137 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving"); 138 INFO(" remove_edge(reversed)"); 139 g.reset(tbb::flow::rf_clear_edges); 140 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has no successor after reset()"); 141 CHECK_MESSAGE( (tbb::flow::input_port<0>(jnode).my_predecessors.empty()), "predecessor not reset"); 142 INFO(" done\n"); 143 g.wait_for_all(); 144 } 145 146 // continue_node has only predecessor count 147 // they do not have predecessors, only the counts 148 // successor edges cannot be reversed 149 void TestContinueNode() { 150 tbb::flow::graph g; 151 tbb::flow::function_node<int> fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); 152 tbb::flow::continue_node<int> cnode(g, /*number_of_predecessors*/ 1, 153 serial_continue_body<int>(serial_continue_state0)); 154 tbb::flow::function_node<int> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state1)); 155 tbb::flow::make_edge(fnode0, cnode); 156 tbb::flow::make_edge(cnode, fnode1); 157 INFO("Testing continue_node:"); 158 for( int icnt = 0; icnt < 2; ++icnt ) { 159 INFO( " initial" << icnt); 160 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor addition didn't increment count"); 161 CHECK_MESSAGE( (!cnode.successors().empty()), "successors empty though we added one"); 162 CHECK_MESSAGE( (cnode.my_current_count == 0), "state of continue_receiver incorrect"); 163 serial_continue_state0 = 0; 164 serial_fn_state0 = 0; 165 serial_fn_state1 = 0; 166 167 std::thread t([&] { 168 fnode0.try_put(1); // start the first function node. 169 if(icnt == 0) { // first time through, let the continue_node fire 170 INFO(" firing"); 171 fnode0.try_put(1); // second message 172 g.wait_for_all(); 173 174 // try a try_get() 175 { 176 int i; 177 CHECK_MESSAGE( (!cnode.try_get(i)), "try_get not rejected"); 178 } 179 180 INFO(" reset"); 181 CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (before reset)"); 182 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (before reset)"); 183 g.reset(); // should still be the same 184 CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (after reset)" ); 185 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (after reset)"); 186 } 187 else { // we're going to see if the rf_clear_edges resets things. 188 g.wait_for_all(); 189 INFO(" reset(rf_clear_edges)"); 190 CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (before reset)" ); 191 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (before reset)" ); 192 g.reset(tbb::flow::rf_clear_edges); // should be in forward direction again 193 CHECK_MESSAGE( (cnode.my_current_count == 0), "state of continue_receiver incorrect after reset(rf_clear_edges)" ); 194 CHECK_MESSAGE( (cnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)" ); 195 CHECK_MESSAGE( (cnode.my_predecessor_count == cnode.my_initial_predecessor_count), "predecessor count not reset" ); 196 } 197 }); 198 199 utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting for the first message to arrive in function_node 200 // Now the body of function_node 0 is executing. 201 serial_fn_state0 = 0; // release the node 202 if (icnt == 0) { 203 // wait for node to count the message (or for the node body to execute, which would be wrong) 204 utils::SpinWaitWhile([&] { 205 return serial_continue_state0 == 0 && cnode.my_current_count == 0; 206 }); 207 CHECK_MESSAGE( (serial_continue_state0 == 0), "Improperly released continue_node"); 208 CHECK_MESSAGE( (cnode.my_current_count == 1), "state of continue_receiver incorrect"); 209 210 utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting for the second message to arrive in function_node 211 // Now the body of function_node 0 is executing. 212 serial_fn_state0 = 0; // release the node 213 214 utils::SpinWaitWhileEq(serial_continue_state0, 0); // waiting for continue_node to start 215 CHECK_MESSAGE( (cnode.my_current_count == 0), " my_current_count not reset before body of continue_node started"); 216 serial_continue_state0 = 0; // release the continue_node 217 218 utils::SpinWaitWhileEq(serial_fn_state1, 0); // wait for the successor function_node to enter body 219 serial_fn_state1 = 0; // release successor function_node. 220 } 221 222 t.join(); 223 } 224 225 INFO(" done\n"); 226 227 } 228 229 // function_node has predecessors and successors 230 // try_get() rejects 231 // successor edges cannot be reversed 232 // predecessors will reverse (only rejecting will reverse) 233 void TestFunctionNode() { 234 tbb::flow::graph g; 235 tbb::flow::queue_node<int> qnode0(g); 236 tbb::flow::function_node<int,int, tbb::flow::rejecting > fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); 237 tbb::flow::function_node<int,int/*, tbb::flow::queueing*/> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); 238 239 tbb::flow::queue_node<int> qnode1(g); 240 241 tbb::flow::make_edge(fnode0, qnode1); 242 tbb::flow::make_edge(qnode0, fnode0); 243 244 serial_fn_state0 = 2; // just let it go 245 qnode0.try_put(1); 246 g.wait_for_all(); 247 int ii; 248 CHECK_MESSAGE( (qnode1.try_get(ii) && ii == 1), "output not passed" ); 249 tbb::flow::remove_edge(qnode0, fnode0); 250 tbb::flow::remove_edge(fnode0, qnode1); 251 252 tbb::flow::make_edge(fnode1, qnode1); 253 tbb::flow::make_edge(qnode0, fnode1); 254 255 serial_fn_state0 = 2; // just let it go 256 qnode0.try_put(1); 257 g.wait_for_all(); 258 CHECK_MESSAGE( (qnode1.try_get(ii) && ii == 1), "output not passed" ); 259 tbb::flow::remove_edge(qnode0, fnode1); 260 tbb::flow::remove_edge(fnode1, qnode1); 261 262 // rejecting 263 serial_fn_state0 = 0; 264 std::thread t([&] { 265 g.reset(); // attach to the current arena 266 tbb::flow::make_edge(fnode0, qnode1); 267 tbb::flow::make_edge(qnode0, fnode0); // TODO: invesigate why it always creates a forwarding task 268 INFO("Testing rejecting function_node:"); 269 CHECK_MESSAGE( (!fnode0.my_queue), "node should have no queue"); 270 CHECK_MESSAGE( (!fnode0.my_successors.empty()), "successor edge not added"); 271 qnode0.try_put(1); 272 qnode0.try_put(2); // rejecting node should reject, reverse. 273 g.wait_for_all(); 274 }); 275 utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting rejecting node to start 276 utils::SpinWaitWhile([&] { return fnode0.my_predecessors.empty(); }); 277 serial_fn_state0 = 2; // release function_node body. 278 t.join(); 279 INFO(" reset"); 280 g.reset(); // should reverse the edge from the input to the function node. 281 CHECK_MESSAGE( (!qnode0.my_successors.empty()), "empty successors after reset()"); 282 CHECK_MESSAGE( (fnode0.my_predecessors.empty()), "predecessor not reversed"); 283 tbb::flow::remove_edge(qnode0, fnode0); 284 tbb::flow::remove_edge(fnode0, qnode1); 285 INFO("\n"); 286 287 // queueing 288 tbb::flow::make_edge(fnode1, qnode1); 289 INFO("Testing queueing function_node:"); 290 CHECK_MESSAGE( (fnode1.my_queue), "node should have no queue"); 291 CHECK_MESSAGE( (!fnode1.my_successors.empty()), "successor edge not added"); 292 INFO(" add_pred"); 293 CHECK_MESSAGE( (fnode1.register_predecessor(qnode0)), "Cannot register as predecessor"); 294 CHECK_MESSAGE( (!fnode1.my_predecessors.empty()), "Missing predecessor"); 295 INFO(" reset"); 296 g.wait_for_all(); 297 g.reset(); // should reverse the edge from the input to the function node. 298 CHECK_MESSAGE( (!qnode0.my_successors.empty()), "empty successors after reset()"); 299 CHECK_MESSAGE( (fnode1.my_predecessors.empty()), "predecessor not reversed"); 300 tbb::flow::remove_edge(qnode0, fnode1); 301 tbb::flow::remove_edge(fnode1, qnode1); 302 INFO("\n"); 303 304 serial_fn_state0 = 0; // make the function_node wait 305 std::thread t2([&] { 306 g.reset(); // attach to the current arena 307 308 tbb::flow::make_edge(qnode0, fnode0); // TODO: invesigate why it always creates a forwarding task 309 310 INFO(" start_func"); 311 qnode0.try_put(1); 312 // now if we put an item to the queues the edges to the function_node will reverse. 313 INFO(" put_node(2)"); 314 qnode0.try_put(2); // start queue node. 315 g.wait_for_all(); 316 }); 317 utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting rejecting node to start 318 // wait for the edges to reverse 319 utils::SpinWaitWhile([&] { return fnode0.my_predecessors.empty(); }); 320 g.my_context->cancel_group_execution(); 321 // release the function_node 322 serial_fn_state0 = 2; 323 t2.join(); 324 CHECK_MESSAGE( (!fnode0.my_predecessors.empty() && qnode0.my_successors.empty()), "function_node edge not reversed"); 325 g.reset(tbb::flow::rf_clear_edges); 326 CHECK_MESSAGE( (fnode0.my_predecessors.empty() && qnode0.my_successors.empty()), "function_node edge not removed"); 327 CHECK_MESSAGE( (fnode0.my_successors.empty()), "successor to fnode not removed"); 328 INFO(" done\n"); 329 } 330 331 template<typename TT> 332 class tag_func { 333 TT my_mult; 334 public: 335 tag_func(TT multiplier) : my_mult(multiplier) { } 336 // operator() will return [0 .. Count) 337 tbb::flow::tag_value operator()( TT v) { 338 tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult); 339 return t; 340 } 341 }; 342 343 template<typename JNODE_TYPE> 344 void 345 TestSimpleSuccessorArc(const char *name) { 346 tbb::flow::graph g; 347 { 348 INFO("Join<" << name << "> successor test "); 349 tbb::flow::join_node<std::tuple<int>, JNODE_TYPE> qj(g); 350 tbb::flow::broadcast_node<std::tuple<int> > bnode(g); 351 tbb::flow::make_edge(qj, bnode); 352 CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after linking"); 353 g.reset(); 354 CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after reset()"); 355 g.reset(tbb::flow::rf_clear_edges); 356 CHECK_MESSAGE( (qj.my_successors.empty()), "successors not removed after reset(rf_clear_edges)"); 357 } 358 } 359 360 template<> 361 void 362 TestSimpleSuccessorArc<tbb::flow::tag_matching>(const char *name) { 363 tbb::flow::graph g; 364 { 365 INFO("Join<" << name << "> successor test "); 366 typedef std::tuple<int,int> my_tuple; 367 tbb::flow::join_node<my_tuple, tbb::flow::tag_matching> qj(g, 368 tag_func<int>(1), 369 tag_func<int>(1) 370 ); 371 tbb::flow::broadcast_node<my_tuple > bnode(g); 372 tbb::flow::make_edge(qj, bnode); 373 CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after linking"); 374 g.reset(); 375 CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after reset()"); 376 g.reset(tbb::flow::rf_clear_edges); 377 CHECK_MESSAGE( (qj.my_successors.empty()), "successors not removed after reset(rf_clear_edges)"); 378 } 379 } 380 381 void 382 TestJoinNode() { 383 tbb::flow::graph g; 384 385 TestSimpleSuccessorArc<tbb::flow::queueing>("queueing"); 386 TestSimpleSuccessorArc<tbb::flow::reserving>("reserving"); 387 TestSimpleSuccessorArc<tbb::flow::tag_matching>("tag_matching"); 388 389 // queueing and tagging join nodes have input queues, so the input ports do not reverse. 390 INFO(" reserving preds"); 391 { 392 tbb::flow::join_node<std::tuple<int,int>, tbb::flow::reserving> rj(g); 393 tbb::flow::queue_node<int> q0(g); 394 tbb::flow::queue_node<int> q1(g); 395 tbb::flow::make_edge(q0,tbb::flow::input_port<0>(rj)); 396 tbb::flow::make_edge(q1,tbb::flow::input_port<1>(rj)); 397 q0.try_put(1); 398 g.wait_for_all(); // quiesce 399 CHECK_MESSAGE( (!(tbb::flow::input_port<0>(rj).my_predecessors.empty())),"reversed port missing predecessor"); 400 CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"non-reversed port has pred"); 401 g.reset(); 402 CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"reversed port has pred after reset()"); 403 CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()"); 404 q1.try_put(2); 405 g.wait_for_all(); // quiesce 406 CHECK_MESSAGE( (!(tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port missing predecessor"); 407 CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred"); 408 g.reset(); 409 CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port has pred after reset()"); 410 CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()"); 411 // should reset predecessors just as regular reset. 412 q1.try_put(3); 413 g.wait_for_all(); // quiesce 414 CHECK_MESSAGE( (!(tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port missing predecessor"); 415 CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred"); 416 g.reset(tbb::flow::rf_clear_edges); 417 CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port has pred after reset()"); 418 CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()"); 419 CHECK_MESSAGE( (q0.my_successors.empty()), "edge not removed by reset(rf_clear_edges)"); 420 CHECK_MESSAGE( (q1.my_successors.empty()), "edge not removed by reset(rf_clear_edges)"); 421 } 422 INFO(" done\n"); 423 } 424 425 template <typename DecrementerType> 426 struct limiter_node_type { 427 using type = tbb::flow::limiter_node<int, DecrementerType>; 428 using dtype = DecrementerType; 429 }; 430 431 template <> 432 struct limiter_node_type<void> { 433 using type = tbb::flow::limiter_node<int>; 434 using dtype = tbb::flow::continue_msg; 435 }; 436 437 template <typename DType> 438 struct DecrementerHelper { 439 template <typename Decrementer> 440 static void check(Decrementer&) {} 441 static DType makeDType() { 442 return DType(1); 443 } 444 }; 445 446 template <> 447 struct DecrementerHelper<tbb::flow::continue_msg> { 448 template <typename Decrementer> 449 static void check(Decrementer& decrementer) { 450 auto& d = static_cast<tbb::detail::d1::continue_receiver&>(decrementer); 451 CHECK_MESSAGE(d.my_predecessor_count == 0, "error in pred count"); 452 CHECK_MESSAGE(d.my_initial_predecessor_count == 0, "error in initial pred count"); 453 CHECK_MESSAGE(d.my_current_count == 0, "error in current count"); 454 } 455 static tbb::flow::continue_msg makeDType() { 456 return tbb::flow::continue_msg(); 457 } 458 }; 459 460 template <typename DecrementerType> 461 void TestLimiterNode() { 462 int out_int{}; 463 tbb::flow::graph g; 464 using dtype = typename limiter_node_type<DecrementerType>::dtype; 465 typename limiter_node_type<DecrementerType>::type ln(g,1); 466 INFO("Testing limiter_node: preds and succs"); 467 DecrementerHelper<dtype>::check(ln.decrementer()); 468 CHECK_MESSAGE( (ln.my_threshold == 1), "error in my_threshold"); 469 tbb::flow::queue_node<int> inq(g); 470 tbb::flow::queue_node<int> outq(g); 471 tbb::flow::broadcast_node<dtype> bn(g); 472 473 tbb::flow::make_edge(inq,ln); 474 tbb::flow::make_edge(ln,outq); 475 tbb::flow::make_edge(bn,ln.decrementer()); 476 477 g.wait_for_all(); 478 CHECK_MESSAGE( (!(ln.my_successors.empty())),"successors empty after make_edge"); 479 CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge reversed"); 480 inq.try_put(1); 481 g.wait_for_all(); 482 CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 1), "limiter_node didn't pass first value"); 483 CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge reversed"); 484 inq.try_put(2); 485 g.wait_for_all(); 486 CHECK_MESSAGE( (!outq.try_get(out_int)), "limiter_node incorrectly passed second input"); 487 CHECK_MESSAGE( (!ln.my_predecessors.empty()), "input edge to limiter_node not reversed"); 488 bn.try_put(DecrementerHelper<dtype>::makeDType()); 489 g.wait_for_all(); 490 CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 2), "limiter_node didn't pass second value"); 491 g.wait_for_all(); 492 CHECK_MESSAGE( (!ln.my_predecessors.empty()), "input edge was reversed(after try_get())"); 493 g.reset(); 494 CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge not reset"); 495 inq.try_put(3); 496 g.wait_for_all(); 497 CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 3), "limiter_node didn't pass third value"); 498 499 INFO(" rf_clear_edges"); 500 // currently the limiter_node will not pass another message 501 g.reset(tbb::flow::rf_clear_edges); 502 DecrementerHelper<dtype>::check(ln.decrementer()); 503 CHECK_MESSAGE( (ln.my_threshold == 1), "error in my_threshold"); 504 CHECK_MESSAGE( (ln.my_predecessors.empty()), "preds not reset(rf_clear_edges)"); 505 CHECK_MESSAGE( (ln.my_successors.empty()), "preds not reset(rf_clear_edges)"); 506 CHECK_MESSAGE( (inq.my_successors.empty()), "Arc not removed on reset(rf_clear_edges)"); 507 CHECK_MESSAGE( (inq.my_successors.empty()), "Arc not removed on reset(rf_clear_edges)"); 508 CHECK_MESSAGE( (bn.my_successors.empty()), "control edge not removed on reset(rf_clear_edges)"); 509 tbb::flow::make_edge(inq,ln); 510 tbb::flow::make_edge(ln,outq); 511 inq.try_put(4); 512 inq.try_put(5); 513 g.wait_for_all(); 514 CHECK_MESSAGE( (outq.try_get(out_int)),"missing output after reset(rf_clear_edges)"); 515 CHECK_MESSAGE( (out_int == 4), "input incorrect (4)"); 516 bn.try_put(DecrementerHelper<dtype>::makeDType()); 517 g.wait_for_all(); 518 CHECK_MESSAGE( (!outq.try_get(out_int)),"second output incorrectly passed (rf_clear_edges)"); 519 INFO(" done\n"); 520 } 521 522 template<typename MF_TYPE> 523 struct mf_body { 524 std::atomic<int>& my_flag; 525 mf_body(std::atomic<int>& flag) : my_flag(flag) { } 526 void operator()(const int& in, typename MF_TYPE::output_ports_type& outports) { 527 if(my_flag == 0) { 528 my_flag = 1; 529 530 utils::SpinWaitWhileEq(my_flag, 1); 531 } 532 533 if (in & 0x1) 534 std::get<1>(outports).try_put(in); 535 else 536 std::get<0>(outports).try_put(in); 537 } 538 }; 539 540 template<typename P, typename T> 541 struct test_reversal; 542 template<typename T> 543 struct test_reversal<tbb::flow::queueing, T> { 544 test_reversal() { INFO("<queueing>"); } 545 // queueing node will not reverse. 546 bool operator()(T& node) const { return node.my_predecessors.empty(); } 547 }; 548 549 template<typename T> 550 struct test_reversal<tbb::flow::rejecting, T> { 551 test_reversal() { INFO("<rejecting>"); } 552 bool operator()(T& node) const { return !node.my_predecessors.empty(); } 553 }; 554 555 template<typename P> 556 void TestMultifunctionNode() { 557 typedef tbb::flow::multifunction_node<int, std::tuple<int, int>, P> multinode_type; 558 INFO("Testing multifunction_node"); 559 test_reversal<P,multinode_type> my_test; 560 INFO(":"); 561 tbb::flow::graph g; 562 multinode_type mf(g, tbb::flow::serial, mf_body<multinode_type>(serial_fn_state0)); 563 tbb::flow::queue_node<int> qin(g); 564 tbb::flow::queue_node<int> qodd_out(g); 565 tbb::flow::queue_node<int> qeven_out(g); 566 tbb::flow::make_edge(qin,mf); 567 tbb::flow::make_edge(tbb::flow::output_port<0>(mf), qeven_out); 568 tbb::flow::make_edge(tbb::flow::output_port<1>(mf), qodd_out); 569 g.wait_for_all(); 570 for (int ii = 0; ii < 2 ; ++ii) { 571 serial_fn_state0 = 0; 572 /* if(ii == 0) REMARK(" reset preds"); else REMARK(" 2nd");*/ 573 std::thread t([&] { 574 g.reset(); // attach to the current arena 575 qin.try_put(0); 576 qin.try_put(1); 577 g.wait_for_all(); 578 }); 579 // wait for node to be active 580 utils::SpinWaitWhileEq(serial_fn_state0, 0); 581 utils::SpinWaitWhile( [&] { return !my_test(mf); }); 582 g.my_context->cancel_group_execution(); 583 // release node 584 serial_fn_state0 = 2; 585 t.join(); 586 CHECK_MESSAGE( (my_test(mf)), "fail cancel group test"); 587 if( ii == 1) { 588 INFO(" rf_clear_edges"); 589 g.reset(tbb::flow::rf_clear_edges); 590 CHECK_MESSAGE( (tbb::flow::output_port<0>(mf).my_successors.empty()), "output_port<0> not reset (rf_clear_edges)"); 591 CHECK_MESSAGE( (tbb::flow::output_port<1>(mf).my_successors.empty()), "output_port<1> not reset (rf_clear_edges)"); 592 } 593 else 594 { 595 g.reset(); 596 } 597 CHECK_MESSAGE( (mf.my_predecessors.empty()), "edge didn't reset"); 598 CHECK_MESSAGE( ((ii == 0 && !qin.my_successors.empty()) || (ii == 1 && qin.my_successors.empty())), "edge didn't reset"); 599 } 600 INFO(" done\n"); 601 } 602 603 // indexer_node is like a broadcast_node, in that none of its inputs reverse, and it 604 // never allows a successor to reverse its edge, so we only need test the successors. 605 void 606 TestIndexerNode() { 607 tbb::flow::graph g; 608 typedef tbb::flow::indexer_node< int, int > indexernode_type; 609 indexernode_type inode(g); 610 INFO("Testing indexer_node:"); 611 tbb::flow::queue_node<indexernode_type::output_type> qout(g); 612 tbb::flow::make_edge(inode,qout); 613 g.wait_for_all(); 614 CHECK_MESSAGE( (!inode.my_successors.empty()), "successor of indexer_node missing"); 615 g.reset(); 616 CHECK_MESSAGE( (!inode.my_successors.empty()), "successor of indexer_node missing after reset"); 617 g.reset(tbb::flow::rf_clear_edges); 618 CHECK_MESSAGE( (inode.my_successors.empty()), "successor of indexer_node not removed by reset(rf_clear_edges)"); 619 INFO(" done\n"); 620 } 621 622 template<typename Node> 623 void 624 TestScalarNode(const char *name) { 625 tbb::flow::graph g; 626 Node on(g); 627 tbb::flow::queue_node<int> qout(g); 628 INFO("Testing " << name << ":"); 629 tbb::flow::make_edge(on,qout); 630 g.wait_for_all(); 631 CHECK_MESSAGE( (!on.my_successors.empty()), "edge not added"); 632 g.reset(); 633 CHECK_MESSAGE( (!on.my_successors.empty()), "edge improperly removed"); 634 g.reset(tbb::flow::rf_clear_edges); 635 CHECK_MESSAGE( (on.my_successors.empty()), "edge not removed by reset(rf_clear_edges)"); 636 INFO(" done\n"); 637 } 638 639 struct seq_body { 640 size_t operator()(const int &in) { 641 return size_t(in / 3); 642 } 643 }; 644 645 // sequencer_node behaves like a queueing node, but requires a different constructor. 646 void TestSequencerNode() { 647 tbb::flow::graph g; 648 tbb::flow::sequencer_node<int> bnode(g, seq_body()); 649 INFO("Testing sequencer_node:"); 650 tbb::flow::function_node<int> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); 651 INFO("Testing sequencer_node:"); 652 serial_fn_state0 = 0; // reset to waiting state. 653 INFO(" make_edge"); 654 tbb::flow::make_edge(bnode, fnode); 655 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after make_edge" ); 656 INFO(" try_put"); 657 std::thread t([&]{ 658 bnode.try_put(0); // will forward to the fnode 659 g.wait_for_all(); 660 }); 661 // wait for the function_node to fire up 662 utils::SpinWaitWhileEq(serial_fn_state0, 0); 663 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after forwarding message" ); 664 serial_fn_state0 = 0; // release the function node 665 t.join(); 666 667 INFO(" remove_edge"); 668 tbb::flow::remove_edge(bnode, fnode); 669 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after remove_edge"); 670 tbb::flow::join_node<std::tuple<int,int>,tbb::flow::reserving> jnode(g); 671 tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode)); // will spawn a task 672 g.wait_for_all(); 673 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after attaching to join"); 674 INFO(" reverse"); 675 bnode.try_put(3); // the edge should reverse 676 g.wait_for_all(); 677 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving"); 678 INFO(" reset()"); 679 g.wait_for_all(); 680 g.reset(); // should be in forward direction again 681 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after reset()"); 682 INFO(" remove_edge"); 683 g.reset(tbb::flow::rf_clear_edges); // should be in forward direction again 684 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)"); 685 CHECK_MESSAGE( (fnode.my_predecessors.empty()), "buffering node reversed after reset(rf_clear_edges)"); 686 INFO(" done\n"); 687 g.wait_for_all(); 688 } 689 690 struct snode_body { 691 int max_cnt; 692 int my_cnt; 693 snode_body(const int& in) : max_cnt(in) { my_cnt = 0; } 694 int operator()(tbb::flow_control& fc) { 695 if (max_cnt <= my_cnt++) { 696 fc.stop(); 697 return int(); 698 } 699 return my_cnt; 700 } 701 }; 702 703 void TestInputNode() { 704 tbb::flow::graph g; 705 tbb::flow::input_node<int> in(g, snode_body(4)); 706 INFO("Testing input_node:"); 707 tbb::flow::queue_node<int> qin(g); 708 tbb::flow::join_node<std::tuple<int,int>, tbb::flow::reserving> jn(g); 709 tbb::flow::queue_node<std::tuple<int,int> > qout(g); 710 711 INFO(" make_edges"); 712 tbb::flow::make_edge(in, tbb::flow::input_port<0>(jn)); 713 tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn)); 714 tbb::flow::make_edge(jn,qout); 715 CHECK_MESSAGE( (!in.my_successors.empty()), "input node has no successor after make_edge"); 716 g.wait_for_all(); 717 g.reset(); 718 CHECK_MESSAGE( (!in.my_successors.empty()), "input node has no successor after reset"); 719 g.wait_for_all(); 720 g.reset(tbb::flow::rf_clear_edges); 721 CHECK_MESSAGE( (in.my_successors.empty()), "input node has successor after reset(rf_clear_edges)"); 722 tbb::flow::make_edge(in, tbb::flow::input_port<0>(jn)); 723 tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn)); 724 tbb::flow::make_edge(jn,qout); 725 g.wait_for_all(); 726 INFO(" activate"); 727 in.activate(); // will forward to the fnode 728 INFO(" wait1"); 729 g.wait_for_all(); 730 CHECK_MESSAGE( (in.my_successors.empty()), "input node has no successor after forwarding message"); 731 g.reset(); 732 CHECK_MESSAGE( (!in.my_successors.empty()), "input_node has no successors after reset"); 733 CHECK_MESSAGE( (tbb::flow::input_port<0>(jn).my_predecessors.empty()), "successor of input_node has pred after reset."); 734 INFO(" done\n"); 735 } 736 737 //! Test buffering nodes 738 //! \brief \ref error_guessing 739 TEST_CASE("Test buffering nodes"){ 740 unsigned int MinThread = utils::MinThread; 741 if(MinThread < 3) MinThread = 3; 742 tbb::task_arena arena(MinThread); 743 arena.execute( 744 [&]() { 745 // tests presume at least three threads 746 TestBufferingNode< tbb::flow::buffer_node<int> >("buffer_node"); 747 TestBufferingNode< tbb::flow::priority_queue_node<int> >("priority_queue_node"); 748 TestBufferingNode< tbb::flow::queue_node<int> >("queue_node"); 749 } 750 ); 751 } 752 753 //! Test sequencer_node 754 //! \brief \ref error_guessing 755 TEST_CASE("Test sequencer node"){ 756 TestSequencerNode(); 757 } 758 759 TEST_SUITE("Test multifunction node") { 760 //! Test multifunction_node with rejecting policy 761 //! \brief \ref error_guessing 762 TEST_CASE("with rejecting policy"){ 763 TestMultifunctionNode<tbb::flow::rejecting>(); 764 } 765 766 //! Test multifunction_node with queueing policy 767 //! \brief \ref error_guessing 768 TEST_CASE("with queueing policy") { 769 TestMultifunctionNode<tbb::flow::queueing>(); 770 } 771 } 772 773 //! Test input_node 774 //! \brief \ref error_guessing 775 TEST_CASE("Test input node"){ 776 TestInputNode(); 777 } 778 779 //! Test continue_node 780 //! \brief \ref error_guessing 781 TEST_CASE("Test continue node"){ 782 TestContinueNode(); 783 } 784 785 //! Test function_node 786 //! \brief \ref error_guessing 787 TEST_CASE("Test function node" * doctest::may_fail()){ 788 TestFunctionNode(); 789 } 790 791 //! Test join_node 792 //! \brief \ref error_guessing 793 TEST_CASE("Test join node"){ 794 TestJoinNode(); 795 } 796 797 //! Test limiter_node 798 //! \brief \ref error_guessing 799 TEST_CASE("Test limiter node"){ 800 TestLimiterNode<void>(); 801 TestLimiterNode<int>(); 802 TestLimiterNode<tbb::flow::continue_msg>(); 803 } 804 805 //! Test indexer_node 806 //! \brief \ref error_guessing 807 TEST_CASE("Test indexer node"){ 808 TestIndexerNode(); 809 } 810 811 //! Test split_node 812 //! \brief \ref error_guessing 813 TEST_CASE("Test split node"){ 814 TestSplitNode(); 815 } 816 817 //! Test broadcast, overwrite, write_once nodes 818 //! \brief \ref error_guessing 819 TEST_CASE("Test scalar node"){ 820 TestScalarNode<tbb::flow::broadcast_node<int> >("broadcast_node"); 821 TestScalarNode<tbb::flow::overwrite_node<int> >("overwrite_node"); 822 TestScalarNode<tbb::flow::write_once_node<int> >("write_once_node"); 823 } 824 825 //! try_get in inactive graph 826 //! \brief \ref error_guessing 827 TEST_CASE("try_get in inactive graph"){ 828 tbb::flow::graph g; 829 830 tbb::flow::input_node<int> src(g, [&](tbb::flow_control& fc) { fc.stop(); return 0;}); 831 deactivate_graph(g); 832 833 int tmp = -1; 834 CHECK_MESSAGE((src.try_get(tmp) == false), "try_get can not succeed"); 835 836 src.activate(); 837 tmp = -1; 838 CHECK_MESSAGE((src.try_get(tmp) == false), "try_get can not succeed"); 839 } 840 841 //! Test make_edge in inactive graph 842 //! \brief \ref error_guessing 843 TEST_CASE("Test make_edge in inactive graph"){ 844 tbb::flow::graph g; 845 846 tbb::flow::continue_node<int> c(g, [](const tbb::flow::continue_msg&){ return 1; }); 847 848 tbb::flow::function_node<int, int> f(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); 849 850 c.try_put(tbb::flow::continue_msg()); 851 g.wait_for_all(); 852 853 deactivate_graph(g); 854 855 make_edge(c, f); 856 } 857 858 //! Test make_edge from overwrite_node in inactive graph 859 //! \brief \ref error_guessing 860 TEST_CASE("Test make_edge from overwrite_node in inactive graph"){ 861 tbb::flow::graph g; 862 863 tbb::flow::queue_node<int> q(g); 864 865 tbb::flow::overwrite_node<int> on(g); 866 867 on.try_put(1); 868 g.wait_for_all(); 869 870 deactivate_graph(g); 871 872 make_edge(on, q); 873 874 int tmp = -1; 875 CHECK_MESSAGE((q.try_get(tmp) == false), "Message should not be passed on"); 876 } 877 878 //! Test iterators directly 879 //! \brief \ref error_guessing 880 TEST_CASE("graph_iterator details"){ 881 tbb::flow::graph g; 882 const tbb::flow::graph cg; 883 884 tbb::flow::graph::iterator b = g.begin(); 885 tbb::flow::graph::iterator b2 = g.begin(); 886 ++b2; 887 // Cast to a volatile pointer to workaround self assignment warnings from some compilers. 888 tbb::flow::graph::iterator* volatile b2_ptr = &b2; 889 b2 = *b2_ptr; 890 b = b2; 891 CHECK_MESSAGE((b == b2), "Assignment should make iterators equal"); 892 } 893 894 //! const graph 895 //! \brief \ref error_guessing 896 TEST_CASE("const graph"){ 897 using namespace tbb::flow; 898 899 const graph g; 900 CHECK_MESSAGE((g.cbegin() == g.cend()), "Starting graph is empty"); 901 CHECK_MESSAGE((g.begin() == g.end()), "Starting graph is empty"); 902 903 graph g2; 904 CHECK_MESSAGE((g2.begin() == g2.end()), "Starting graph is empty"); 905 } 906 907 //! Send message to continue_node while graph is inactive 908 //! \brief \ref error_guessing 909 TEST_CASE("Send message to continue_node while graph is inactive") { 910 using namespace tbb::flow; 911 912 graph g; 913 914 continue_node<int> c(g, [](const continue_msg&){ return 1; }); 915 buffer_node<int> b(g); 916 917 make_edge(c, b); 918 919 deactivate_graph(g); 920 921 c.try_put(continue_msg()); 922 g.wait_for_all(); 923 924 int tmp = -1; 925 CHECK_MESSAGE((b.try_get(tmp) == false), "Message should not arrive"); 926 CHECK_MESSAGE((tmp == -1), "Value should not be altered"); 927 } 928 929 930 //! Bypass of a successor's message in a node with lightweight policy 931 //! \brief \ref error_guessing 932 TEST_CASE("Bypass of a successor's message in a node with lightweight policy") { 933 using namespace tbb::flow; 934 935 graph g; 936 937 auto body = [](const int&v)->int { return v * 2; }; 938 function_node<int, int, lightweight> f1(g, unlimited, body); 939 940 auto body2 = [](const int&v)->int {return v / 2;}; 941 function_node<int, int> f2(g, unlimited, body2); 942 943 buffer_node<int> b(g); 944 945 make_edge(f1, f2); 946 make_edge(f2, b); 947 948 f1.try_put(1); 949 g.wait_for_all(); 950 951 int tmp = -1; 952 CHECK_MESSAGE((b.try_get(tmp) == true), "Functional nodes can work in succession"); 953 CHECK_MESSAGE((tmp == 1), "Value should not be altered"); 954 } 955