1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 //! \file test_eh_flow_graph.cpp 18 //! \brief Test for [flow_graph.copy_body flow_graph.broadcast_node flow_graph.priority_queue_node flow_graph.indexer_node flow_graph.sequencer_node flow_graph.remove_edge flow_graph.join_node flow_graph.split_node flow_graph.limiter_node flow_graph.write_once_node flow_graph.overwrite_node flow_graph.make_edge flow_graph.graph flow_graph.buffer_node flow_graph.function_node flow_graph.multifunction_node flow_graph.continue_node flow_graph.input_node] specification 19 20 #include "common/config.h" 21 22 #if USE_TASK_SCHEDULER_OBSERVER 23 #include "tbb/task_scheduler_observer.h" 24 #endif 25 #include "tbb/flow_graph.h" 26 #include "tbb/global_control.h" 27 28 #include "common/test.h" 29 30 #if TBB_USE_EXCEPTIONS 31 32 #include "common/utils.h" 33 #include "common/checktype.h" 34 #include "common/concurrency_tracker.h" 35 36 #if _MSC_VER 37 #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning 38 #endif 39 40 #if __TBB_MSVC_UNREACHABLE_CODE_IGNORED 41 // Suppress "unreachable code" warning by VC++ 17.0-18.0 (VS 2012 or newer) 42 #pragma warning (disable: 4702) 43 #endif 44 45 // global task_scheduler_observer is an imperfect tool to find how many threads are really 46 // participating. That was the hope, but it counts the entries into the marketplace, 47 // not the arena. 48 // TODO: Consider using local task scheduler observer 49 // #define USE_TASK_SCHEDULER_OBSERVER 1 50 51 #include <iostream> 52 #include <sstream> 53 #include <vector> 54 55 #include "common/exception_handling.h" 56 57 #include <stdexcept> 58 59 #define NUM_ITEMS 15 60 int g_NumItems; 61 62 std::atomic<unsigned> nExceptions; 63 std::atomic<intptr_t> g_TGCCancelled; 64 65 enum TestNodeTypeEnum { nonThrowing, isThrowing }; 66 67 static const size_t unlimited_type = 0; 68 static const size_t serial_type = 1; 69 static const size_t limited_type = 4; 70 71 template<TestNodeTypeEnum T> struct TestNodeTypeName; 72 template<> struct TestNodeTypeName<nonThrowing> { static const char *name() { return "nonThrowing"; } }; 73 template<> struct TestNodeTypeName<isThrowing> { static const char *name() { return "isThrowing"; } }; 74 75 template<size_t Conc> struct concurrencyName; 76 template<> struct concurrencyName<serial_type>{ static const char *name() { return "serial"; } }; 77 template<> struct concurrencyName<unlimited_type>{ static const char *name() { return "unlimited"; } }; 78 template<> struct concurrencyName<limited_type>{ static const char *name() { return "limited"; } }; 79 80 // Class that provides waiting and throwing behavior. If we are not throwing, do nothing 81 // If serial, we can't wait for concurrency to peak; we may be the bottleneck and will 82 // stop further processing. We will execute g_NumThreads + 10 times (the "10" is somewhat 83 // arbitrary, and just makes sure there are enough items in the graph to keep it flowing), 84 // If parallel or serial and throwing, use utils::ConcurrencyTracker to wait. 85 86 template<size_t Conc, TestNodeTypeEnum t = nonThrowing> 87 class WaitThrow; 88 89 template<> 90 class WaitThrow<serial_type,nonThrowing> { 91 protected: 92 void WaitAndThrow(int cnt, const char * /*name*/) { 93 if(cnt > g_NumThreads + 10) { 94 utils::ConcurrencyTracker ct; 95 WaitUntilConcurrencyPeaks(); 96 } 97 } 98 }; 99 100 template<> 101 class WaitThrow<serial_type,isThrowing> { 102 protected: 103 void WaitAndThrow(int cnt, const char * /*name*/) { 104 if(cnt > g_NumThreads + 10) { 105 utils::ConcurrencyTracker ct; 106 WaitUntilConcurrencyPeaks(); 107 ThrowTestException(1); 108 } 109 } 110 }; 111 112 // for nodes with limited concurrency, if that concurrency is < g_NumThreads, we need 113 // to make sure enough other nodes wait for concurrency to peak. If we are attached to 114 // N successors, for each item we pass to a successor, we will get N executions of the 115 // "absorbers" (because we broadcast to successors.) for an odd number of threads we 116 // need (g_NumThreads - limited + 1) / 2 items (that will give us one extra execution 117 // of an "absorber", but we can't change that without changing the behavior of the node.) 118 template<> 119 class WaitThrow<limited_type,nonThrowing> { 120 protected: 121 void WaitAndThrow(int cnt, const char * /*name*/) { 122 if(cnt <= (g_NumThreads - (int)limited_type + 1)/2) { 123 return; 124 } 125 utils::ConcurrencyTracker ct; 126 WaitUntilConcurrencyPeaks(); 127 } 128 }; 129 130 template<> 131 class WaitThrow<limited_type,isThrowing> { 132 protected: 133 void WaitAndThrow(int cnt, const char * /*name*/) { 134 utils::ConcurrencyTracker ct; 135 if(cnt <= (g_NumThreads - (int)limited_type + 1)/2) { 136 return; 137 } 138 WaitUntilConcurrencyPeaks(); 139 ThrowTestException(1); 140 } 141 }; 142 143 template<> 144 class WaitThrow<unlimited_type,nonThrowing> { 145 protected: 146 void WaitAndThrow(int /*cnt*/, const char * /*name*/) { 147 utils::ConcurrencyTracker ct; 148 WaitUntilConcurrencyPeaks(); 149 } 150 }; 151 152 template<> 153 class WaitThrow<unlimited_type,isThrowing> { 154 protected: 155 void WaitAndThrow(int /*cnt*/, const char * /*name*/) { 156 utils::ConcurrencyTracker ct; 157 WaitUntilConcurrencyPeaks(); 158 ThrowTestException(1); 159 } 160 }; 161 162 void 163 ResetGlobals(bool throwException = true, bool flog = false) { 164 nExceptions = 0; 165 g_TGCCancelled = 0; 166 ResetEhGlobals(throwException, flog); 167 } 168 169 // -------input_node body ------------------ 170 template <class OutputType, TestNodeTypeEnum TType> 171 class test_input_body : WaitThrow<serial_type, TType> { 172 using WaitThrow<serial_type, TType>::WaitAndThrow; 173 std::atomic<int> *my_current_val; 174 int my_mult; 175 public: 176 test_input_body(std::atomic<int> &my_cnt, int multiplier = 1) : my_current_val(&my_cnt), my_mult(multiplier) { 177 // INFO("- --------- - - - constructed " << (size_t)(my_current_val) << "\n"); 178 } 179 180 OutputType operator()(tbb::flow_control& fc) { 181 UPDATE_COUNTS(); 182 OutputType ret = OutputType(my_mult * ++(*my_current_val)); 183 // TODO revamp: reconsider logging for the tests. 184 185 // The following line is known to cause double frees. Therefore, commenting out frequent 186 // calls to INFO() macro. 187 188 // INFO("xx(" << (size_t)(my_current_val) << ") ret == " << (int)ret << "\n"); 189 if(*my_current_val > g_NumItems) { 190 // INFO(" ------ End of the line!\n"); 191 *my_current_val = g_NumItems; 192 fc.stop(); 193 return OutputType(); 194 } 195 WaitAndThrow((int)ret,"test_input_body"); 196 return ret; 197 } 198 199 int count_value() { return (int)*my_current_val; } 200 }; 201 202 template <TestNodeTypeEnum TType> 203 class test_input_body<tbb::flow::continue_msg, TType> : WaitThrow<serial_type, TType> { 204 using WaitThrow<serial_type, TType>::WaitAndThrow; 205 std::atomic<int> *my_current_val; 206 public: 207 test_input_body(std::atomic<int> &my_cnt) : my_current_val(&my_cnt) { } 208 209 tbb::flow::continue_msg operator()( tbb::flow_control & fc) { 210 UPDATE_COUNTS(); 211 int outint = ++(*my_current_val); 212 if(*my_current_val > g_NumItems) { 213 *my_current_val = g_NumItems; 214 fc.stop(); 215 return tbb::flow::continue_msg(); 216 } 217 WaitAndThrow(outint,"test_input_body"); 218 return tbb::flow::continue_msg(); 219 } 220 221 int count_value() { return (int)*my_current_val; } 222 }; 223 224 // -------{function/continue}_node body ------------------ 225 template<class InputType, class OutputType, TestNodeTypeEnum T, size_t Conc> 226 class absorber_body : WaitThrow<Conc,T> { 227 using WaitThrow<Conc,T>::WaitAndThrow; 228 std::atomic<int> *my_count; 229 public: 230 absorber_body(std::atomic<int> &my_cnt) : my_count(&my_cnt) { } 231 OutputType operator()(const InputType &/*p_in*/) { 232 UPDATE_COUNTS(); 233 int out = ++(*my_count); 234 WaitAndThrow(out,"absorber_body"); 235 return OutputType(); 236 } 237 int count_value() { return *my_count; } 238 }; 239 240 // -------multifunction_node body ------------------ 241 242 // helper classes 243 template<int N,class PortsType> 244 struct IssueOutput { 245 typedef typename std::tuple_element<N-1,PortsType>::type::output_type my_type; 246 247 static void issue_tuple_element( PortsType &my_ports) { 248 CHECK_MESSAGE( (std::get<N-1>(my_ports).try_put(my_type())), "Error putting to successor"); 249 IssueOutput<N-1,PortsType>::issue_tuple_element(my_ports); 250 } 251 }; 252 253 template<class PortsType> 254 struct IssueOutput<1,PortsType> { 255 typedef typename std::tuple_element<0,PortsType>::type::output_type my_type; 256 257 static void issue_tuple_element( PortsType &my_ports) { 258 CHECK_MESSAGE( (std::get<0>(my_ports).try_put(my_type())), "Error putting to successor"); 259 } 260 }; 261 262 template<class InputType, class OutputTupleType, TestNodeTypeEnum T, size_t Conc> 263 class multifunction_node_body : WaitThrow<Conc,T> { 264 using WaitThrow<Conc,T>::WaitAndThrow; 265 static const int N = std::tuple_size<OutputTupleType>::value; 266 typedef typename tbb::flow::multifunction_node<InputType,OutputTupleType> NodeType; 267 typedef typename NodeType::output_ports_type PortsType; 268 std::atomic<int> *my_count; 269 public: 270 multifunction_node_body(std::atomic<int> &my_cnt) : my_count(&my_cnt) { } 271 void operator()(const InputType& /*in*/, PortsType &my_ports) { 272 UPDATE_COUNTS(); 273 int out = ++(*my_count); 274 WaitAndThrow(out,"multifunction_node_body"); 275 // issue an item to each output port. 276 IssueOutput<N,PortsType>::issue_tuple_element(my_ports); 277 } 278 279 int count_value() { return *my_count; } 280 }; 281 282 // --------- body to sort items in sequencer_node 283 template<class BufferItemType> 284 struct sequencer_body { 285 size_t operator()(const BufferItemType &s) { 286 CHECK_MESSAGE( (s), "sequencer item out of range (== 0)"); 287 return size_t(s) - 1; 288 } 289 }; 290 291 // --------- body to compare the "priorities" of objects for priority_queue_node five priority levels 0-4. 292 template<class T> 293 struct myLess { 294 bool operator()(const T &t1, const T &t2) { 295 return (int(t1) % 5) < (int(t2) % 5); 296 } 297 }; 298 299 // --------- type for < comparison in priority_queue_node. 300 template<class ItemType> 301 struct less_body { 302 bool operator()(const ItemType &lhs, const ItemType &rhs) { 303 return (int(lhs) % 3) < (int(rhs) % 3); 304 } 305 }; 306 307 // --------- tag methods for tag_matching join_node 308 template<typename TT> 309 class tag_func { 310 TT my_mult; 311 public: 312 tag_func(TT multiplier) : my_mult(multiplier) { } 313 // operator() will return [0 .. Count) 314 tbb::flow::tag_value operator()( TT v) { 315 tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult); 316 return t; 317 } 318 }; 319 320 // --------- Input body for split_node test. 321 template <class OutputTuple, TestNodeTypeEnum TType> 322 class tuple_test_input_body : WaitThrow<serial_type, TType> { 323 typedef typename std::tuple_element<0,OutputTuple>::type ItemType0; 324 typedef typename std::tuple_element<1,OutputTuple>::type ItemType1; 325 using WaitThrow<serial_type, TType>::WaitAndThrow; 326 std::atomic<int> *my_current_val; 327 public: 328 tuple_test_input_body(std::atomic<int> &my_cnt) : my_current_val(&my_cnt) { } 329 330 OutputTuple operator()(tbb::flow_control& fc) { 331 UPDATE_COUNTS(); 332 int ival = ++(*my_current_val); 333 if(*my_current_val > g_NumItems) { 334 *my_current_val = g_NumItems; // jam the final value; we assert on it later. 335 fc.stop(); 336 return OutputTuple(); 337 } 338 WaitAndThrow(ival,"tuple_test_input_body"); 339 return OutputTuple(ItemType0(ival),ItemType1(ival)); 340 } 341 342 int count_value() { return (int)*my_current_val; } 343 }; 344 345 // ------- end of node bodies 346 347 // input_node is only-serial. input_node can throw, or the function_node can throw. 348 // graph being tested is 349 // 350 // input_node+---+parallel function_node 351 // 352 // After each run the graph is reset(), to test the reset functionality. 353 // 354 355 356 template<class ItemType, TestNodeTypeEnum inpThrowType, TestNodeTypeEnum absorbThrowType> 357 void run_one_input_node_test(bool throwException, bool flog) { 358 typedef test_input_body<ItemType,inpThrowType> src_body_type; 359 typedef absorber_body<ItemType, tbb::flow::continue_msg, absorbThrowType, unlimited_type> parallel_absorb_body_type; 360 std::atomic<int> input_body_count; 361 std::atomic<int> absorber_body_count; 362 input_body_count = 0; 363 absorber_body_count = 0; 364 365 tbb::flow::graph g; 366 367 g_Master = std::this_thread::get_id(); 368 369 #if USE_TASK_SCHEDULER_OBSERVER 370 eh_test_observer o; 371 o.observe(true); 372 #endif 373 374 tbb::flow::input_node<ItemType> sn(g, src_body_type(input_body_count)); 375 parallel_absorb_body_type ab2(absorber_body_count); 376 tbb::flow::function_node<ItemType> parallel_fn(g,tbb::flow::unlimited,ab2); 377 make_edge(sn, parallel_fn); 378 for(int runcnt = 0; runcnt < 2; ++runcnt) { 379 ResetGlobals(throwException,flog); 380 if(throwException) { 381 TRY(); 382 sn.activate(); 383 g.wait_for_all(); 384 CATCH_AND_ASSERT(); 385 } 386 else { 387 TRY(); 388 sn.activate(); 389 g.wait_for_all(); 390 CATCH_AND_FAIL(); 391 } 392 393 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 394 int src_cnt = tbb::flow::copy_body<src_body_type>(sn).count_value(); 395 int sink_cnt = tbb::flow::copy_body<parallel_absorb_body_type>(parallel_fn).count_value(); 396 if(throwException) { 397 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception flag in flow::graph not set"); 398 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "canceled flag not set"); 399 CHECK_MESSAGE( (src_cnt <= g_NumItems), "Too many input_node items emitted"); 400 CHECK_MESSAGE( (sink_cnt <= src_cnt), "Too many input_node items received"); 401 } 402 else { 403 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 404 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 405 CHECK_MESSAGE( (src_cnt == g_NumItems), "Incorrect # input_node items emitted"); 406 CHECK_MESSAGE( (sink_cnt == src_cnt), "Incorrect # input_node items received"); 407 } 408 g.reset(); // resets the body of the input_node and the absorb_nodes. 409 input_body_count = 0; 410 absorber_body_count = 0; 411 CHECK_MESSAGE( (!g.exception_thrown()), "Reset didn't clear exception_thrown()"); 412 CHECK_MESSAGE( (!g.is_cancelled()), "Reset didn't clear is_cancelled()"); 413 src_cnt = tbb::flow::copy_body<src_body_type>(sn).count_value(); 414 sink_cnt = tbb::flow::copy_body<parallel_absorb_body_type>(parallel_fn).count_value(); 415 CHECK_MESSAGE( (src_cnt == 0), "input_node count not reset"); 416 CHECK_MESSAGE( (sink_cnt == 0), "sink_node count not reset"); 417 } 418 #if USE_TASK_SCHEDULER_OBSERVER 419 o.observe(false); 420 #endif 421 } // run_one_input_node_test 422 423 424 template<class ItemType, TestNodeTypeEnum inpThrowType, TestNodeTypeEnum absorbThrowType> 425 void run_input_node_test() { 426 run_one_input_node_test<ItemType,inpThrowType,absorbThrowType>(false,false); 427 run_one_input_node_test<ItemType,inpThrowType,absorbThrowType>(true,false); 428 run_one_input_node_test<ItemType,inpThrowType,absorbThrowType>(true,true); 429 } // run_input_node_test 430 431 void test_input_node() { 432 INFO("Testing input_node\n"); 433 CheckType<int>::check_type_counter = 0; 434 g_Wakeup_Msg = "input_node(1): Missed wakeup or machine is overloaded?"; 435 run_input_node_test<CheckType<int>, isThrowing, nonThrowing>(); 436 CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test"); 437 g_Wakeup_Msg = "input_node(2): Missed wakeup or machine is overloaded?"; 438 run_input_node_test<int, isThrowing, nonThrowing>(); 439 g_Wakeup_Msg = "input_node(3): Missed wakeup or machine is overloaded?"; 440 run_input_node_test<int, nonThrowing, isThrowing>(); 441 g_Wakeup_Msg = "input_node(4): Missed wakeup or machine is overloaded?"; 442 run_input_node_test<int, isThrowing, isThrowing>(); 443 g_Wakeup_Msg = "input_node(5): Missed wakeup or machine is overloaded?"; 444 run_input_node_test<CheckType<int>, isThrowing, isThrowing>(); 445 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 446 CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test"); 447 } 448 449 // -------- utilities & types to test function_node and multifunction_node. 450 451 // need to tell the template which node type I am using so it attaches successors correctly. 452 enum NodeFetchType { func_node_type, multifunc_node_type }; 453 454 template<class NodeType, class ItemType, int indx, NodeFetchType NFT> 455 struct AttachPoint; 456 457 template<class NodeType, class ItemType, int indx> 458 struct AttachPoint<NodeType,ItemType,indx,multifunc_node_type> { 459 static tbb::flow::sender<ItemType> &GetSender(NodeType &n) { 460 return tbb::flow::output_port<indx>(n); 461 } 462 }; 463 464 template<class NodeType, class ItemType, int indx> 465 struct AttachPoint<NodeType,ItemType,indx,func_node_type> { 466 static tbb::flow::sender<ItemType> &GetSender(NodeType &n) { 467 return n; 468 } 469 }; 470 471 472 // common template for running function_node, multifunction_node. continue_node 473 // has different firing requirements, so it needs a different graph topology. 474 template< 475 class InputNodeType, 476 class InputNodeBodyType0, 477 class InputNodeBodyType1, 478 NodeFetchType NFT, 479 class TestNodeType, 480 class TestNodeBodyType, 481 class TypeToSink0, // what kind of item are we sending to sink0 482 class TypeToSink1, // what kind of item are we sending to sink1 483 class SinkNodeType0, // will be same for function; 484 class SinkNodeType1, // may differ for multifunction_node 485 class SinkNodeBodyType0, 486 class SinkNodeBodyType1, 487 size_t Conc 488 > 489 void 490 run_one_functype_node_test(bool throwException, bool flog, const char * /*name*/) { 491 492 std::stringstream ss; 493 char *saved_msg = const_cast<char *>(g_Wakeup_Msg); 494 tbb::flow::graph g; 495 496 std::atomic<int> input0_count; 497 std::atomic<int> input1_count; 498 std::atomic<int> sink0_count; 499 std::atomic<int> sink1_count; 500 std::atomic<int> test_count; 501 input0_count = input1_count = sink0_count = sink1_count = test_count = 0; 502 503 #if USE_TASK_SCHEDULER_OBSERVER 504 eh_test_observer o; 505 o.observe(true); 506 #endif 507 508 g_Master = std::this_thread::get_id(); 509 InputNodeType input0(g, InputNodeBodyType0(input0_count)); 510 InputNodeType input1(g, InputNodeBodyType1(input1_count)); 511 TestNodeType node_to_test(g, Conc, TestNodeBodyType(test_count)); 512 SinkNodeType0 sink0(g,tbb::flow::unlimited,SinkNodeBodyType0(sink0_count)); 513 SinkNodeType1 sink1(g,tbb::flow::unlimited,SinkNodeBodyType1(sink1_count)); 514 make_edge(input0, node_to_test); 515 make_edge(input1, node_to_test); 516 make_edge(AttachPoint<TestNodeType, TypeToSink0, 0, NFT>::GetSender(node_to_test), sink0); 517 make_edge(AttachPoint<TestNodeType, TypeToSink1, 1, NFT>::GetSender(node_to_test), sink1); 518 519 for(int iter = 0; iter < 2; ++iter) { // run, reset, run again 520 ss.clear(); 521 ss << saved_msg << " iter=" << iter << ", threads=" << g_NumThreads << ", throw=" << (throwException ? "T" : "F") << ", flow=" << (flog ? "T" : "F"); 522 g_Wakeup_Msg = ss.str().c_str(); 523 ResetGlobals(throwException,flog); 524 if(throwException) { 525 TRY(); 526 input0.activate(); 527 input1.activate(); 528 g.wait_for_all(); 529 CATCH_AND_ASSERT(); 530 } 531 else { 532 TRY(); 533 input0.activate(); 534 input1.activate(); 535 g.wait_for_all(); 536 CATCH_AND_FAIL(); 537 } 538 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 539 int ib0_cnt = tbb::flow::copy_body<InputNodeBodyType0>(input0).count_value(); 540 int ib1_cnt = tbb::flow::copy_body<InputNodeBodyType1>(input1).count_value(); 541 int t_cnt = tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value(); 542 int nb0_cnt = tbb::flow::copy_body<SinkNodeBodyType0>(sink0).count_value(); 543 int nb1_cnt = tbb::flow::copy_body<SinkNodeBodyType1>(sink1).count_value(); 544 if(throwException) { 545 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph"); 546 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph"); 547 CHECK_MESSAGE( (ib0_cnt + ib1_cnt <= 2*g_NumItems), "Too many items sent by inputs"); 548 CHECK_MESSAGE( (ib0_cnt + ib1_cnt >= t_cnt), "Too many items received by test node"); 549 CHECK_MESSAGE( (nb0_cnt + nb1_cnt <= t_cnt*2), "Too many items received by sink nodes"); 550 } 551 else { 552 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 553 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 554 CHECK_MESSAGE( (ib0_cnt + ib1_cnt == 2*g_NumItems), "Missing invocations of input_nodes"); 555 CHECK_MESSAGE( (t_cnt == 2*g_NumItems), "Not all items reached test node"); 556 CHECK_MESSAGE( (nb0_cnt == 2*g_NumItems && nb1_cnt == 2*g_NumItems), "Missing items in absorbers"); 557 } 558 g.reset(); // resets the body of the input_nodes, test_node and the absorb_nodes. 559 input0_count = input1_count = sink0_count = sink1_count = test_count = 0; 560 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType0>(input0).count_value()),"Reset input 0 failed"); 561 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType1>(input1).count_value()),"Reset input 1 failed"); 562 CHECK_MESSAGE( (0 == tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value()),"Reset test_node failed"); 563 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType0>(sink0).count_value()),"Reset sink 0 failed"); 564 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType1>(sink1).count_value()),"Reset sink 1 failed"); 565 566 g_Wakeup_Msg = saved_msg; 567 } 568 #if USE_TASK_SCHEDULER_OBSERVER 569 o.observe(false); 570 #endif 571 } 572 573 // Test function_node 574 // 575 // graph being tested is 576 // 577 // input_node -\ /- parallel function_node 578 // \ / 579 // +function_node+ 580 // / \ x 581 // input_node -/ \- parallel function_node 582 // 583 // After each run the graph is reset(), to test the reset functionality. 584 // 585 template< 586 TestNodeTypeEnum IType1, // does input node 1 throw? 587 TestNodeTypeEnum IType2, // does input node 2 throw? 588 class Item12, // type of item passed between inputs and test node 589 TestNodeTypeEnum FType, // does function node throw? 590 class Item23, // type passed from function_node to sink nodes 591 TestNodeTypeEnum NType1, // does sink node 1 throw? 592 TestNodeTypeEnum NType2, // does sink node 1 throw? 593 class NodePolicy, // rejecting,queueing 594 size_t Conc // is node concurrent? {serial | limited | unlimited} 595 > 596 void run_function_node_test() { 597 598 typedef test_input_body<Item12,IType1> IBodyType1; 599 typedef test_input_body<Item12,IType2> IBodyType2; 600 typedef absorber_body<Item12, Item23, FType, Conc> TestBodyType; 601 typedef absorber_body<Item23,tbb::flow::continue_msg, NType1, unlimited_type> SinkBodyType1; 602 typedef absorber_body<Item23,tbb::flow::continue_msg, NType2, unlimited_type> SinkBodyType2; 603 604 typedef tbb::flow::input_node<Item12> InputType; 605 typedef tbb::flow::function_node<Item12, Item23, NodePolicy> TestType; 606 typedef tbb::flow::function_node<Item23,tbb::flow::continue_msg> SnkType; 607 608 for(int i = 0; i < 4; ++i ) { 609 if(i != 2) { // doesn't make sense to flog a non-throwing test 610 bool doThrow = (i & 0x1) != 0; 611 bool doFlog = (i & 0x2) != 0; 612 run_one_functype_node_test< 613 /*InputNodeType*/ InputType, 614 /*InputNodeBodyType0*/ IBodyType1, 615 /*InputNodeBodyType1*/ IBodyType2, 616 /* NFT */ func_node_type, 617 /*TestNodeType*/ TestType, 618 /*TestNodeBodyType*/ TestBodyType, 619 /*TypeToSink0 */ Item23, 620 /*TypeToSink1 */ Item23, 621 /*SinkNodeType0*/ SnkType, 622 /*SinkNodeType1*/ SnkType, 623 /*SinkNodeBodyType1*/ SinkBodyType1, 624 /*SinkNodeBodyType2*/ SinkBodyType2, 625 /*Conc*/ Conc> 626 (doThrow,doFlog,"function_node"); 627 } 628 } 629 } // run_function_node_test 630 631 void test_function_node() { 632 INFO("Testing function_node\n"); 633 // serial rejecting 634 g_Wakeup_Msg = "function_node(1a): Missed wakeup or machine is overloaded?"; 635 run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>(); 636 g_Wakeup_Msg = "function_node(1b): Missed wakeup or machine is overloaded?"; 637 run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>(); 638 g_Wakeup_Msg = "function_node(1c): Missed wakeup or machine is overloaded?"; 639 run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, isThrowing, nonThrowing, tbb::flow::rejecting, serial_type>(); 640 641 // serial queueing 642 g_Wakeup_Msg = "function_node(2): Missed wakeup or machine is overloaded?"; 643 run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); 644 run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); 645 run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); 646 CheckType<int>::check_type_counter = 0; 647 run_function_node_test<nonThrowing, nonThrowing, CheckType<int>, nonThrowing, CheckType<int>, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); 648 CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test"); 649 650 // unlimited parallel rejecting 651 g_Wakeup_Msg = "function_node(3): Missed wakeup or machine is overloaded?"; 652 run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>(); 653 run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>(); 654 run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>(); 655 656 // limited parallel rejecting 657 g_Wakeup_Msg = "function_node(4): Missed wakeup or machine is overloaded?"; 658 run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, limited_type>(); 659 run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, (size_t)limited_type>(); 660 run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::rejecting, (size_t)limited_type>(); 661 662 // limited parallel queueing 663 g_Wakeup_Msg = "function_node(5): Missed wakeup or machine is overloaded?"; 664 run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>(); 665 run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>(); 666 run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::queueing, (size_t)limited_type>(); 667 668 // everyone throwing 669 g_Wakeup_Msg = "function_node(6): Missed wakeup or machine is overloaded?"; 670 run_function_node_test<isThrowing, isThrowing, int, isThrowing, int, isThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>(); 671 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 672 } 673 674 // ----------------------------------- multifunction_node ---------------------------------- 675 // Test multifunction_node. 676 // 677 // graph being tested is 678 // 679 // input_node -\ /- parallel function_node 680 // \ / 681 // +multifunction_node+ 682 // / \ x 683 // input_node -/ \- parallel function_node 684 // 685 // After each run the graph is reset(), to test the reset functionality. The 686 // multifunction_node will put an item to each successor for every item 687 // received. 688 // 689 template< 690 TestNodeTypeEnum IType0, // does input node 1 throw? 691 TestNodeTypeEnum IType1, // does input node 2 thorw? 692 class Item12, // type of item passed between inputs and test node 693 TestNodeTypeEnum FType, // does multifunction node throw? 694 class ItemTuple, // tuple of types passed from multifunction_node to sink nodes 695 TestNodeTypeEnum NType1, // does sink node 1 throw? 696 TestNodeTypeEnum NType2, // does sink node 2 throw? 697 class NodePolicy, // rejecting,queueing 698 size_t Conc // is node concurrent? {serial | limited | unlimited} 699 > 700 void run_multifunction_node_test() { 701 702 typedef typename std::tuple_element<0,ItemTuple>::type Item23Type0; 703 typedef typename std::tuple_element<1,ItemTuple>::type Item23Type1; 704 typedef test_input_body<Item12,IType0> IBodyType1; 705 typedef test_input_body<Item12,IType1> IBodyType2; 706 typedef multifunction_node_body<Item12, ItemTuple, FType, Conc> TestBodyType; 707 typedef absorber_body<Item23Type0,tbb::flow::continue_msg, NType1, unlimited_type> SinkBodyType1; 708 typedef absorber_body<Item23Type1,tbb::flow::continue_msg, NType2, unlimited_type> SinkBodyType2; 709 710 typedef tbb::flow::input_node<Item12> InputType; 711 typedef tbb::flow::multifunction_node<Item12, ItemTuple, NodePolicy> TestType; 712 typedef tbb::flow::function_node<Item23Type0,tbb::flow::continue_msg> SnkType0; 713 typedef tbb::flow::function_node<Item23Type1,tbb::flow::continue_msg> SnkType1; 714 715 for(int i = 0; i < 4; ++i ) { 716 if(i != 2) { // doesn't make sense to flog a non-throwing test 717 bool doThrow = (i & 0x1) != 0; 718 bool doFlog = (i & 0x2) != 0; 719 run_one_functype_node_test< 720 /*InputNodeType*/ InputType, 721 /*InputNodeBodyType0*/ IBodyType1, 722 /*InputNodeBodyType1*/ IBodyType2, 723 /*NFT*/ multifunc_node_type, 724 /*TestNodeType*/ TestType, 725 /*TestNodeBodyType*/ TestBodyType, 726 /*TypeToSink0*/ Item23Type0, 727 /*TypeToSink1*/ Item23Type1, 728 /*SinkNodeType0*/ SnkType0, 729 /*SinkNodeType1*/ SnkType1, 730 /*SinkNodeBodyType0*/ SinkBodyType1, 731 /*SinkNodeBodyType1*/ SinkBodyType2, 732 /*Conc*/ Conc> 733 (doThrow,doFlog,"multifunction_node"); 734 } 735 } 736 } // run_multifunction_node_test 737 738 void test_multifunction_node() { 739 INFO("Testing multifunction_node\n"); 740 g_Wakeup_Msg = "multifunction_node(input throws,rejecting,serial): Missed wakeup or machine is overloaded?"; 741 // serial rejecting 742 run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,float>, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>(); 743 g_Wakeup_Msg = "multifunction_node(test throws,rejecting,serial): Missed wakeup or machine is overloaded?"; 744 run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>(); 745 g_Wakeup_Msg = "multifunction_node(sink throws,rejecting,serial): Missed wakeup or machine is overloaded?"; 746 run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, isThrowing, nonThrowing, tbb::flow::rejecting, serial_type>(); 747 748 g_Wakeup_Msg = "multifunction_node(2): Missed wakeup or machine is overloaded?"; 749 // serial queueing 750 run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); 751 run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); 752 run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); 753 CheckType<int>::check_type_counter = 0; 754 run_multifunction_node_test<nonThrowing, nonThrowing, CheckType<int>, nonThrowing, std::tuple<CheckType<int>, CheckType<int> >, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); 755 CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test"); 756 757 g_Wakeup_Msg = "multifunction_node(3): Missed wakeup or machine is overloaded?"; 758 // unlimited parallel rejecting 759 run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>(); 760 run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>(); 761 run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>(); 762 763 g_Wakeup_Msg = "multifunction_node(4): Missed wakeup or machine is overloaded?"; 764 // limited parallel rejecting 765 run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, limited_type>(); 766 run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, (size_t)limited_type>(); 767 run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::rejecting, (size_t)limited_type>(); 768 769 g_Wakeup_Msg = "multifunction_node(5): Missed wakeup or machine is overloaded?"; 770 // limited parallel queueing 771 run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>(); 772 run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>(); 773 run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::queueing, (size_t)limited_type>(); 774 775 g_Wakeup_Msg = "multifunction_node(6): Missed wakeup or machine is overloaded?"; 776 // everyone throwing 777 run_multifunction_node_test<isThrowing, isThrowing, int, isThrowing, std::tuple<int,int>, isThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>(); 778 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 779 } 780 781 // 782 // Continue node has T predecessors. when it receives messages (continue_msg) on T predecessors 783 // it executes the body of the node, and forwards a continue_msg to its successors. 784 // However many predecessors the continue_node has, that's how many continue_msgs it receives 785 // on input before forwarding a message. 786 // 787 // The graph will look like 788 // 789 // +broadcast_node+ 790 // / \ ___ 791 // input_node+------>+broadcast_node+ +continue_node+--->+absorber 792 // \ / 793 // +broadcast_node+ 794 // 795 // The continue_node has unlimited parallelism, no input buffering, and broadcasts to successors. 796 // The absorber is parallel, so each item emitted by the input will result in one thread 797 // spinning. So for N threads we pass N-1 continue_messages, then spin wait and then throw if 798 // we are allowed to. 799 800 template < class InputNodeType, class InputNodeBodyType, class TTestNodeType, class TestNodeBodyType, 801 class SinkNodeType, class SinkNodeBodyType> 802 void run_one_continue_node_test (bool throwException, bool flog) { 803 tbb::flow::graph g; 804 805 std::atomic<int> input_count; 806 std::atomic<int> test_count; 807 std::atomic<int> sink_count; 808 input_count = test_count = sink_count = 0; 809 #if USE_TASK_SCHEDULER_OBSERVER 810 eh_test_observer o; 811 o.observe(true); 812 #endif 813 g_Master = std::this_thread::get_id(); 814 InputNodeType input(g, InputNodeBodyType(input_count)); 815 TTestNodeType node_to_test(g, TestNodeBodyType(test_count)); 816 SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count)); 817 tbb::flow::broadcast_node<tbb::flow::continue_msg> b1(g), b2(g), b3(g); 818 make_edge(input, b1); 819 make_edge(b1,b2); 820 make_edge(b1,b3); 821 make_edge(b2,node_to_test); 822 make_edge(b3,node_to_test); 823 make_edge(node_to_test, sink); 824 for(int iter = 0; iter < 2; ++iter) { 825 ResetGlobals(throwException,flog); 826 if(throwException) { 827 TRY(); 828 input.activate(); 829 g.wait_for_all(); 830 CATCH_AND_ASSERT(); 831 } 832 else { 833 TRY(); 834 input.activate(); 835 g.wait_for_all(); 836 CATCH_AND_FAIL(); 837 } 838 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 839 int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value(); 840 int t_cnt = tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value(); 841 int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(); 842 if(throwException) { 843 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph"); 844 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph"); 845 CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs"); 846 CHECK_MESSAGE( (ib_cnt >= t_cnt), "Too many items received by test node"); 847 CHECK_MESSAGE( (nb_cnt <= t_cnt), "Too many items received by sink nodes"); 848 } 849 else { 850 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 851 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 852 CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node"); 853 CHECK_MESSAGE( (t_cnt == g_NumItems), "Not all items reached test node"); 854 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers"); 855 } 856 g.reset(); // resets the body of the input_nodes, test_node and the absorb_nodes. 857 input_count = test_count = sink_count = 0; 858 CHECK_MESSAGE( (0 == (int)test_count), "Atomic wasn't reset properly"); 859 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed"); 860 CHECK_MESSAGE( (0 == tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value()),"Reset test_node failed"); 861 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed"); 862 } 863 #if USE_TASK_SCHEDULER_OBSERVER 864 o.observe(false); 865 #endif 866 } 867 868 template< 869 class ItemType, 870 TestNodeTypeEnum IType, // does input node throw? 871 TestNodeTypeEnum CType, // does continue_node throw? 872 TestNodeTypeEnum AType> // does absorber throw 873 void run_continue_node_test() { 874 typedef test_input_body<tbb::flow::continue_msg,IType> IBodyType; 875 typedef absorber_body<tbb::flow::continue_msg,ItemType,CType,unlimited_type> ContBodyType; 876 typedef absorber_body<ItemType,tbb::flow::continue_msg, AType, unlimited_type> SinkBodyType; 877 878 typedef tbb::flow::input_node<tbb::flow::continue_msg> InputType; 879 typedef tbb::flow::continue_node<ItemType> TestType; 880 typedef tbb::flow::function_node<ItemType,tbb::flow::continue_msg> SnkType; 881 882 for(int i = 0; i < 4; ++i ) { 883 if(i == 2) continue; // don't run (false,true); it doesn't make sense. 884 bool doThrow = (i & 0x1) != 0; 885 bool doFlog = (i & 0x2) != 0; 886 run_one_continue_node_test< 887 /*InputNodeType*/ InputType, 888 /*InputNodeBodyType*/ IBodyType, 889 /*TestNodeType*/ TestType, 890 /*TestNodeBodyType*/ ContBodyType, 891 /*SinkNodeType*/ SnkType, 892 /*SinkNodeBodyType*/ SinkBodyType> 893 (doThrow,doFlog); 894 } 895 } 896 897 // 898 void test_continue_node() { 899 INFO("Testing continue_node\n"); 900 g_Wakeup_Msg = "buffer_node(non,is,non): Missed wakeup or machine is overloaded?"; 901 run_continue_node_test<int,nonThrowing,isThrowing,nonThrowing>(); 902 g_Wakeup_Msg = "buffer_node(non,non,is): Missed wakeup or machine is overloaded?"; 903 run_continue_node_test<int,nonThrowing,nonThrowing,isThrowing>(); 904 g_Wakeup_Msg = "buffer_node(is,non,non): Missed wakeup or machine is overloaded?"; 905 run_continue_node_test<int,isThrowing,nonThrowing,nonThrowing>(); 906 g_Wakeup_Msg = "buffer_node(is,is,is): Missed wakeup or machine is overloaded?"; 907 run_continue_node_test<int,isThrowing,isThrowing,isThrowing>(); 908 CheckType<double>::check_type_counter = 0; 909 run_continue_node_test<CheckType<double>,isThrowing,isThrowing,isThrowing>(); 910 CHECK_MESSAGE( (!CheckType<double>::check_type_counter), "Dropped objects in continue_node test"); 911 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 912 } 913 914 // ---------- buffer_node queue_node overwrite_node -------------- 915 916 template< 917 class BufferItemType, // 918 class InputNodeType, 919 class InputNodeBodyType, 920 class TestNodeType, 921 class SinkNodeType, 922 class SinkNodeBodyType > 923 void run_one_buffer_node_test(bool throwException,bool flog) { 924 tbb::flow::graph g; 925 926 std::atomic<int> input_count; 927 std::atomic<int> sink_count; 928 input_count = sink_count = 0; 929 #if USE_TASK_SCHEDULER_OBSERVER 930 eh_test_observer o; 931 o.observe(true); 932 #endif 933 g_Master = std::this_thread::get_id(); 934 InputNodeType input(g, InputNodeBodyType(input_count)); 935 TestNodeType node_to_test(g); 936 SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count)); 937 make_edge(input,node_to_test); 938 make_edge(node_to_test, sink); 939 for(int iter = 0; iter < 2; ++iter) { 940 ResetGlobals(throwException,flog); 941 if(throwException) { 942 TRY(); 943 input.activate(); 944 g.wait_for_all(); 945 CATCH_AND_ASSERT(); 946 } 947 else { 948 TRY(); 949 input.activate(); 950 g.wait_for_all(); 951 CATCH_AND_FAIL(); 952 } 953 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 954 int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value(); 955 int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(); 956 if(throwException) { 957 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph"); 958 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph"); 959 CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs"); 960 CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes"); 961 } 962 else { 963 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 964 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 965 CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node"); 966 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers"); 967 } 968 if(iter == 0) { 969 remove_edge(node_to_test, sink); 970 node_to_test.try_put(BufferItemType()); 971 g.wait_for_all(); 972 g.reset(); 973 input_count = sink_count = 0; 974 BufferItemType tmp; 975 CHECK_MESSAGE( (!node_to_test.try_get(tmp)), "node not empty"); 976 make_edge(node_to_test, sink); 977 g.wait_for_all(); 978 } 979 else { 980 g.reset(); 981 input_count = sink_count = 0; 982 } 983 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed"); 984 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed"); 985 } 986 987 #if USE_TASK_SCHEDULER_OBSERVER 988 o.observe(false); 989 #endif 990 } 991 template<class BufferItemType, 992 TestNodeTypeEnum InputThrowType, 993 TestNodeTypeEnum SinkThrowType> 994 void run_buffer_queue_and_overwrite_node_test() { 995 typedef test_input_body<BufferItemType,InputThrowType> InputBodyType; 996 typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType; 997 998 typedef tbb::flow::input_node<BufferItemType> InputType; 999 typedef tbb::flow::buffer_node<BufferItemType> BufType; 1000 typedef tbb::flow::queue_node<BufferItemType> QueType; 1001 typedef tbb::flow::overwrite_node<BufferItemType> OvrType; 1002 typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType; 1003 1004 for(int i = 0; i < 4; ++i) { 1005 if(i == 2) continue; // no need to test flog w/o throws 1006 bool throwException = (i & 0x1) != 0; 1007 bool doFlog = (i & 0x2) != 0; 1008 run_one_buffer_node_test< 1009 /* class BufferItemType*/ BufferItemType, 1010 /*class InputNodeType*/ InputType, 1011 /*class InputNodeBodyType*/ InputBodyType, 1012 /*class TestNodeType*/ BufType, 1013 /*class SinkNodeType*/ SnkType, 1014 /*class SinkNodeBodyType*/ SinkBodyType 1015 >(throwException, doFlog); 1016 run_one_buffer_node_test< 1017 /* class BufferItemType*/ BufferItemType, 1018 /*class InputNodeType*/ InputType, 1019 /*class InputNodeBodyType*/ InputBodyType, 1020 /*class TestNodeType*/ QueType, 1021 /*class SinkNodeType*/ SnkType, 1022 /*class SinkNodeBodyType*/ SinkBodyType 1023 >(throwException, doFlog); 1024 run_one_buffer_node_test< 1025 /* class BufferItemType*/ BufferItemType, 1026 /*class InputNodeType*/ InputType, 1027 /*class InputNodeBodyType*/ InputBodyType, 1028 /*class TestNodeType*/ OvrType, 1029 /*class SinkNodeType*/ SnkType, 1030 /*class SinkNodeBodyType*/ SinkBodyType 1031 >(throwException, doFlog); 1032 } 1033 } 1034 1035 void test_buffer_queue_and_overwrite_node() { 1036 INFO("Testing buffer_node, queue_node and overwrite_node\n"); 1037 g_Wakeup_Msg = "buffer, queue, overwrite(is,non): Missed wakeup or machine is overloaded?"; 1038 run_buffer_queue_and_overwrite_node_test<int,isThrowing,nonThrowing>(); 1039 g_Wakeup_Msg = "buffer, queue, overwrite(non,is): Missed wakeup or machine is overloaded?"; 1040 run_buffer_queue_and_overwrite_node_test<int,nonThrowing,isThrowing>(); 1041 g_Wakeup_Msg = "buffer, queue, overwrite(is,is): Missed wakeup or machine is overloaded?"; 1042 run_buffer_queue_and_overwrite_node_test<int,isThrowing,isThrowing>(); 1043 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 1044 } 1045 1046 // ---------- sequencer_node ------------------------- 1047 1048 1049 template< 1050 class BufferItemType, // 1051 class InputNodeType, 1052 class InputNodeBodyType, 1053 class TestNodeType, 1054 class SeqBodyType, 1055 class SinkNodeType, 1056 class SinkNodeBodyType > 1057 void run_one_sequencer_node_test(bool throwException,bool flog) { 1058 tbb::flow::graph g; 1059 1060 std::atomic<int> input_count; 1061 std::atomic<int> sink_count; 1062 input_count = sink_count = 0; 1063 #if USE_TASK_SCHEDULER_OBSERVER 1064 eh_test_observer o; 1065 o.observe(true); 1066 #endif 1067 g_Master = std::this_thread::get_id(); 1068 InputNodeType input(g, InputNodeBodyType(input_count)); 1069 TestNodeType node_to_test(g,SeqBodyType()); 1070 SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count)); 1071 make_edge(input,node_to_test); 1072 make_edge(node_to_test, sink); 1073 for(int iter = 0; iter < 2; ++iter) { 1074 ResetGlobals(throwException,flog); 1075 if(throwException) { 1076 TRY(); 1077 input.activate(); 1078 g.wait_for_all(); 1079 CATCH_AND_ASSERT(); 1080 } 1081 else { 1082 TRY(); 1083 input.activate(); 1084 g.wait_for_all(); 1085 CATCH_AND_FAIL(); 1086 } 1087 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 1088 int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value(); 1089 int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(); 1090 if(throwException) { 1091 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph"); 1092 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph"); 1093 CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs"); 1094 CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes"); 1095 } 1096 else { 1097 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 1098 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 1099 CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node"); 1100 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers"); 1101 } 1102 if(iter == 0) { 1103 remove_edge(node_to_test, sink); 1104 node_to_test.try_put(BufferItemType(g_NumItems + 1)); 1105 node_to_test.try_put(BufferItemType(1)); 1106 g.wait_for_all(); 1107 g.reset(); 1108 input_count = sink_count = 0; 1109 make_edge(node_to_test, sink); 1110 g.wait_for_all(); 1111 } 1112 else { 1113 g.reset(); 1114 input_count = sink_count = 0; 1115 } 1116 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed"); 1117 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed"); 1118 } 1119 1120 #if USE_TASK_SCHEDULER_OBSERVER 1121 o.observe(false); 1122 #endif 1123 } 1124 1125 template<class BufferItemType, 1126 TestNodeTypeEnum InputThrowType, 1127 TestNodeTypeEnum SinkThrowType> 1128 void run_sequencer_node_test() { 1129 typedef test_input_body<BufferItemType,InputThrowType> InputBodyType; 1130 typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType; 1131 typedef sequencer_body<BufferItemType> SeqBodyType; 1132 1133 typedef tbb::flow::input_node<BufferItemType> InputType; 1134 typedef tbb::flow::sequencer_node<BufferItemType> SeqType; 1135 typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType; 1136 1137 for(int i = 0; i < 4; ++i) { 1138 if(i == 2) continue; // no need to test flog w/o throws 1139 bool throwException = (i & 0x1) != 0; 1140 bool doFlog = (i & 0x2) != 0; 1141 run_one_sequencer_node_test< 1142 /* class BufferItemType*/ BufferItemType, 1143 /*class InputNodeType*/ InputType, 1144 /*class InputNodeBodyType*/ InputBodyType, 1145 /*class TestNodeType*/ SeqType, 1146 /*class SeqBodyType*/ SeqBodyType, 1147 /*class SinkNodeType*/ SnkType, 1148 /*class SinkNodeBodyType*/ SinkBodyType 1149 >(throwException, doFlog); 1150 } 1151 } 1152 1153 1154 1155 void test_sequencer_node() { 1156 INFO("Testing sequencer_node\n"); 1157 g_Wakeup_Msg = "sequencer_node(is,non): Missed wakeup or machine is overloaded?"; 1158 run_sequencer_node_test<int, isThrowing,nonThrowing>(); 1159 CheckType<int>::check_type_counter = 0; 1160 g_Wakeup_Msg = "sequencer_node(non,is): Missed wakeup or machine is overloaded?"; 1161 run_sequencer_node_test<CheckType<int>, nonThrowing,isThrowing>(); 1162 CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Dropped objects in sequencer_node test"); 1163 g_Wakeup_Msg = "sequencer_node(is,is): Missed wakeup or machine is overloaded?"; 1164 run_sequencer_node_test<int, isThrowing,isThrowing>(); 1165 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 1166 } 1167 1168 // ------------ priority_queue_node ------------------ 1169 1170 template< 1171 class BufferItemType, 1172 class InputNodeType, 1173 class InputNodeBodyType, 1174 class TestNodeType, 1175 class SinkNodeType, 1176 class SinkNodeBodyType > 1177 void run_one_priority_queue_node_test(bool throwException,bool flog) { 1178 tbb::flow::graph g; 1179 1180 std::atomic<int> input_count; 1181 std::atomic<int> sink_count; 1182 input_count = sink_count = 0; 1183 #if USE_TASK_SCHEDULER_OBSERVER 1184 eh_test_observer o; 1185 o.observe(true); 1186 #endif 1187 g_Master = std::this_thread::get_id(); 1188 InputNodeType input(g, InputNodeBodyType(input_count)); 1189 1190 TestNodeType node_to_test(g); 1191 1192 SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count)); 1193 1194 make_edge(input,node_to_test); 1195 make_edge(node_to_test, sink); 1196 for(int iter = 0; iter < 2; ++iter) { 1197 ResetGlobals(throwException,flog); 1198 if(throwException) { 1199 TRY(); 1200 input.activate(); 1201 g.wait_for_all(); 1202 CATCH_AND_ASSERT(); 1203 } 1204 else { 1205 TRY(); 1206 input.activate(); 1207 g.wait_for_all(); 1208 CATCH_AND_FAIL(); 1209 } 1210 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 1211 int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value(); 1212 int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(); 1213 if(throwException) { 1214 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph"); 1215 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph"); 1216 CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs"); 1217 CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes"); 1218 } 1219 else { 1220 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 1221 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 1222 CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node"); 1223 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers"); 1224 } 1225 if(iter == 0) { 1226 remove_edge(node_to_test, sink); 1227 node_to_test.try_put(BufferItemType(g_NumItems + 1)); 1228 node_to_test.try_put(BufferItemType(g_NumItems + 2)); 1229 node_to_test.try_put(BufferItemType()); 1230 g.wait_for_all(); 1231 g.reset(); 1232 input_count = sink_count = 0; 1233 make_edge(node_to_test, sink); 1234 g.wait_for_all(); 1235 } 1236 else { 1237 g.reset(); 1238 input_count = sink_count = 0; 1239 } 1240 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed"); 1241 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed"); 1242 } 1243 1244 #if USE_TASK_SCHEDULER_OBSERVER 1245 o.observe(false); 1246 #endif 1247 } 1248 1249 template<class BufferItemType, 1250 TestNodeTypeEnum InputThrowType, 1251 TestNodeTypeEnum SinkThrowType> 1252 void run_priority_queue_node_test() { 1253 typedef test_input_body<BufferItemType,InputThrowType> InputBodyType; 1254 typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType; 1255 typedef less_body<BufferItemType> LessBodyType; 1256 1257 typedef tbb::flow::input_node<BufferItemType> InputType; 1258 typedef tbb::flow::priority_queue_node<BufferItemType,LessBodyType> PrqType; 1259 typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType; 1260 1261 for(int i = 0; i < 4; ++i) { 1262 if(i == 2) continue; // no need to test flog w/o throws 1263 bool throwException = (i & 0x1) != 0; 1264 bool doFlog = (i & 0x2) != 0; 1265 run_one_priority_queue_node_test< 1266 /* class BufferItemType*/ BufferItemType, 1267 /*class InputNodeType*/ InputType, 1268 /*class InputNodeBodyType*/ InputBodyType, 1269 /*class TestNodeType*/ PrqType, 1270 /*class SinkNodeType*/ SnkType, 1271 /*class SinkNodeBodyType*/ SinkBodyType 1272 >(throwException, doFlog); 1273 } 1274 } 1275 1276 void test_priority_queue_node() { 1277 INFO("Testing priority_queue_node\n"); 1278 g_Wakeup_Msg = "priority_queue_node(is,non): Missed wakeup or machine is overloaded?"; 1279 run_priority_queue_node_test<int, isThrowing,nonThrowing>(); 1280 CheckType<int>::check_type_counter = 0; 1281 g_Wakeup_Msg = "priority_queue_node(non,is): Missed wakeup or machine is overloaded?"; 1282 run_priority_queue_node_test<CheckType<int>, nonThrowing,isThrowing>(); 1283 CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Dropped objects in priority_queue_node test"); 1284 g_Wakeup_Msg = "priority_queue_node(is,is): Missed wakeup or machine is overloaded?"; 1285 run_priority_queue_node_test<int, isThrowing,isThrowing>(); 1286 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 1287 } 1288 1289 // ------------------- join_node ---------------- 1290 template<class JP> struct graph_policy_name{ 1291 static const char* name() {return "unknown"; } 1292 }; 1293 template<> struct graph_policy_name<tbb::flow::queueing> { 1294 static const char* name() {return "queueing"; } 1295 }; 1296 template<> struct graph_policy_name<tbb::flow::reserving> { 1297 static const char* name() {return "reserving"; } 1298 }; 1299 template<> struct graph_policy_name<tbb::flow::tag_matching> { 1300 static const char* name() {return "tag_matching"; } 1301 }; 1302 1303 1304 template< 1305 class JP, 1306 class OutputTuple, 1307 class InputType0, 1308 class InputBodyType0, 1309 class InputType1, 1310 class InputBodyType1, 1311 class TestJoinType, 1312 class SinkType, 1313 class SinkBodyType 1314 > 1315 struct run_one_join_node_test { 1316 run_one_join_node_test() {} 1317 static void execute_test(bool throwException,bool flog) { 1318 typedef typename std::tuple_element<0,OutputTuple>::type ItemType0; 1319 typedef typename std::tuple_element<1,OutputTuple>::type ItemType1; 1320 1321 tbb::flow::graph g; 1322 std::atomic<int>input0_count; 1323 std::atomic<int>input1_count; 1324 std::atomic<int>sink_count; 1325 input0_count = input1_count = sink_count = 0; 1326 #if USE_TASK_SCHEDULER_OBSERVER 1327 eh_test_observer o; 1328 o.observe(true); 1329 #endif 1330 g_Master = std::this_thread::get_id(); 1331 InputType0 input0(g, InputBodyType0(input0_count)); 1332 InputType1 input1(g, InputBodyType1(input1_count)); 1333 TestJoinType node_to_test(g); 1334 SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count)); 1335 make_edge(input0,tbb::flow::input_port<0>(node_to_test)); 1336 make_edge(input1,tbb::flow::input_port<1>(node_to_test)); 1337 make_edge(node_to_test, sink); 1338 for(int iter = 0; iter < 2; ++iter) { 1339 ResetGlobals(throwException,flog); 1340 if(throwException) { 1341 TRY(); 1342 input0.activate(); 1343 input1.activate(); 1344 g.wait_for_all(); 1345 CATCH_AND_ASSERT(); 1346 } 1347 else { 1348 TRY(); 1349 input0.activate(); 1350 input1.activate(); 1351 g.wait_for_all(); 1352 CATCH_AND_FAIL(); 1353 } 1354 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 1355 int ib0_cnt = tbb::flow::copy_body<InputBodyType0>(input0).count_value(); 1356 int ib1_cnt = tbb::flow::copy_body<InputBodyType1>(input1).count_value(); 1357 int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value(); 1358 if(throwException) { 1359 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph"); 1360 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph"); 1361 CHECK_MESSAGE( (ib0_cnt <= g_NumItems && ib1_cnt <= g_NumItems), "Too many items sent by inputs"); 1362 CHECK_MESSAGE( (nb_cnt <= ((ib0_cnt < ib1_cnt) ? ib0_cnt : ib1_cnt)), "Too many items received by sink nodes"); 1363 } 1364 else { 1365 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 1366 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 1367 if(ib0_cnt != g_NumItems) { 1368 // INFO("throwException == %s\n" << (throwException ? "true" : "false")); 1369 // INFO("iter == " << iter << "\n"); 1370 // INFO("ib0_cnt == " << ib0_cnt << "\n"); 1371 // INFO("g_NumItems == " << g_NumItems << "\n"); 1372 } 1373 CHECK_MESSAGE( (ib0_cnt == g_NumItems), "Missing invocations of input_node0"); // this one 1374 CHECK_MESSAGE( (ib1_cnt == g_NumItems), "Missing invocations of input_node1"); 1375 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers"); 1376 } 1377 if(iter == 0) { 1378 remove_edge(node_to_test, sink); 1379 tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 1)); 1380 tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2)); 1381 g.wait_for_all(); 1382 g.reset(); 1383 input0_count = input1_count = sink_count = 0; 1384 make_edge(node_to_test, sink); 1385 g.wait_for_all(); 1386 } 1387 else { 1388 g.wait_for_all(); 1389 g.reset(); 1390 input0_count = input1_count = sink_count = 0; 1391 } 1392 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType0>(input0).count_value()),"Reset input failed"); 1393 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType1>(input1).count_value()),"Reset input failed"); 1394 nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value(); 1395 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value()),"Reset sink failed"); 1396 } 1397 1398 #if USE_TASK_SCHEDULER_OBSERVER 1399 o.observe(false); 1400 #endif 1401 } 1402 }; // run_one_join_node_test 1403 1404 template< 1405 class OutputTuple, 1406 class InputType0, 1407 class InputBodyType0, 1408 class InputType1, 1409 class InputBodyType1, 1410 class TestJoinType, 1411 class SinkType, 1412 class SinkBodyType 1413 > 1414 struct run_one_join_node_test< 1415 tbb::flow::tag_matching, 1416 OutputTuple, 1417 InputType0, 1418 InputBodyType0, 1419 InputType1, 1420 InputBodyType1, 1421 TestJoinType, 1422 SinkType, 1423 SinkBodyType 1424 > { 1425 run_one_join_node_test() {} 1426 static void execute_test(bool throwException,bool flog) { 1427 typedef typename std::tuple_element<0,OutputTuple>::type ItemType0; 1428 typedef typename std::tuple_element<1,OutputTuple>::type ItemType1; 1429 1430 tbb::flow::graph g; 1431 1432 std::atomic<int>input0_count; 1433 std::atomic<int>input1_count; 1434 std::atomic<int>sink_count; 1435 input0_count = input1_count = sink_count = 0; 1436 #if USE_TASK_SCHEDULER_OBSERVER 1437 eh_test_observer o; 1438 o.observe(true); 1439 #endif 1440 g_Master = std::this_thread::get_id(); 1441 InputType0 input0(g, InputBodyType0(input0_count, 2)); 1442 InputType1 input1(g, InputBodyType1(input1_count, 3)); 1443 TestJoinType node_to_test(g, tag_func<ItemType0>(ItemType0(2)), tag_func<ItemType1>(ItemType1(3))); 1444 SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count)); 1445 make_edge(input0,tbb::flow::input_port<0>(node_to_test)); 1446 make_edge(input1,tbb::flow::input_port<1>(node_to_test)); 1447 make_edge(node_to_test, sink); 1448 for(int iter = 0; iter < 2; ++iter) { 1449 ResetGlobals(throwException,flog); 1450 if(throwException) { 1451 TRY(); 1452 input0.activate(); 1453 input1.activate(); 1454 g.wait_for_all(); 1455 CATCH_AND_ASSERT(); 1456 } 1457 else { 1458 TRY(); 1459 input0.activate(); 1460 input1.activate(); 1461 g.wait_for_all(); 1462 CATCH_AND_FAIL(); 1463 } 1464 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 1465 int ib0_cnt = tbb::flow::copy_body<InputBodyType0>(input0).count_value(); 1466 int ib1_cnt = tbb::flow::copy_body<InputBodyType1>(input1).count_value(); 1467 int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value(); 1468 if(throwException) { 1469 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph"); 1470 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph"); 1471 CHECK_MESSAGE( (ib0_cnt <= g_NumItems && ib1_cnt <= g_NumItems), "Too many items sent by inputs"); 1472 CHECK_MESSAGE( (nb_cnt <= ((ib0_cnt < ib1_cnt) ? ib0_cnt : ib1_cnt)), "Too many items received by sink nodes"); 1473 } 1474 else { 1475 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 1476 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 1477 CHECK_MESSAGE( (ib0_cnt == g_NumItems), "Missing invocations of input_node0"); 1478 CHECK_MESSAGE( (ib1_cnt == g_NumItems), "Missing invocations of input_node1"); 1479 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers"); 1480 } 1481 if(iter == 0) { 1482 remove_edge(node_to_test, sink); 1483 tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 4)); 1484 tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2)); 1485 g.wait_for_all(); // have to wait for the graph to stop again.... 1486 g.reset(); // resets the body of the input_nodes, test_node and the absorb_nodes. 1487 input0_count = input1_count = sink_count = 0; 1488 make_edge(node_to_test, sink); 1489 g.wait_for_all(); // have to wait for the graph to stop again.... 1490 } 1491 else { 1492 g.wait_for_all(); 1493 g.reset(); 1494 input0_count = input1_count = sink_count = 0; 1495 } 1496 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType0>(input0).count_value()),"Reset input failed"); 1497 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType1>(input1).count_value()),"Reset input failed"); 1498 nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value(); 1499 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value()),"Reset sink failed"); 1500 } 1501 1502 #if USE_TASK_SCHEDULER_OBSERVER 1503 o.observe(false); 1504 #endif 1505 } 1506 }; // run_one_join_node_test<tag_matching> 1507 1508 template<class JP, class OutputTuple, 1509 TestNodeTypeEnum InputThrowType, 1510 TestNodeTypeEnum SinkThrowType> 1511 void run_join_node_test() { 1512 typedef typename std::tuple_element<0,OutputTuple>::type ItemType0; 1513 typedef typename std::tuple_element<1,OutputTuple>::type ItemType1; 1514 typedef test_input_body<ItemType0,InputThrowType> InputBodyType0; 1515 typedef test_input_body<ItemType1,InputThrowType> InputBodyType1; 1516 typedef absorber_body<OutputTuple,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType; 1517 1518 typedef typename tbb::flow::input_node<ItemType0> InputType0; 1519 typedef typename tbb::flow::input_node<ItemType1> InputType1; 1520 typedef typename tbb::flow::join_node<OutputTuple,JP> TestJoinType; 1521 typedef typename tbb::flow::function_node<OutputTuple,tbb::flow::continue_msg> SinkType; 1522 1523 for(int i = 0; i < 4; ++i) { 1524 if(2 == i) continue; 1525 bool throwException = (i & 0x1) != 0; 1526 bool doFlog = (i & 0x2) != 0; 1527 run_one_join_node_test< 1528 JP, 1529 OutputTuple, 1530 InputType0, 1531 InputBodyType0, 1532 InputType1, 1533 InputBodyType1, 1534 TestJoinType, 1535 SinkType, 1536 SinkBodyType>::execute_test(throwException,doFlog); 1537 } 1538 } 1539 1540 template<class JP> 1541 void test_join_node() { 1542 INFO("Testing join_node<" << graph_policy_name<JP>::name() << ">\n"); 1543 // only doing two-input joins 1544 g_Wakeup_Msg = "join(is,non): Missed wakeup or machine is overloaded?"; 1545 run_join_node_test<JP, std::tuple<int,int>, isThrowing, nonThrowing>(); 1546 CheckType<int>::check_type_counter = 0; 1547 g_Wakeup_Msg = "join(non,is): Missed wakeup or machine is overloaded?"; 1548 run_join_node_test<JP, std::tuple<CheckType<int>,int>, nonThrowing, isThrowing>(); 1549 CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Dropped items in test"); 1550 g_Wakeup_Msg = "join(is,is): Missed wakeup or machine is overloaded?"; 1551 run_join_node_test<JP, std::tuple<int,int>, isThrowing, isThrowing>(); 1552 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 1553 } 1554 1555 // ------------------- limiter_node ------------- 1556 1557 template< 1558 class BufferItemType, // 1559 class InputNodeType, 1560 class InputNodeBodyType, 1561 class TestNodeType, 1562 class SinkNodeType, 1563 class SinkNodeBodyType > 1564 void run_one_limiter_node_test(bool throwException,bool flog) { 1565 tbb::flow::graph g; 1566 1567 std::atomic<int> input_count; 1568 std::atomic<int> sink_count; 1569 input_count = sink_count = 0; 1570 #if USE_TASK_SCHEDULER_OBSERVER 1571 eh_test_observer o; 1572 o.observe(true); 1573 #endif 1574 g_Master = std::this_thread::get_id(); 1575 InputNodeType input(g, InputNodeBodyType(input_count)); 1576 TestNodeType node_to_test(g,g_NumThreads + 1); 1577 SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count)); 1578 make_edge(input,node_to_test); 1579 make_edge(node_to_test, sink); 1580 for(int iter = 0; iter < 2; ++iter) { 1581 ResetGlobals(throwException,flog); 1582 if(throwException) { 1583 TRY(); 1584 input.activate(); 1585 g.wait_for_all(); 1586 CATCH_AND_ASSERT(); 1587 } 1588 else { 1589 TRY(); 1590 input.activate(); 1591 g.wait_for_all(); 1592 CATCH_AND_FAIL(); 1593 } 1594 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 1595 int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value(); 1596 int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(); 1597 if(throwException) { 1598 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph"); 1599 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph"); 1600 CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs"); 1601 CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes"); 1602 } 1603 else { 1604 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 1605 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 1606 // we stop after limiter's limit, which is g_NumThreads + 1. The input_node 1607 // is invoked one extra time, filling its buffer, so its limit is g_NumThreads + 2. 1608 CHECK_MESSAGE( (ib_cnt == g_NumThreads + 2), "Missing invocations of input_node"); 1609 CHECK_MESSAGE( (nb_cnt == g_NumThreads + 1), "Missing items in absorbers"); 1610 } 1611 if(iter == 0) { 1612 remove_edge(node_to_test, sink); 1613 node_to_test.try_put(BufferItemType()); 1614 node_to_test.try_put(BufferItemType()); 1615 g.wait_for_all(); 1616 g.reset(); 1617 input_count = sink_count = 0; 1618 BufferItemType tmp; 1619 CHECK_MESSAGE( (!node_to_test.try_get(tmp)), "node not empty"); 1620 make_edge(node_to_test, sink); 1621 g.wait_for_all(); 1622 } 1623 else { 1624 g.reset(); 1625 input_count = sink_count = 0; 1626 } 1627 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed"); 1628 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed"); 1629 } 1630 1631 #if USE_TASK_SCHEDULER_OBSERVER 1632 o.observe(false); 1633 #endif 1634 } 1635 1636 template<class BufferItemType, 1637 TestNodeTypeEnum InputThrowType, 1638 TestNodeTypeEnum SinkThrowType> 1639 void run_limiter_node_test() { 1640 typedef test_input_body<BufferItemType,InputThrowType> InputBodyType; 1641 typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType; 1642 1643 typedef tbb::flow::input_node<BufferItemType> InputType; 1644 typedef tbb::flow::limiter_node<BufferItemType> LmtType; 1645 typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType; 1646 1647 for(int i = 0; i < 4; ++i) { 1648 if(i == 2) continue; // no need to test flog w/o throws 1649 bool throwException = (i & 0x1) != 0; 1650 bool doFlog = (i & 0x2) != 0; 1651 run_one_limiter_node_test< 1652 /* class BufferItemType*/ BufferItemType, 1653 /*class InputNodeType*/ InputType, 1654 /*class InputNodeBodyType*/ InputBodyType, 1655 /*class TestNodeType*/ LmtType, 1656 /*class SinkNodeType*/ SnkType, 1657 /*class SinkNodeBodyType*/ SinkBodyType 1658 >(throwException, doFlog); 1659 } 1660 } 1661 1662 void test_limiter_node() { 1663 INFO("Testing limiter_node\n"); 1664 g_Wakeup_Msg = "limiter_node(is,non): Missed wakeup or machine is overloaded?"; 1665 run_limiter_node_test<int,isThrowing,nonThrowing>(); 1666 g_Wakeup_Msg = "limiter_node(non,is): Missed wakeup or machine is overloaded?"; 1667 run_limiter_node_test<int,nonThrowing,isThrowing>(); 1668 g_Wakeup_Msg = "limiter_node(is,is): Missed wakeup or machine is overloaded?"; 1669 run_limiter_node_test<int,isThrowing,isThrowing>(); 1670 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 1671 } 1672 1673 // -------- split_node -------------------- 1674 1675 template< 1676 class InputTuple, 1677 class InputType, 1678 class InputBodyType, 1679 class TestSplitType, 1680 class SinkType0, 1681 class SinkBodyType0, 1682 class SinkType1, 1683 class SinkBodyType1> 1684 void run_one_split_node_test(bool throwException, bool flog) { 1685 1686 tbb::flow::graph g; 1687 1688 std::atomic<int> input_count; 1689 std::atomic<int> sink0_count; 1690 std::atomic<int> sink1_count; 1691 input_count = sink0_count = sink1_count = 0; 1692 #if USE_TASK_SCHEDULER_OBSERVER 1693 eh_test_observer o; 1694 o.observe(true); 1695 #endif 1696 1697 g_Master = std::this_thread::get_id(); 1698 InputType input(g, InputBodyType(input_count)); 1699 TestSplitType node_to_test(g); 1700 SinkType0 sink0(g,tbb::flow::unlimited,SinkBodyType0(sink0_count)); 1701 SinkType1 sink1(g,tbb::flow::unlimited,SinkBodyType1(sink1_count)); 1702 make_edge(input, node_to_test); 1703 make_edge(tbb::flow::output_port<0>(node_to_test), sink0); 1704 make_edge(tbb::flow::output_port<1>(node_to_test), sink1); 1705 1706 for(int iter = 0; iter < 2; ++iter) { // run, reset, run again 1707 ResetGlobals(throwException,flog); 1708 if(throwException) { 1709 TRY(); 1710 input.activate(); 1711 g.wait_for_all(); 1712 CATCH_AND_ASSERT(); 1713 } 1714 else { 1715 TRY(); 1716 input.activate(); 1717 g.wait_for_all(); 1718 CATCH_AND_FAIL(); 1719 } 1720 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 1721 int ib_cnt = tbb::flow::copy_body<InputBodyType>(input).count_value(); 1722 int nb0_cnt = tbb::flow::copy_body<SinkBodyType0>(sink0).count_value(); 1723 int nb1_cnt = tbb::flow::copy_body<SinkBodyType1>(sink1).count_value(); 1724 if(throwException) { 1725 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph"); 1726 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph"); 1727 CHECK_MESSAGE( (ib_cnt <= 2*g_NumItems), "Too many items sent by input"); 1728 CHECK_MESSAGE( (nb0_cnt + nb1_cnt <= ib_cnt*2), "Too many items received by sink nodes"); 1729 } 1730 else { 1731 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 1732 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 1733 CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_nodes"); 1734 CHECK_MESSAGE( (nb0_cnt == g_NumItems && nb1_cnt == g_NumItems), "Missing items in absorbers"); 1735 } 1736 g.reset(); // resets the body of the input_nodes and the absorb_nodes. 1737 input_count = sink0_count = sink1_count = 0; 1738 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType>(input).count_value()),"Reset input failed"); 1739 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType0>(sink0).count_value()),"Reset sink 0 failed"); 1740 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType1>(sink1).count_value()),"Reset sink 1 failed"); 1741 } 1742 #if USE_TASK_SCHEDULER_OBSERVER 1743 o.observe(false); 1744 #endif 1745 } 1746 1747 template<class InputTuple, 1748 TestNodeTypeEnum InputThrowType, 1749 TestNodeTypeEnum SinkThrowType> 1750 void run_split_node_test() { 1751 typedef typename std::tuple_element<0,InputTuple>::type ItemType0; 1752 typedef typename std::tuple_element<1,InputTuple>::type ItemType1; 1753 typedef tuple_test_input_body<InputTuple,InputThrowType> InputBodyType; 1754 typedef absorber_body<ItemType0,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType0; 1755 typedef absorber_body<ItemType1,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType1; 1756 1757 typedef typename tbb::flow::input_node<InputTuple> InputType; 1758 typedef typename tbb::flow::split_node<InputTuple> TestSplitType; 1759 typedef typename tbb::flow::function_node<ItemType0,tbb::flow::continue_msg> SinkType0; 1760 typedef typename tbb::flow::function_node<ItemType1,tbb::flow::continue_msg> SinkType1; 1761 1762 for(int i = 0; i < 4; ++i) { 1763 if(2 == i) continue; 1764 bool throwException = (i & 0x1) != 0; 1765 bool doFlog = (i & 0x2) != 0; 1766 run_one_split_node_test< 1767 InputTuple, 1768 InputType, 1769 InputBodyType, 1770 TestSplitType, 1771 SinkType0, 1772 SinkBodyType0, 1773 SinkType1, 1774 SinkBodyType1> 1775 (throwException,doFlog); 1776 } 1777 } 1778 1779 void test_split_node() { 1780 INFO("Testing split_node\n"); 1781 g_Wakeup_Msg = "split_node(is,non): Missed wakeup or machine is overloaded?"; 1782 run_split_node_test<std::tuple<int,int>, isThrowing, nonThrowing>(); 1783 g_Wakeup_Msg = "split_node(non,is): Missed wakeup or machine is overloaded?"; 1784 run_split_node_test<std::tuple<int,int>, nonThrowing, isThrowing>(); 1785 g_Wakeup_Msg = "split_node(is,is): Missed wakeup or machine is overloaded?"; 1786 run_split_node_test<std::tuple<int,int>, isThrowing, isThrowing>(); 1787 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 1788 } 1789 1790 // --------- indexer_node ---------------------- 1791 1792 template < class InputTuple, 1793 class InputType0, 1794 class InputBodyType0, 1795 class InputType1, 1796 class InputBodyType1, 1797 class TestNodeType, 1798 class SinkType, 1799 class SinkBodyType> 1800 void run_one_indexer_node_test(bool throwException,bool flog) { 1801 typedef typename std::tuple_element<0,InputTuple>::type ItemType0; 1802 typedef typename std::tuple_element<1,InputTuple>::type ItemType1; 1803 1804 tbb::flow::graph g; 1805 1806 std::atomic<int> input0_count; 1807 std::atomic<int> input1_count; 1808 std::atomic<int> sink_count; 1809 input0_count = input1_count = sink_count = 0; 1810 #if USE_TASK_SCHEDULER_OBSERVER 1811 eh_test_observer o; 1812 o.observe(true); 1813 #endif 1814 g_Master = std::this_thread::get_id(); 1815 InputType0 input0(g, InputBodyType0(input0_count)); 1816 InputType1 input1(g, InputBodyType1(input1_count)); 1817 TestNodeType node_to_test(g); 1818 SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count)); 1819 make_edge(input0,tbb::flow::input_port<0>(node_to_test)); 1820 make_edge(input1,tbb::flow::input_port<1>(node_to_test)); 1821 make_edge(node_to_test, sink); 1822 for(int iter = 0; iter < 2; ++iter) { 1823 ResetGlobals(throwException,flog); 1824 if(throwException) { 1825 TRY(); 1826 input0.activate(); 1827 input1.activate(); 1828 g.wait_for_all(); 1829 CATCH_AND_ASSERT(); 1830 } 1831 else { 1832 TRY(); 1833 input0.activate(); 1834 input1.activate(); 1835 g.wait_for_all(); 1836 CATCH_AND_FAIL(); 1837 } 1838 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 1839 int ib0_cnt = tbb::flow::copy_body<InputBodyType0>(input0).count_value(); 1840 int ib1_cnt = tbb::flow::copy_body<InputBodyType1>(input1).count_value(); 1841 int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value(); 1842 if(throwException) { 1843 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph"); 1844 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph"); 1845 CHECK_MESSAGE( (ib0_cnt <= g_NumItems && ib1_cnt <= g_NumItems), "Too many items sent by inputs"); 1846 CHECK_MESSAGE( (nb_cnt <= ib0_cnt + ib1_cnt), "Too many items received by sink nodes"); 1847 } 1848 else { 1849 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 1850 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 1851 CHECK_MESSAGE( (ib0_cnt == g_NumItems), "Missing invocations of input_node0"); 1852 CHECK_MESSAGE( (ib1_cnt == g_NumItems), "Missing invocations of input_node1"); 1853 CHECK_MESSAGE( (nb_cnt == 2*g_NumItems), "Missing items in absorbers"); 1854 } 1855 if(iter == 0) { 1856 remove_edge(node_to_test, sink); 1857 tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 4)); 1858 tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2)); 1859 g.wait_for_all(); 1860 g.reset(); 1861 input0_count = input1_count = sink_count = 0; 1862 make_edge(node_to_test, sink); 1863 g.wait_for_all(); 1864 } 1865 else { 1866 g.wait_for_all(); 1867 g.reset(); 1868 input0_count = input1_count = sink_count = 0; 1869 } 1870 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType0>(input0).count_value()),"Reset input failed"); 1871 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType1>(input1).count_value()),"Reset input failed"); 1872 nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value(); 1873 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value()),"Reset sink failed"); 1874 } 1875 1876 #if USE_TASK_SCHEDULER_OBSERVER 1877 o.observe(false); 1878 #endif 1879 } 1880 1881 template<class InputTuple, 1882 TestNodeTypeEnum InputThrowType, 1883 TestNodeTypeEnum SinkThrowType> 1884 void run_indexer_node_test() { 1885 typedef typename std::tuple_element<0,InputTuple>::type ItemType0; 1886 typedef typename std::tuple_element<1,InputTuple>::type ItemType1; 1887 typedef test_input_body<ItemType0,InputThrowType> InputBodyType0; 1888 typedef test_input_body<ItemType1,InputThrowType> InputBodyType1; 1889 typedef typename tbb::flow::indexer_node<ItemType0, ItemType1> TestNodeType; 1890 typedef absorber_body<typename TestNodeType::output_type,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType; 1891 1892 typedef typename tbb::flow::input_node<ItemType0> InputType0; 1893 typedef typename tbb::flow::input_node<ItemType1> InputType1; 1894 typedef typename tbb::flow::function_node<typename TestNodeType::output_type,tbb::flow::continue_msg> SinkType; 1895 1896 for(int i = 0; i < 4; ++i) { 1897 if(2 == i) continue; 1898 bool throwException = (i & 0x1) != 0; 1899 bool doFlog = (i & 0x2) != 0; 1900 run_one_indexer_node_test< 1901 InputTuple, 1902 InputType0, 1903 InputBodyType0, 1904 InputType1, 1905 InputBodyType1, 1906 TestNodeType, 1907 SinkType, 1908 SinkBodyType>(throwException,doFlog); 1909 } 1910 } 1911 1912 void test_indexer_node() { 1913 INFO("Testing indexer_node\n"); 1914 g_Wakeup_Msg = "indexer_node(is,non): Missed wakeup or machine is overloaded?"; 1915 run_indexer_node_test<std::tuple<int,int>, isThrowing, nonThrowing>(); 1916 g_Wakeup_Msg = "indexer_node(non,is): Missed wakeup or machine is overloaded?"; 1917 run_indexer_node_test<std::tuple<int,int>, nonThrowing, isThrowing>(); 1918 g_Wakeup_Msg = "indexer_node(is,is): Missed wakeup or machine is overloaded?"; 1919 run_indexer_node_test<std::tuple<int,int>, isThrowing, isThrowing>(); 1920 g_Wakeup_Msg = g_Orig_Wakeup_Msg;; 1921 } 1922 1923 /////////////////////////////////////////////// 1924 // whole-graph exception test 1925 1926 class Foo { 1927 private: 1928 // std::vector<int>& m_vec; 1929 std::vector<int>* m_vec; 1930 public: 1931 Foo(std::vector<int>& vec) : m_vec(&vec) { } 1932 void operator() (tbb::flow::continue_msg) const { 1933 ++nExceptions; 1934 (void)m_vec->at(m_vec->size()); // Will throw out_of_range exception 1935 CHECK_MESSAGE( (false), "Exception not thrown by invalid access"); 1936 } 1937 }; 1938 1939 // test from user ahelwer: http://software.intel.com/en-us/forums/showthread.php?t=103786 1940 // exception thrown in graph node, not caught in wait_for_all() 1941 void 1942 test_flow_graph_exception0() { 1943 // Initializes body 1944 std::vector<int> vec; 1945 vec.push_back(0); 1946 Foo f(vec); 1947 nExceptions = 0; 1948 1949 // Construct graph and nodes 1950 tbb::flow::graph g; 1951 tbb::flow::broadcast_node<tbb::flow::continue_msg> start(g); 1952 tbb::flow::continue_node<tbb::flow::continue_msg> fooNode(g, f); 1953 1954 // Construct edge 1955 tbb::flow::make_edge(start, fooNode); 1956 1957 // Execute graph 1958 CHECK_MESSAGE( (!g.exception_thrown()), "exception_thrown flag already set"); 1959 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag already set"); 1960 try { 1961 start.try_put(tbb::flow::continue_msg()); 1962 g.wait_for_all(); 1963 CHECK_MESSAGE( (false), "Exception not thrown"); 1964 } 1965 catch(std::out_of_range& ex) { 1966 INFO("Exception: " << ex.what() << "(expected)\n"); 1967 } 1968 catch(...) { 1969 INFO("Unknown exception caught (expected)\n"); 1970 } 1971 CHECK_MESSAGE( (nExceptions > 0), "Exception caught, but no body signaled exception being thrown"); 1972 nExceptions = 0; 1973 CHECK_MESSAGE( (g.exception_thrown()), "Exception not intercepted"); 1974 // if exception set, cancellation also set. 1975 CHECK_MESSAGE( (g.is_cancelled()), "Exception cancellation not signaled"); 1976 // in case we got an exception 1977 try { 1978 g.wait_for_all(); // context still signalled canceled, my_exception still set. 1979 } 1980 catch(...) { 1981 CHECK_MESSAGE( (false), "Second exception thrown but no task executing"); 1982 } 1983 CHECK_MESSAGE( (nExceptions == 0), "body signaled exception being thrown, but no body executed"); 1984 CHECK_MESSAGE( (!g.exception_thrown()), "exception_thrown flag not reset"); 1985 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag not reset"); 1986 } 1987 1988 void TestOneThreadNum(int nThread) { 1989 INFO("Testing " << nThread << "%d threads\n"); 1990 g_NumItems = ((nThread > NUM_ITEMS) ? nThread *2 : NUM_ITEMS); 1991 g_NumThreads = nThread; 1992 tbb::task_arena arena(nThread); 1993 arena.execute( 1994 [&]() { 1995 // whole-graph exception catch and rethrow test 1996 test_flow_graph_exception0(); 1997 for(int i = 0; i < 4; ++i) { 1998 g_ExceptionInMaster = (i & 1) != 0; 1999 g_SolitaryException = (i & 2) != 0; 2000 INFO("g_ExceptionInMaster == " << (g_ExceptionInMaster ? "T":"F") 2001 << ", g_SolitaryException == " << (g_SolitaryException ? "T":"F") << "\n"); 2002 test_input_node(); 2003 test_function_node(); 2004 test_continue_node(); // also test broadcast_node 2005 test_multifunction_node(); 2006 // single- and multi-item buffering nodes 2007 test_buffer_queue_and_overwrite_node(); 2008 test_sequencer_node(); 2009 test_priority_queue_node(); 2010 2011 // join_nodes 2012 test_join_node<tbb::flow::queueing>(); 2013 test_join_node<tbb::flow::reserving>(); 2014 test_join_node<tbb::flow::tag_matching>(); 2015 2016 test_limiter_node(); 2017 test_split_node(); 2018 // graph for write_once_node will be complicated by the fact the node will 2019 // not do try_puts after it has been set. To get parallelism of N we have 2020 // to attach N successor nodes to the write_once (or play some similar game). 2021 // test_write_once_node(); 2022 test_indexer_node(); 2023 } 2024 } 2025 ); 2026 } 2027 2028 //! Test exceptions with parallelism 2029 //! \brief \ref error_guessing 2030 TEST_CASE("Testing several threads"){ 2031 // reverse order of tests 2032 for(unsigned int nThread=utils::MaxThread; nThread >= utils::MinThread; --nThread) { 2033 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, nThread); 2034 TestOneThreadNum(nThread); 2035 } 2036 } 2037 2038 #endif // TBB_USE_EXCEPTIONS 2039