1 /* 2 Copyright (c) 2005-2022 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 //! \file test_eh_flow_graph.cpp 18 //! \brief Test for [flow_graph.copy_body flow_graph.broadcast_node flow_graph.priority_queue_node flow_graph.indexer_node flow_graph.sequencer_node flow_graph.remove_edge flow_graph.join_node flow_graph.split_node flow_graph.limiter_node flow_graph.write_once_node flow_graph.overwrite_node flow_graph.make_edge flow_graph.graph flow_graph.buffer_node flow_graph.function_node flow_graph.multifunction_node flow_graph.continue_node flow_graph.input_node] specification 19 20 #include "common/config.h" 21 22 #if USE_TASK_SCHEDULER_OBSERVER 23 #include "tbb/task_scheduler_observer.h" 24 #endif 25 #include "tbb/flow_graph.h" 26 #include "tbb/global_control.h" 27 28 #include "common/test.h" 29 30 #if TBB_USE_EXCEPTIONS 31 32 #include "common/utils.h" 33 #include "common/checktype.h" 34 #include "common/concurrency_tracker.h" 35 36 #if _MSC_VER 37 #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning 38 #endif 39 40 #if __TBB_MSVC_UNREACHABLE_CODE_IGNORED 41 // Suppress "unreachable code" warning by VC++ 17.0-18.0 (VS 2012 or newer) 42 #pragma warning (disable: 4702) 43 #endif 44 45 // global task_scheduler_observer is an imperfect tool to find how many threads are really 46 // participating. That was the hope, but it counts the entries into the marketplace, 47 // not the arena. 48 // TODO: Consider using local task scheduler observer 49 // #define USE_TASK_SCHEDULER_OBSERVER 1 50 51 #include <iostream> 52 #include <sstream> 53 #include <vector> 54 55 #include "common/exception_handling.h" 56 57 #include <stdexcept> 58 59 #define NUM_ITEMS 15 60 int g_NumItems; 61 62 std::atomic<unsigned> nExceptions; 63 std::atomic<intptr_t> g_TGCCancelled; 64 65 enum TestNodeTypeEnum { nonThrowing, isThrowing }; 66 67 static const size_t unlimited_type = 0; 68 static const size_t serial_type = 1; 69 static const size_t limited_type = 4; 70 71 template<TestNodeTypeEnum T> struct TestNodeTypeName; 72 template<> struct TestNodeTypeName<nonThrowing> { static const char *name() { return "nonThrowing"; } }; 73 template<> struct TestNodeTypeName<isThrowing> { static const char *name() { return "isThrowing"; } }; 74 75 template<size_t Conc> struct concurrencyName; 76 template<> struct concurrencyName<serial_type>{ static const char *name() { return "serial"; } }; 77 template<> struct concurrencyName<unlimited_type>{ static const char *name() { return "unlimited"; } }; 78 template<> struct concurrencyName<limited_type>{ static const char *name() { return "limited"; } }; 79 80 // Class that provides waiting and throwing behavior. If we are not throwing, do nothing 81 // If serial, we can't wait for concurrency to peak; we may be the bottleneck and will 82 // stop further processing. We will execute g_NumThreads + 10 times (the "10" is somewhat 83 // arbitrary, and just makes sure there are enough items in the graph to keep it flowing), 84 // If parallel or serial and throwing, use utils::ConcurrencyTracker to wait. 85 86 template<size_t Conc, TestNodeTypeEnum t = nonThrowing> 87 class WaitThrow; 88 89 template<> 90 class WaitThrow<serial_type,nonThrowing> { 91 protected: 92 void WaitAndThrow(int cnt, const char * /*name*/) { 93 if(cnt > g_NumThreads + 10) { 94 utils::ConcurrencyTracker ct; 95 WaitUntilConcurrencyPeaks(); 96 } 97 } 98 }; 99 100 template<> 101 class WaitThrow<serial_type,isThrowing> { 102 protected: 103 void WaitAndThrow(int cnt, const char * /*name*/) { 104 if(cnt > g_NumThreads + 10) { 105 utils::ConcurrencyTracker ct; 106 WaitUntilConcurrencyPeaks(); 107 ThrowTestException(1); 108 } 109 } 110 }; 111 112 // for nodes with limited concurrency, if that concurrency is < g_NumThreads, we need 113 // to make sure enough other nodes wait for concurrency to peak. If we are attached to 114 // N successors, for each item we pass to a successor, we will get N executions of the 115 // "absorbers" (because we broadcast to successors.) for an odd number of threads we 116 // need (g_NumThreads - limited + 1) / 2 items (that will give us one extra execution 117 // of an "absorber", but we can't change that without changing the behavior of the node.) 118 template<> 119 class WaitThrow<limited_type,nonThrowing> { 120 protected: 121 void WaitAndThrow(int cnt, const char * /*name*/) { 122 if(cnt <= (g_NumThreads - (int)limited_type + 1)/2) { 123 return; 124 } 125 utils::ConcurrencyTracker ct; 126 WaitUntilConcurrencyPeaks(); 127 } 128 }; 129 130 template<> 131 class WaitThrow<limited_type,isThrowing> { 132 protected: 133 void WaitAndThrow(int cnt, const char * /*name*/) { 134 utils::ConcurrencyTracker ct; 135 if(cnt <= (g_NumThreads - (int)limited_type + 1)/2) { 136 return; 137 } 138 WaitUntilConcurrencyPeaks(); 139 ThrowTestException(1); 140 } 141 }; 142 143 template<> 144 class WaitThrow<unlimited_type,nonThrowing> { 145 protected: 146 void WaitAndThrow(int /*cnt*/, const char * /*name*/) { 147 utils::ConcurrencyTracker ct; 148 WaitUntilConcurrencyPeaks(); 149 } 150 }; 151 152 template<> 153 class WaitThrow<unlimited_type,isThrowing> { 154 protected: 155 void WaitAndThrow(int /*cnt*/, const char * /*name*/) { 156 utils::ConcurrencyTracker ct; 157 WaitUntilConcurrencyPeaks(); 158 ThrowTestException(1); 159 } 160 }; 161 162 void 163 ResetGlobals(bool throwException = true, bool flog = false) { 164 nExceptions = 0; 165 g_TGCCancelled = 0; 166 ResetEhGlobals(throwException, flog); 167 } 168 169 // -------input_node body ------------------ 170 template <class OutputType, TestNodeTypeEnum TType> 171 class test_input_body : WaitThrow<serial_type, TType> { 172 using WaitThrow<serial_type, TType>::WaitAndThrow; 173 std::atomic<int> *my_current_val; 174 int my_mult; 175 public: 176 test_input_body(std::atomic<int> &my_cnt, int multiplier = 1) : my_current_val(&my_cnt), my_mult(multiplier) { 177 // INFO("- --------- - - - constructed " << (size_t)(my_current_val) << "\n"); 178 } 179 180 OutputType operator()(tbb::flow_control& fc) { 181 UPDATE_COUNTS(); 182 OutputType ret = OutputType(my_mult * ++(*my_current_val)); 183 // TODO revamp: reconsider logging for the tests. 184 185 // The following line is known to cause double frees. Therefore, commenting out frequent 186 // calls to INFO() macro. 187 188 // INFO("xx(" << (size_t)(my_current_val) << ") ret == " << (int)ret << "\n"); 189 if(*my_current_val > g_NumItems) { 190 // INFO(" ------ End of the line!\n"); 191 *my_current_val = g_NumItems; 192 fc.stop(); 193 return OutputType(); 194 } 195 WaitAndThrow((int)ret,"test_input_body"); 196 return ret; 197 } 198 199 int count_value() { return (int)*my_current_val; } 200 }; 201 202 template <TestNodeTypeEnum TType> 203 class test_input_body<tbb::flow::continue_msg, TType> : WaitThrow<serial_type, TType> { 204 using WaitThrow<serial_type, TType>::WaitAndThrow; 205 std::atomic<int> *my_current_val; 206 public: 207 test_input_body(std::atomic<int> &my_cnt) : my_current_val(&my_cnt) { } 208 209 tbb::flow::continue_msg operator()( tbb::flow_control & fc) { 210 UPDATE_COUNTS(); 211 int outint = ++(*my_current_val); 212 if(*my_current_val > g_NumItems) { 213 *my_current_val = g_NumItems; 214 fc.stop(); 215 return tbb::flow::continue_msg(); 216 } 217 WaitAndThrow(outint,"test_input_body"); 218 return tbb::flow::continue_msg(); 219 } 220 221 int count_value() { return (int)*my_current_val; } 222 }; 223 224 // -------{function/continue}_node body ------------------ 225 template<class InputType, class OutputType, TestNodeTypeEnum T, size_t Conc> 226 class absorber_body : WaitThrow<Conc,T> { 227 using WaitThrow<Conc,T>::WaitAndThrow; 228 std::atomic<int> *my_count; 229 public: 230 absorber_body(std::atomic<int> &my_cnt) : my_count(&my_cnt) { } 231 OutputType operator()(const InputType &/*p_in*/) { 232 UPDATE_COUNTS(); 233 int out = ++(*my_count); 234 WaitAndThrow(out,"absorber_body"); 235 return OutputType(); 236 } 237 int count_value() { return *my_count; } 238 }; 239 240 // -------multifunction_node body ------------------ 241 242 // helper classes 243 template<int N,class PortsType> 244 struct IssueOutput { 245 typedef typename std::tuple_element<N-1,PortsType>::type::output_type my_type; 246 247 static void issue_tuple_element( PortsType &my_ports) { 248 CHECK_MESSAGE( (std::get<N-1>(my_ports).try_put(my_type())), "Error putting to successor"); 249 IssueOutput<N-1,PortsType>::issue_tuple_element(my_ports); 250 } 251 }; 252 253 template<class PortsType> 254 struct IssueOutput<1,PortsType> { 255 typedef typename std::tuple_element<0,PortsType>::type::output_type my_type; 256 257 static void issue_tuple_element( PortsType &my_ports) { 258 CHECK_MESSAGE( (std::get<0>(my_ports).try_put(my_type())), "Error putting to successor"); 259 } 260 }; 261 262 template<class InputType, class OutputTupleType, TestNodeTypeEnum T, size_t Conc> 263 class multifunction_node_body : WaitThrow<Conc,T> { 264 using WaitThrow<Conc,T>::WaitAndThrow; 265 static const int N = std::tuple_size<OutputTupleType>::value; 266 typedef typename tbb::flow::multifunction_node<InputType,OutputTupleType> NodeType; 267 typedef typename NodeType::output_ports_type PortsType; 268 std::atomic<int> *my_count; 269 public: 270 multifunction_node_body(std::atomic<int> &my_cnt) : my_count(&my_cnt) { } 271 void operator()(const InputType& /*in*/, PortsType &my_ports) { 272 UPDATE_COUNTS(); 273 int out = ++(*my_count); 274 WaitAndThrow(out,"multifunction_node_body"); 275 // issue an item to each output port. 276 IssueOutput<N,PortsType>::issue_tuple_element(my_ports); 277 } 278 279 int count_value() { return *my_count; } 280 }; 281 282 // --------- body to sort items in sequencer_node 283 template<class BufferItemType> 284 struct sequencer_body { 285 size_t operator()(const BufferItemType &s) { 286 CHECK_MESSAGE( (s), "sequencer item out of range (== 0)"); 287 return size_t(s) - 1; 288 } 289 }; 290 291 // --------- type for < comparison in priority_queue_node. 292 template<class ItemType> 293 struct less_body { 294 bool operator()(const ItemType &lhs, const ItemType &rhs) { 295 return (int(lhs) % 3) < (int(rhs) % 3); 296 } 297 }; 298 299 // --------- tag methods for tag_matching join_node 300 template<typename TT> 301 class tag_func { 302 TT my_mult; 303 public: 304 tag_func(TT multiplier) : my_mult(multiplier) { } 305 // operator() will return [0 .. Count) 306 tbb::flow::tag_value operator()( TT v) { 307 tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult); 308 return t; 309 } 310 }; 311 312 // --------- Input body for split_node test. 313 template <class OutputTuple, TestNodeTypeEnum TType> 314 class tuple_test_input_body : WaitThrow<serial_type, TType> { 315 typedef typename std::tuple_element<0,OutputTuple>::type ItemType0; 316 typedef typename std::tuple_element<1,OutputTuple>::type ItemType1; 317 using WaitThrow<serial_type, TType>::WaitAndThrow; 318 std::atomic<int> *my_current_val; 319 public: 320 tuple_test_input_body(std::atomic<int> &my_cnt) : my_current_val(&my_cnt) { } 321 322 OutputTuple operator()(tbb::flow_control& fc) { 323 UPDATE_COUNTS(); 324 int ival = ++(*my_current_val); 325 if(*my_current_val > g_NumItems) { 326 *my_current_val = g_NumItems; // jam the final value; we assert on it later. 327 fc.stop(); 328 return OutputTuple(); 329 } 330 WaitAndThrow(ival,"tuple_test_input_body"); 331 return OutputTuple(ItemType0(ival),ItemType1(ival)); 332 } 333 334 int count_value() { return (int)*my_current_val; } 335 }; 336 337 // ------- end of node bodies 338 339 // input_node is only-serial. input_node can throw, or the function_node can throw. 340 // graph being tested is 341 // 342 // input_node+---+parallel function_node 343 // 344 // After each run the graph is reset(), to test the reset functionality. 345 // 346 347 348 template<class ItemType, TestNodeTypeEnum inpThrowType, TestNodeTypeEnum absorbThrowType> 349 void run_one_input_node_test(bool throwException, bool flog) { 350 typedef test_input_body<ItemType,inpThrowType> src_body_type; 351 typedef absorber_body<ItemType, tbb::flow::continue_msg, absorbThrowType, unlimited_type> parallel_absorb_body_type; 352 std::atomic<int> input_body_count; 353 std::atomic<int> absorber_body_count; 354 input_body_count = 0; 355 absorber_body_count = 0; 356 357 tbb::flow::graph g; 358 359 g_Master = std::this_thread::get_id(); 360 361 #if USE_TASK_SCHEDULER_OBSERVER 362 eh_test_observer o; 363 o.observe(true); 364 #endif 365 366 tbb::flow::input_node<ItemType> sn(g, src_body_type(input_body_count)); 367 parallel_absorb_body_type ab2(absorber_body_count); 368 tbb::flow::function_node<ItemType> parallel_fn(g,tbb::flow::unlimited,ab2); 369 make_edge(sn, parallel_fn); 370 for(int runcnt = 0; runcnt < 2; ++runcnt) { 371 ResetGlobals(throwException,flog); 372 if(throwException) { 373 TRY(); 374 sn.activate(); 375 g.wait_for_all(); 376 CATCH_AND_ASSERT(); 377 } 378 else { 379 TRY(); 380 sn.activate(); 381 g.wait_for_all(); 382 CATCH_AND_FAIL(); 383 } 384 385 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 386 int src_cnt = tbb::flow::copy_body<src_body_type>(sn).count_value(); 387 int sink_cnt = tbb::flow::copy_body<parallel_absorb_body_type>(parallel_fn).count_value(); 388 if(throwException) { 389 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception flag in flow::graph not set"); 390 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "canceled flag not set"); 391 CHECK_MESSAGE( (src_cnt <= g_NumItems), "Too many input_node items emitted"); 392 CHECK_MESSAGE( (sink_cnt <= src_cnt), "Too many input_node items received"); 393 } 394 else { 395 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 396 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 397 CHECK_MESSAGE( (src_cnt == g_NumItems), "Incorrect # input_node items emitted"); 398 CHECK_MESSAGE( (sink_cnt == src_cnt), "Incorrect # input_node items received"); 399 } 400 g.reset(); // resets the body of the input_node and the absorb_nodes. 401 input_body_count = 0; 402 absorber_body_count = 0; 403 CHECK_MESSAGE( (!g.exception_thrown()), "Reset didn't clear exception_thrown()"); 404 CHECK_MESSAGE( (!g.is_cancelled()), "Reset didn't clear is_cancelled()"); 405 src_cnt = tbb::flow::copy_body<src_body_type>(sn).count_value(); 406 sink_cnt = tbb::flow::copy_body<parallel_absorb_body_type>(parallel_fn).count_value(); 407 CHECK_MESSAGE( (src_cnt == 0), "input_node count not reset"); 408 CHECK_MESSAGE( (sink_cnt == 0), "sink_node count not reset"); 409 } 410 #if USE_TASK_SCHEDULER_OBSERVER 411 o.observe(false); 412 #endif 413 } // run_one_input_node_test 414 415 416 template<class ItemType, TestNodeTypeEnum inpThrowType, TestNodeTypeEnum absorbThrowType> 417 void run_input_node_test() { 418 run_one_input_node_test<ItemType,inpThrowType,absorbThrowType>(false,false); 419 run_one_input_node_test<ItemType,inpThrowType,absorbThrowType>(true,false); 420 run_one_input_node_test<ItemType,inpThrowType,absorbThrowType>(true,true); 421 } // run_input_node_test 422 423 void test_input_node() { 424 INFO("Testing input_node\n"); 425 CheckType<int>::check_type_counter = 0; 426 g_Wakeup_Msg = "input_node(1): Missed wakeup or machine is overloaded?"; 427 run_input_node_test<CheckType<int>, isThrowing, nonThrowing>(); 428 CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test"); 429 g_Wakeup_Msg = "input_node(2): Missed wakeup or machine is overloaded?"; 430 run_input_node_test<int, isThrowing, nonThrowing>(); 431 g_Wakeup_Msg = "input_node(3): Missed wakeup or machine is overloaded?"; 432 run_input_node_test<int, nonThrowing, isThrowing>(); 433 g_Wakeup_Msg = "input_node(4): Missed wakeup or machine is overloaded?"; 434 run_input_node_test<int, isThrowing, isThrowing>(); 435 g_Wakeup_Msg = "input_node(5): Missed wakeup or machine is overloaded?"; 436 run_input_node_test<CheckType<int>, isThrowing, isThrowing>(); 437 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 438 CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test"); 439 } 440 441 // -------- utilities & types to test function_node and multifunction_node. 442 443 // need to tell the template which node type I am using so it attaches successors correctly. 444 enum NodeFetchType { func_node_type, multifunc_node_type }; 445 446 template<class NodeType, class ItemType, int indx, NodeFetchType NFT> 447 struct AttachPoint; 448 449 template<class NodeType, class ItemType, int indx> 450 struct AttachPoint<NodeType,ItemType,indx,multifunc_node_type> { 451 static tbb::flow::sender<ItemType> &GetSender(NodeType &n) { 452 return tbb::flow::output_port<indx>(n); 453 } 454 }; 455 456 template<class NodeType, class ItemType, int indx> 457 struct AttachPoint<NodeType,ItemType,indx,func_node_type> { 458 static tbb::flow::sender<ItemType> &GetSender(NodeType &n) { 459 return n; 460 } 461 }; 462 463 464 // common template for running function_node, multifunction_node. continue_node 465 // has different firing requirements, so it needs a different graph topology. 466 template< 467 class InputNodeType, 468 class InputNodeBodyType0, 469 class InputNodeBodyType1, 470 NodeFetchType NFT, 471 class TestNodeType, 472 class TestNodeBodyType, 473 class TypeToSink0, // what kind of item are we sending to sink0 474 class TypeToSink1, // what kind of item are we sending to sink1 475 class SinkNodeType0, // will be same for function; 476 class SinkNodeType1, // may differ for multifunction_node 477 class SinkNodeBodyType0, 478 class SinkNodeBodyType1, 479 size_t Conc 480 > 481 void 482 run_one_functype_node_test(bool throwException, bool flog, const char * /*name*/) { 483 484 std::stringstream ss; 485 char *saved_msg = const_cast<char *>(g_Wakeup_Msg); 486 tbb::flow::graph g; 487 488 std::atomic<int> input0_count; 489 std::atomic<int> input1_count; 490 std::atomic<int> sink0_count; 491 std::atomic<int> sink1_count; 492 std::atomic<int> test_count; 493 input0_count = input1_count = sink0_count = sink1_count = test_count = 0; 494 495 #if USE_TASK_SCHEDULER_OBSERVER 496 eh_test_observer o; 497 o.observe(true); 498 #endif 499 500 g_Master = std::this_thread::get_id(); 501 InputNodeType input0(g, InputNodeBodyType0(input0_count)); 502 InputNodeType input1(g, InputNodeBodyType1(input1_count)); 503 TestNodeType node_to_test(g, Conc, TestNodeBodyType(test_count)); 504 SinkNodeType0 sink0(g,tbb::flow::unlimited,SinkNodeBodyType0(sink0_count)); 505 SinkNodeType1 sink1(g,tbb::flow::unlimited,SinkNodeBodyType1(sink1_count)); 506 make_edge(input0, node_to_test); 507 make_edge(input1, node_to_test); 508 make_edge(AttachPoint<TestNodeType, TypeToSink0, 0, NFT>::GetSender(node_to_test), sink0); 509 make_edge(AttachPoint<TestNodeType, TypeToSink1, 1, NFT>::GetSender(node_to_test), sink1); 510 511 for(int iter = 0; iter < 2; ++iter) { // run, reset, run again 512 ss.clear(); 513 ss << saved_msg << " iter=" << iter << ", threads=" << g_NumThreads << ", throw=" << (throwException ? "T" : "F") << ", flow=" << (flog ? "T" : "F"); 514 g_Wakeup_Msg = ss.str().c_str(); 515 ResetGlobals(throwException,flog); 516 if(throwException) { 517 TRY(); 518 input0.activate(); 519 input1.activate(); 520 g.wait_for_all(); 521 CATCH_AND_ASSERT(); 522 } 523 else { 524 TRY(); 525 input0.activate(); 526 input1.activate(); 527 g.wait_for_all(); 528 CATCH_AND_FAIL(); 529 } 530 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 531 int ib0_cnt = tbb::flow::copy_body<InputNodeBodyType0>(input0).count_value(); 532 int ib1_cnt = tbb::flow::copy_body<InputNodeBodyType1>(input1).count_value(); 533 int t_cnt = tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value(); 534 int nb0_cnt = tbb::flow::copy_body<SinkNodeBodyType0>(sink0).count_value(); 535 int nb1_cnt = tbb::flow::copy_body<SinkNodeBodyType1>(sink1).count_value(); 536 if(throwException) { 537 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph"); 538 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph"); 539 CHECK_MESSAGE( (ib0_cnt + ib1_cnt <= 2*g_NumItems), "Too many items sent by inputs"); 540 CHECK_MESSAGE( (ib0_cnt + ib1_cnt >= t_cnt), "Too many items received by test node"); 541 CHECK_MESSAGE( (nb0_cnt + nb1_cnt <= t_cnt*2), "Too many items received by sink nodes"); 542 } 543 else { 544 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 545 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 546 CHECK_MESSAGE( (ib0_cnt + ib1_cnt == 2*g_NumItems), "Missing invocations of input_nodes"); 547 CHECK_MESSAGE( (t_cnt == 2*g_NumItems), "Not all items reached test node"); 548 CHECK_MESSAGE( (nb0_cnt == 2*g_NumItems && nb1_cnt == 2*g_NumItems), "Missing items in absorbers"); 549 } 550 g.reset(); // resets the body of the input_nodes, test_node and the absorb_nodes. 551 input0_count = input1_count = sink0_count = sink1_count = test_count = 0; 552 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType0>(input0).count_value()),"Reset input 0 failed"); 553 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType1>(input1).count_value()),"Reset input 1 failed"); 554 CHECK_MESSAGE( (0 == tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value()),"Reset test_node failed"); 555 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType0>(sink0).count_value()),"Reset sink 0 failed"); 556 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType1>(sink1).count_value()),"Reset sink 1 failed"); 557 558 g_Wakeup_Msg = saved_msg; 559 } 560 #if USE_TASK_SCHEDULER_OBSERVER 561 o.observe(false); 562 #endif 563 } 564 565 // Test function_node 566 // 567 // graph being tested is 568 // 569 // input_node -\ /- parallel function_node 570 // \ / 571 // +function_node+ 572 // / \ x 573 // input_node -/ \- parallel function_node 574 // 575 // After each run the graph is reset(), to test the reset functionality. 576 // 577 template< 578 TestNodeTypeEnum IType1, // does input node 1 throw? 579 TestNodeTypeEnum IType2, // does input node 2 throw? 580 class Item12, // type of item passed between inputs and test node 581 TestNodeTypeEnum FType, // does function node throw? 582 class Item23, // type passed from function_node to sink nodes 583 TestNodeTypeEnum NType1, // does sink node 1 throw? 584 TestNodeTypeEnum NType2, // does sink node 1 throw? 585 class NodePolicy, // rejecting,queueing 586 size_t Conc // is node concurrent? {serial | limited | unlimited} 587 > 588 void run_function_node_test() { 589 590 typedef test_input_body<Item12,IType1> IBodyType1; 591 typedef test_input_body<Item12,IType2> IBodyType2; 592 typedef absorber_body<Item12, Item23, FType, Conc> TestBodyType; 593 typedef absorber_body<Item23,tbb::flow::continue_msg, NType1, unlimited_type> SinkBodyType1; 594 typedef absorber_body<Item23,tbb::flow::continue_msg, NType2, unlimited_type> SinkBodyType2; 595 596 typedef tbb::flow::input_node<Item12> InputType; 597 typedef tbb::flow::function_node<Item12, Item23, NodePolicy> TestType; 598 typedef tbb::flow::function_node<Item23,tbb::flow::continue_msg> SnkType; 599 600 for(int i = 0; i < 4; ++i ) { 601 if(i != 2) { // doesn't make sense to flog a non-throwing test 602 bool doThrow = (i & 0x1) != 0; 603 bool doFlog = (i & 0x2) != 0; 604 run_one_functype_node_test< 605 /*InputNodeType*/ InputType, 606 /*InputNodeBodyType0*/ IBodyType1, 607 /*InputNodeBodyType1*/ IBodyType2, 608 /* NFT */ func_node_type, 609 /*TestNodeType*/ TestType, 610 /*TestNodeBodyType*/ TestBodyType, 611 /*TypeToSink0 */ Item23, 612 /*TypeToSink1 */ Item23, 613 /*SinkNodeType0*/ SnkType, 614 /*SinkNodeType1*/ SnkType, 615 /*SinkNodeBodyType1*/ SinkBodyType1, 616 /*SinkNodeBodyType2*/ SinkBodyType2, 617 /*Conc*/ Conc> 618 (doThrow,doFlog,"function_node"); 619 } 620 } 621 } // run_function_node_test 622 623 void test_function_node() { 624 INFO("Testing function_node\n"); 625 // serial rejecting 626 g_Wakeup_Msg = "function_node(1a): Missed wakeup or machine is overloaded?"; 627 run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>(); 628 g_Wakeup_Msg = "function_node(1b): Missed wakeup or machine is overloaded?"; 629 run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>(); 630 g_Wakeup_Msg = "function_node(1c): Missed wakeup or machine is overloaded?"; 631 run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, isThrowing, nonThrowing, tbb::flow::rejecting, serial_type>(); 632 633 // serial queueing 634 g_Wakeup_Msg = "function_node(2): Missed wakeup or machine is overloaded?"; 635 run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); 636 run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); 637 run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); 638 CheckType<int>::check_type_counter = 0; 639 run_function_node_test<nonThrowing, nonThrowing, CheckType<int>, nonThrowing, CheckType<int>, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); 640 CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test"); 641 642 // unlimited parallel rejecting 643 g_Wakeup_Msg = "function_node(3): Missed wakeup or machine is overloaded?"; 644 run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>(); 645 run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>(); 646 run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>(); 647 648 // limited parallel rejecting 649 g_Wakeup_Msg = "function_node(4): Missed wakeup or machine is overloaded?"; 650 run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, limited_type>(); 651 run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, (size_t)limited_type>(); 652 run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::rejecting, (size_t)limited_type>(); 653 654 // limited parallel queueing 655 g_Wakeup_Msg = "function_node(5): Missed wakeup or machine is overloaded?"; 656 run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>(); 657 run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>(); 658 run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::queueing, (size_t)limited_type>(); 659 660 // everyone throwing 661 g_Wakeup_Msg = "function_node(6): Missed wakeup or machine is overloaded?"; 662 run_function_node_test<isThrowing, isThrowing, int, isThrowing, int, isThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>(); 663 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 664 } 665 666 // ----------------------------------- multifunction_node ---------------------------------- 667 // Test multifunction_node. 668 // 669 // graph being tested is 670 // 671 // input_node -\ /- parallel function_node 672 // \ / 673 // +multifunction_node+ 674 // / \ x 675 // input_node -/ \- parallel function_node 676 // 677 // After each run the graph is reset(), to test the reset functionality. The 678 // multifunction_node will put an item to each successor for every item 679 // received. 680 // 681 template< 682 TestNodeTypeEnum IType0, // does input node 1 throw? 683 TestNodeTypeEnum IType1, // does input node 2 thorw? 684 class Item12, // type of item passed between inputs and test node 685 TestNodeTypeEnum FType, // does multifunction node throw? 686 class ItemTuple, // tuple of types passed from multifunction_node to sink nodes 687 TestNodeTypeEnum NType1, // does sink node 1 throw? 688 TestNodeTypeEnum NType2, // does sink node 2 throw? 689 class NodePolicy, // rejecting,queueing 690 size_t Conc // is node concurrent? {serial | limited | unlimited} 691 > 692 void run_multifunction_node_test() { 693 694 typedef typename std::tuple_element<0,ItemTuple>::type Item23Type0; 695 typedef typename std::tuple_element<1,ItemTuple>::type Item23Type1; 696 typedef test_input_body<Item12,IType0> IBodyType1; 697 typedef test_input_body<Item12,IType1> IBodyType2; 698 typedef multifunction_node_body<Item12, ItemTuple, FType, Conc> TestBodyType; 699 typedef absorber_body<Item23Type0,tbb::flow::continue_msg, NType1, unlimited_type> SinkBodyType1; 700 typedef absorber_body<Item23Type1,tbb::flow::continue_msg, NType2, unlimited_type> SinkBodyType2; 701 702 typedef tbb::flow::input_node<Item12> InputType; 703 typedef tbb::flow::multifunction_node<Item12, ItemTuple, NodePolicy> TestType; 704 typedef tbb::flow::function_node<Item23Type0,tbb::flow::continue_msg> SnkType0; 705 typedef tbb::flow::function_node<Item23Type1,tbb::flow::continue_msg> SnkType1; 706 707 for(int i = 0; i < 4; ++i ) { 708 if(i != 2) { // doesn't make sense to flog a non-throwing test 709 bool doThrow = (i & 0x1) != 0; 710 bool doFlog = (i & 0x2) != 0; 711 run_one_functype_node_test< 712 /*InputNodeType*/ InputType, 713 /*InputNodeBodyType0*/ IBodyType1, 714 /*InputNodeBodyType1*/ IBodyType2, 715 /*NFT*/ multifunc_node_type, 716 /*TestNodeType*/ TestType, 717 /*TestNodeBodyType*/ TestBodyType, 718 /*TypeToSink0*/ Item23Type0, 719 /*TypeToSink1*/ Item23Type1, 720 /*SinkNodeType0*/ SnkType0, 721 /*SinkNodeType1*/ SnkType1, 722 /*SinkNodeBodyType0*/ SinkBodyType1, 723 /*SinkNodeBodyType1*/ SinkBodyType2, 724 /*Conc*/ Conc> 725 (doThrow,doFlog,"multifunction_node"); 726 } 727 } 728 } // run_multifunction_node_test 729 730 void test_multifunction_node() { 731 INFO("Testing multifunction_node\n"); 732 g_Wakeup_Msg = "multifunction_node(input throws,rejecting,serial): Missed wakeup or machine is overloaded?"; 733 // serial rejecting 734 run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,float>, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>(); 735 g_Wakeup_Msg = "multifunction_node(test throws,rejecting,serial): Missed wakeup or machine is overloaded?"; 736 run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>(); 737 g_Wakeup_Msg = "multifunction_node(sink throws,rejecting,serial): Missed wakeup or machine is overloaded?"; 738 run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, isThrowing, nonThrowing, tbb::flow::rejecting, serial_type>(); 739 740 g_Wakeup_Msg = "multifunction_node(2): Missed wakeup or machine is overloaded?"; 741 // serial queueing 742 run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); 743 run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); 744 run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); 745 CheckType<int>::check_type_counter = 0; 746 run_multifunction_node_test<nonThrowing, nonThrowing, CheckType<int>, nonThrowing, std::tuple<CheckType<int>, CheckType<int> >, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); 747 CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test"); 748 749 g_Wakeup_Msg = "multifunction_node(3): Missed wakeup or machine is overloaded?"; 750 // unlimited parallel rejecting 751 run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>(); 752 run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>(); 753 run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>(); 754 755 g_Wakeup_Msg = "multifunction_node(4): Missed wakeup or machine is overloaded?"; 756 // limited parallel rejecting 757 run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, limited_type>(); 758 run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, (size_t)limited_type>(); 759 run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::rejecting, (size_t)limited_type>(); 760 761 g_Wakeup_Msg = "multifunction_node(5): Missed wakeup or machine is overloaded?"; 762 // limited parallel queueing 763 run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>(); 764 run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>(); 765 run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::queueing, (size_t)limited_type>(); 766 767 g_Wakeup_Msg = "multifunction_node(6): Missed wakeup or machine is overloaded?"; 768 // everyone throwing 769 run_multifunction_node_test<isThrowing, isThrowing, int, isThrowing, std::tuple<int,int>, isThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>(); 770 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 771 } 772 773 // 774 // Continue node has T predecessors. when it receives messages (continue_msg) on T predecessors 775 // it executes the body of the node, and forwards a continue_msg to its successors. 776 // However many predecessors the continue_node has, that's how many continue_msgs it receives 777 // on input before forwarding a message. 778 // 779 // The graph will look like 780 // 781 // +broadcast_node+ 782 // / \ ___ 783 // input_node+------>+broadcast_node+ +continue_node+--->+absorber 784 // \ / 785 // +broadcast_node+ 786 // 787 // The continue_node has unlimited parallelism, no input buffering, and broadcasts to successors. 788 // The absorber is parallel, so each item emitted by the input will result in one thread 789 // spinning. So for N threads we pass N-1 continue_messages, then spin wait and then throw if 790 // we are allowed to. 791 792 template < class InputNodeType, class InputNodeBodyType, class TTestNodeType, class TestNodeBodyType, 793 class SinkNodeType, class SinkNodeBodyType> 794 void run_one_continue_node_test (bool throwException, bool flog) { 795 tbb::flow::graph g; 796 797 std::atomic<int> input_count; 798 std::atomic<int> test_count; 799 std::atomic<int> sink_count; 800 input_count = test_count = sink_count = 0; 801 #if USE_TASK_SCHEDULER_OBSERVER 802 eh_test_observer o; 803 o.observe(true); 804 #endif 805 g_Master = std::this_thread::get_id(); 806 InputNodeType input(g, InputNodeBodyType(input_count)); 807 TTestNodeType node_to_test(g, TestNodeBodyType(test_count)); 808 SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count)); 809 tbb::flow::broadcast_node<tbb::flow::continue_msg> b1(g), b2(g), b3(g); 810 make_edge(input, b1); 811 make_edge(b1,b2); 812 make_edge(b1,b3); 813 make_edge(b2,node_to_test); 814 make_edge(b3,node_to_test); 815 make_edge(node_to_test, sink); 816 for(int iter = 0; iter < 2; ++iter) { 817 ResetGlobals(throwException,flog); 818 if(throwException) { 819 TRY(); 820 input.activate(); 821 g.wait_for_all(); 822 CATCH_AND_ASSERT(); 823 } 824 else { 825 TRY(); 826 input.activate(); 827 g.wait_for_all(); 828 CATCH_AND_FAIL(); 829 } 830 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 831 int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value(); 832 int t_cnt = tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value(); 833 int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(); 834 if(throwException) { 835 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph"); 836 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph"); 837 CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs"); 838 CHECK_MESSAGE( (ib_cnt >= t_cnt), "Too many items received by test node"); 839 CHECK_MESSAGE( (nb_cnt <= t_cnt), "Too many items received by sink nodes"); 840 } 841 else { 842 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 843 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 844 CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node"); 845 CHECK_MESSAGE( (t_cnt == g_NumItems), "Not all items reached test node"); 846 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers"); 847 } 848 g.reset(); // resets the body of the input_nodes, test_node and the absorb_nodes. 849 input_count = test_count = sink_count = 0; 850 CHECK_MESSAGE( (0 == (int)test_count), "Atomic wasn't reset properly"); 851 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed"); 852 CHECK_MESSAGE( (0 == tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value()),"Reset test_node failed"); 853 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed"); 854 } 855 #if USE_TASK_SCHEDULER_OBSERVER 856 o.observe(false); 857 #endif 858 } 859 860 template< 861 class ItemType, 862 TestNodeTypeEnum IType, // does input node throw? 863 TestNodeTypeEnum CType, // does continue_node throw? 864 TestNodeTypeEnum AType> // does absorber throw 865 void run_continue_node_test() { 866 typedef test_input_body<tbb::flow::continue_msg,IType> IBodyType; 867 typedef absorber_body<tbb::flow::continue_msg,ItemType,CType,unlimited_type> ContBodyType; 868 typedef absorber_body<ItemType,tbb::flow::continue_msg, AType, unlimited_type> SinkBodyType; 869 870 typedef tbb::flow::input_node<tbb::flow::continue_msg> InputType; 871 typedef tbb::flow::continue_node<ItemType> TestType; 872 typedef tbb::flow::function_node<ItemType,tbb::flow::continue_msg> SnkType; 873 874 for(int i = 0; i < 4; ++i ) { 875 if(i == 2) continue; // don't run (false,true); it doesn't make sense. 876 bool doThrow = (i & 0x1) != 0; 877 bool doFlog = (i & 0x2) != 0; 878 run_one_continue_node_test< 879 /*InputNodeType*/ InputType, 880 /*InputNodeBodyType*/ IBodyType, 881 /*TestNodeType*/ TestType, 882 /*TestNodeBodyType*/ ContBodyType, 883 /*SinkNodeType*/ SnkType, 884 /*SinkNodeBodyType*/ SinkBodyType> 885 (doThrow,doFlog); 886 } 887 } 888 889 // 890 void test_continue_node() { 891 INFO("Testing continue_node\n"); 892 g_Wakeup_Msg = "buffer_node(non,is,non): Missed wakeup or machine is overloaded?"; 893 run_continue_node_test<int,nonThrowing,isThrowing,nonThrowing>(); 894 g_Wakeup_Msg = "buffer_node(non,non,is): Missed wakeup or machine is overloaded?"; 895 run_continue_node_test<int,nonThrowing,nonThrowing,isThrowing>(); 896 g_Wakeup_Msg = "buffer_node(is,non,non): Missed wakeup or machine is overloaded?"; 897 run_continue_node_test<int,isThrowing,nonThrowing,nonThrowing>(); 898 g_Wakeup_Msg = "buffer_node(is,is,is): Missed wakeup or machine is overloaded?"; 899 run_continue_node_test<int,isThrowing,isThrowing,isThrowing>(); 900 CheckType<double>::check_type_counter = 0; 901 run_continue_node_test<CheckType<double>,isThrowing,isThrowing,isThrowing>(); 902 CHECK_MESSAGE( (!CheckType<double>::check_type_counter), "Dropped objects in continue_node test"); 903 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 904 } 905 906 // ---------- buffer_node queue_node overwrite_node -------------- 907 908 template< 909 class BufferItemType, // 910 class InputNodeType, 911 class InputNodeBodyType, 912 class TestNodeType, 913 class SinkNodeType, 914 class SinkNodeBodyType > 915 void run_one_buffer_node_test(bool throwException,bool flog) { 916 tbb::flow::graph g; 917 918 std::atomic<int> input_count; 919 std::atomic<int> sink_count; 920 input_count = sink_count = 0; 921 #if USE_TASK_SCHEDULER_OBSERVER 922 eh_test_observer o; 923 o.observe(true); 924 #endif 925 g_Master = std::this_thread::get_id(); 926 InputNodeType input(g, InputNodeBodyType(input_count)); 927 TestNodeType node_to_test(g); 928 SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count)); 929 make_edge(input,node_to_test); 930 make_edge(node_to_test, sink); 931 for(int iter = 0; iter < 2; ++iter) { 932 ResetGlobals(throwException,flog); 933 if(throwException) { 934 TRY(); 935 input.activate(); 936 g.wait_for_all(); 937 CATCH_AND_ASSERT(); 938 } 939 else { 940 TRY(); 941 input.activate(); 942 g.wait_for_all(); 943 CATCH_AND_FAIL(); 944 } 945 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 946 int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value(); 947 int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(); 948 if(throwException) { 949 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph"); 950 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph"); 951 CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs"); 952 CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes"); 953 } 954 else { 955 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 956 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 957 CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node"); 958 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers"); 959 } 960 if(iter == 0) { 961 remove_edge(node_to_test, sink); 962 node_to_test.try_put(BufferItemType()); 963 g.wait_for_all(); 964 g.reset(); 965 input_count = sink_count = 0; 966 BufferItemType tmp; 967 CHECK_MESSAGE( (!node_to_test.try_get(tmp)), "node not empty"); 968 make_edge(node_to_test, sink); 969 g.wait_for_all(); 970 } 971 else { 972 g.reset(); 973 input_count = sink_count = 0; 974 } 975 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed"); 976 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed"); 977 } 978 979 #if USE_TASK_SCHEDULER_OBSERVER 980 o.observe(false); 981 #endif 982 } 983 template<class BufferItemType, 984 TestNodeTypeEnum InputThrowType, 985 TestNodeTypeEnum SinkThrowType> 986 void run_buffer_queue_and_overwrite_node_test() { 987 typedef test_input_body<BufferItemType,InputThrowType> InputBodyType; 988 typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType; 989 990 typedef tbb::flow::input_node<BufferItemType> InputType; 991 typedef tbb::flow::buffer_node<BufferItemType> BufType; 992 typedef tbb::flow::queue_node<BufferItemType> QueType; 993 typedef tbb::flow::overwrite_node<BufferItemType> OvrType; 994 typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType; 995 996 for(int i = 0; i < 4; ++i) { 997 if(i == 2) continue; // no need to test flog w/o throws 998 bool throwException = (i & 0x1) != 0; 999 bool doFlog = (i & 0x2) != 0; 1000 run_one_buffer_node_test< 1001 /* class BufferItemType*/ BufferItemType, 1002 /*class InputNodeType*/ InputType, 1003 /*class InputNodeBodyType*/ InputBodyType, 1004 /*class TestNodeType*/ BufType, 1005 /*class SinkNodeType*/ SnkType, 1006 /*class SinkNodeBodyType*/ SinkBodyType 1007 >(throwException, doFlog); 1008 run_one_buffer_node_test< 1009 /* class BufferItemType*/ BufferItemType, 1010 /*class InputNodeType*/ InputType, 1011 /*class InputNodeBodyType*/ InputBodyType, 1012 /*class TestNodeType*/ QueType, 1013 /*class SinkNodeType*/ SnkType, 1014 /*class SinkNodeBodyType*/ SinkBodyType 1015 >(throwException, doFlog); 1016 run_one_buffer_node_test< 1017 /* class BufferItemType*/ BufferItemType, 1018 /*class InputNodeType*/ InputType, 1019 /*class InputNodeBodyType*/ InputBodyType, 1020 /*class TestNodeType*/ OvrType, 1021 /*class SinkNodeType*/ SnkType, 1022 /*class SinkNodeBodyType*/ SinkBodyType 1023 >(throwException, doFlog); 1024 } 1025 } 1026 1027 void test_buffer_queue_and_overwrite_node() { 1028 INFO("Testing buffer_node, queue_node and overwrite_node\n"); 1029 g_Wakeup_Msg = "buffer, queue, overwrite(is,non): Missed wakeup or machine is overloaded?"; 1030 run_buffer_queue_and_overwrite_node_test<int,isThrowing,nonThrowing>(); 1031 g_Wakeup_Msg = "buffer, queue, overwrite(non,is): Missed wakeup or machine is overloaded?"; 1032 run_buffer_queue_and_overwrite_node_test<int,nonThrowing,isThrowing>(); 1033 g_Wakeup_Msg = "buffer, queue, overwrite(is,is): Missed wakeup or machine is overloaded?"; 1034 run_buffer_queue_and_overwrite_node_test<int,isThrowing,isThrowing>(); 1035 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 1036 } 1037 1038 // ---------- sequencer_node ------------------------- 1039 1040 1041 template< 1042 class BufferItemType, // 1043 class InputNodeType, 1044 class InputNodeBodyType, 1045 class TestNodeType, 1046 class SeqBodyType, 1047 class SinkNodeType, 1048 class SinkNodeBodyType > 1049 void run_one_sequencer_node_test(bool throwException,bool flog) { 1050 tbb::flow::graph g; 1051 1052 std::atomic<int> input_count; 1053 std::atomic<int> sink_count; 1054 input_count = sink_count = 0; 1055 #if USE_TASK_SCHEDULER_OBSERVER 1056 eh_test_observer o; 1057 o.observe(true); 1058 #endif 1059 g_Master = std::this_thread::get_id(); 1060 InputNodeType input(g, InputNodeBodyType(input_count)); 1061 TestNodeType node_to_test(g,SeqBodyType()); 1062 SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count)); 1063 make_edge(input,node_to_test); 1064 make_edge(node_to_test, sink); 1065 for(int iter = 0; iter < 2; ++iter) { 1066 ResetGlobals(throwException,flog); 1067 if(throwException) { 1068 TRY(); 1069 input.activate(); 1070 g.wait_for_all(); 1071 CATCH_AND_ASSERT(); 1072 } 1073 else { 1074 TRY(); 1075 input.activate(); 1076 g.wait_for_all(); 1077 CATCH_AND_FAIL(); 1078 } 1079 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 1080 int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value(); 1081 int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(); 1082 if(throwException) { 1083 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph"); 1084 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph"); 1085 CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs"); 1086 CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes"); 1087 } 1088 else { 1089 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 1090 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 1091 CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node"); 1092 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers"); 1093 } 1094 if(iter == 0) { 1095 remove_edge(node_to_test, sink); 1096 node_to_test.try_put(BufferItemType(g_NumItems + 1)); 1097 node_to_test.try_put(BufferItemType(1)); 1098 g.wait_for_all(); 1099 g.reset(); 1100 input_count = sink_count = 0; 1101 make_edge(node_to_test, sink); 1102 g.wait_for_all(); 1103 } 1104 else { 1105 g.reset(); 1106 input_count = sink_count = 0; 1107 } 1108 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed"); 1109 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed"); 1110 } 1111 1112 #if USE_TASK_SCHEDULER_OBSERVER 1113 o.observe(false); 1114 #endif 1115 } 1116 1117 template<class BufferItemType, 1118 TestNodeTypeEnum InputThrowType, 1119 TestNodeTypeEnum SinkThrowType> 1120 void run_sequencer_node_test() { 1121 typedef test_input_body<BufferItemType,InputThrowType> InputBodyType; 1122 typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType; 1123 typedef sequencer_body<BufferItemType> SeqBodyType; 1124 1125 typedef tbb::flow::input_node<BufferItemType> InputType; 1126 typedef tbb::flow::sequencer_node<BufferItemType> SeqType; 1127 typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType; 1128 1129 for(int i = 0; i < 4; ++i) { 1130 if(i == 2) continue; // no need to test flog w/o throws 1131 bool throwException = (i & 0x1) != 0; 1132 bool doFlog = (i & 0x2) != 0; 1133 run_one_sequencer_node_test< 1134 /* class BufferItemType*/ BufferItemType, 1135 /*class InputNodeType*/ InputType, 1136 /*class InputNodeBodyType*/ InputBodyType, 1137 /*class TestNodeType*/ SeqType, 1138 /*class SeqBodyType*/ SeqBodyType, 1139 /*class SinkNodeType*/ SnkType, 1140 /*class SinkNodeBodyType*/ SinkBodyType 1141 >(throwException, doFlog); 1142 } 1143 } 1144 1145 1146 1147 void test_sequencer_node() { 1148 INFO("Testing sequencer_node\n"); 1149 g_Wakeup_Msg = "sequencer_node(is,non): Missed wakeup or machine is overloaded?"; 1150 run_sequencer_node_test<int, isThrowing,nonThrowing>(); 1151 CheckType<int>::check_type_counter = 0; 1152 g_Wakeup_Msg = "sequencer_node(non,is): Missed wakeup or machine is overloaded?"; 1153 run_sequencer_node_test<CheckType<int>, nonThrowing,isThrowing>(); 1154 CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Dropped objects in sequencer_node test"); 1155 g_Wakeup_Msg = "sequencer_node(is,is): Missed wakeup or machine is overloaded?"; 1156 run_sequencer_node_test<int, isThrowing,isThrowing>(); 1157 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 1158 } 1159 1160 // ------------ priority_queue_node ------------------ 1161 1162 template< 1163 class BufferItemType, 1164 class InputNodeType, 1165 class InputNodeBodyType, 1166 class TestNodeType, 1167 class SinkNodeType, 1168 class SinkNodeBodyType > 1169 void run_one_priority_queue_node_test(bool throwException,bool flog) { 1170 tbb::flow::graph g; 1171 1172 std::atomic<int> input_count; 1173 std::atomic<int> sink_count; 1174 input_count = sink_count = 0; 1175 #if USE_TASK_SCHEDULER_OBSERVER 1176 eh_test_observer o; 1177 o.observe(true); 1178 #endif 1179 g_Master = std::this_thread::get_id(); 1180 InputNodeType input(g, InputNodeBodyType(input_count)); 1181 1182 TestNodeType node_to_test(g); 1183 1184 SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count)); 1185 1186 make_edge(input,node_to_test); 1187 make_edge(node_to_test, sink); 1188 for(int iter = 0; iter < 2; ++iter) { 1189 ResetGlobals(throwException,flog); 1190 if(throwException) { 1191 TRY(); 1192 input.activate(); 1193 g.wait_for_all(); 1194 CATCH_AND_ASSERT(); 1195 } 1196 else { 1197 TRY(); 1198 input.activate(); 1199 g.wait_for_all(); 1200 CATCH_AND_FAIL(); 1201 } 1202 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 1203 int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value(); 1204 int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(); 1205 if(throwException) { 1206 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph"); 1207 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph"); 1208 CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs"); 1209 CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes"); 1210 } 1211 else { 1212 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 1213 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 1214 CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node"); 1215 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers"); 1216 } 1217 if(iter == 0) { 1218 remove_edge(node_to_test, sink); 1219 node_to_test.try_put(BufferItemType(g_NumItems + 1)); 1220 node_to_test.try_put(BufferItemType(g_NumItems + 2)); 1221 node_to_test.try_put(BufferItemType()); 1222 g.wait_for_all(); 1223 g.reset(); 1224 input_count = sink_count = 0; 1225 make_edge(node_to_test, sink); 1226 g.wait_for_all(); 1227 } 1228 else { 1229 g.reset(); 1230 input_count = sink_count = 0; 1231 } 1232 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed"); 1233 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed"); 1234 } 1235 1236 #if USE_TASK_SCHEDULER_OBSERVER 1237 o.observe(false); 1238 #endif 1239 } 1240 1241 template<class BufferItemType, 1242 TestNodeTypeEnum InputThrowType, 1243 TestNodeTypeEnum SinkThrowType> 1244 void run_priority_queue_node_test() { 1245 typedef test_input_body<BufferItemType,InputThrowType> InputBodyType; 1246 typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType; 1247 typedef less_body<BufferItemType> LessBodyType; 1248 1249 typedef tbb::flow::input_node<BufferItemType> InputType; 1250 typedef tbb::flow::priority_queue_node<BufferItemType,LessBodyType> PrqType; 1251 typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType; 1252 1253 for(int i = 0; i < 4; ++i) { 1254 if(i == 2) continue; // no need to test flog w/o throws 1255 bool throwException = (i & 0x1) != 0; 1256 bool doFlog = (i & 0x2) != 0; 1257 run_one_priority_queue_node_test< 1258 /* class BufferItemType*/ BufferItemType, 1259 /*class InputNodeType*/ InputType, 1260 /*class InputNodeBodyType*/ InputBodyType, 1261 /*class TestNodeType*/ PrqType, 1262 /*class SinkNodeType*/ SnkType, 1263 /*class SinkNodeBodyType*/ SinkBodyType 1264 >(throwException, doFlog); 1265 } 1266 } 1267 1268 void test_priority_queue_node() { 1269 INFO("Testing priority_queue_node\n"); 1270 g_Wakeup_Msg = "priority_queue_node(is,non): Missed wakeup or machine is overloaded?"; 1271 run_priority_queue_node_test<int, isThrowing,nonThrowing>(); 1272 CheckType<int>::check_type_counter = 0; 1273 g_Wakeup_Msg = "priority_queue_node(non,is): Missed wakeup or machine is overloaded?"; 1274 run_priority_queue_node_test<CheckType<int>, nonThrowing,isThrowing>(); 1275 CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Dropped objects in priority_queue_node test"); 1276 g_Wakeup_Msg = "priority_queue_node(is,is): Missed wakeup or machine is overloaded?"; 1277 run_priority_queue_node_test<int, isThrowing,isThrowing>(); 1278 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 1279 } 1280 1281 // ------------------- join_node ---------------- 1282 template<class JP> struct graph_policy_name{ 1283 static const char* name() {return "unknown"; } 1284 }; 1285 template<> struct graph_policy_name<tbb::flow::queueing> { 1286 static const char* name() {return "queueing"; } 1287 }; 1288 template<> struct graph_policy_name<tbb::flow::reserving> { 1289 static const char* name() {return "reserving"; } 1290 }; 1291 template<> struct graph_policy_name<tbb::flow::tag_matching> { 1292 static const char* name() {return "tag_matching"; } 1293 }; 1294 1295 1296 template< 1297 class JP, 1298 class OutputTuple, 1299 class InputType0, 1300 class InputBodyType0, 1301 class InputType1, 1302 class InputBodyType1, 1303 class TestJoinType, 1304 class SinkType, 1305 class SinkBodyType 1306 > 1307 struct run_one_join_node_test { 1308 run_one_join_node_test() {} 1309 static void execute_test(bool throwException,bool flog) { 1310 typedef typename std::tuple_element<0,OutputTuple>::type ItemType0; 1311 typedef typename std::tuple_element<1,OutputTuple>::type ItemType1; 1312 1313 tbb::flow::graph g; 1314 std::atomic<int>input0_count; 1315 std::atomic<int>input1_count; 1316 std::atomic<int>sink_count; 1317 input0_count = input1_count = sink_count = 0; 1318 #if USE_TASK_SCHEDULER_OBSERVER 1319 eh_test_observer o; 1320 o.observe(true); 1321 #endif 1322 g_Master = std::this_thread::get_id(); 1323 InputType0 input0(g, InputBodyType0(input0_count)); 1324 InputType1 input1(g, InputBodyType1(input1_count)); 1325 TestJoinType node_to_test(g); 1326 SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count)); 1327 make_edge(input0,tbb::flow::input_port<0>(node_to_test)); 1328 make_edge(input1,tbb::flow::input_port<1>(node_to_test)); 1329 make_edge(node_to_test, sink); 1330 for(int iter = 0; iter < 2; ++iter) { 1331 ResetGlobals(throwException,flog); 1332 if(throwException) { 1333 TRY(); 1334 input0.activate(); 1335 input1.activate(); 1336 g.wait_for_all(); 1337 CATCH_AND_ASSERT(); 1338 } 1339 else { 1340 TRY(); 1341 input0.activate(); 1342 input1.activate(); 1343 g.wait_for_all(); 1344 CATCH_AND_FAIL(); 1345 } 1346 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 1347 int ib0_cnt = tbb::flow::copy_body<InputBodyType0>(input0).count_value(); 1348 int ib1_cnt = tbb::flow::copy_body<InputBodyType1>(input1).count_value(); 1349 int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value(); 1350 if(throwException) { 1351 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph"); 1352 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph"); 1353 CHECK_MESSAGE( (ib0_cnt <= g_NumItems && ib1_cnt <= g_NumItems), "Too many items sent by inputs"); 1354 CHECK_MESSAGE( (nb_cnt <= ((ib0_cnt < ib1_cnt) ? ib0_cnt : ib1_cnt)), "Too many items received by sink nodes"); 1355 } 1356 else { 1357 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 1358 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 1359 if(ib0_cnt != g_NumItems) { 1360 // INFO("throwException == %s\n" << (throwException ? "true" : "false")); 1361 // INFO("iter == " << iter << "\n"); 1362 // INFO("ib0_cnt == " << ib0_cnt << "\n"); 1363 // INFO("g_NumItems == " << g_NumItems << "\n"); 1364 } 1365 CHECK_MESSAGE( (ib0_cnt == g_NumItems), "Missing invocations of input_node0"); // this one 1366 CHECK_MESSAGE( (ib1_cnt == g_NumItems), "Missing invocations of input_node1"); 1367 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers"); 1368 } 1369 if(iter == 0) { 1370 remove_edge(node_to_test, sink); 1371 tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 1)); 1372 tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2)); 1373 g.wait_for_all(); 1374 g.reset(); 1375 input0_count = input1_count = sink_count = 0; 1376 make_edge(node_to_test, sink); 1377 g.wait_for_all(); 1378 } 1379 else { 1380 g.wait_for_all(); 1381 g.reset(); 1382 input0_count = input1_count = sink_count = 0; 1383 } 1384 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType0>(input0).count_value()),"Reset input failed"); 1385 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType1>(input1).count_value()),"Reset input failed"); 1386 nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value(); 1387 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value()),"Reset sink failed"); 1388 } 1389 1390 #if USE_TASK_SCHEDULER_OBSERVER 1391 o.observe(false); 1392 #endif 1393 } 1394 }; // run_one_join_node_test 1395 1396 template< 1397 class OutputTuple, 1398 class InputType0, 1399 class InputBodyType0, 1400 class InputType1, 1401 class InputBodyType1, 1402 class TestJoinType, 1403 class SinkType, 1404 class SinkBodyType 1405 > 1406 struct run_one_join_node_test< 1407 tbb::flow::tag_matching, 1408 OutputTuple, 1409 InputType0, 1410 InputBodyType0, 1411 InputType1, 1412 InputBodyType1, 1413 TestJoinType, 1414 SinkType, 1415 SinkBodyType 1416 > { 1417 run_one_join_node_test() {} 1418 static void execute_test(bool throwException,bool flog) { 1419 typedef typename std::tuple_element<0,OutputTuple>::type ItemType0; 1420 typedef typename std::tuple_element<1,OutputTuple>::type ItemType1; 1421 1422 tbb::flow::graph g; 1423 1424 std::atomic<int>input0_count; 1425 std::atomic<int>input1_count; 1426 std::atomic<int>sink_count; 1427 input0_count = input1_count = sink_count = 0; 1428 #if USE_TASK_SCHEDULER_OBSERVER 1429 eh_test_observer o; 1430 o.observe(true); 1431 #endif 1432 g_Master = std::this_thread::get_id(); 1433 InputType0 input0(g, InputBodyType0(input0_count, 2)); 1434 InputType1 input1(g, InputBodyType1(input1_count, 3)); 1435 TestJoinType node_to_test(g, tag_func<ItemType0>(ItemType0(2)), tag_func<ItemType1>(ItemType1(3))); 1436 SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count)); 1437 make_edge(input0,tbb::flow::input_port<0>(node_to_test)); 1438 make_edge(input1,tbb::flow::input_port<1>(node_to_test)); 1439 make_edge(node_to_test, sink); 1440 for(int iter = 0; iter < 2; ++iter) { 1441 ResetGlobals(throwException,flog); 1442 if(throwException) { 1443 TRY(); 1444 input0.activate(); 1445 input1.activate(); 1446 g.wait_for_all(); 1447 CATCH_AND_ASSERT(); 1448 } 1449 else { 1450 TRY(); 1451 input0.activate(); 1452 input1.activate(); 1453 g.wait_for_all(); 1454 CATCH_AND_FAIL(); 1455 } 1456 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 1457 int ib0_cnt = tbb::flow::copy_body<InputBodyType0>(input0).count_value(); 1458 int ib1_cnt = tbb::flow::copy_body<InputBodyType1>(input1).count_value(); 1459 int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value(); 1460 if(throwException) { 1461 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph"); 1462 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph"); 1463 CHECK_MESSAGE( (ib0_cnt <= g_NumItems && ib1_cnt <= g_NumItems), "Too many items sent by inputs"); 1464 CHECK_MESSAGE( (nb_cnt <= ((ib0_cnt < ib1_cnt) ? ib0_cnt : ib1_cnt)), "Too many items received by sink nodes"); 1465 } 1466 else { 1467 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 1468 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 1469 CHECK_MESSAGE( (ib0_cnt == g_NumItems), "Missing invocations of input_node0"); 1470 CHECK_MESSAGE( (ib1_cnt == g_NumItems), "Missing invocations of input_node1"); 1471 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers"); 1472 } 1473 if(iter == 0) { 1474 remove_edge(node_to_test, sink); 1475 tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 4)); 1476 tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2)); 1477 g.wait_for_all(); // have to wait for the graph to stop again.... 1478 g.reset(); // resets the body of the input_nodes, test_node and the absorb_nodes. 1479 input0_count = input1_count = sink_count = 0; 1480 make_edge(node_to_test, sink); 1481 g.wait_for_all(); // have to wait for the graph to stop again.... 1482 } 1483 else { 1484 g.wait_for_all(); 1485 g.reset(); 1486 input0_count = input1_count = sink_count = 0; 1487 } 1488 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType0>(input0).count_value()),"Reset input failed"); 1489 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType1>(input1).count_value()),"Reset input failed"); 1490 nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value(); 1491 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value()),"Reset sink failed"); 1492 } 1493 1494 #if USE_TASK_SCHEDULER_OBSERVER 1495 o.observe(false); 1496 #endif 1497 } 1498 }; // run_one_join_node_test<tag_matching> 1499 1500 template<class JP, class OutputTuple, 1501 TestNodeTypeEnum InputThrowType, 1502 TestNodeTypeEnum SinkThrowType> 1503 void run_join_node_test() { 1504 typedef typename std::tuple_element<0,OutputTuple>::type ItemType0; 1505 typedef typename std::tuple_element<1,OutputTuple>::type ItemType1; 1506 typedef test_input_body<ItemType0,InputThrowType> InputBodyType0; 1507 typedef test_input_body<ItemType1,InputThrowType> InputBodyType1; 1508 typedef absorber_body<OutputTuple,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType; 1509 1510 typedef typename tbb::flow::input_node<ItemType0> InputType0; 1511 typedef typename tbb::flow::input_node<ItemType1> InputType1; 1512 typedef typename tbb::flow::join_node<OutputTuple,JP> TestJoinType; 1513 typedef typename tbb::flow::function_node<OutputTuple,tbb::flow::continue_msg> SinkType; 1514 1515 for(int i = 0; i < 4; ++i) { 1516 if(2 == i) continue; 1517 bool throwException = (i & 0x1) != 0; 1518 bool doFlog = (i & 0x2) != 0; 1519 run_one_join_node_test< 1520 JP, 1521 OutputTuple, 1522 InputType0, 1523 InputBodyType0, 1524 InputType1, 1525 InputBodyType1, 1526 TestJoinType, 1527 SinkType, 1528 SinkBodyType>::execute_test(throwException,doFlog); 1529 } 1530 } 1531 1532 template<class JP> 1533 void test_join_node() { 1534 INFO("Testing join_node<" << graph_policy_name<JP>::name() << ">\n"); 1535 // only doing two-input joins 1536 g_Wakeup_Msg = "join(is,non): Missed wakeup or machine is overloaded?"; 1537 run_join_node_test<JP, std::tuple<int,int>, isThrowing, nonThrowing>(); 1538 CheckType<int>::check_type_counter = 0; 1539 g_Wakeup_Msg = "join(non,is): Missed wakeup or machine is overloaded?"; 1540 run_join_node_test<JP, std::tuple<CheckType<int>,int>, nonThrowing, isThrowing>(); 1541 CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Dropped items in test"); 1542 g_Wakeup_Msg = "join(is,is): Missed wakeup or machine is overloaded?"; 1543 run_join_node_test<JP, std::tuple<int,int>, isThrowing, isThrowing>(); 1544 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 1545 } 1546 1547 // ------------------- limiter_node ------------- 1548 1549 template< 1550 class BufferItemType, // 1551 class InputNodeType, 1552 class InputNodeBodyType, 1553 class TestNodeType, 1554 class SinkNodeType, 1555 class SinkNodeBodyType > 1556 void run_one_limiter_node_test(bool throwException,bool flog) { 1557 tbb::flow::graph g; 1558 1559 std::atomic<int> input_count; 1560 std::atomic<int> sink_count; 1561 input_count = sink_count = 0; 1562 #if USE_TASK_SCHEDULER_OBSERVER 1563 eh_test_observer o; 1564 o.observe(true); 1565 #endif 1566 g_Master = std::this_thread::get_id(); 1567 InputNodeType input(g, InputNodeBodyType(input_count)); 1568 TestNodeType node_to_test(g,g_NumThreads + 1); 1569 SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count)); 1570 make_edge(input,node_to_test); 1571 make_edge(node_to_test, sink); 1572 for(int iter = 0; iter < 2; ++iter) { 1573 ResetGlobals(throwException,flog); 1574 if(throwException) { 1575 TRY(); 1576 input.activate(); 1577 g.wait_for_all(); 1578 CATCH_AND_ASSERT(); 1579 } 1580 else { 1581 TRY(); 1582 input.activate(); 1583 g.wait_for_all(); 1584 CATCH_AND_FAIL(); 1585 } 1586 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 1587 int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value(); 1588 int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(); 1589 if(throwException) { 1590 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph"); 1591 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph"); 1592 CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs"); 1593 CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes"); 1594 } 1595 else { 1596 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 1597 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 1598 // we stop after limiter's limit, which is g_NumThreads + 1. The input_node 1599 // is invoked one extra time, filling its buffer, so its limit is g_NumThreads + 2. 1600 CHECK_MESSAGE( (ib_cnt == g_NumThreads + 2), "Missing invocations of input_node"); 1601 CHECK_MESSAGE( (nb_cnt == g_NumThreads + 1), "Missing items in absorbers"); 1602 } 1603 if(iter == 0) { 1604 remove_edge(node_to_test, sink); 1605 node_to_test.try_put(BufferItemType()); 1606 node_to_test.try_put(BufferItemType()); 1607 g.wait_for_all(); 1608 g.reset(); 1609 input_count = sink_count = 0; 1610 BufferItemType tmp; 1611 CHECK_MESSAGE( (!node_to_test.try_get(tmp)), "node not empty"); 1612 make_edge(node_to_test, sink); 1613 g.wait_for_all(); 1614 } 1615 else { 1616 g.reset(); 1617 input_count = sink_count = 0; 1618 } 1619 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed"); 1620 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed"); 1621 } 1622 1623 #if USE_TASK_SCHEDULER_OBSERVER 1624 o.observe(false); 1625 #endif 1626 } 1627 1628 template<class BufferItemType, 1629 TestNodeTypeEnum InputThrowType, 1630 TestNodeTypeEnum SinkThrowType> 1631 void run_limiter_node_test() { 1632 typedef test_input_body<BufferItemType,InputThrowType> InputBodyType; 1633 typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType; 1634 1635 typedef tbb::flow::input_node<BufferItemType> InputType; 1636 typedef tbb::flow::limiter_node<BufferItemType> LmtType; 1637 typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType; 1638 1639 for(int i = 0; i < 4; ++i) { 1640 if(i == 2) continue; // no need to test flog w/o throws 1641 bool throwException = (i & 0x1) != 0; 1642 bool doFlog = (i & 0x2) != 0; 1643 run_one_limiter_node_test< 1644 /* class BufferItemType*/ BufferItemType, 1645 /*class InputNodeType*/ InputType, 1646 /*class InputNodeBodyType*/ InputBodyType, 1647 /*class TestNodeType*/ LmtType, 1648 /*class SinkNodeType*/ SnkType, 1649 /*class SinkNodeBodyType*/ SinkBodyType 1650 >(throwException, doFlog); 1651 } 1652 } 1653 1654 void test_limiter_node() { 1655 INFO("Testing limiter_node\n"); 1656 g_Wakeup_Msg = "limiter_node(is,non): Missed wakeup or machine is overloaded?"; 1657 run_limiter_node_test<int,isThrowing,nonThrowing>(); 1658 g_Wakeup_Msg = "limiter_node(non,is): Missed wakeup or machine is overloaded?"; 1659 run_limiter_node_test<int,nonThrowing,isThrowing>(); 1660 g_Wakeup_Msg = "limiter_node(is,is): Missed wakeup or machine is overloaded?"; 1661 run_limiter_node_test<int,isThrowing,isThrowing>(); 1662 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 1663 } 1664 1665 // -------- split_node -------------------- 1666 1667 template< 1668 class InputTuple, 1669 class InputType, 1670 class InputBodyType, 1671 class TestSplitType, 1672 class SinkType0, 1673 class SinkBodyType0, 1674 class SinkType1, 1675 class SinkBodyType1> 1676 void run_one_split_node_test(bool throwException, bool flog) { 1677 1678 tbb::flow::graph g; 1679 1680 std::atomic<int> input_count; 1681 std::atomic<int> sink0_count; 1682 std::atomic<int> sink1_count; 1683 input_count = sink0_count = sink1_count = 0; 1684 #if USE_TASK_SCHEDULER_OBSERVER 1685 eh_test_observer o; 1686 o.observe(true); 1687 #endif 1688 1689 g_Master = std::this_thread::get_id(); 1690 InputType input(g, InputBodyType(input_count)); 1691 TestSplitType node_to_test(g); 1692 SinkType0 sink0(g,tbb::flow::unlimited,SinkBodyType0(sink0_count)); 1693 SinkType1 sink1(g,tbb::flow::unlimited,SinkBodyType1(sink1_count)); 1694 make_edge(input, node_to_test); 1695 make_edge(tbb::flow::output_port<0>(node_to_test), sink0); 1696 make_edge(tbb::flow::output_port<1>(node_to_test), sink1); 1697 1698 for(int iter = 0; iter < 2; ++iter) { // run, reset, run again 1699 ResetGlobals(throwException,flog); 1700 if(throwException) { 1701 TRY(); 1702 input.activate(); 1703 g.wait_for_all(); 1704 CATCH_AND_ASSERT(); 1705 } 1706 else { 1707 TRY(); 1708 input.activate(); 1709 g.wait_for_all(); 1710 CATCH_AND_FAIL(); 1711 } 1712 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 1713 int ib_cnt = tbb::flow::copy_body<InputBodyType>(input).count_value(); 1714 int nb0_cnt = tbb::flow::copy_body<SinkBodyType0>(sink0).count_value(); 1715 int nb1_cnt = tbb::flow::copy_body<SinkBodyType1>(sink1).count_value(); 1716 if(throwException) { 1717 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph"); 1718 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph"); 1719 CHECK_MESSAGE( (ib_cnt <= 2*g_NumItems), "Too many items sent by input"); 1720 CHECK_MESSAGE( (nb0_cnt + nb1_cnt <= ib_cnt*2), "Too many items received by sink nodes"); 1721 } 1722 else { 1723 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 1724 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 1725 CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_nodes"); 1726 CHECK_MESSAGE( (nb0_cnt == g_NumItems && nb1_cnt == g_NumItems), "Missing items in absorbers"); 1727 } 1728 g.reset(); // resets the body of the input_nodes and the absorb_nodes. 1729 input_count = sink0_count = sink1_count = 0; 1730 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType>(input).count_value()),"Reset input failed"); 1731 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType0>(sink0).count_value()),"Reset sink 0 failed"); 1732 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType1>(sink1).count_value()),"Reset sink 1 failed"); 1733 } 1734 #if USE_TASK_SCHEDULER_OBSERVER 1735 o.observe(false); 1736 #endif 1737 } 1738 1739 template<class InputTuple, 1740 TestNodeTypeEnum InputThrowType, 1741 TestNodeTypeEnum SinkThrowType> 1742 void run_split_node_test() { 1743 typedef typename std::tuple_element<0,InputTuple>::type ItemType0; 1744 typedef typename std::tuple_element<1,InputTuple>::type ItemType1; 1745 typedef tuple_test_input_body<InputTuple,InputThrowType> InputBodyType; 1746 typedef absorber_body<ItemType0,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType0; 1747 typedef absorber_body<ItemType1,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType1; 1748 1749 typedef typename tbb::flow::input_node<InputTuple> InputType; 1750 typedef typename tbb::flow::split_node<InputTuple> TestSplitType; 1751 typedef typename tbb::flow::function_node<ItemType0,tbb::flow::continue_msg> SinkType0; 1752 typedef typename tbb::flow::function_node<ItemType1,tbb::flow::continue_msg> SinkType1; 1753 1754 for(int i = 0; i < 4; ++i) { 1755 if(2 == i) continue; 1756 bool throwException = (i & 0x1) != 0; 1757 bool doFlog = (i & 0x2) != 0; 1758 run_one_split_node_test< 1759 InputTuple, 1760 InputType, 1761 InputBodyType, 1762 TestSplitType, 1763 SinkType0, 1764 SinkBodyType0, 1765 SinkType1, 1766 SinkBodyType1> 1767 (throwException,doFlog); 1768 } 1769 } 1770 1771 void test_split_node() { 1772 INFO("Testing split_node\n"); 1773 g_Wakeup_Msg = "split_node(is,non): Missed wakeup or machine is overloaded?"; 1774 run_split_node_test<std::tuple<int,int>, isThrowing, nonThrowing>(); 1775 g_Wakeup_Msg = "split_node(non,is): Missed wakeup or machine is overloaded?"; 1776 run_split_node_test<std::tuple<int,int>, nonThrowing, isThrowing>(); 1777 g_Wakeup_Msg = "split_node(is,is): Missed wakeup or machine is overloaded?"; 1778 run_split_node_test<std::tuple<int,int>, isThrowing, isThrowing>(); 1779 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 1780 } 1781 1782 // --------- indexer_node ---------------------- 1783 1784 template < class InputTuple, 1785 class InputType0, 1786 class InputBodyType0, 1787 class InputType1, 1788 class InputBodyType1, 1789 class TestNodeType, 1790 class SinkType, 1791 class SinkBodyType> 1792 void run_one_indexer_node_test(bool throwException,bool flog) { 1793 typedef typename std::tuple_element<0,InputTuple>::type ItemType0; 1794 typedef typename std::tuple_element<1,InputTuple>::type ItemType1; 1795 1796 tbb::flow::graph g; 1797 1798 std::atomic<int> input0_count; 1799 std::atomic<int> input1_count; 1800 std::atomic<int> sink_count; 1801 input0_count = input1_count = sink_count = 0; 1802 #if USE_TASK_SCHEDULER_OBSERVER 1803 eh_test_observer o; 1804 o.observe(true); 1805 #endif 1806 g_Master = std::this_thread::get_id(); 1807 InputType0 input0(g, InputBodyType0(input0_count)); 1808 InputType1 input1(g, InputBodyType1(input1_count)); 1809 TestNodeType node_to_test(g); 1810 SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count)); 1811 make_edge(input0,tbb::flow::input_port<0>(node_to_test)); 1812 make_edge(input1,tbb::flow::input_port<1>(node_to_test)); 1813 make_edge(node_to_test, sink); 1814 for(int iter = 0; iter < 2; ++iter) { 1815 ResetGlobals(throwException,flog); 1816 if(throwException) { 1817 TRY(); 1818 input0.activate(); 1819 input1.activate(); 1820 g.wait_for_all(); 1821 CATCH_AND_ASSERT(); 1822 } 1823 else { 1824 TRY(); 1825 input0.activate(); 1826 input1.activate(); 1827 g.wait_for_all(); 1828 CATCH_AND_FAIL(); 1829 } 1830 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 1831 int ib0_cnt = tbb::flow::copy_body<InputBodyType0>(input0).count_value(); 1832 int ib1_cnt = tbb::flow::copy_body<InputBodyType1>(input1).count_value(); 1833 int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value(); 1834 if(throwException) { 1835 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph"); 1836 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph"); 1837 CHECK_MESSAGE( (ib0_cnt <= g_NumItems && ib1_cnt <= g_NumItems), "Too many items sent by inputs"); 1838 CHECK_MESSAGE( (nb_cnt <= ib0_cnt + ib1_cnt), "Too many items received by sink nodes"); 1839 } 1840 else { 1841 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 1842 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 1843 CHECK_MESSAGE( (ib0_cnt == g_NumItems), "Missing invocations of input_node0"); 1844 CHECK_MESSAGE( (ib1_cnt == g_NumItems), "Missing invocations of input_node1"); 1845 CHECK_MESSAGE( (nb_cnt == 2*g_NumItems), "Missing items in absorbers"); 1846 } 1847 if(iter == 0) { 1848 remove_edge(node_to_test, sink); 1849 tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 4)); 1850 tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2)); 1851 g.wait_for_all(); 1852 g.reset(); 1853 input0_count = input1_count = sink_count = 0; 1854 make_edge(node_to_test, sink); 1855 g.wait_for_all(); 1856 } 1857 else { 1858 g.wait_for_all(); 1859 g.reset(); 1860 input0_count = input1_count = sink_count = 0; 1861 } 1862 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType0>(input0).count_value()),"Reset input failed"); 1863 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType1>(input1).count_value()),"Reset input failed"); 1864 nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value(); 1865 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value()),"Reset sink failed"); 1866 } 1867 1868 #if USE_TASK_SCHEDULER_OBSERVER 1869 o.observe(false); 1870 #endif 1871 } 1872 1873 template<class InputTuple, 1874 TestNodeTypeEnum InputThrowType, 1875 TestNodeTypeEnum SinkThrowType> 1876 void run_indexer_node_test() { 1877 typedef typename std::tuple_element<0,InputTuple>::type ItemType0; 1878 typedef typename std::tuple_element<1,InputTuple>::type ItemType1; 1879 typedef test_input_body<ItemType0,InputThrowType> InputBodyType0; 1880 typedef test_input_body<ItemType1,InputThrowType> InputBodyType1; 1881 typedef typename tbb::flow::indexer_node<ItemType0, ItemType1> TestNodeType; 1882 typedef absorber_body<typename TestNodeType::output_type,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType; 1883 1884 typedef typename tbb::flow::input_node<ItemType0> InputType0; 1885 typedef typename tbb::flow::input_node<ItemType1> InputType1; 1886 typedef typename tbb::flow::function_node<typename TestNodeType::output_type,tbb::flow::continue_msg> SinkType; 1887 1888 for(int i = 0; i < 4; ++i) { 1889 if(2 == i) continue; 1890 bool throwException = (i & 0x1) != 0; 1891 bool doFlog = (i & 0x2) != 0; 1892 run_one_indexer_node_test< 1893 InputTuple, 1894 InputType0, 1895 InputBodyType0, 1896 InputType1, 1897 InputBodyType1, 1898 TestNodeType, 1899 SinkType, 1900 SinkBodyType>(throwException,doFlog); 1901 } 1902 } 1903 1904 void test_indexer_node() { 1905 INFO("Testing indexer_node\n"); 1906 g_Wakeup_Msg = "indexer_node(is,non): Missed wakeup or machine is overloaded?"; 1907 run_indexer_node_test<std::tuple<int,int>, isThrowing, nonThrowing>(); 1908 g_Wakeup_Msg = "indexer_node(non,is): Missed wakeup or machine is overloaded?"; 1909 run_indexer_node_test<std::tuple<int,int>, nonThrowing, isThrowing>(); 1910 g_Wakeup_Msg = "indexer_node(is,is): Missed wakeup or machine is overloaded?"; 1911 run_indexer_node_test<std::tuple<int,int>, isThrowing, isThrowing>(); 1912 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 1913 } 1914 1915 /////////////////////////////////////////////// 1916 // whole-graph exception test 1917 1918 class Foo { 1919 private: 1920 // std::vector<int>& m_vec; 1921 std::vector<int>* m_vec; 1922 public: 1923 Foo(std::vector<int>& vec) : m_vec(&vec) { } 1924 void operator() (tbb::flow::continue_msg) const { 1925 ++nExceptions; 1926 (void)m_vec->at(m_vec->size()); // Will throw out_of_range exception 1927 CHECK_MESSAGE( (false), "Exception not thrown by invalid access"); 1928 } 1929 }; 1930 1931 // test from user ahelwer: http://software.intel.com/en-us/forums/showthread.php?t=103786 1932 // exception thrown in graph node, not caught in wait_for_all() 1933 void 1934 test_flow_graph_exception0() { 1935 // Initializes body 1936 std::vector<int> vec; 1937 vec.push_back(0); 1938 Foo f(vec); 1939 nExceptions = 0; 1940 1941 // Construct graph and nodes 1942 tbb::flow::graph g; 1943 tbb::flow::broadcast_node<tbb::flow::continue_msg> start(g); 1944 tbb::flow::continue_node<tbb::flow::continue_msg> fooNode(g, f); 1945 1946 // Construct edge 1947 tbb::flow::make_edge(start, fooNode); 1948 1949 // Execute graph 1950 CHECK_MESSAGE( (!g.exception_thrown()), "exception_thrown flag already set"); 1951 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag already set"); 1952 try { 1953 start.try_put(tbb::flow::continue_msg()); 1954 g.wait_for_all(); 1955 CHECK_MESSAGE( (false), "Exception not thrown"); 1956 } 1957 catch(std::out_of_range& ex) { 1958 INFO("Exception: " << ex.what() << "(expected)\n"); 1959 } 1960 catch(...) { 1961 INFO("Unknown exception caught (expected)\n"); 1962 } 1963 CHECK_MESSAGE( (nExceptions > 0), "Exception caught, but no body signaled exception being thrown"); 1964 nExceptions = 0; 1965 CHECK_MESSAGE( (g.exception_thrown()), "Exception not intercepted"); 1966 // if exception set, cancellation also set. 1967 CHECK_MESSAGE( (g.is_cancelled()), "Exception cancellation not signaled"); 1968 // in case we got an exception 1969 try { 1970 g.wait_for_all(); // context still signalled canceled, my_exception still set. 1971 } 1972 catch(...) { 1973 CHECK_MESSAGE( (false), "Second exception thrown but no task executing"); 1974 } 1975 CHECK_MESSAGE( (nExceptions == 0), "body signaled exception being thrown, but no body executed"); 1976 CHECK_MESSAGE( (!g.exception_thrown()), "exception_thrown flag not reset"); 1977 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag not reset"); 1978 } 1979 1980 void TestOneThreadNum(int nThread) { 1981 INFO("Testing " << nThread << "%d threads\n"); 1982 g_NumItems = ((nThread > NUM_ITEMS) ? nThread *2 : NUM_ITEMS); 1983 g_NumThreads = nThread; 1984 tbb::task_arena arena(nThread); 1985 arena.execute( 1986 [&]() { 1987 // whole-graph exception catch and rethrow test 1988 test_flow_graph_exception0(); 1989 for(int i = 0; i < 4; ++i) { 1990 g_ExceptionInMaster = (i & 1) != 0; 1991 g_SolitaryException = (i & 2) != 0; 1992 INFO("g_ExceptionInMaster == " << (g_ExceptionInMaster ? "T":"F") 1993 << ", g_SolitaryException == " << (g_SolitaryException ? "T":"F") << "\n"); 1994 test_input_node(); 1995 test_function_node(); 1996 test_continue_node(); // also test broadcast_node 1997 test_multifunction_node(); 1998 // single- and multi-item buffering nodes 1999 test_buffer_queue_and_overwrite_node(); 2000 test_sequencer_node(); 2001 test_priority_queue_node(); 2002 2003 // join_nodes 2004 test_join_node<tbb::flow::queueing>(); 2005 test_join_node<tbb::flow::reserving>(); 2006 test_join_node<tbb::flow::tag_matching>(); 2007 2008 test_limiter_node(); 2009 test_split_node(); 2010 // graph for write_once_node will be complicated by the fact the node will 2011 // not do try_puts after it has been set. To get parallelism of N we have 2012 // to attach N successor nodes to the write_once (or play some similar game). 2013 // test_write_once_node(); 2014 test_indexer_node(); 2015 } 2016 } 2017 ); 2018 } 2019 2020 //! Test exceptions with parallelism 2021 //! \brief \ref error_guessing 2022 TEST_CASE("Testing several threads"){ 2023 // reverse order of tests 2024 for(unsigned int nThread=utils::MaxThread; nThread >= utils::MinThread; --nThread) { 2025 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, nThread); 2026 TestOneThreadNum(nThread); 2027 } 2028 } 2029 2030 #endif // TBB_USE_EXCEPTIONS 2031