1 /* 2 Copyright (c) 2005-2020 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 //! \file test_eh_flow_graph.cpp 18 //! \brief Test for [flow_graph.copy_body flow_graph.broadcast_node flow_graph.priority_queue_node flow_graph.indexer_node flow_graph.sequencer_node flow_graph.remove_edge flow_graph.join_node flow_graph.split_node flow_graph.limiter_node flow_graph.write_once_node flow_graph.overwrite_node flow_graph.make_edge flow_graph.graph flow_graph.buffer_node flow_graph.function_node flow_graph.multifunction_node flow_graph.continue_node flow_graph.input_node] specification 19 20 #include "common/config.h" 21 22 #if USE_TASK_SCHEDULER_OBSERVER 23 #include "tbb/task_scheduler_observer.h" 24 #endif 25 #include "tbb/flow_graph.h" 26 27 #include "common/test.h" 28 29 #if TBB_USE_EXCEPTIONS 30 31 #include "common/utils.h" 32 #include "common/checktype.h" 33 #include "common/concurrency_tracker.h" 34 35 #if _MSC_VER 36 #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning 37 #endif 38 39 #if __TBB_MSVC_UNREACHABLE_CODE_IGNORED 40 // Suppress "unreachable code" warning by VC++ 17.0-18.0 (VS 2012 or newer) 41 #pragma warning (disable: 4702) 42 #endif 43 44 // global task_scheduler_observer is an imperfect tool to find how many threads are really 45 // participating. That was the hope, but it counts the entries into the marketplace, 46 // not the arena. 47 // TODO: Consider using local task scheduler observer 48 // #define USE_TASK_SCHEDULER_OBSERVER 1 49 50 #include <iostream> 51 #include <sstream> 52 #include <vector> 53 54 #include "common/exception_handling.h" 55 56 #include <stdexcept> 57 58 #define NUM_ITEMS 15 59 int g_NumItems; 60 61 std::atomic<unsigned> nExceptions; 62 std::atomic<intptr_t> g_TGCCancelled; 63 64 enum TestNodeTypeEnum { nonThrowing, isThrowing }; 65 66 static const size_t unlimited_type = 0; 67 static const size_t serial_type = 1; 68 static const size_t limited_type = 4; 69 70 template<TestNodeTypeEnum T> struct TestNodeTypeName; 71 template<> struct TestNodeTypeName<nonThrowing> { static const char *name() { return "nonThrowing"; } }; 72 template<> struct TestNodeTypeName<isThrowing> { static const char *name() { return "isThrowing"; } }; 73 74 template<size_t Conc> struct concurrencyName; 75 template<> struct concurrencyName<serial_type>{ static const char *name() { return "serial"; } }; 76 template<> struct concurrencyName<unlimited_type>{ static const char *name() { return "unlimited"; } }; 77 template<> struct concurrencyName<limited_type>{ static const char *name() { return "limited"; } }; 78 79 // Class that provides waiting and throwing behavior. If we are not throwing, do nothing 80 // If serial, we can't wait for concurrency to peak; we may be the bottleneck and will 81 // stop further processing. We will execute g_NumThreads + 10 times (the "10" is somewhat 82 // arbitrary, and just makes sure there are enough items in the graph to keep it flowing), 83 // If parallel or serial and throwing, use utils::ConcurrencyTracker to wait. 84 85 template<size_t Conc, TestNodeTypeEnum t = nonThrowing> 86 class WaitThrow; 87 88 template<> 89 class WaitThrow<serial_type,nonThrowing> { 90 protected: 91 void WaitAndThrow(int cnt, const char * /*name*/) { 92 if(cnt > g_NumThreads + 10) { 93 utils::ConcurrencyTracker ct; 94 WaitUntilConcurrencyPeaks(); 95 } 96 } 97 }; 98 99 template<> 100 class WaitThrow<serial_type,isThrowing> { 101 protected: 102 void WaitAndThrow(int cnt, const char * /*name*/) { 103 if(cnt > g_NumThreads + 10) { 104 utils::ConcurrencyTracker ct; 105 WaitUntilConcurrencyPeaks(); 106 ThrowTestException(1); 107 } 108 } 109 }; 110 111 // for nodes with limited concurrency, if that concurrency is < g_NumThreads, we need 112 // to make sure enough other nodes wait for concurrency to peak. If we are attached to 113 // N successors, for each item we pass to a successor, we will get N executions of the 114 // "absorbers" (because we broadcast to successors.) for an odd number of threads we 115 // need (g_NumThreads - limited + 1) / 2 items (that will give us one extra execution 116 // of an "absorber", but we can't change that without changing the behavior of the node.) 117 template<> 118 class WaitThrow<limited_type,nonThrowing> { 119 protected: 120 void WaitAndThrow(int cnt, const char * /*name*/) { 121 if(cnt <= (g_NumThreads - (int)limited_type + 1)/2) { 122 return; 123 } 124 utils::ConcurrencyTracker ct; 125 WaitUntilConcurrencyPeaks(); 126 } 127 }; 128 129 template<> 130 class WaitThrow<limited_type,isThrowing> { 131 protected: 132 void WaitAndThrow(int cnt, const char * /*name*/) { 133 utils::ConcurrencyTracker ct; 134 if(cnt <= (g_NumThreads - (int)limited_type + 1)/2) { 135 return; 136 } 137 WaitUntilConcurrencyPeaks(); 138 ThrowTestException(1); 139 } 140 }; 141 142 template<> 143 class WaitThrow<unlimited_type,nonThrowing> { 144 protected: 145 void WaitAndThrow(int /*cnt*/, const char * /*name*/) { 146 utils::ConcurrencyTracker ct; 147 WaitUntilConcurrencyPeaks(); 148 } 149 }; 150 151 template<> 152 class WaitThrow<unlimited_type,isThrowing> { 153 protected: 154 void WaitAndThrow(int /*cnt*/, const char * /*name*/) { 155 utils::ConcurrencyTracker ct; 156 WaitUntilConcurrencyPeaks(); 157 ThrowTestException(1); 158 } 159 }; 160 161 void 162 ResetGlobals(bool throwException = true, bool flog = false) { 163 nExceptions = 0; 164 g_TGCCancelled = 0; 165 ResetEhGlobals(throwException, flog); 166 } 167 168 // -------input_node body ------------------ 169 template <class OutputType, TestNodeTypeEnum TType> 170 class test_input_body : WaitThrow<serial_type, TType> { 171 using WaitThrow<serial_type, TType>::WaitAndThrow; 172 std::atomic<int> *my_current_val; 173 int my_mult; 174 public: 175 test_input_body(std::atomic<int> &my_cnt, int multiplier = 1) : my_current_val(&my_cnt), my_mult(multiplier) { 176 // INFO("- --------- - - - constructed " << (size_t)(my_current_val) << "\n"); 177 } 178 179 OutputType operator()(tbb::flow_control& fc) { 180 UPDATE_COUNTS(); 181 OutputType ret = OutputType(my_mult * ++(*my_current_val)); 182 // TODO revamp: reconsider logging for the tests. 183 184 // The following line is known to cause double frees. Therefore, commenting out frequent 185 // calls to INFO() macro. 186 187 // INFO("xx(" << (size_t)(my_current_val) << ") ret == " << (int)ret << "\n"); 188 if(*my_current_val > g_NumItems) { 189 // INFO(" ------ End of the line!\n"); 190 *my_current_val = g_NumItems; 191 fc.stop(); 192 return OutputType(); 193 } 194 WaitAndThrow((int)ret,"test_input_body"); 195 return ret; 196 } 197 198 int count_value() { return (int)*my_current_val; } 199 }; 200 201 template <TestNodeTypeEnum TType> 202 class test_input_body<tbb::flow::continue_msg, TType> : WaitThrow<serial_type, TType> { 203 using WaitThrow<serial_type, TType>::WaitAndThrow; 204 std::atomic<int> *my_current_val; 205 public: 206 test_input_body(std::atomic<int> &my_cnt) : my_current_val(&my_cnt) { } 207 208 tbb::flow::continue_msg operator()( tbb::flow_control & fc) { 209 UPDATE_COUNTS(); 210 int outint = ++(*my_current_val); 211 if(*my_current_val > g_NumItems) { 212 *my_current_val = g_NumItems; 213 fc.stop(); 214 return tbb::flow::continue_msg(); 215 } 216 WaitAndThrow(outint,"test_input_body"); 217 return tbb::flow::continue_msg(); 218 } 219 220 int count_value() { return (int)*my_current_val; } 221 }; 222 223 // -------{function/continue}_node body ------------------ 224 template<class InputType, class OutputType, TestNodeTypeEnum T, size_t Conc> 225 class absorber_body : WaitThrow<Conc,T> { 226 using WaitThrow<Conc,T>::WaitAndThrow; 227 std::atomic<int> *my_count; 228 public: 229 absorber_body(std::atomic<int> &my_cnt) : my_count(&my_cnt) { } 230 OutputType operator()(const InputType &/*p_in*/) { 231 UPDATE_COUNTS(); 232 int out = ++(*my_count); 233 WaitAndThrow(out,"absorber_body"); 234 return OutputType(); 235 } 236 int count_value() { return *my_count; } 237 }; 238 239 // -------multifunction_node body ------------------ 240 241 // helper classes 242 template<int N,class PortsType> 243 struct IssueOutput { 244 typedef typename std::tuple_element<N-1,PortsType>::type::output_type my_type; 245 246 static void issue_tuple_element( PortsType &my_ports) { 247 CHECK_MESSAGE( (std::get<N-1>(my_ports).try_put(my_type())), "Error putting to successor"); 248 IssueOutput<N-1,PortsType>::issue_tuple_element(my_ports); 249 } 250 }; 251 252 template<class PortsType> 253 struct IssueOutput<1,PortsType> { 254 typedef typename std::tuple_element<0,PortsType>::type::output_type my_type; 255 256 static void issue_tuple_element( PortsType &my_ports) { 257 CHECK_MESSAGE( (std::get<0>(my_ports).try_put(my_type())), "Error putting to successor"); 258 } 259 }; 260 261 template<class InputType, class OutputTupleType, TestNodeTypeEnum T, size_t Conc> 262 class multifunction_node_body : WaitThrow<Conc,T> { 263 using WaitThrow<Conc,T>::WaitAndThrow; 264 static const int N = std::tuple_size<OutputTupleType>::value; 265 typedef typename tbb::flow::multifunction_node<InputType,OutputTupleType> NodeType; 266 typedef typename NodeType::output_ports_type PortsType; 267 std::atomic<int> *my_count; 268 public: 269 multifunction_node_body(std::atomic<int> &my_cnt) : my_count(&my_cnt) { } 270 void operator()(const InputType& /*in*/, PortsType &my_ports) { 271 UPDATE_COUNTS(); 272 int out = ++(*my_count); 273 WaitAndThrow(out,"multifunction_node_body"); 274 // issue an item to each output port. 275 IssueOutput<N,PortsType>::issue_tuple_element(my_ports); 276 } 277 278 int count_value() { return *my_count; } 279 }; 280 281 // --------- body to sort items in sequencer_node 282 template<class BufferItemType> 283 struct sequencer_body { 284 size_t operator()(const BufferItemType &s) { 285 CHECK_MESSAGE( (s), "sequencer item out of range (== 0)"); 286 return size_t(s) - 1; 287 } 288 }; 289 290 // --------- body to compare the "priorities" of objects for priority_queue_node five priority levels 0-4. 291 template<class T> 292 struct myLess { 293 bool operator()(const T &t1, const T &t2) { 294 return (int(t1) % 5) < (int(t2) % 5); 295 } 296 }; 297 298 // --------- type for < comparison in priority_queue_node. 299 template<class ItemType> 300 struct less_body { 301 bool operator()(const ItemType &lhs, const ItemType &rhs) { 302 return (int(lhs) % 3) < (int(rhs) % 3); 303 } 304 }; 305 306 // --------- tag methods for tag_matching join_node 307 template<typename TT> 308 class tag_func { 309 TT my_mult; 310 public: 311 tag_func(TT multiplier) : my_mult(multiplier) { } 312 // operator() will return [0 .. Count) 313 tbb::flow::tag_value operator()( TT v) { 314 tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult); 315 return t; 316 } 317 }; 318 319 // --------- Input body for split_node test. 320 template <class OutputTuple, TestNodeTypeEnum TType> 321 class tuple_test_input_body : WaitThrow<serial_type, TType> { 322 typedef typename std::tuple_element<0,OutputTuple>::type ItemType0; 323 typedef typename std::tuple_element<1,OutputTuple>::type ItemType1; 324 using WaitThrow<serial_type, TType>::WaitAndThrow; 325 std::atomic<int> *my_current_val; 326 public: 327 tuple_test_input_body(std::atomic<int> &my_cnt) : my_current_val(&my_cnt) { } 328 329 OutputTuple operator()(tbb::flow_control& fc) { 330 UPDATE_COUNTS(); 331 int ival = ++(*my_current_val); 332 if(*my_current_val > g_NumItems) { 333 *my_current_val = g_NumItems; // jam the final value; we assert on it later. 334 fc.stop(); 335 return OutputTuple(); 336 } 337 WaitAndThrow(ival,"tuple_test_input_body"); 338 return OutputTuple(ItemType0(ival),ItemType1(ival)); 339 } 340 341 int count_value() { return (int)*my_current_val; } 342 }; 343 344 // ------- end of node bodies 345 346 // input_node is only-serial. input_node can throw, or the function_node can throw. 347 // graph being tested is 348 // 349 // input_node+---+parallel function_node 350 // 351 // After each run the graph is reset(), to test the reset functionality. 352 // 353 354 355 template<class ItemType, TestNodeTypeEnum inpThrowType, TestNodeTypeEnum absorbThrowType> 356 void run_one_input_node_test(bool throwException, bool flog) { 357 typedef test_input_body<ItemType,inpThrowType> src_body_type; 358 typedef absorber_body<ItemType, tbb::flow::continue_msg, absorbThrowType, unlimited_type> parallel_absorb_body_type; 359 std::atomic<int> input_body_count; 360 std::atomic<int> absorber_body_count; 361 input_body_count = 0; 362 absorber_body_count = 0; 363 364 tbb::flow::graph g; 365 366 g_Master = std::this_thread::get_id(); 367 368 #if USE_TASK_SCHEDULER_OBSERVER 369 eh_test_observer o; 370 o.observe(true); 371 #endif 372 373 tbb::flow::input_node<ItemType> sn(g, src_body_type(input_body_count)); 374 parallel_absorb_body_type ab2(absorber_body_count); 375 tbb::flow::function_node<ItemType> parallel_fn(g,tbb::flow::unlimited,ab2); 376 make_edge(sn, parallel_fn); 377 for(int runcnt = 0; runcnt < 2; ++runcnt) { 378 ResetGlobals(throwException,flog); 379 if(throwException) { 380 TRY(); 381 sn.activate(); 382 g.wait_for_all(); 383 CATCH_AND_ASSERT(); 384 } 385 else { 386 TRY(); 387 sn.activate(); 388 g.wait_for_all(); 389 CATCH_AND_FAIL(); 390 } 391 392 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 393 int src_cnt = tbb::flow::copy_body<src_body_type>(sn).count_value(); 394 int sink_cnt = tbb::flow::copy_body<parallel_absorb_body_type>(parallel_fn).count_value(); 395 if(throwException) { 396 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception flag in flow::graph not set"); 397 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "canceled flag not set"); 398 CHECK_MESSAGE( (src_cnt <= g_NumItems), "Too many input_node items emitted"); 399 CHECK_MESSAGE( (sink_cnt <= src_cnt), "Too many input_node items received"); 400 } 401 else { 402 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 403 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 404 CHECK_MESSAGE( (src_cnt == g_NumItems), "Incorrect # input_node items emitted"); 405 CHECK_MESSAGE( (sink_cnt == src_cnt), "Incorrect # input_node items received"); 406 } 407 g.reset(); // resets the body of the input_node and the absorb_nodes. 408 input_body_count = 0; 409 absorber_body_count = 0; 410 CHECK_MESSAGE( (!g.exception_thrown()), "Reset didn't clear exception_thrown()"); 411 CHECK_MESSAGE( (!g.is_cancelled()), "Reset didn't clear is_cancelled()"); 412 src_cnt = tbb::flow::copy_body<src_body_type>(sn).count_value(); 413 sink_cnt = tbb::flow::copy_body<parallel_absorb_body_type>(parallel_fn).count_value(); 414 CHECK_MESSAGE( (src_cnt == 0), "input_node count not reset"); 415 CHECK_MESSAGE( (sink_cnt == 0), "sink_node count not reset"); 416 } 417 #if USE_TASK_SCHEDULER_OBSERVER 418 o.observe(false); 419 #endif 420 } // run_one_input_node_test 421 422 423 template<class ItemType, TestNodeTypeEnum inpThrowType, TestNodeTypeEnum absorbThrowType> 424 void run_input_node_test() { 425 run_one_input_node_test<ItemType,inpThrowType,absorbThrowType>(false,false); 426 run_one_input_node_test<ItemType,inpThrowType,absorbThrowType>(true,false); 427 run_one_input_node_test<ItemType,inpThrowType,absorbThrowType>(true,true); 428 } // run_input_node_test 429 430 void test_input_node() { 431 INFO("Testing input_node\n"); 432 CheckType<int>::check_type_counter = 0; 433 g_Wakeup_Msg = "input_node(1): Missed wakeup or machine is overloaded?"; 434 run_input_node_test<CheckType<int>, isThrowing, nonThrowing>(); 435 CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test"); 436 g_Wakeup_Msg = "input_node(2): Missed wakeup or machine is overloaded?"; 437 run_input_node_test<int, isThrowing, nonThrowing>(); 438 g_Wakeup_Msg = "input_node(3): Missed wakeup or machine is overloaded?"; 439 run_input_node_test<int, nonThrowing, isThrowing>(); 440 g_Wakeup_Msg = "input_node(4): Missed wakeup or machine is overloaded?"; 441 run_input_node_test<int, isThrowing, isThrowing>(); 442 g_Wakeup_Msg = "input_node(5): Missed wakeup or machine is overloaded?"; 443 run_input_node_test<CheckType<int>, isThrowing, isThrowing>(); 444 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 445 CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test"); 446 } 447 448 // -------- utilities & types to test function_node and multifunction_node. 449 450 // need to tell the template which node type I am using so it attaches successors correctly. 451 enum NodeFetchType { func_node_type, multifunc_node_type }; 452 453 template<class NodeType, class ItemType, int indx, NodeFetchType NFT> 454 struct AttachPoint; 455 456 template<class NodeType, class ItemType, int indx> 457 struct AttachPoint<NodeType,ItemType,indx,multifunc_node_type> { 458 static tbb::flow::sender<ItemType> &GetSender(NodeType &n) { 459 return tbb::flow::output_port<indx>(n); 460 } 461 }; 462 463 template<class NodeType, class ItemType, int indx> 464 struct AttachPoint<NodeType,ItemType,indx,func_node_type> { 465 static tbb::flow::sender<ItemType> &GetSender(NodeType &n) { 466 return n; 467 } 468 }; 469 470 471 // common template for running function_node, multifunction_node. continue_node 472 // has different firing requirements, so it needs a different graph topology. 473 template< 474 class InputNodeType, 475 class InputNodeBodyType0, 476 class InputNodeBodyType1, 477 NodeFetchType NFT, 478 class TestNodeType, 479 class TestNodeBodyType, 480 class TypeToSink0, // what kind of item are we sending to sink0 481 class TypeToSink1, // what kind of item are we sending to sink1 482 class SinkNodeType0, // will be same for function; 483 class SinkNodeType1, // may differ for multifunction_node 484 class SinkNodeBodyType0, 485 class SinkNodeBodyType1, 486 size_t Conc 487 > 488 void 489 run_one_functype_node_test(bool throwException, bool flog, const char * /*name*/) { 490 491 std::stringstream ss; 492 char *saved_msg = const_cast<char *>(g_Wakeup_Msg); 493 tbb::flow::graph g; 494 495 std::atomic<int> input0_count; 496 std::atomic<int> input1_count; 497 std::atomic<int> sink0_count; 498 std::atomic<int> sink1_count; 499 std::atomic<int> test_count; 500 input0_count = input1_count = sink0_count = sink1_count = test_count = 0; 501 502 #if USE_TASK_SCHEDULER_OBSERVER 503 eh_test_observer o; 504 o.observe(true); 505 #endif 506 507 g_Master = std::this_thread::get_id(); 508 InputNodeType input0(g, InputNodeBodyType0(input0_count)); 509 InputNodeType input1(g, InputNodeBodyType1(input1_count)); 510 TestNodeType node_to_test(g, Conc, TestNodeBodyType(test_count)); 511 SinkNodeType0 sink0(g,tbb::flow::unlimited,SinkNodeBodyType0(sink0_count)); 512 SinkNodeType1 sink1(g,tbb::flow::unlimited,SinkNodeBodyType1(sink1_count)); 513 make_edge(input0, node_to_test); 514 make_edge(input1, node_to_test); 515 make_edge(AttachPoint<TestNodeType, TypeToSink0, 0, NFT>::GetSender(node_to_test), sink0); 516 make_edge(AttachPoint<TestNodeType, TypeToSink1, 1, NFT>::GetSender(node_to_test), sink1); 517 518 for(int iter = 0; iter < 2; ++iter) { // run, reset, run again 519 ss.clear(); 520 ss << saved_msg << " iter=" << iter << ", threads=" << g_NumThreads << ", throw=" << (throwException ? "T" : "F") << ", flow=" << (flog ? "T" : "F"); 521 g_Wakeup_Msg = ss.str().c_str(); 522 ResetGlobals(throwException,flog); 523 if(throwException) { 524 TRY(); 525 input0.activate(); 526 input1.activate(); 527 g.wait_for_all(); 528 CATCH_AND_ASSERT(); 529 } 530 else { 531 TRY(); 532 input0.activate(); 533 input1.activate(); 534 g.wait_for_all(); 535 CATCH_AND_FAIL(); 536 } 537 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 538 int ib0_cnt = tbb::flow::copy_body<InputNodeBodyType0>(input0).count_value(); 539 int ib1_cnt = tbb::flow::copy_body<InputNodeBodyType1>(input1).count_value(); 540 int t_cnt = tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value(); 541 int nb0_cnt = tbb::flow::copy_body<SinkNodeBodyType0>(sink0).count_value(); 542 int nb1_cnt = tbb::flow::copy_body<SinkNodeBodyType1>(sink1).count_value(); 543 if(throwException) { 544 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph"); 545 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph"); 546 CHECK_MESSAGE( (ib0_cnt + ib1_cnt <= 2*g_NumItems), "Too many items sent by inputs"); 547 CHECK_MESSAGE( (ib0_cnt + ib1_cnt >= t_cnt), "Too many items received by test node"); 548 CHECK_MESSAGE( (nb0_cnt + nb1_cnt <= t_cnt*2), "Too many items received by sink nodes"); 549 } 550 else { 551 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 552 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 553 CHECK_MESSAGE( (ib0_cnt + ib1_cnt == 2*g_NumItems), "Missing invocations of input_nodes"); 554 CHECK_MESSAGE( (t_cnt == 2*g_NumItems), "Not all items reached test node"); 555 CHECK_MESSAGE( (nb0_cnt == 2*g_NumItems && nb1_cnt == 2*g_NumItems), "Missing items in absorbers"); 556 } 557 g.reset(); // resets the body of the input_nodes, test_node and the absorb_nodes. 558 input0_count = input1_count = sink0_count = sink1_count = test_count = 0; 559 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType0>(input0).count_value()),"Reset input 0 failed"); 560 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType1>(input1).count_value()),"Reset input 1 failed"); 561 CHECK_MESSAGE( (0 == tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value()),"Reset test_node failed"); 562 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType0>(sink0).count_value()),"Reset sink 0 failed"); 563 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType1>(sink1).count_value()),"Reset sink 1 failed"); 564 565 g_Wakeup_Msg = saved_msg; 566 } 567 #if USE_TASK_SCHEDULER_OBSERVER 568 o.observe(false); 569 #endif 570 } 571 572 // Test function_node 573 // 574 // graph being tested is 575 // 576 // input_node -\ /- parallel function_node 577 // \ / 578 // +function_node+ 579 // / \ x 580 // input_node -/ \- parallel function_node 581 // 582 // After each run the graph is reset(), to test the reset functionality. 583 // 584 template< 585 TestNodeTypeEnum IType1, // does input node 1 throw? 586 TestNodeTypeEnum IType2, // does input node 2 throw? 587 class Item12, // type of item passed between inputs and test node 588 TestNodeTypeEnum FType, // does function node throw? 589 class Item23, // type passed from function_node to sink nodes 590 TestNodeTypeEnum NType1, // does sink node 1 throw? 591 TestNodeTypeEnum NType2, // does sink node 1 throw? 592 class NodePolicy, // rejecting,queueing 593 size_t Conc // is node concurrent? {serial | limited | unlimited} 594 > 595 void run_function_node_test() { 596 597 typedef test_input_body<Item12,IType1> IBodyType1; 598 typedef test_input_body<Item12,IType2> IBodyType2; 599 typedef absorber_body<Item12, Item23, FType, Conc> TestBodyType; 600 typedef absorber_body<Item23,tbb::flow::continue_msg, NType1, unlimited_type> SinkBodyType1; 601 typedef absorber_body<Item23,tbb::flow::continue_msg, NType2, unlimited_type> SinkBodyType2; 602 603 typedef tbb::flow::input_node<Item12> InputType; 604 typedef tbb::flow::function_node<Item12, Item23, NodePolicy> TestType; 605 typedef tbb::flow::function_node<Item23,tbb::flow::continue_msg> SnkType; 606 607 for(int i = 0; i < 4; ++i ) { 608 if(i != 2) { // doesn't make sense to flog a non-throwing test 609 bool doThrow = (i & 0x1) != 0; 610 bool doFlog = (i & 0x2) != 0; 611 run_one_functype_node_test< 612 /*InputNodeType*/ InputType, 613 /*InputNodeBodyType0*/ IBodyType1, 614 /*InputNodeBodyType1*/ IBodyType2, 615 /* NFT */ func_node_type, 616 /*TestNodeType*/ TestType, 617 /*TestNodeBodyType*/ TestBodyType, 618 /*TypeToSink0 */ Item23, 619 /*TypeToSink1 */ Item23, 620 /*SinkNodeType0*/ SnkType, 621 /*SinkNodeType1*/ SnkType, 622 /*SinkNodeBodyType1*/ SinkBodyType1, 623 /*SinkNodeBodyType2*/ SinkBodyType2, 624 /*Conc*/ Conc> 625 (doThrow,doFlog,"function_node"); 626 } 627 } 628 } // run_function_node_test 629 630 void test_function_node() { 631 INFO("Testing function_node\n"); 632 // serial rejecting 633 g_Wakeup_Msg = "function_node(1a): Missed wakeup or machine is overloaded?"; 634 run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>(); 635 g_Wakeup_Msg = "function_node(1b): Missed wakeup or machine is overloaded?"; 636 run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>(); 637 g_Wakeup_Msg = "function_node(1c): Missed wakeup or machine is overloaded?"; 638 run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, isThrowing, nonThrowing, tbb::flow::rejecting, serial_type>(); 639 640 // serial queueing 641 g_Wakeup_Msg = "function_node(2): Missed wakeup or machine is overloaded?"; 642 run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); 643 run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); 644 run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); 645 CheckType<int>::check_type_counter = 0; 646 run_function_node_test<nonThrowing, nonThrowing, CheckType<int>, nonThrowing, CheckType<int>, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); 647 CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test"); 648 649 // unlimited parallel rejecting 650 g_Wakeup_Msg = "function_node(3): Missed wakeup or machine is overloaded?"; 651 run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>(); 652 run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>(); 653 run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>(); 654 655 // limited parallel rejecting 656 g_Wakeup_Msg = "function_node(4): Missed wakeup or machine is overloaded?"; 657 run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, limited_type>(); 658 run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, (size_t)limited_type>(); 659 run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::rejecting, (size_t)limited_type>(); 660 661 // limited parallel queueing 662 g_Wakeup_Msg = "function_node(5): Missed wakeup or machine is overloaded?"; 663 run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>(); 664 run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>(); 665 run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::queueing, (size_t)limited_type>(); 666 667 // everyone throwing 668 g_Wakeup_Msg = "function_node(6): Missed wakeup or machine is overloaded?"; 669 run_function_node_test<isThrowing, isThrowing, int, isThrowing, int, isThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>(); 670 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 671 } 672 673 // ----------------------------------- multifunction_node ---------------------------------- 674 // Test multifunction_node. 675 // 676 // graph being tested is 677 // 678 // input_node -\ /- parallel function_node 679 // \ / 680 // +multifunction_node+ 681 // / \ x 682 // input_node -/ \- parallel function_node 683 // 684 // After each run the graph is reset(), to test the reset functionality. The 685 // multifunction_node will put an item to each successor for every item 686 // received. 687 // 688 template< 689 TestNodeTypeEnum IType0, // does input node 1 throw? 690 TestNodeTypeEnum IType1, // does input node 2 thorw? 691 class Item12, // type of item passed between inputs and test node 692 TestNodeTypeEnum FType, // does multifunction node throw? 693 class ItemTuple, // tuple of types passed from multifunction_node to sink nodes 694 TestNodeTypeEnum NType1, // does sink node 1 throw? 695 TestNodeTypeEnum NType2, // does sink node 2 throw? 696 class NodePolicy, // rejecting,queueing 697 size_t Conc // is node concurrent? {serial | limited | unlimited} 698 > 699 void run_multifunction_node_test() { 700 701 typedef typename std::tuple_element<0,ItemTuple>::type Item23Type0; 702 typedef typename std::tuple_element<1,ItemTuple>::type Item23Type1; 703 typedef test_input_body<Item12,IType0> IBodyType1; 704 typedef test_input_body<Item12,IType1> IBodyType2; 705 typedef multifunction_node_body<Item12, ItemTuple, FType, Conc> TestBodyType; 706 typedef absorber_body<Item23Type0,tbb::flow::continue_msg, NType1, unlimited_type> SinkBodyType1; 707 typedef absorber_body<Item23Type1,tbb::flow::continue_msg, NType2, unlimited_type> SinkBodyType2; 708 709 typedef tbb::flow::input_node<Item12> InputType; 710 typedef tbb::flow::multifunction_node<Item12, ItemTuple, NodePolicy> TestType; 711 typedef tbb::flow::function_node<Item23Type0,tbb::flow::continue_msg> SnkType0; 712 typedef tbb::flow::function_node<Item23Type1,tbb::flow::continue_msg> SnkType1; 713 714 for(int i = 0; i < 4; ++i ) { 715 if(i != 2) { // doesn't make sense to flog a non-throwing test 716 bool doThrow = (i & 0x1) != 0; 717 bool doFlog = (i & 0x2) != 0; 718 run_one_functype_node_test< 719 /*InputNodeType*/ InputType, 720 /*InputNodeBodyType0*/ IBodyType1, 721 /*InputNodeBodyType1*/ IBodyType2, 722 /*NFT*/ multifunc_node_type, 723 /*TestNodeType*/ TestType, 724 /*TestNodeBodyType*/ TestBodyType, 725 /*TypeToSink0*/ Item23Type0, 726 /*TypeToSink1*/ Item23Type1, 727 /*SinkNodeType0*/ SnkType0, 728 /*SinkNodeType1*/ SnkType1, 729 /*SinkNodeBodyType0*/ SinkBodyType1, 730 /*SinkNodeBodyType1*/ SinkBodyType2, 731 /*Conc*/ Conc> 732 (doThrow,doFlog,"multifunction_node"); 733 } 734 } 735 } // run_multifunction_node_test 736 737 void test_multifunction_node() { 738 INFO("Testing multifunction_node\n"); 739 g_Wakeup_Msg = "multifunction_node(input throws,rejecting,serial): Missed wakeup or machine is overloaded?"; 740 // serial rejecting 741 run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,float>, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>(); 742 g_Wakeup_Msg = "multifunction_node(test throws,rejecting,serial): Missed wakeup or machine is overloaded?"; 743 run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>(); 744 g_Wakeup_Msg = "multifunction_node(sink throws,rejecting,serial): Missed wakeup or machine is overloaded?"; 745 run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, isThrowing, nonThrowing, tbb::flow::rejecting, serial_type>(); 746 747 g_Wakeup_Msg = "multifunction_node(2): Missed wakeup or machine is overloaded?"; 748 // serial queueing 749 run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); 750 run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); 751 run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); 752 CheckType<int>::check_type_counter = 0; 753 run_multifunction_node_test<nonThrowing, nonThrowing, CheckType<int>, nonThrowing, std::tuple<CheckType<int>, CheckType<int> >, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); 754 CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test"); 755 756 g_Wakeup_Msg = "multifunction_node(3): Missed wakeup or machine is overloaded?"; 757 // unlimited parallel rejecting 758 run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>(); 759 run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>(); 760 run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>(); 761 762 g_Wakeup_Msg = "multifunction_node(4): Missed wakeup or machine is overloaded?"; 763 // limited parallel rejecting 764 run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, limited_type>(); 765 run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, (size_t)limited_type>(); 766 run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::rejecting, (size_t)limited_type>(); 767 768 g_Wakeup_Msg = "multifunction_node(5): Missed wakeup or machine is overloaded?"; 769 // limited parallel queueing 770 run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>(); 771 run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>(); 772 run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::queueing, (size_t)limited_type>(); 773 774 g_Wakeup_Msg = "multifunction_node(6): Missed wakeup or machine is overloaded?"; 775 // everyone throwing 776 run_multifunction_node_test<isThrowing, isThrowing, int, isThrowing, std::tuple<int,int>, isThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>(); 777 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 778 } 779 780 // 781 // Continue node has T predecessors. when it receives messages (continue_msg) on T predecessors 782 // it executes the body of the node, and forwards a continue_msg to its successors. 783 // However many predecessors the continue_node has, that's how many continue_msgs it receives 784 // on input before forwarding a message. 785 // 786 // The graph will look like 787 // 788 // +broadcast_node+ 789 // / \ ___ 790 // input_node+------>+broadcast_node+ +continue_node+--->+absorber 791 // \ / 792 // +broadcast_node+ 793 // 794 // The continue_node has unlimited parallelism, no input buffering, and broadcasts to successors. 795 // The absorber is parallel, so each item emitted by the input will result in one thread 796 // spinning. So for N threads we pass N-1 continue_messages, then spin wait and then throw if 797 // we are allowed to. 798 799 template < class InputNodeType, class InputNodeBodyType, class TTestNodeType, class TestNodeBodyType, 800 class SinkNodeType, class SinkNodeBodyType> 801 void run_one_continue_node_test (bool throwException, bool flog) { 802 tbb::flow::graph g; 803 804 std::atomic<int> input_count; 805 std::atomic<int> test_count; 806 std::atomic<int> sink_count; 807 input_count = test_count = sink_count = 0; 808 #if USE_TASK_SCHEDULER_OBSERVER 809 eh_test_observer o; 810 o.observe(true); 811 #endif 812 g_Master = std::this_thread::get_id(); 813 InputNodeType input(g, InputNodeBodyType(input_count)); 814 TTestNodeType node_to_test(g, TestNodeBodyType(test_count)); 815 SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count)); 816 tbb::flow::broadcast_node<tbb::flow::continue_msg> b1(g), b2(g), b3(g); 817 make_edge(input, b1); 818 make_edge(b1,b2); 819 make_edge(b1,b3); 820 make_edge(b2,node_to_test); 821 make_edge(b3,node_to_test); 822 make_edge(node_to_test, sink); 823 for(int iter = 0; iter < 2; ++iter) { 824 ResetGlobals(throwException,flog); 825 if(throwException) { 826 TRY(); 827 input.activate(); 828 g.wait_for_all(); 829 CATCH_AND_ASSERT(); 830 } 831 else { 832 TRY(); 833 input.activate(); 834 g.wait_for_all(); 835 CATCH_AND_FAIL(); 836 } 837 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 838 int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value(); 839 int t_cnt = tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value(); 840 int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(); 841 if(throwException) { 842 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph"); 843 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph"); 844 CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs"); 845 CHECK_MESSAGE( (ib_cnt >= t_cnt), "Too many items received by test node"); 846 CHECK_MESSAGE( (nb_cnt <= t_cnt), "Too many items received by sink nodes"); 847 } 848 else { 849 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 850 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 851 CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node"); 852 CHECK_MESSAGE( (t_cnt == g_NumItems), "Not all items reached test node"); 853 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers"); 854 } 855 g.reset(); // resets the body of the input_nodes, test_node and the absorb_nodes. 856 input_count = test_count = sink_count = 0; 857 CHECK_MESSAGE( (0 == (int)test_count), "Atomic wasn't reset properly"); 858 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed"); 859 CHECK_MESSAGE( (0 == tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value()),"Reset test_node failed"); 860 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed"); 861 } 862 #if USE_TASK_SCHEDULER_OBSERVER 863 o.observe(false); 864 #endif 865 } 866 867 template< 868 class ItemType, 869 TestNodeTypeEnum IType, // does input node throw? 870 TestNodeTypeEnum CType, // does continue_node throw? 871 TestNodeTypeEnum AType> // does absorber throw 872 void run_continue_node_test() { 873 typedef test_input_body<tbb::flow::continue_msg,IType> IBodyType; 874 typedef absorber_body<tbb::flow::continue_msg,ItemType,CType,unlimited_type> ContBodyType; 875 typedef absorber_body<ItemType,tbb::flow::continue_msg, AType, unlimited_type> SinkBodyType; 876 877 typedef tbb::flow::input_node<tbb::flow::continue_msg> InputType; 878 typedef tbb::flow::continue_node<ItemType> TestType; 879 typedef tbb::flow::function_node<ItemType,tbb::flow::continue_msg> SnkType; 880 881 for(int i = 0; i < 4; ++i ) { 882 if(i == 2) continue; // don't run (false,true); it doesn't make sense. 883 bool doThrow = (i & 0x1) != 0; 884 bool doFlog = (i & 0x2) != 0; 885 run_one_continue_node_test< 886 /*InputNodeType*/ InputType, 887 /*InputNodeBodyType*/ IBodyType, 888 /*TestNodeType*/ TestType, 889 /*TestNodeBodyType*/ ContBodyType, 890 /*SinkNodeType*/ SnkType, 891 /*SinkNodeBodyType*/ SinkBodyType> 892 (doThrow,doFlog); 893 } 894 } 895 896 // 897 void test_continue_node() { 898 INFO("Testing continue_node\n"); 899 g_Wakeup_Msg = "buffer_node(non,is,non): Missed wakeup or machine is overloaded?"; 900 run_continue_node_test<int,nonThrowing,isThrowing,nonThrowing>(); 901 g_Wakeup_Msg = "buffer_node(non,non,is): Missed wakeup or machine is overloaded?"; 902 run_continue_node_test<int,nonThrowing,nonThrowing,isThrowing>(); 903 g_Wakeup_Msg = "buffer_node(is,non,non): Missed wakeup or machine is overloaded?"; 904 run_continue_node_test<int,isThrowing,nonThrowing,nonThrowing>(); 905 g_Wakeup_Msg = "buffer_node(is,is,is): Missed wakeup or machine is overloaded?"; 906 run_continue_node_test<int,isThrowing,isThrowing,isThrowing>(); 907 CheckType<double>::check_type_counter = 0; 908 run_continue_node_test<CheckType<double>,isThrowing,isThrowing,isThrowing>(); 909 CHECK_MESSAGE( (!CheckType<double>::check_type_counter), "Dropped objects in continue_node test"); 910 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 911 } 912 913 // ---------- buffer_node queue_node overwrite_node -------------- 914 915 template< 916 class BufferItemType, // 917 class InputNodeType, 918 class InputNodeBodyType, 919 class TestNodeType, 920 class SinkNodeType, 921 class SinkNodeBodyType > 922 void run_one_buffer_node_test(bool throwException,bool flog) { 923 tbb::flow::graph g; 924 925 std::atomic<int> input_count; 926 std::atomic<int> sink_count; 927 input_count = sink_count = 0; 928 #if USE_TASK_SCHEDULER_OBSERVER 929 eh_test_observer o; 930 o.observe(true); 931 #endif 932 g_Master = std::this_thread::get_id(); 933 InputNodeType input(g, InputNodeBodyType(input_count)); 934 TestNodeType node_to_test(g); 935 SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count)); 936 make_edge(input,node_to_test); 937 make_edge(node_to_test, sink); 938 for(int iter = 0; iter < 2; ++iter) { 939 ResetGlobals(throwException,flog); 940 if(throwException) { 941 TRY(); 942 input.activate(); 943 g.wait_for_all(); 944 CATCH_AND_ASSERT(); 945 } 946 else { 947 TRY(); 948 input.activate(); 949 g.wait_for_all(); 950 CATCH_AND_FAIL(); 951 } 952 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 953 int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value(); 954 int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(); 955 if(throwException) { 956 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph"); 957 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph"); 958 CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs"); 959 CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes"); 960 } 961 else { 962 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 963 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 964 CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node"); 965 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers"); 966 } 967 if(iter == 0) { 968 remove_edge(node_to_test, sink); 969 node_to_test.try_put(BufferItemType()); 970 g.wait_for_all(); 971 g.reset(); 972 input_count = sink_count = 0; 973 BufferItemType tmp; 974 CHECK_MESSAGE( (!node_to_test.try_get(tmp)), "node not empty"); 975 make_edge(node_to_test, sink); 976 g.wait_for_all(); 977 } 978 else { 979 g.reset(); 980 input_count = sink_count = 0; 981 } 982 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed"); 983 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed"); 984 } 985 986 #if USE_TASK_SCHEDULER_OBSERVER 987 o.observe(false); 988 #endif 989 } 990 template<class BufferItemType, 991 TestNodeTypeEnum InputThrowType, 992 TestNodeTypeEnum SinkThrowType> 993 void run_buffer_queue_and_overwrite_node_test() { 994 typedef test_input_body<BufferItemType,InputThrowType> InputBodyType; 995 typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType; 996 997 typedef tbb::flow::input_node<BufferItemType> InputType; 998 typedef tbb::flow::buffer_node<BufferItemType> BufType; 999 typedef tbb::flow::queue_node<BufferItemType> QueType; 1000 typedef tbb::flow::overwrite_node<BufferItemType> OvrType; 1001 typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType; 1002 1003 for(int i = 0; i < 4; ++i) { 1004 if(i == 2) continue; // no need to test flog w/o throws 1005 bool throwException = (i & 0x1) != 0; 1006 bool doFlog = (i & 0x2) != 0; 1007 run_one_buffer_node_test< 1008 /* class BufferItemType*/ BufferItemType, 1009 /*class InputNodeType*/ InputType, 1010 /*class InputNodeBodyType*/ InputBodyType, 1011 /*class TestNodeType*/ BufType, 1012 /*class SinkNodeType*/ SnkType, 1013 /*class SinkNodeBodyType*/ SinkBodyType 1014 >(throwException, doFlog); 1015 run_one_buffer_node_test< 1016 /* class BufferItemType*/ BufferItemType, 1017 /*class InputNodeType*/ InputType, 1018 /*class InputNodeBodyType*/ InputBodyType, 1019 /*class TestNodeType*/ QueType, 1020 /*class SinkNodeType*/ SnkType, 1021 /*class SinkNodeBodyType*/ SinkBodyType 1022 >(throwException, doFlog); 1023 run_one_buffer_node_test< 1024 /* class BufferItemType*/ BufferItemType, 1025 /*class InputNodeType*/ InputType, 1026 /*class InputNodeBodyType*/ InputBodyType, 1027 /*class TestNodeType*/ OvrType, 1028 /*class SinkNodeType*/ SnkType, 1029 /*class SinkNodeBodyType*/ SinkBodyType 1030 >(throwException, doFlog); 1031 } 1032 } 1033 1034 void test_buffer_queue_and_overwrite_node() { 1035 INFO("Testing buffer_node, queue_node and overwrite_node\n"); 1036 g_Wakeup_Msg = "buffer, queue, overwrite(is,non): Missed wakeup or machine is overloaded?"; 1037 run_buffer_queue_and_overwrite_node_test<int,isThrowing,nonThrowing>(); 1038 g_Wakeup_Msg = "buffer, queue, overwrite(non,is): Missed wakeup or machine is overloaded?"; 1039 run_buffer_queue_and_overwrite_node_test<int,nonThrowing,isThrowing>(); 1040 g_Wakeup_Msg = "buffer, queue, overwrite(is,is): Missed wakeup or machine is overloaded?"; 1041 run_buffer_queue_and_overwrite_node_test<int,isThrowing,isThrowing>(); 1042 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 1043 } 1044 1045 // ---------- sequencer_node ------------------------- 1046 1047 1048 template< 1049 class BufferItemType, // 1050 class InputNodeType, 1051 class InputNodeBodyType, 1052 class TestNodeType, 1053 class SeqBodyType, 1054 class SinkNodeType, 1055 class SinkNodeBodyType > 1056 void run_one_sequencer_node_test(bool throwException,bool flog) { 1057 tbb::flow::graph g; 1058 1059 std::atomic<int> input_count; 1060 std::atomic<int> sink_count; 1061 input_count = sink_count = 0; 1062 #if USE_TASK_SCHEDULER_OBSERVER 1063 eh_test_observer o; 1064 o.observe(true); 1065 #endif 1066 g_Master = std::this_thread::get_id(); 1067 InputNodeType input(g, InputNodeBodyType(input_count)); 1068 TestNodeType node_to_test(g,SeqBodyType()); 1069 SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count)); 1070 make_edge(input,node_to_test); 1071 make_edge(node_to_test, sink); 1072 for(int iter = 0; iter < 2; ++iter) { 1073 ResetGlobals(throwException,flog); 1074 if(throwException) { 1075 TRY(); 1076 input.activate(); 1077 g.wait_for_all(); 1078 CATCH_AND_ASSERT(); 1079 } 1080 else { 1081 TRY(); 1082 input.activate(); 1083 g.wait_for_all(); 1084 CATCH_AND_FAIL(); 1085 } 1086 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 1087 int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value(); 1088 int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(); 1089 if(throwException) { 1090 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph"); 1091 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph"); 1092 CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs"); 1093 CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes"); 1094 } 1095 else { 1096 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 1097 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 1098 CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node"); 1099 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers"); 1100 } 1101 if(iter == 0) { 1102 remove_edge(node_to_test, sink); 1103 node_to_test.try_put(BufferItemType(g_NumItems + 1)); 1104 node_to_test.try_put(BufferItemType(1)); 1105 g.wait_for_all(); 1106 g.reset(); 1107 input_count = sink_count = 0; 1108 make_edge(node_to_test, sink); 1109 g.wait_for_all(); 1110 } 1111 else { 1112 g.reset(); 1113 input_count = sink_count = 0; 1114 } 1115 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed"); 1116 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed"); 1117 } 1118 1119 #if USE_TASK_SCHEDULER_OBSERVER 1120 o.observe(false); 1121 #endif 1122 } 1123 1124 template<class BufferItemType, 1125 TestNodeTypeEnum InputThrowType, 1126 TestNodeTypeEnum SinkThrowType> 1127 void run_sequencer_node_test() { 1128 typedef test_input_body<BufferItemType,InputThrowType> InputBodyType; 1129 typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType; 1130 typedef sequencer_body<BufferItemType> SeqBodyType; 1131 1132 typedef tbb::flow::input_node<BufferItemType> InputType; 1133 typedef tbb::flow::sequencer_node<BufferItemType> SeqType; 1134 typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType; 1135 1136 for(int i = 0; i < 4; ++i) { 1137 if(i == 2) continue; // no need to test flog w/o throws 1138 bool throwException = (i & 0x1) != 0; 1139 bool doFlog = (i & 0x2) != 0; 1140 run_one_sequencer_node_test< 1141 /* class BufferItemType*/ BufferItemType, 1142 /*class InputNodeType*/ InputType, 1143 /*class InputNodeBodyType*/ InputBodyType, 1144 /*class TestNodeType*/ SeqType, 1145 /*class SeqBodyType*/ SeqBodyType, 1146 /*class SinkNodeType*/ SnkType, 1147 /*class SinkNodeBodyType*/ SinkBodyType 1148 >(throwException, doFlog); 1149 } 1150 } 1151 1152 1153 1154 void test_sequencer_node() { 1155 INFO("Testing sequencer_node\n"); 1156 g_Wakeup_Msg = "sequencer_node(is,non): Missed wakeup or machine is overloaded?"; 1157 run_sequencer_node_test<int, isThrowing,nonThrowing>(); 1158 CheckType<int>::check_type_counter = 0; 1159 g_Wakeup_Msg = "sequencer_node(non,is): Missed wakeup or machine is overloaded?"; 1160 run_sequencer_node_test<CheckType<int>, nonThrowing,isThrowing>(); 1161 CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Dropped objects in sequencer_node test"); 1162 g_Wakeup_Msg = "sequencer_node(is,is): Missed wakeup or machine is overloaded?"; 1163 run_sequencer_node_test<int, isThrowing,isThrowing>(); 1164 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 1165 } 1166 1167 // ------------ priority_queue_node ------------------ 1168 1169 template< 1170 class BufferItemType, 1171 class InputNodeType, 1172 class InputNodeBodyType, 1173 class TestNodeType, 1174 class SinkNodeType, 1175 class SinkNodeBodyType > 1176 void run_one_priority_queue_node_test(bool throwException,bool flog) { 1177 tbb::flow::graph g; 1178 1179 std::atomic<int> input_count; 1180 std::atomic<int> sink_count; 1181 input_count = sink_count = 0; 1182 #if USE_TASK_SCHEDULER_OBSERVER 1183 eh_test_observer o; 1184 o.observe(true); 1185 #endif 1186 g_Master = std::this_thread::get_id(); 1187 InputNodeType input(g, InputNodeBodyType(input_count)); 1188 1189 TestNodeType node_to_test(g); 1190 1191 SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count)); 1192 1193 make_edge(input,node_to_test); 1194 make_edge(node_to_test, sink); 1195 for(int iter = 0; iter < 2; ++iter) { 1196 ResetGlobals(throwException,flog); 1197 if(throwException) { 1198 TRY(); 1199 input.activate(); 1200 g.wait_for_all(); 1201 CATCH_AND_ASSERT(); 1202 } 1203 else { 1204 TRY(); 1205 input.activate(); 1206 g.wait_for_all(); 1207 CATCH_AND_FAIL(); 1208 } 1209 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 1210 int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value(); 1211 int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(); 1212 if(throwException) { 1213 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph"); 1214 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph"); 1215 CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs"); 1216 CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes"); 1217 } 1218 else { 1219 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 1220 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 1221 CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node"); 1222 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers"); 1223 } 1224 if(iter == 0) { 1225 remove_edge(node_to_test, sink); 1226 node_to_test.try_put(BufferItemType(g_NumItems + 1)); 1227 node_to_test.try_put(BufferItemType(g_NumItems + 2)); 1228 node_to_test.try_put(BufferItemType()); 1229 g.wait_for_all(); 1230 g.reset(); 1231 input_count = sink_count = 0; 1232 make_edge(node_to_test, sink); 1233 g.wait_for_all(); 1234 } 1235 else { 1236 g.reset(); 1237 input_count = sink_count = 0; 1238 } 1239 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed"); 1240 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed"); 1241 } 1242 1243 #if USE_TASK_SCHEDULER_OBSERVER 1244 o.observe(false); 1245 #endif 1246 } 1247 1248 template<class BufferItemType, 1249 TestNodeTypeEnum InputThrowType, 1250 TestNodeTypeEnum SinkThrowType> 1251 void run_priority_queue_node_test() { 1252 typedef test_input_body<BufferItemType,InputThrowType> InputBodyType; 1253 typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType; 1254 typedef less_body<BufferItemType> LessBodyType; 1255 1256 typedef tbb::flow::input_node<BufferItemType> InputType; 1257 typedef tbb::flow::priority_queue_node<BufferItemType,LessBodyType> PrqType; 1258 typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType; 1259 1260 for(int i = 0; i < 4; ++i) { 1261 if(i == 2) continue; // no need to test flog w/o throws 1262 bool throwException = (i & 0x1) != 0; 1263 bool doFlog = (i & 0x2) != 0; 1264 run_one_priority_queue_node_test< 1265 /* class BufferItemType*/ BufferItemType, 1266 /*class InputNodeType*/ InputType, 1267 /*class InputNodeBodyType*/ InputBodyType, 1268 /*class TestNodeType*/ PrqType, 1269 /*class SinkNodeType*/ SnkType, 1270 /*class SinkNodeBodyType*/ SinkBodyType 1271 >(throwException, doFlog); 1272 } 1273 } 1274 1275 void test_priority_queue_node() { 1276 INFO("Testing priority_queue_node\n"); 1277 g_Wakeup_Msg = "priority_queue_node(is,non): Missed wakeup or machine is overloaded?"; 1278 run_priority_queue_node_test<int, isThrowing,nonThrowing>(); 1279 CheckType<int>::check_type_counter = 0; 1280 g_Wakeup_Msg = "priority_queue_node(non,is): Missed wakeup or machine is overloaded?"; 1281 run_priority_queue_node_test<CheckType<int>, nonThrowing,isThrowing>(); 1282 CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Dropped objects in priority_queue_node test"); 1283 g_Wakeup_Msg = "priority_queue_node(is,is): Missed wakeup or machine is overloaded?"; 1284 run_priority_queue_node_test<int, isThrowing,isThrowing>(); 1285 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 1286 } 1287 1288 // ------------------- join_node ---------------- 1289 template<class JP> struct graph_policy_name{ 1290 static const char* name() {return "unknown"; } 1291 }; 1292 template<> struct graph_policy_name<tbb::flow::queueing> { 1293 static const char* name() {return "queueing"; } 1294 }; 1295 template<> struct graph_policy_name<tbb::flow::reserving> { 1296 static const char* name() {return "reserving"; } 1297 }; 1298 template<> struct graph_policy_name<tbb::flow::tag_matching> { 1299 static const char* name() {return "tag_matching"; } 1300 }; 1301 1302 1303 template< 1304 class JP, 1305 class OutputTuple, 1306 class InputType0, 1307 class InputBodyType0, 1308 class InputType1, 1309 class InputBodyType1, 1310 class TestJoinType, 1311 class SinkType, 1312 class SinkBodyType 1313 > 1314 struct run_one_join_node_test { 1315 run_one_join_node_test() {} 1316 static void execute_test(bool throwException,bool flog) { 1317 typedef typename std::tuple_element<0,OutputTuple>::type ItemType0; 1318 typedef typename std::tuple_element<1,OutputTuple>::type ItemType1; 1319 1320 tbb::flow::graph g; 1321 std::atomic<int>input0_count; 1322 std::atomic<int>input1_count; 1323 std::atomic<int>sink_count; 1324 input0_count = input1_count = sink_count = 0; 1325 #if USE_TASK_SCHEDULER_OBSERVER 1326 eh_test_observer o; 1327 o.observe(true); 1328 #endif 1329 g_Master = std::this_thread::get_id(); 1330 InputType0 input0(g, InputBodyType0(input0_count)); 1331 InputType1 input1(g, InputBodyType1(input1_count)); 1332 TestJoinType node_to_test(g); 1333 SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count)); 1334 make_edge(input0,tbb::flow::input_port<0>(node_to_test)); 1335 make_edge(input1,tbb::flow::input_port<1>(node_to_test)); 1336 make_edge(node_to_test, sink); 1337 for(int iter = 0; iter < 2; ++iter) { 1338 ResetGlobals(throwException,flog); 1339 if(throwException) { 1340 TRY(); 1341 input0.activate(); 1342 input1.activate(); 1343 g.wait_for_all(); 1344 CATCH_AND_ASSERT(); 1345 } 1346 else { 1347 TRY(); 1348 input0.activate(); 1349 input1.activate(); 1350 g.wait_for_all(); 1351 CATCH_AND_FAIL(); 1352 } 1353 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 1354 int ib0_cnt = tbb::flow::copy_body<InputBodyType0>(input0).count_value(); 1355 int ib1_cnt = tbb::flow::copy_body<InputBodyType1>(input1).count_value(); 1356 int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value(); 1357 if(throwException) { 1358 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph"); 1359 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph"); 1360 CHECK_MESSAGE( (ib0_cnt <= g_NumItems && ib1_cnt <= g_NumItems), "Too many items sent by inputs"); 1361 CHECK_MESSAGE( (nb_cnt <= ((ib0_cnt < ib1_cnt) ? ib0_cnt : ib1_cnt)), "Too many items received by sink nodes"); 1362 } 1363 else { 1364 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 1365 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 1366 if(ib0_cnt != g_NumItems) { 1367 // INFO("throwException == %s\n" << (throwException ? "true" : "false")); 1368 // INFO("iter == " << iter << "\n"); 1369 // INFO("ib0_cnt == " << ib0_cnt << "\n"); 1370 // INFO("g_NumItems == " << g_NumItems << "\n"); 1371 } 1372 CHECK_MESSAGE( (ib0_cnt == g_NumItems), "Missing invocations of input_node0"); // this one 1373 CHECK_MESSAGE( (ib1_cnt == g_NumItems), "Missing invocations of input_node1"); 1374 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers"); 1375 } 1376 if(iter == 0) { 1377 remove_edge(node_to_test, sink); 1378 tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 1)); 1379 tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2)); 1380 g.wait_for_all(); 1381 g.reset(); 1382 input0_count = input1_count = sink_count = 0; 1383 make_edge(node_to_test, sink); 1384 g.wait_for_all(); 1385 } 1386 else { 1387 g.wait_for_all(); 1388 g.reset(); 1389 input0_count = input1_count = sink_count = 0; 1390 } 1391 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType0>(input0).count_value()),"Reset input failed"); 1392 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType1>(input1).count_value()),"Reset input failed"); 1393 nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value(); 1394 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value()),"Reset sink failed"); 1395 } 1396 1397 #if USE_TASK_SCHEDULER_OBSERVER 1398 o.observe(false); 1399 #endif 1400 } 1401 }; // run_one_join_node_test 1402 1403 template< 1404 class OutputTuple, 1405 class InputType0, 1406 class InputBodyType0, 1407 class InputType1, 1408 class InputBodyType1, 1409 class TestJoinType, 1410 class SinkType, 1411 class SinkBodyType 1412 > 1413 struct run_one_join_node_test< 1414 tbb::flow::tag_matching, 1415 OutputTuple, 1416 InputType0, 1417 InputBodyType0, 1418 InputType1, 1419 InputBodyType1, 1420 TestJoinType, 1421 SinkType, 1422 SinkBodyType 1423 > { 1424 run_one_join_node_test() {} 1425 static void execute_test(bool throwException,bool flog) { 1426 typedef typename std::tuple_element<0,OutputTuple>::type ItemType0; 1427 typedef typename std::tuple_element<1,OutputTuple>::type ItemType1; 1428 1429 tbb::flow::graph g; 1430 1431 std::atomic<int>input0_count; 1432 std::atomic<int>input1_count; 1433 std::atomic<int>sink_count; 1434 input0_count = input1_count = sink_count = 0; 1435 #if USE_TASK_SCHEDULER_OBSERVER 1436 eh_test_observer o; 1437 o.observe(true); 1438 #endif 1439 g_Master = std::this_thread::get_id(); 1440 InputType0 input0(g, InputBodyType0(input0_count, 2)); 1441 InputType1 input1(g, InputBodyType1(input1_count, 3)); 1442 TestJoinType node_to_test(g, tag_func<ItemType0>(ItemType0(2)), tag_func<ItemType1>(ItemType1(3))); 1443 SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count)); 1444 make_edge(input0,tbb::flow::input_port<0>(node_to_test)); 1445 make_edge(input1,tbb::flow::input_port<1>(node_to_test)); 1446 make_edge(node_to_test, sink); 1447 for(int iter = 0; iter < 2; ++iter) { 1448 ResetGlobals(throwException,flog); 1449 if(throwException) { 1450 TRY(); 1451 input0.activate(); 1452 input1.activate(); 1453 g.wait_for_all(); 1454 CATCH_AND_ASSERT(); 1455 } 1456 else { 1457 TRY(); 1458 input0.activate(); 1459 input1.activate(); 1460 g.wait_for_all(); 1461 CATCH_AND_FAIL(); 1462 } 1463 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 1464 int ib0_cnt = tbb::flow::copy_body<InputBodyType0>(input0).count_value(); 1465 int ib1_cnt = tbb::flow::copy_body<InputBodyType1>(input1).count_value(); 1466 int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value(); 1467 if(throwException) { 1468 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph"); 1469 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph"); 1470 CHECK_MESSAGE( (ib0_cnt <= g_NumItems && ib1_cnt <= g_NumItems), "Too many items sent by inputs"); 1471 CHECK_MESSAGE( (nb_cnt <= ((ib0_cnt < ib1_cnt) ? ib0_cnt : ib1_cnt)), "Too many items received by sink nodes"); 1472 } 1473 else { 1474 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 1475 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 1476 CHECK_MESSAGE( (ib0_cnt == g_NumItems), "Missing invocations of input_node0"); 1477 CHECK_MESSAGE( (ib1_cnt == g_NumItems), "Missing invocations of input_node1"); 1478 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers"); 1479 } 1480 if(iter == 0) { 1481 remove_edge(node_to_test, sink); 1482 tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 4)); 1483 tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2)); 1484 g.wait_for_all(); // have to wait for the graph to stop again.... 1485 g.reset(); // resets the body of the input_nodes, test_node and the absorb_nodes. 1486 input0_count = input1_count = sink_count = 0; 1487 make_edge(node_to_test, sink); 1488 g.wait_for_all(); // have to wait for the graph to stop again.... 1489 } 1490 else { 1491 g.wait_for_all(); 1492 g.reset(); 1493 input0_count = input1_count = sink_count = 0; 1494 } 1495 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType0>(input0).count_value()),"Reset input failed"); 1496 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType1>(input1).count_value()),"Reset input failed"); 1497 nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value(); 1498 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value()),"Reset sink failed"); 1499 } 1500 1501 #if USE_TASK_SCHEDULER_OBSERVER 1502 o.observe(false); 1503 #endif 1504 } 1505 }; // run_one_join_node_test<tag_matching> 1506 1507 template<class JP, class OutputTuple, 1508 TestNodeTypeEnum InputThrowType, 1509 TestNodeTypeEnum SinkThrowType> 1510 void run_join_node_test() { 1511 typedef typename std::tuple_element<0,OutputTuple>::type ItemType0; 1512 typedef typename std::tuple_element<1,OutputTuple>::type ItemType1; 1513 typedef test_input_body<ItemType0,InputThrowType> InputBodyType0; 1514 typedef test_input_body<ItemType1,InputThrowType> InputBodyType1; 1515 typedef absorber_body<OutputTuple,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType; 1516 1517 typedef typename tbb::flow::input_node<ItemType0> InputType0; 1518 typedef typename tbb::flow::input_node<ItemType1> InputType1; 1519 typedef typename tbb::flow::join_node<OutputTuple,JP> TestJoinType; 1520 typedef typename tbb::flow::function_node<OutputTuple,tbb::flow::continue_msg> SinkType; 1521 1522 for(int i = 0; i < 4; ++i) { 1523 if(2 == i) continue; 1524 bool throwException = (i & 0x1) != 0; 1525 bool doFlog = (i & 0x2) != 0; 1526 run_one_join_node_test< 1527 JP, 1528 OutputTuple, 1529 InputType0, 1530 InputBodyType0, 1531 InputType1, 1532 InputBodyType1, 1533 TestJoinType, 1534 SinkType, 1535 SinkBodyType>::execute_test(throwException,doFlog); 1536 } 1537 } 1538 1539 template<class JP> 1540 void test_join_node() { 1541 INFO("Testing join_node<" << graph_policy_name<JP>::name() << ">\n"); 1542 // only doing two-input joins 1543 g_Wakeup_Msg = "join(is,non): Missed wakeup or machine is overloaded?"; 1544 run_join_node_test<JP, std::tuple<int,int>, isThrowing, nonThrowing>(); 1545 CheckType<int>::check_type_counter = 0; 1546 g_Wakeup_Msg = "join(non,is): Missed wakeup or machine is overloaded?"; 1547 run_join_node_test<JP, std::tuple<CheckType<int>,int>, nonThrowing, isThrowing>(); 1548 CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Dropped items in test"); 1549 g_Wakeup_Msg = "join(is,is): Missed wakeup or machine is overloaded?"; 1550 run_join_node_test<JP, std::tuple<int,int>, isThrowing, isThrowing>(); 1551 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 1552 } 1553 1554 // ------------------- limiter_node ------------- 1555 1556 template< 1557 class BufferItemType, // 1558 class InputNodeType, 1559 class InputNodeBodyType, 1560 class TestNodeType, 1561 class SinkNodeType, 1562 class SinkNodeBodyType > 1563 void run_one_limiter_node_test(bool throwException,bool flog) { 1564 tbb::flow::graph g; 1565 1566 std::atomic<int> input_count; 1567 std::atomic<int> sink_count; 1568 input_count = sink_count = 0; 1569 #if USE_TASK_SCHEDULER_OBSERVER 1570 eh_test_observer o; 1571 o.observe(true); 1572 #endif 1573 g_Master = std::this_thread::get_id(); 1574 InputNodeType input(g, InputNodeBodyType(input_count)); 1575 TestNodeType node_to_test(g,g_NumThreads + 1); 1576 SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count)); 1577 make_edge(input,node_to_test); 1578 make_edge(node_to_test, sink); 1579 for(int iter = 0; iter < 2; ++iter) { 1580 ResetGlobals(throwException,flog); 1581 if(throwException) { 1582 TRY(); 1583 input.activate(); 1584 g.wait_for_all(); 1585 CATCH_AND_ASSERT(); 1586 } 1587 else { 1588 TRY(); 1589 input.activate(); 1590 g.wait_for_all(); 1591 CATCH_AND_FAIL(); 1592 } 1593 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 1594 int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value(); 1595 int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(); 1596 if(throwException) { 1597 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph"); 1598 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph"); 1599 CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs"); 1600 CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes"); 1601 } 1602 else { 1603 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 1604 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 1605 // we stop after limiter's limit, which is g_NumThreads + 1. The input_node 1606 // is invoked one extra time, filling its buffer, so its limit is g_NumThreads + 2. 1607 CHECK_MESSAGE( (ib_cnt == g_NumThreads + 2), "Missing invocations of input_node"); 1608 CHECK_MESSAGE( (nb_cnt == g_NumThreads + 1), "Missing items in absorbers"); 1609 } 1610 if(iter == 0) { 1611 remove_edge(node_to_test, sink); 1612 node_to_test.try_put(BufferItemType()); 1613 node_to_test.try_put(BufferItemType()); 1614 g.wait_for_all(); 1615 g.reset(); 1616 input_count = sink_count = 0; 1617 BufferItemType tmp; 1618 CHECK_MESSAGE( (!node_to_test.try_get(tmp)), "node not empty"); 1619 make_edge(node_to_test, sink); 1620 g.wait_for_all(); 1621 } 1622 else { 1623 g.reset(); 1624 input_count = sink_count = 0; 1625 } 1626 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed"); 1627 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed"); 1628 } 1629 1630 #if USE_TASK_SCHEDULER_OBSERVER 1631 o.observe(false); 1632 #endif 1633 } 1634 1635 template<class BufferItemType, 1636 TestNodeTypeEnum InputThrowType, 1637 TestNodeTypeEnum SinkThrowType> 1638 void run_limiter_node_test() { 1639 typedef test_input_body<BufferItemType,InputThrowType> InputBodyType; 1640 typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType; 1641 1642 typedef tbb::flow::input_node<BufferItemType> InputType; 1643 typedef tbb::flow::limiter_node<BufferItemType> LmtType; 1644 typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType; 1645 1646 for(int i = 0; i < 4; ++i) { 1647 if(i == 2) continue; // no need to test flog w/o throws 1648 bool throwException = (i & 0x1) != 0; 1649 bool doFlog = (i & 0x2) != 0; 1650 run_one_limiter_node_test< 1651 /* class BufferItemType*/ BufferItemType, 1652 /*class InputNodeType*/ InputType, 1653 /*class InputNodeBodyType*/ InputBodyType, 1654 /*class TestNodeType*/ LmtType, 1655 /*class SinkNodeType*/ SnkType, 1656 /*class SinkNodeBodyType*/ SinkBodyType 1657 >(throwException, doFlog); 1658 } 1659 } 1660 1661 void test_limiter_node() { 1662 INFO("Testing limiter_node\n"); 1663 g_Wakeup_Msg = "limiter_node(is,non): Missed wakeup or machine is overloaded?"; 1664 run_limiter_node_test<int,isThrowing,nonThrowing>(); 1665 g_Wakeup_Msg = "limiter_node(non,is): Missed wakeup or machine is overloaded?"; 1666 run_limiter_node_test<int,nonThrowing,isThrowing>(); 1667 g_Wakeup_Msg = "limiter_node(is,is): Missed wakeup or machine is overloaded?"; 1668 run_limiter_node_test<int,isThrowing,isThrowing>(); 1669 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 1670 } 1671 1672 // -------- split_node -------------------- 1673 1674 template< 1675 class InputTuple, 1676 class InputType, 1677 class InputBodyType, 1678 class TestSplitType, 1679 class SinkType0, 1680 class SinkBodyType0, 1681 class SinkType1, 1682 class SinkBodyType1> 1683 void run_one_split_node_test(bool throwException, bool flog) { 1684 1685 tbb::flow::graph g; 1686 1687 std::atomic<int> input_count; 1688 std::atomic<int> sink0_count; 1689 std::atomic<int> sink1_count; 1690 input_count = sink0_count = sink1_count = 0; 1691 #if USE_TASK_SCHEDULER_OBSERVER 1692 eh_test_observer o; 1693 o.observe(true); 1694 #endif 1695 1696 g_Master = std::this_thread::get_id(); 1697 InputType input(g, InputBodyType(input_count)); 1698 TestSplitType node_to_test(g); 1699 SinkType0 sink0(g,tbb::flow::unlimited,SinkBodyType0(sink0_count)); 1700 SinkType1 sink1(g,tbb::flow::unlimited,SinkBodyType1(sink1_count)); 1701 make_edge(input, node_to_test); 1702 make_edge(tbb::flow::output_port<0>(node_to_test), sink0); 1703 make_edge(tbb::flow::output_port<1>(node_to_test), sink1); 1704 1705 for(int iter = 0; iter < 2; ++iter) { // run, reset, run again 1706 ResetGlobals(throwException,flog); 1707 if(throwException) { 1708 TRY(); 1709 input.activate(); 1710 g.wait_for_all(); 1711 CATCH_AND_ASSERT(); 1712 } 1713 else { 1714 TRY(); 1715 input.activate(); 1716 g.wait_for_all(); 1717 CATCH_AND_FAIL(); 1718 } 1719 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 1720 int ib_cnt = tbb::flow::copy_body<InputBodyType>(input).count_value(); 1721 int nb0_cnt = tbb::flow::copy_body<SinkBodyType0>(sink0).count_value(); 1722 int nb1_cnt = tbb::flow::copy_body<SinkBodyType1>(sink1).count_value(); 1723 if(throwException) { 1724 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph"); 1725 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph"); 1726 CHECK_MESSAGE( (ib_cnt <= 2*g_NumItems), "Too many items sent by input"); 1727 CHECK_MESSAGE( (nb0_cnt + nb1_cnt <= ib_cnt*2), "Too many items received by sink nodes"); 1728 } 1729 else { 1730 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 1731 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 1732 CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_nodes"); 1733 CHECK_MESSAGE( (nb0_cnt == g_NumItems && nb1_cnt == g_NumItems), "Missing items in absorbers"); 1734 } 1735 g.reset(); // resets the body of the input_nodes and the absorb_nodes. 1736 input_count = sink0_count = sink1_count = 0; 1737 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType>(input).count_value()),"Reset input failed"); 1738 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType0>(sink0).count_value()),"Reset sink 0 failed"); 1739 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType1>(sink1).count_value()),"Reset sink 1 failed"); 1740 } 1741 #if USE_TASK_SCHEDULER_OBSERVER 1742 o.observe(false); 1743 #endif 1744 } 1745 1746 template<class InputTuple, 1747 TestNodeTypeEnum InputThrowType, 1748 TestNodeTypeEnum SinkThrowType> 1749 void run_split_node_test() { 1750 typedef typename std::tuple_element<0,InputTuple>::type ItemType0; 1751 typedef typename std::tuple_element<1,InputTuple>::type ItemType1; 1752 typedef tuple_test_input_body<InputTuple,InputThrowType> InputBodyType; 1753 typedef absorber_body<ItemType0,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType0; 1754 typedef absorber_body<ItemType1,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType1; 1755 1756 typedef typename tbb::flow::input_node<InputTuple> InputType; 1757 typedef typename tbb::flow::split_node<InputTuple> TestSplitType; 1758 typedef typename tbb::flow::function_node<ItemType0,tbb::flow::continue_msg> SinkType0; 1759 typedef typename tbb::flow::function_node<ItemType1,tbb::flow::continue_msg> SinkType1; 1760 1761 for(int i = 0; i < 4; ++i) { 1762 if(2 == i) continue; 1763 bool throwException = (i & 0x1) != 0; 1764 bool doFlog = (i & 0x2) != 0; 1765 run_one_split_node_test< 1766 InputTuple, 1767 InputType, 1768 InputBodyType, 1769 TestSplitType, 1770 SinkType0, 1771 SinkBodyType0, 1772 SinkType1, 1773 SinkBodyType1> 1774 (throwException,doFlog); 1775 } 1776 } 1777 1778 void test_split_node() { 1779 INFO("Testing split_node\n"); 1780 g_Wakeup_Msg = "split_node(is,non): Missed wakeup or machine is overloaded?"; 1781 run_split_node_test<std::tuple<int,int>, isThrowing, nonThrowing>(); 1782 g_Wakeup_Msg = "split_node(non,is): Missed wakeup or machine is overloaded?"; 1783 run_split_node_test<std::tuple<int,int>, nonThrowing, isThrowing>(); 1784 g_Wakeup_Msg = "split_node(is,is): Missed wakeup or machine is overloaded?"; 1785 run_split_node_test<std::tuple<int,int>, isThrowing, isThrowing>(); 1786 g_Wakeup_Msg = g_Orig_Wakeup_Msg; 1787 } 1788 1789 // --------- indexer_node ---------------------- 1790 1791 template < class InputTuple, 1792 class InputType0, 1793 class InputBodyType0, 1794 class InputType1, 1795 class InputBodyType1, 1796 class TestNodeType, 1797 class SinkType, 1798 class SinkBodyType> 1799 void run_one_indexer_node_test(bool throwException,bool flog) { 1800 typedef typename std::tuple_element<0,InputTuple>::type ItemType0; 1801 typedef typename std::tuple_element<1,InputTuple>::type ItemType1; 1802 1803 tbb::flow::graph g; 1804 1805 std::atomic<int> input0_count; 1806 std::atomic<int> input1_count; 1807 std::atomic<int> sink_count; 1808 input0_count = input1_count = sink_count = 0; 1809 #if USE_TASK_SCHEDULER_OBSERVER 1810 eh_test_observer o; 1811 o.observe(true); 1812 #endif 1813 g_Master = std::this_thread::get_id(); 1814 InputType0 input0(g, InputBodyType0(input0_count)); 1815 InputType1 input1(g, InputBodyType1(input1_count)); 1816 TestNodeType node_to_test(g); 1817 SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count)); 1818 make_edge(input0,tbb::flow::input_port<0>(node_to_test)); 1819 make_edge(input1,tbb::flow::input_port<1>(node_to_test)); 1820 make_edge(node_to_test, sink); 1821 for(int iter = 0; iter < 2; ++iter) { 1822 ResetGlobals(throwException,flog); 1823 if(throwException) { 1824 TRY(); 1825 input0.activate(); 1826 input1.activate(); 1827 g.wait_for_all(); 1828 CATCH_AND_ASSERT(); 1829 } 1830 else { 1831 TRY(); 1832 input0.activate(); 1833 input1.activate(); 1834 g.wait_for_all(); 1835 CATCH_AND_FAIL(); 1836 } 1837 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; 1838 int ib0_cnt = tbb::flow::copy_body<InputBodyType0>(input0).count_value(); 1839 int ib1_cnt = tbb::flow::copy_body<InputBodyType1>(input1).count_value(); 1840 int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value(); 1841 if(throwException) { 1842 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph"); 1843 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph"); 1844 CHECK_MESSAGE( (ib0_cnt <= g_NumItems && ib1_cnt <= g_NumItems), "Too many items sent by inputs"); 1845 CHECK_MESSAGE( (nb_cnt <= ib0_cnt + ib1_cnt), "Too many items received by sink nodes"); 1846 } 1847 else { 1848 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred"); 1849 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred"); 1850 CHECK_MESSAGE( (ib0_cnt == g_NumItems), "Missing invocations of input_node0"); 1851 CHECK_MESSAGE( (ib1_cnt == g_NumItems), "Missing invocations of input_node1"); 1852 CHECK_MESSAGE( (nb_cnt == 2*g_NumItems), "Missing items in absorbers"); 1853 } 1854 if(iter == 0) { 1855 remove_edge(node_to_test, sink); 1856 tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 4)); 1857 tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2)); 1858 g.wait_for_all(); 1859 g.reset(); 1860 input0_count = input1_count = sink_count = 0; 1861 make_edge(node_to_test, sink); 1862 g.wait_for_all(); 1863 } 1864 else { 1865 g.wait_for_all(); 1866 g.reset(); 1867 input0_count = input1_count = sink_count = 0; 1868 } 1869 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType0>(input0).count_value()),"Reset input failed"); 1870 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType1>(input1).count_value()),"Reset input failed"); 1871 nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value(); 1872 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value()),"Reset sink failed"); 1873 } 1874 1875 #if USE_TASK_SCHEDULER_OBSERVER 1876 o.observe(false); 1877 #endif 1878 } 1879 1880 template<class InputTuple, 1881 TestNodeTypeEnum InputThrowType, 1882 TestNodeTypeEnum SinkThrowType> 1883 void run_indexer_node_test() { 1884 typedef typename std::tuple_element<0,InputTuple>::type ItemType0; 1885 typedef typename std::tuple_element<1,InputTuple>::type ItemType1; 1886 typedef test_input_body<ItemType0,InputThrowType> InputBodyType0; 1887 typedef test_input_body<ItemType1,InputThrowType> InputBodyType1; 1888 typedef typename tbb::flow::indexer_node<ItemType0, ItemType1> TestNodeType; 1889 typedef absorber_body<typename TestNodeType::output_type,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType; 1890 1891 typedef typename tbb::flow::input_node<ItemType0> InputType0; 1892 typedef typename tbb::flow::input_node<ItemType1> InputType1; 1893 typedef typename tbb::flow::function_node<typename TestNodeType::output_type,tbb::flow::continue_msg> SinkType; 1894 1895 for(int i = 0; i < 4; ++i) { 1896 if(2 == i) continue; 1897 bool throwException = (i & 0x1) != 0; 1898 bool doFlog = (i & 0x2) != 0; 1899 run_one_indexer_node_test< 1900 InputTuple, 1901 InputType0, 1902 InputBodyType0, 1903 InputType1, 1904 InputBodyType1, 1905 TestNodeType, 1906 SinkType, 1907 SinkBodyType>(throwException,doFlog); 1908 } 1909 } 1910 1911 void test_indexer_node() { 1912 INFO("Testing indexer_node\n"); 1913 g_Wakeup_Msg = "indexer_node(is,non): Missed wakeup or machine is overloaded?"; 1914 run_indexer_node_test<std::tuple<int,int>, isThrowing, nonThrowing>(); 1915 g_Wakeup_Msg = "indexer_node(non,is): Missed wakeup or machine is overloaded?"; 1916 run_indexer_node_test<std::tuple<int,int>, nonThrowing, isThrowing>(); 1917 g_Wakeup_Msg = "indexer_node(is,is): Missed wakeup or machine is overloaded?"; 1918 run_indexer_node_test<std::tuple<int,int>, isThrowing, isThrowing>(); 1919 g_Wakeup_Msg = g_Orig_Wakeup_Msg;; 1920 } 1921 1922 /////////////////////////////////////////////// 1923 // whole-graph exception test 1924 1925 class Foo { 1926 private: 1927 // std::vector<int>& m_vec; 1928 std::vector<int>* m_vec; 1929 public: 1930 Foo(std::vector<int>& vec) : m_vec(&vec) { } 1931 void operator() (tbb::flow::continue_msg) const { 1932 ++nExceptions; 1933 (void)m_vec->at(m_vec->size()); // Will throw out_of_range exception 1934 CHECK_MESSAGE( (false), "Exception not thrown by invalid access"); 1935 } 1936 }; 1937 1938 // test from user ahelwer: http://software.intel.com/en-us/forums/showthread.php?t=103786 1939 // exception thrown in graph node, not caught in wait_for_all() 1940 void 1941 test_flow_graph_exception0() { 1942 // Initializes body 1943 std::vector<int> vec; 1944 vec.push_back(0); 1945 Foo f(vec); 1946 nExceptions = 0; 1947 1948 // Construct graph and nodes 1949 tbb::flow::graph g; 1950 tbb::flow::broadcast_node<tbb::flow::continue_msg> start(g); 1951 tbb::flow::continue_node<tbb::flow::continue_msg> fooNode(g, f); 1952 1953 // Construct edge 1954 tbb::flow::make_edge(start, fooNode); 1955 1956 // Execute graph 1957 CHECK_MESSAGE( (!g.exception_thrown()), "exception_thrown flag already set"); 1958 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag already set"); 1959 try { 1960 start.try_put(tbb::flow::continue_msg()); 1961 g.wait_for_all(); 1962 CHECK_MESSAGE( (false), "Exception not thrown"); 1963 } 1964 catch(std::out_of_range& ex) { 1965 INFO("Exception: " << ex.what() << "(expected)\n"); 1966 } 1967 catch(...) { 1968 INFO("Unknown exception caught (expected)\n"); 1969 } 1970 CHECK_MESSAGE( (nExceptions > 0), "Exception caught, but no body signaled exception being thrown"); 1971 nExceptions = 0; 1972 CHECK_MESSAGE( (g.exception_thrown()), "Exception not intercepted"); 1973 // if exception set, cancellation also set. 1974 CHECK_MESSAGE( (g.is_cancelled()), "Exception cancellation not signaled"); 1975 // in case we got an exception 1976 try { 1977 g.wait_for_all(); // context still signalled canceled, my_exception still set. 1978 } 1979 catch(...) { 1980 CHECK_MESSAGE( (false), "Second exception thrown but no task executing"); 1981 } 1982 CHECK_MESSAGE( (nExceptions == 0), "body signaled exception being thrown, but no body executed"); 1983 CHECK_MESSAGE( (!g.exception_thrown()), "exception_thrown flag not reset"); 1984 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag not reset"); 1985 } 1986 1987 void TestOneThreadNum(int nThread) { 1988 INFO("Testing " << nThread << "%d threads\n"); 1989 g_NumItems = ((nThread > NUM_ITEMS) ? nThread *2 : NUM_ITEMS); 1990 g_NumThreads = nThread; 1991 tbb::task_arena arena(nThread); 1992 arena.execute( 1993 [&]() { 1994 // whole-graph exception catch and rethrow test 1995 test_flow_graph_exception0(); 1996 for(int i = 0; i < 4; ++i) { 1997 g_ExceptionInMaster = (i & 1) != 0; 1998 g_SolitaryException = (i & 2) != 0; 1999 INFO("g_ExceptionInMaster == " << (g_ExceptionInMaster ? "T":"F") 2000 << ", g_SolitaryException == " << (g_SolitaryException ? "T":"F") << "\n"); 2001 test_input_node(); 2002 test_function_node(); 2003 test_continue_node(); // also test broadcast_node 2004 test_multifunction_node(); 2005 // single- and multi-item buffering nodes 2006 test_buffer_queue_and_overwrite_node(); 2007 test_sequencer_node(); 2008 test_priority_queue_node(); 2009 2010 // join_nodes 2011 test_join_node<tbb::flow::queueing>(); 2012 test_join_node<tbb::flow::reserving>(); 2013 test_join_node<tbb::flow::tag_matching>(); 2014 2015 test_limiter_node(); 2016 test_split_node(); 2017 // graph for write_once_node will be complicated by the fact the node will 2018 // not do try_puts after it has been set. To get parallelism of N we have 2019 // to attach N successor nodes to the write_once (or play some similar game). 2020 // test_write_once_node(); 2021 test_indexer_node(); 2022 } 2023 } 2024 ); 2025 } 2026 2027 //! Test exceptions with parallelism 2028 //! \brief \ref error_guessing 2029 TEST_CASE("Testing several threads"){ 2030 // reverse order of tests 2031 for(unsigned int nThread=utils::MaxThread; nThread >= utils::MinThread; --nThread) { 2032 TestOneThreadNum(nThread); 2033 } 2034 } 2035 2036 #endif // TBB_USE_EXCEPTIONS 2037