1 /* 2 Copyright (c) 2005-2020 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/config.h" 18 19 #if _MSC_VER 20 #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning 21 #if _MSC_VER==1700 && !defined(__INTEL_COMPILER) 22 // Suppress "unreachable code" warning by VC++ 17.0 (VS 2012) 23 #pragma warning (disable: 4702) 24 #endif 25 #endif 26 27 // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these 28 // parts in all of tests might make testing of the product, which is different from what is actually 29 // released. 30 #define __TBB_EXTRA_DEBUG 1 31 32 // need these to get proper external names for private methods in library. 33 #include "tbb/spin_mutex.h" 34 #include "tbb/spin_rw_mutex.h" 35 #include "tbb/task_arena.h" 36 #include "tbb/task_group.h" 37 38 #define private public 39 #define protected public 40 #include "tbb/flow_graph.h" 41 #undef protected 42 #undef private 43 44 #include "common/test.h" 45 #include "common/utils.h" 46 #include "common/graph_utils.h" 47 48 #include <string> // merely prevents LNK2001 error to happen (on ICL+VC9 configurations) 49 50 51 //! \file test_flow_graph_whitebox.cpp 52 //! \brief Test for [flow_graph.broadcast_node flow_graph.priority_queue_node flow_graph.indexer_node flow_graph.sequencer_node flow_graph.remove_edge flow_graph.join_node flow_graph.split_node flow_graph.limiter_node flow_graph.write_once_node flow_graph.overwrite_node flow_graph.make_edge flow_graph.graph flow_graph.buffer_node flow_graph.function_node flow_graph.multifunction_node flow_graph.continue_node flow_graph.input_node] specification 53 54 template<typename T> 55 struct receiverBody { 56 tbb::flow::continue_msg operator()(const T &/*in*/) { 57 return tbb::flow::continue_msg(); 58 } 59 }; 60 61 // split_nodes cannot have predecessors 62 // they do not reject messages and always forward. 63 // they reject edge reversals from successors. 64 void TestSplitNode() { 65 typedef tbb::flow::split_node<std::tuple<int> > snode_type; 66 tbb::flow::graph g; 67 snode_type snode(g); 68 tbb::flow::function_node<int> rcvr(g,tbb::flow::unlimited, receiverBody<int>()); 69 INFO("Testing split_node\n"); 70 CHECK_MESSAGE( (tbb::flow::output_port<0>(snode).my_successors.empty()), "Constructed split_node has successors"); 71 // tbb::flow::output_port<0>(snode) 72 tbb::flow::make_edge(tbb::flow::output_port<0>(snode), rcvr); 73 CHECK_MESSAGE( (!(tbb::flow::output_port<0>(snode).my_successors.empty())), "after make_edge, split_node has no successor."); 74 snode.try_put(std::tuple<int>(1)); 75 g.wait_for_all(); 76 g.reset(); 77 CHECK_MESSAGE( (!(tbb::flow::output_port<0>(snode).my_successors.empty())), "after reset(), split_node has no successor."); 78 g.reset(tbb::flow::rf_clear_edges); 79 CHECK_MESSAGE( (tbb::flow::output_port<0>(snode).my_successors.empty()), "after reset(rf_clear_edges), split_node has a successor."); 80 } 81 82 // buffering nodes cannot have predecessors 83 // they do not reject messages and always save or forward 84 // they allow edge reversals from successors 85 template< typename B > 86 void TestBufferingNode(const char * name) { 87 tbb::flow::graph g; 88 B bnode(g); 89 tbb::flow::function_node<int,int,tbb::flow::rejecting> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); 90 INFO("Testing " << name << ":"); 91 for(int icnt = 0; icnt < 2; icnt++) { 92 bool reverse_edge = (icnt & 0x2) != 0; 93 serial_fn_state0 = 0; // reset to waiting state. 94 INFO(" make_edge"); 95 tbb::flow::make_edge(bnode, fnode); 96 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after make_edge"); 97 INFO(" try_put"); 98 bnode.try_put(1); // will forward to the fnode 99 BACKOFF_WAIT(serial_fn_state0 == 0, "Timed out waiting for first put"); 100 if(reverse_edge) { 101 INFO(" try_put2"); 102 bnode.try_put(2); // will reverse the edge 103 // cannot do a wait_for_all here; the function_node is still executing 104 BACKOFF_WAIT(!bnode.my_successors.empty(), "Timed out waiting after 2nd put"); 105 // at this point the only task running is the one for the function_node. 106 CHECK_MESSAGE( (bnode.my_successors.empty()), "successor not removed"); 107 } 108 else { 109 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after forwarding message"); 110 } 111 serial_fn_state0 = 0; // release the function_node. 112 if(reverse_edge) { 113 // have to do a second release because the function_node will get the 2nd item 114 BACKOFF_WAIT( serial_fn_state0 == 0, "Timed out waiting after 2nd put"); 115 serial_fn_state0 = 0; // release the function_node. 116 } 117 g.wait_for_all(); 118 INFO(" remove_edge"); 119 tbb::flow::remove_edge(bnode, fnode); 120 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after remove_edge"); 121 } 122 tbb::flow::join_node<std::tuple<int,int>,tbb::flow::reserving> jnode(g); 123 tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode)); // will spawn a task 124 g.wait_for_all(); 125 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after attaching to join"); 126 INFO(" reverse"); 127 bnode.try_put(1); // the edge should reverse 128 g.wait_for_all(); 129 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving"); 130 INFO(" reset()"); 131 g.wait_for_all(); 132 g.reset(); // should be in forward direction again 133 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after reset()"); 134 INFO(" remove_edge"); 135 g.reset(tbb::flow::rf_clear_edges); 136 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)"); 137 tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode)); // add edge again 138 // reverse edge by adding to buffer. 139 bnode.try_put(1); // the edge should reverse 140 g.wait_for_all(); 141 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving"); 142 INFO(" remove_edge(reversed)"); 143 g.reset(tbb::flow::rf_clear_edges); 144 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has no successor after reset()"); 145 CHECK_MESSAGE( (tbb::flow::input_port<0>(jnode).my_predecessors.empty()), "predecessor not reset"); 146 INFO(" done\n"); 147 g.wait_for_all(); 148 } 149 150 // continue_node has only predecessor count 151 // they do not have predecessors, only the counts 152 // successor edges cannot be reversed 153 void TestContinueNode() { 154 tbb::flow::graph g; 155 tbb::flow::function_node<int> fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); 156 tbb::flow::continue_node<int> cnode(g, 1, serial_continue_body<int>(serial_continue_state0)); 157 tbb::flow::function_node<int> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state1)); 158 tbb::flow::make_edge(fnode0, cnode); 159 tbb::flow::make_edge(cnode, fnode1); 160 INFO("Testing continue_node:"); 161 for( int icnt = 0; icnt < 2; ++icnt ) { 162 INFO( " initial" << icnt); 163 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor addition didn't increment count"); 164 CHECK_MESSAGE( (!cnode.successors().empty()), "successors empty though we added one"); 165 CHECK_MESSAGE( (cnode.my_current_count == 0), "state of continue_receiver incorrect"); 166 serial_continue_state0 = 0; 167 serial_fn_state0 = 0; 168 serial_fn_state1 = 0; 169 170 fnode0.try_put(1); // start the first function node. 171 BACKOFF_WAIT(!serial_fn_state0, "Timed out waiting for function_node to start"); 172 // Now the body of function_node 0 is executing. 173 serial_fn_state0 = 0; // release the node 174 // wait for node to count the message (or for the node body to execute, which would be wrong) 175 BACKOFF_WAIT(serial_continue_state0 == 0 && cnode.my_current_count == 0, "Timed out waiting for continue_state0 to change"); 176 CHECK_MESSAGE( (serial_continue_state0 == 0), "Improperly released continue_node"); 177 CHECK_MESSAGE( (cnode.my_current_count == 1), "state of continue_receiver incorrect"); 178 if(icnt == 0) { // first time through, let the continue_node fire 179 INFO(" firing"); 180 fnode0.try_put(1); // second message 181 BACKOFF_WAIT(serial_fn_state0 == 0, "timeout waiting for continue_body to execute"); 182 // Now the body of function_node 0 is executing. 183 serial_fn_state0 = 0; // release the node 184 185 BACKOFF_WAIT(!serial_continue_state0,"continue_node didn't start"); // now we wait for the continue_node. 186 CHECK_MESSAGE( (cnode.my_current_count == 0), " my_current_count not reset before body of continue_node started"); 187 serial_continue_state0 = 0; // release the continue_node 188 BACKOFF_WAIT(!serial_fn_state1,"successor function_node didn't start"); // wait for the successor function_node to enter body 189 serial_fn_state1 = 0; // release successor function_node. 190 g.wait_for_all(); 191 192 // try a try_get() 193 { 194 int i; 195 CHECK_MESSAGE( (!cnode.try_get(i)), "try_get not rejected"); 196 } 197 198 INFO(" reset"); 199 CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (before reset)"); 200 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (before reset)"); 201 g.reset(); // should still be the same 202 CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (after reset)" ); 203 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (after reset)"); 204 } 205 else { // we're going to see if the rf_clear_edges resets things. 206 g.wait_for_all(); 207 INFO(" reset(rf_clear_edges)"); 208 CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (before reset)"); 209 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (before reset)"); 210 g.reset(tbb::flow::rf_clear_edges); // should be in forward direction again 211 CHECK_MESSAGE( (cnode.my_current_count == 0), "state of continue_receiver incorrect after reset(rf_clear_edges)"); 212 CHECK_MESSAGE( (cnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)"); 213 CHECK_MESSAGE( (cnode.my_predecessor_count == cnode.my_initial_predecessor_count), "predecessor count not reset"); 214 } 215 } 216 217 INFO(" done\n"); 218 219 } 220 221 // function_node has predecessors and successors 222 // try_get() rejects 223 // successor edges cannot be reversed 224 // predecessors will reverse (only rejecting will reverse) 225 void TestFunctionNode() { 226 tbb::flow::graph g; 227 tbb::flow::queue_node<int> qnode0(g); 228 tbb::flow::function_node<int,int, tbb::flow::rejecting > fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); 229 // queueing function node 230 tbb::flow::function_node<int,int> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); 231 232 tbb::flow::queue_node<int> qnode1(g); 233 234 tbb::flow::make_edge(fnode0, qnode1); 235 tbb::flow::make_edge(qnode0, fnode0); 236 237 serial_fn_state0 = 2; // just let it go 238 // see if the darned thing will work.... 239 qnode0.try_put(1); 240 g.wait_for_all(); 241 int ii; 242 CHECK_MESSAGE( (qnode1.try_get(ii) && ii == 1), "output not passed"); 243 tbb::flow::remove_edge(qnode0, fnode0); 244 tbb::flow::remove_edge(fnode0, qnode1); 245 246 tbb::flow::make_edge(fnode1, qnode1); 247 tbb::flow::make_edge(qnode0, fnode1); 248 249 serial_fn_state0 = 2; // just let it go 250 // see if the darned thing will work.... 251 qnode0.try_put(1); 252 g.wait_for_all(); 253 CHECK_MESSAGE( (qnode1.try_get(ii) && ii == 1), "output not passed"); 254 tbb::flow::remove_edge(qnode0, fnode1); 255 tbb::flow::remove_edge(fnode1, qnode1); 256 257 // rejecting 258 serial_fn_state0 = 0; 259 tbb::flow::make_edge(fnode0, qnode1); 260 tbb::flow::make_edge(qnode0, fnode0); 261 INFO("Testing rejecting function_node:"); 262 CHECK_MESSAGE( (!fnode0.my_queue), "node should have no queue"); 263 CHECK_MESSAGE( (!fnode0.my_successors.empty()), "successor edge not added"); 264 qnode0.try_put(1); 265 BACKOFF_WAIT(!serial_fn_state0,"rejecting function_node didn't start"); 266 qnode0.try_put(2); // rejecting node should reject, reverse. 267 BACKOFF_WAIT(fnode0.my_predecessors.empty(), "Missing predecessor ---"); 268 serial_fn_state0 = 2; // release function_node body. 269 g.wait_for_all(); 270 INFO(" reset"); 271 g.reset(); // should reverse the edge from the input to the function node. 272 CHECK_MESSAGE( (!qnode0.my_successors.empty()), "empty successors after reset()"); 273 CHECK_MESSAGE( (fnode0.my_predecessors.empty()), "predecessor not reversed"); 274 tbb::flow::remove_edge(qnode0, fnode0); 275 tbb::flow::remove_edge(fnode0, qnode1); 276 INFO("\n"); 277 278 // queueing 279 tbb::flow::make_edge(fnode1, qnode1); 280 INFO("Testing queueing function_node:"); 281 CHECK_MESSAGE( (fnode1.my_queue), "node should have no queue"); 282 CHECK_MESSAGE( (!fnode1.my_successors.empty()), "successor edge not added"); 283 INFO(" add_pred"); 284 CHECK_MESSAGE( (fnode1.register_predecessor(qnode0)), "Cannot register as predecessor"); 285 CHECK_MESSAGE( (!fnode1.my_predecessors.empty()), "Missing predecessor"); 286 INFO(" reset"); 287 g.wait_for_all(); 288 g.reset(); // should reverse the edge from the input to the function node. 289 CHECK_MESSAGE( (!qnode0.my_successors.empty()), "empty successors after reset()"); 290 CHECK_MESSAGE( (fnode1.my_predecessors.empty()), "predecessor not reversed"); 291 tbb::flow::remove_edge(qnode0, fnode1); 292 tbb::flow::remove_edge(fnode1, qnode1); 293 INFO("\n"); 294 295 serial_fn_state0 = 0; // make the function_node wait 296 tbb::flow::make_edge(qnode0, fnode0); 297 INFO(" start_func"); 298 qnode0.try_put(1); 299 BACKOFF_WAIT(serial_fn_state0 == 0, "Timed out waiting after 1st put"); 300 // now if we put an item to the queues the edges to the function_node will reverse. 301 INFO(" put_node(2)"); 302 qnode0.try_put(2); // start queue node. 303 // wait for the edges to reverse 304 BACKOFF_WAIT(fnode0.my_predecessors.empty(), "Timed out waiting"); 305 CHECK_MESSAGE( (!fnode0.my_predecessors.empty()), "function_node edge not reversed"); 306 g.my_context->cancel_group_execution(); 307 // release the function_node 308 serial_fn_state0 = 2; 309 g.wait_for_all(); 310 CHECK_MESSAGE( (!fnode0.my_predecessors.empty() && qnode0.my_successors.empty()), "function_node edge not reversed"); 311 g.reset(tbb::flow::rf_clear_edges); 312 CHECK_MESSAGE( (fnode0.my_predecessors.empty() && qnode0.my_successors.empty()), "function_node edge not removed"); 313 CHECK_MESSAGE( (fnode0.my_successors.empty()), "successor to fnode not removed"); 314 INFO(" done\n"); 315 } 316 317 template<typename TT> 318 class tag_func { 319 TT my_mult; 320 public: 321 tag_func(TT multiplier) : my_mult(multiplier) { } 322 // operator() will return [0 .. Count) 323 tbb::flow::tag_value operator()( TT v) { 324 tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult); 325 return t; 326 } 327 }; 328 329 template<typename JNODE_TYPE> 330 void 331 TestSimpleSuccessorArc(const char *name) { 332 tbb::flow::graph g; 333 { 334 INFO("Join<" << name << "> successor test "); 335 tbb::flow::join_node<std::tuple<int>, JNODE_TYPE> qj(g); 336 tbb::flow::broadcast_node<std::tuple<int> > bnode(g); 337 tbb::flow::make_edge(qj, bnode); 338 CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after linking"); 339 g.reset(); 340 CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after reset()"); 341 g.reset(tbb::flow::rf_clear_edges); 342 CHECK_MESSAGE( (qj.my_successors.empty()), "successors not removed after reset(rf_clear_edges)"); 343 } 344 } 345 346 template<> 347 void 348 TestSimpleSuccessorArc<tbb::flow::tag_matching>(const char *name) { 349 tbb::flow::graph g; 350 { 351 INFO("Join<" << name << "> successor test "); 352 typedef std::tuple<int,int> my_tuple; 353 tbb::flow::join_node<my_tuple, tbb::flow::tag_matching> qj(g, 354 tag_func<int>(1), 355 tag_func<int>(1) 356 ); 357 tbb::flow::broadcast_node<my_tuple > bnode(g); 358 tbb::flow::make_edge(qj, bnode); 359 CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after linking"); 360 g.reset(); 361 CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after reset()"); 362 g.reset(tbb::flow::rf_clear_edges); 363 CHECK_MESSAGE( (qj.my_successors.empty()), "successors not removed after reset(rf_clear_edges)"); 364 } 365 } 366 367 void 368 TestJoinNode() { 369 tbb::flow::graph g; 370 371 TestSimpleSuccessorArc<tbb::flow::queueing>("queueing"); 372 TestSimpleSuccessorArc<tbb::flow::reserving>("reserving"); 373 TestSimpleSuccessorArc<tbb::flow::tag_matching>("tag_matching"); 374 375 // queueing and tagging join nodes have input queues, so the input ports do not reverse. 376 INFO(" reserving preds"); 377 { 378 tbb::flow::join_node<std::tuple<int,int>, tbb::flow::reserving> rj(g); 379 tbb::flow::queue_node<int> q0(g); 380 tbb::flow::queue_node<int> q1(g); 381 tbb::flow::make_edge(q0,tbb::flow::input_port<0>(rj)); 382 tbb::flow::make_edge(q1,tbb::flow::input_port<1>(rj)); 383 q0.try_put(1); 384 g.wait_for_all(); // quiesce 385 CHECK_MESSAGE( (!(tbb::flow::input_port<0>(rj).my_predecessors.empty())),"reversed port missing predecessor"); 386 CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"non-reversed port has pred"); 387 g.reset(); 388 CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"reversed port has pred after reset()"); 389 CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()"); 390 q1.try_put(2); 391 g.wait_for_all(); // quiesce 392 CHECK_MESSAGE( (!(tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port missing predecessor"); 393 CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred"); 394 g.reset(); 395 CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port has pred after reset()"); 396 CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()"); 397 // should reset predecessors just as regular reset. 398 q1.try_put(3); 399 g.wait_for_all(); // quiesce 400 CHECK_MESSAGE( (!(tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port missing predecessor"); 401 CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred"); 402 g.reset(tbb::flow::rf_clear_edges); 403 CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port has pred after reset()"); 404 CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()"); 405 CHECK_MESSAGE( (q0.my_successors.empty()), "edge not removed by reset(rf_clear_edges)"); 406 CHECK_MESSAGE( (q1.my_successors.empty()), "edge not removed by reset(rf_clear_edges)"); 407 } 408 INFO(" done\n"); 409 } 410 411 template <typename DecrementerType> 412 struct limiter_node_type { 413 using type = tbb::flow::limiter_node<int, DecrementerType>; 414 using dtype = DecrementerType; 415 }; 416 417 template <> 418 struct limiter_node_type<void> { 419 using type = tbb::flow::limiter_node<int>; 420 using dtype = tbb::flow::continue_msg; 421 }; 422 423 template <typename DType> 424 struct DecrementerHelper { 425 template <typename Decrementer> 426 static void check(Decrementer&) {} 427 static DType makeDType() { 428 return DType(1); 429 } 430 }; 431 432 template <> 433 struct DecrementerHelper<tbb::flow::continue_msg> { 434 template <typename Decrementer> 435 static void check(Decrementer& decrementer) { 436 auto& d = static_cast<tbb::detail::d1::continue_receiver&>(decrementer); 437 CHECK_MESSAGE(d.my_predecessor_count == 0, "error in pred count"); 438 CHECK_MESSAGE(d.my_initial_predecessor_count == 0, "error in initial pred count"); 439 CHECK_MESSAGE(d.my_current_count == 0, "error in current count"); 440 } 441 static tbb::flow::continue_msg makeDType() { 442 return tbb::flow::continue_msg(); 443 } 444 }; 445 446 template <typename DecrementerType> 447 void TestLimiterNode() { 448 int out_int{}; 449 tbb::flow::graph g; 450 using dtype = typename limiter_node_type<DecrementerType>::dtype; 451 typename limiter_node_type<DecrementerType>::type ln(g,1); 452 INFO("Testing limiter_node: preds and succs"); 453 DecrementerHelper<dtype>::check(ln.decrementer()); 454 CHECK_MESSAGE( (ln.my_threshold == 1), "error in my_threshold"); 455 tbb::flow::queue_node<int> inq(g); 456 tbb::flow::queue_node<int> outq(g); 457 tbb::flow::broadcast_node<dtype> bn(g); 458 459 tbb::flow::make_edge(inq,ln); 460 tbb::flow::make_edge(ln,outq); 461 tbb::flow::make_edge(bn,ln.decrementer()); 462 463 g.wait_for_all(); 464 CHECK_MESSAGE( (!(ln.my_successors.empty())),"successors empty after make_edge"); 465 CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge reversed"); 466 inq.try_put(1); 467 g.wait_for_all(); 468 CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 1), "limiter_node didn't pass first value"); 469 CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge reversed"); 470 inq.try_put(2); 471 g.wait_for_all(); 472 CHECK_MESSAGE( (!outq.try_get(out_int)), "limiter_node incorrectly passed second input"); 473 CHECK_MESSAGE( (!ln.my_predecessors.empty()), "input edge to limiter_node not reversed"); 474 bn.try_put(DecrementerHelper<dtype>::makeDType()); 475 g.wait_for_all(); 476 CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 2), "limiter_node didn't pass second value"); 477 g.wait_for_all(); 478 CHECK_MESSAGE( (!ln.my_predecessors.empty()), "input edge was reversed(after try_get())"); 479 g.reset(); 480 CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge not reset"); 481 inq.try_put(3); 482 g.wait_for_all(); 483 CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 3), "limiter_node didn't pass third value"); 484 485 INFO(" rf_clear_edges"); 486 // currently the limiter_node will not pass another message 487 g.reset(tbb::flow::rf_clear_edges); 488 DecrementerHelper<dtype>::check(ln.decrementer()); 489 CHECK_MESSAGE( (ln.my_threshold == 1), "error in my_threshold"); 490 CHECK_MESSAGE( (ln.my_predecessors.empty()), "preds not reset(rf_clear_edges)"); 491 CHECK_MESSAGE( (ln.my_successors.empty()), "preds not reset(rf_clear_edges)"); 492 CHECK_MESSAGE( (inq.my_successors.empty()), "Arc not removed on reset(rf_clear_edges)"); 493 CHECK_MESSAGE( (inq.my_successors.empty()), "Arc not removed on reset(rf_clear_edges)"); 494 CHECK_MESSAGE( (bn.my_successors.empty()), "control edge not removed on reset(rf_clear_edges)"); 495 tbb::flow::make_edge(inq,ln); 496 tbb::flow::make_edge(ln,outq); 497 inq.try_put(4); 498 inq.try_put(5); 499 g.wait_for_all(); 500 CHECK_MESSAGE( (outq.try_get(out_int)),"missing output after reset(rf_clear_edges)"); 501 CHECK_MESSAGE( (out_int == 4), "input incorrect (4)"); 502 bn.try_put(DecrementerHelper<dtype>::makeDType()); 503 g.wait_for_all(); 504 CHECK_MESSAGE( (!outq.try_get(out_int)),"second output incorrectly passed (rf_clear_edges)"); 505 INFO(" done\n"); 506 } 507 508 template<typename MF_TYPE> 509 struct mf_body { 510 std::atomic<int> *_flag; 511 mf_body( std::atomic<int> &myatomic) : _flag(&myatomic) { } 512 void operator()( const int& in, typename MF_TYPE::output_ports_type &outports) { 513 if(_flag->load(std::memory_order_acquire) == 0) { 514 _flag->store(1, std::memory_order_release); 515 BACKOFF_WAIT(_flag->load(std::memory_order_acquire) == 1, "multifunction_node not released"); 516 } 517 518 if(in & 0x1) std::get<1>(outports).try_put(in); 519 else std::get<0>(outports).try_put(in); 520 } 521 }; 522 523 template<typename P, typename T> 524 struct test_reversal; 525 template<typename T> 526 struct test_reversal<tbb::flow::queueing, T> { 527 test_reversal() { INFO("<queueing>"); } 528 // queueing node will not reverse. 529 bool operator()( T &node) { return node.my_predecessors.empty(); } 530 }; 531 532 template<typename T> 533 struct test_reversal<tbb::flow::rejecting, T> { 534 test_reversal() { INFO("<rejecting>"); } 535 bool operator()( T &node) { return !node.my_predecessors.empty(); } 536 }; 537 538 template<typename P> 539 void 540 TestMultifunctionNode() { 541 typedef tbb::flow::multifunction_node<int, std::tuple<int, int>, P> multinode_type; 542 INFO("Testing multifunction_node"); 543 test_reversal<P,multinode_type> my_test; 544 INFO(":"); 545 tbb::flow::graph g; 546 multinode_type mf(g, tbb::flow::serial, mf_body<multinode_type>(serial_fn_state0)); 547 tbb::flow::queue_node<int> qin(g); 548 tbb::flow::queue_node<int> qodd_out(g); 549 tbb::flow::queue_node<int> qeven_out(g); 550 tbb::flow::make_edge(qin,mf); 551 tbb::flow::make_edge(tbb::flow::output_port<0>(mf), qeven_out); 552 tbb::flow::make_edge(tbb::flow::output_port<1>(mf), qodd_out); 553 g.wait_for_all(); 554 for( int ii = 0; ii < 2 ; ++ii) { 555 serial_fn_state0 = 0; 556 /* if(ii == 0) REMARK(" reset preds"); else REMARK(" 2nd");*/ 557 qin.try_put(0); 558 // wait for node to be active 559 BACKOFF_WAIT(serial_fn_state0 == 0, "timed out waiting for first put"); 560 qin.try_put(1); 561 BACKOFF_WAIT((!my_test(mf)), "Timed out waiting"); 562 CHECK_MESSAGE( (my_test(mf)), "fail second put test"); 563 g.my_context->cancel_group_execution(); 564 // release node 565 serial_fn_state0 = 2; 566 g.wait_for_all(); 567 CHECK_MESSAGE( (my_test(mf)), "fail cancel group test"); 568 if( ii == 1) { 569 INFO(" rf_clear_edges"); 570 g.reset(tbb::flow::rf_clear_edges); 571 CHECK_MESSAGE( (tbb::flow::output_port<0>(mf).my_successors.empty()), "output_port<0> not reset (rf_clear_edges)"); 572 CHECK_MESSAGE( (tbb::flow::output_port<1>(mf).my_successors.empty()), "output_port<1> not reset (rf_clear_edges)"); 573 } 574 else 575 { 576 g.reset(); 577 } 578 CHECK_MESSAGE( (mf.my_predecessors.empty()), "edge didn't reset"); 579 CHECK_MESSAGE( ((ii == 0 && !qin.my_successors.empty()) || (ii == 1 && qin.my_successors.empty())), "edge didn't reset"); 580 } 581 INFO(" done\n"); 582 } 583 584 // indexer_node is like a broadcast_node, in that none of its inputs reverse, and it 585 // never allows a successor to reverse its edge, so we only need test the successors. 586 void 587 TestIndexerNode() { 588 tbb::flow::graph g; 589 typedef tbb::flow::indexer_node< int, int > indexernode_type; 590 indexernode_type inode(g); 591 INFO("Testing indexer_node:"); 592 tbb::flow::queue_node<indexernode_type::output_type> qout(g); 593 tbb::flow::make_edge(inode,qout); 594 g.wait_for_all(); 595 CHECK_MESSAGE( (!inode.my_successors.empty()), "successor of indexer_node missing"); 596 g.reset(); 597 CHECK_MESSAGE( (!inode.my_successors.empty()), "successor of indexer_node missing after reset"); 598 g.reset(tbb::flow::rf_clear_edges); 599 CHECK_MESSAGE( (inode.my_successors.empty()), "successor of indexer_node not removed by reset(rf_clear_edges)"); 600 INFO(" done\n"); 601 } 602 603 template<typename Node> 604 void 605 TestScalarNode(const char *name) { 606 tbb::flow::graph g; 607 Node on(g); 608 tbb::flow::queue_node<int> qout(g); 609 INFO("Testing " << name << ":"); 610 tbb::flow::make_edge(on,qout); 611 g.wait_for_all(); 612 CHECK_MESSAGE( (!on.my_successors.empty()), "edge not added"); 613 g.reset(); 614 CHECK_MESSAGE( (!on.my_successors.empty()), "edge improperly removed"); 615 g.reset(tbb::flow::rf_clear_edges); 616 CHECK_MESSAGE( (on.my_successors.empty()), "edge not removed by reset(rf_clear_edges)"); 617 INFO(" done\n"); 618 } 619 620 struct seq_body { 621 size_t operator()(const int &in) { 622 return size_t(in / 3); 623 } 624 }; 625 626 // sequencer_node behaves like a queueing node, but requires a different constructor. 627 void 628 TestSequencerNode() { 629 tbb::flow::graph g; 630 tbb::flow::sequencer_node<int> bnode(g, seq_body()); 631 INFO("Testing sequencer_node:"); 632 tbb::flow::function_node<int> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); 633 INFO("Testing sequencer_node:"); 634 serial_fn_state0 = 0; // reset to waiting state. 635 INFO(" make_edge"); 636 tbb::flow::make_edge(bnode, fnode); 637 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after make_edge"); 638 INFO(" try_put"); 639 bnode.try_put(0); // will forward to the fnode 640 BACKOFF_WAIT( serial_fn_state0 == 0, "timeout waiting for function_node"); // wait for the function_node to fire up 641 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after forwarding message"); 642 serial_fn_state0 = 0; 643 g.wait_for_all(); 644 INFO(" remove_edge"); 645 tbb::flow::remove_edge(bnode, fnode); 646 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after remove_edge"); 647 tbb::flow::join_node<std::tuple<int,int>,tbb::flow::reserving> jnode(g); 648 tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode)); // will spawn a task 649 g.wait_for_all(); 650 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after attaching to join"); 651 INFO(" reverse"); 652 bnode.try_put(3); // the edge should reverse 653 g.wait_for_all(); 654 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving"); 655 INFO(" reset()"); 656 g.wait_for_all(); 657 g.reset(); // should be in forward direction again 658 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after reset()"); 659 INFO(" remove_edge"); 660 g.reset(tbb::flow::rf_clear_edges); // should be in forward direction again 661 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)"); 662 CHECK_MESSAGE( (fnode.my_predecessors.empty()), "buffering node reversed after reset(rf_clear_edges)"); 663 INFO(" done\n"); 664 g.wait_for_all(); 665 } 666 667 struct snode_body { 668 int max_cnt; 669 int my_cnt; 670 snode_body( const int &in) : max_cnt(in) { my_cnt = 0; } 671 int operator()(tbb::flow_control &fc) { 672 if(max_cnt <= my_cnt++) { 673 fc.stop(); 674 return int(); 675 } 676 return my_cnt; 677 } 678 }; 679 680 void 681 TestInputNode() { 682 tbb::flow::graph g; 683 tbb::flow::input_node<int> in(g, snode_body(4)); 684 INFO("Testing input_node:"); 685 tbb::flow::queue_node<int> qin(g); 686 tbb::flow::join_node<std::tuple<int,int>, tbb::flow::reserving> jn(g); 687 tbb::flow::queue_node<std::tuple<int,int> > qout(g); 688 689 INFO(" make_edges"); 690 tbb::flow::make_edge(in, tbb::flow::input_port<0>(jn)); 691 tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn)); 692 tbb::flow::make_edge(jn,qout); 693 CHECK_MESSAGE( (!in.my_successors.empty()), "input node has no successor after make_edge"); 694 g.wait_for_all(); 695 g.reset(); 696 CHECK_MESSAGE( (!in.my_successors.empty()), "input node has no successor after reset"); 697 g.wait_for_all(); 698 g.reset(tbb::flow::rf_clear_edges); 699 CHECK_MESSAGE( (in.my_successors.empty()), "input node has successor after reset(rf_clear_edges)"); 700 tbb::flow::make_edge(in, tbb::flow::input_port<0>(jn)); 701 tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn)); 702 tbb::flow::make_edge(jn,qout); 703 g.wait_for_all(); 704 INFO(" activate"); 705 in.activate(); // will forward to the fnode 706 INFO(" wait1"); 707 BACKOFF_WAIT( !in.my_successors.empty(), "Timed out waiting for edge to reverse"); 708 CHECK_MESSAGE( (in.my_successors.empty()), "input node has no successor after forwarding message"); 709 710 g.wait_for_all(); 711 g.reset(); 712 CHECK_MESSAGE( (!in.my_successors.empty()), "input_node has no successors after reset"); 713 CHECK_MESSAGE( (tbb::flow::input_port<0>(jn).my_predecessors.empty()), "successor of input_node has pred after reset."); 714 INFO(" done\n"); 715 } 716 717 //! Test buffering nodes 718 //! \brief \ref error_guessing 719 TEST_CASE("Test buffering nodes"){ 720 unsigned int MinThread = utils::MinThread; 721 if(MinThread < 3) MinThread = 3; 722 tbb::task_arena arena(MinThread); 723 arena.execute( 724 [&]() { 725 // tests presume at least three threads 726 727 TestBufferingNode< tbb::flow::buffer_node<int> >("buffer_node"); 728 TestBufferingNode< tbb::flow::priority_queue_node<int> >("priority_queue_node"); 729 TestBufferingNode< tbb::flow::queue_node<int> >("queue_node"); 730 731 } 732 ); 733 } 734 735 //! Test sequencer_node 736 //! \brief \ref error_guessing 737 TEST_CASE("Test sequencer node"){ 738 TestSequencerNode(); 739 } 740 741 TEST_SUITE("Test multifunction node") { 742 //! Test multifunction_node with rejecting policy 743 //! \brief \ref error_guessing 744 TEST_CASE("with rejecting policy"){ 745 TestMultifunctionNode<tbb::flow::rejecting>(); 746 } 747 748 //! Test multifunction_node with queueing policy 749 //! \brief \ref error_guessing 750 TEST_CASE("with queueing policy") { 751 TestMultifunctionNode<tbb::flow::queueing>(); 752 } 753 } 754 755 //! Test input_node 756 //! \brief \ref error_guessing 757 TEST_CASE("Test input node"){ 758 TestInputNode(); 759 } 760 761 //! Test continue_node 762 //! \brief \ref error_guessing 763 TEST_CASE("Test continue node"){ 764 TestContinueNode(); 765 } 766 767 //! Test function_node 768 //! \brief \ref error_guessing 769 TEST_CASE("Test function node" * doctest::may_fail()){ 770 TestFunctionNode(); 771 } 772 773 //! Test join_node 774 //! \brief \ref error_guessing 775 TEST_CASE("Test join node"){ 776 TestJoinNode(); 777 } 778 779 //! Test limiter_node 780 //! \brief \ref error_guessing 781 TEST_CASE("Test limiter node"){ 782 TestLimiterNode<void>(); 783 TestLimiterNode<int>(); 784 TestLimiterNode<tbb::flow::continue_msg>(); 785 } 786 787 //! Test indexer_node 788 //! \brief \ref error_guessing 789 TEST_CASE("Test indexer node"){ 790 TestIndexerNode(); 791 } 792 793 //! Test split_node 794 //! \brief \ref error_guessing 795 TEST_CASE("Test split node"){ 796 TestSplitNode(); 797 } 798 799 //! Test broadcast, overwrite, write_once nodes 800 //! \brief \ref error_guessing 801 TEST_CASE("Test scalar node"){ 802 TestScalarNode<tbb::flow::broadcast_node<int> >("broadcast_node"); 803 TestScalarNode<tbb::flow::overwrite_node<int> >("overwrite_node"); 804 TestScalarNode<tbb::flow::write_once_node<int> >("write_once_node"); 805 } 806 807 //! try_get in inactive graph 808 //! \brief \ref error_guessing 809 TEST_CASE("try_get in inactive graph"){ 810 tbb::flow::graph g; 811 812 tbb::flow::input_node<int> src(g, [&](tbb::flow_control& fc) -> bool { fc.stop(); return 0;}); 813 deactivate_graph(g); 814 815 int tmp = -1; 816 CHECK_MESSAGE((src.try_get(tmp) == false), "try_get can not succeed"); 817 818 src.activate(); 819 tmp = -1; 820 CHECK_MESSAGE((src.try_get(tmp) == false), "try_get can not succeed"); 821 } 822 823 //! Test make_edge in inactive graph 824 //! \brief \ref error_guessing 825 TEST_CASE("Test make_edge in inactive graph"){ 826 tbb::flow::graph g; 827 828 tbb::flow::continue_node<int> c(g, [](const tbb::flow::continue_msg&){ return 1; }); 829 830 tbb::flow::function_node<int, int> f(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); 831 832 c.try_put(tbb::flow::continue_msg()); 833 g.wait_for_all(); 834 835 deactivate_graph(g); 836 837 make_edge(c, f); 838 } 839 840 //! Test make_edge from overwrite_node in inactive graph 841 //! \brief \ref error_guessing 842 TEST_CASE("Test make_edge from overwrite_node in inactive graph"){ 843 tbb::flow::graph g; 844 845 tbb::flow::queue_node<int> q(g); 846 847 tbb::flow::overwrite_node<int> on(g); 848 849 on.try_put(1); 850 g.wait_for_all(); 851 852 deactivate_graph(g); 853 854 make_edge(on, q); 855 856 int tmp = -1; 857 CHECK_MESSAGE((q.try_get(tmp) == false), "Message should not be passed on"); 858 } 859 860 //! Test iterators directly 861 //! \brief \ref error_guessing 862 TEST_CASE("graph_iterator details"){ 863 tbb::flow::graph g; 864 const tbb::flow::graph cg; 865 866 tbb::flow::graph::iterator b = g.begin(); 867 tbb::flow::graph::iterator b2 = g.begin(); 868 ++b2; 869 // Cast to a volatile pointer to workaround self assignment warnings from some compilers. 870 tbb::flow::graph::iterator* volatile b2_ptr = &b2; 871 b2 = *b2_ptr; 872 b = b2; 873 CHECK_MESSAGE((b == b2), "Assignment should make iterators equal"); 874 } 875 876 //! const graph 877 //! \brief \ref error_guessing 878 TEST_CASE("const graph"){ 879 using namespace tbb::flow; 880 881 const graph g; 882 CHECK_MESSAGE((g.cbegin() == g.cend()), "Starting graph is empty"); 883 CHECK_MESSAGE((g.begin() == g.end()), "Starting graph is empty"); 884 885 graph g2; 886 CHECK_MESSAGE((g2.begin() == g2.end()), "Starting graph is empty"); 887 } 888 889 //! Send message to continue_node while graph is inactive 890 //! \brief \ref error_guessing 891 TEST_CASE("Send message to continue_node while graph is inactive") { 892 using namespace tbb::flow; 893 894 graph g; 895 896 continue_node<int> c(g, [](const continue_msg&){ return 1; }); 897 buffer_node<int> b(g); 898 899 make_edge(c, b); 900 901 deactivate_graph(g); 902 903 c.try_put(continue_msg()); 904 g.wait_for_all(); 905 906 int tmp = -1; 907 CHECK_MESSAGE((b.try_get(tmp) == false), "Message should not arrive"); 908 CHECK_MESSAGE((tmp == -1), "Value should not be altered"); 909 } 910 911 912 //! Bypass of a successor's message in a node with lightweight policy 913 //! \brief \ref error_guessing 914 TEST_CASE("Bypass of a successor's message in a node with lightweight policy") { 915 using namespace tbb::flow; 916 917 graph g; 918 919 auto body = [](const int&v)->int { return v * 2; }; 920 function_node<int, int, lightweight> f1(g, unlimited, body); 921 922 auto body2 = [](const int&v)->int {return v / 2;}; 923 function_node<int, int> f2(g, unlimited, body2); 924 925 buffer_node<int> b(g); 926 927 make_edge(f1, f2); 928 make_edge(f2, b); 929 930 f1.try_put(1); 931 g.wait_for_all(); 932 933 int tmp = -1; 934 CHECK_MESSAGE((b.try_get(tmp) == true), "Functional nodes can work in succession"); 935 CHECK_MESSAGE((tmp == 1), "Value should not be altered"); 936 } 937 938