xref: /oneTBB/test/tbb/test_eh_flow_graph.cpp (revision 5e91b2c0)
1 /*
2     Copyright (c) 2005-2022 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 //! \file test_eh_flow_graph.cpp
18 //! \brief Test for [flow_graph.copy_body flow_graph.broadcast_node flow_graph.priority_queue_node flow_graph.indexer_node flow_graph.sequencer_node flow_graph.remove_edge flow_graph.join_node flow_graph.split_node flow_graph.limiter_node flow_graph.write_once_node flow_graph.overwrite_node flow_graph.make_edge flow_graph.graph flow_graph.buffer_node flow_graph.function_node flow_graph.multifunction_node flow_graph.continue_node flow_graph.input_node] specification
19 
20 #include "common/config.h"
21 
22 #if USE_TASK_SCHEDULER_OBSERVER
23 #include "tbb/task_scheduler_observer.h"
24 #endif
25 #include "tbb/flow_graph.h"
26 #include "tbb/global_control.h"
27 
28 #include "common/test.h"
29 
30 #if TBB_USE_EXCEPTIONS
31 
32 #include "common/utils.h"
33 #include "common/checktype.h"
34 #include "common/concurrency_tracker.h"
35 
36 #if _MSC_VER
37     #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning
38 #endif
39 
40 #if __TBB_MSVC_UNREACHABLE_CODE_IGNORED
41     // Suppress "unreachable code" warning by VC++ 17.0-18.0 (VS 2012 or newer)
42     #pragma warning (disable: 4702)
43 #endif
44 
45 // global task_scheduler_observer is an imperfect tool to find how many threads are really
46 // participating.  That was the hope, but it counts the entries into the marketplace,
47 // not the arena.
48 // TODO: Consider using local task scheduler observer
49 // #define USE_TASK_SCHEDULER_OBSERVER 1
50 
51 #include <iostream>
52 #include <sstream>
53 #include <vector>
54 
55 #include "common/exception_handling.h"
56 
57 #include <stdexcept>
58 
59 #define NUM_ITEMS 15
60 int g_NumItems;
61 
62 std::atomic<unsigned> nExceptions;
63 std::atomic<intptr_t> g_TGCCancelled;
64 
65 enum TestNodeTypeEnum { nonThrowing, isThrowing };
66 
67 static const size_t unlimited_type = 0;
68 static const size_t serial_type = 1;
69 static const size_t limited_type = 4;
70 
71 template<TestNodeTypeEnum T> struct TestNodeTypeName;
72 template<> struct TestNodeTypeName<nonThrowing> { static const char *name() { return "nonThrowing"; } };
73 template<> struct TestNodeTypeName<isThrowing> { static const char *name() { return "isThrowing"; } };
74 
75 template<size_t Conc> struct concurrencyName;
76 template<> struct concurrencyName<serial_type>{ static const char *name() { return "serial"; } };
77 template<> struct concurrencyName<unlimited_type>{ static const char *name() { return "unlimited"; } };
78 template<> struct concurrencyName<limited_type>{ static const char *name() { return "limited"; } };
79 
80 // Class that provides waiting and throwing behavior.  If we are not throwing, do nothing
81 // If serial, we can't wait for concurrency to peak; we may be the bottleneck and will
82 // stop further processing.  We will execute g_NumThreads + 10 times (the "10" is somewhat
83 // arbitrary, and just makes sure there are enough items in the graph to keep it flowing),
84 // If parallel or serial and throwing, use utils::ConcurrencyTracker to wait.
85 
86 template<size_t Conc, TestNodeTypeEnum t = nonThrowing>
87 class WaitThrow;
88 
89 template<>
90 class WaitThrow<serial_type,nonThrowing> {
91 protected:
92     void WaitAndThrow(int cnt, const char * /*name*/) {
93         if(cnt > g_NumThreads + 10) {
94             utils::ConcurrencyTracker ct;
95             WaitUntilConcurrencyPeaks();
96         }
97     }
98 };
99 
100 template<>
101 class WaitThrow<serial_type,isThrowing> {
102 protected:
103     void WaitAndThrow(int cnt, const char * /*name*/) {
104         if(cnt > g_NumThreads + 10) {
105             utils::ConcurrencyTracker ct;
106             WaitUntilConcurrencyPeaks();
107             ThrowTestException(1);
108         }
109     }
110 };
111 
112 // for nodes with limited concurrency, if that concurrency is < g_NumThreads, we need
113 // to make sure enough other nodes wait for concurrency to peak.  If we are attached to
114 // N successors, for each item we pass to a successor, we will get N executions of the
115 // "absorbers" (because we broadcast to successors.)  for an odd number of threads we
116 // need (g_NumThreads - limited + 1) / 2 items (that will give us one extra execution
117 // of an "absorber", but we can't change that without changing the behavior of the node.)
118 template<>
119 class WaitThrow<limited_type,nonThrowing> {
120 protected:
121     void WaitAndThrow(int cnt, const char * /*name*/) {
122         if(cnt <= (g_NumThreads - (int)limited_type + 1)/2) {
123             return;
124         }
125         utils::ConcurrencyTracker ct;
126         WaitUntilConcurrencyPeaks();
127     }
128 };
129 
130 template<>
131 class WaitThrow<limited_type,isThrowing> {
132 protected:
133     void WaitAndThrow(int cnt, const char * /*name*/) {
134         utils::ConcurrencyTracker ct;
135         if(cnt <= (g_NumThreads - (int)limited_type + 1)/2) {
136             return;
137         }
138         WaitUntilConcurrencyPeaks();
139         ThrowTestException(1);
140     }
141 };
142 
143 template<>
144 class WaitThrow<unlimited_type,nonThrowing> {
145 protected:
146     void WaitAndThrow(int /*cnt*/, const char * /*name*/) {
147         utils::ConcurrencyTracker ct;
148         WaitUntilConcurrencyPeaks();
149     }
150 };
151 
152 template<>
153 class WaitThrow<unlimited_type,isThrowing> {
154 protected:
155     void WaitAndThrow(int /*cnt*/, const char * /*name*/) {
156         utils::ConcurrencyTracker ct;
157         WaitUntilConcurrencyPeaks();
158         ThrowTestException(1);
159     }
160 };
161 
162 void
163 ResetGlobals(bool throwException = true, bool flog = false) {
164     nExceptions = 0;
165     g_TGCCancelled = 0;
166     ResetEhGlobals(throwException, flog);
167 }
168 
169 // -------input_node body ------------------
170 template <class OutputType, TestNodeTypeEnum TType>
171 class test_input_body : WaitThrow<serial_type, TType> {
172     using WaitThrow<serial_type, TType>::WaitAndThrow;
173     std::atomic<int> *my_current_val;
174     int my_mult;
175 public:
176     test_input_body(std::atomic<int> &my_cnt, int multiplier = 1) : my_current_val(&my_cnt), my_mult(multiplier) {
177         // INFO("- --------- - - -   constructed " << (size_t)(my_current_val) << "\n");
178     }
179 
180     OutputType operator()(tbb::flow_control& fc) {
181         UPDATE_COUNTS();
182         OutputType ret = OutputType(my_mult * ++(*my_current_val));
183         // TODO revamp: reconsider logging for the tests.
184 
185         // The following line is known to cause double frees. Therefore, commenting out frequent
186         // calls to INFO() macro.
187 
188         // INFO("xx(" << (size_t)(my_current_val) << ") ret == " << (int)ret << "\n");
189         if(*my_current_val > g_NumItems) {
190             // INFO(" ------ End of the line!\n");
191             *my_current_val = g_NumItems;
192             fc.stop();
193             return OutputType();
194         }
195         WaitAndThrow((int)ret,"test_input_body");
196         return ret;
197     }
198 
199     int count_value() { return (int)*my_current_val; }
200 };
201 
202 template <TestNodeTypeEnum TType>
203 class test_input_body<tbb::flow::continue_msg, TType> : WaitThrow<serial_type, TType> {
204     using WaitThrow<serial_type, TType>::WaitAndThrow;
205     std::atomic<int> *my_current_val;
206 public:
207     test_input_body(std::atomic<int> &my_cnt) : my_current_val(&my_cnt) { }
208 
209     tbb::flow::continue_msg operator()( tbb::flow_control & fc) {
210         UPDATE_COUNTS();
211         int outint = ++(*my_current_val);
212         if(*my_current_val > g_NumItems) {
213             *my_current_val = g_NumItems;
214             fc.stop();
215             return tbb::flow::continue_msg();
216         }
217         WaitAndThrow(outint,"test_input_body");
218         return tbb::flow::continue_msg();
219     }
220 
221     int count_value() { return (int)*my_current_val; }
222 };
223 
224 // -------{function/continue}_node body ------------------
225 template<class InputType, class OutputType, TestNodeTypeEnum T, size_t Conc>
226 class absorber_body : WaitThrow<Conc,T> {
227     using WaitThrow<Conc,T>::WaitAndThrow;
228     std::atomic<int> *my_count;
229 public:
230     absorber_body(std::atomic<int> &my_cnt) : my_count(&my_cnt) { }
231     OutputType operator()(const InputType &/*p_in*/) {
232         UPDATE_COUNTS();
233         int out = ++(*my_count);
234         WaitAndThrow(out,"absorber_body");
235         return OutputType();
236     }
237     int count_value() { return *my_count; }
238 };
239 
240 // -------multifunction_node body ------------------
241 
242 // helper classes
243 template<int N,class PortsType>
244 struct IssueOutput {
245     typedef typename std::tuple_element<N-1,PortsType>::type::output_type my_type;
246 
247     static void issue_tuple_element( PortsType &my_ports) {
248         CHECK_MESSAGE( (std::get<N-1>(my_ports).try_put(my_type())), "Error putting to successor");
249         IssueOutput<N-1,PortsType>::issue_tuple_element(my_ports);
250     }
251 };
252 
253 template<class PortsType>
254 struct IssueOutput<1,PortsType> {
255     typedef typename std::tuple_element<0,PortsType>::type::output_type my_type;
256 
257     static void issue_tuple_element( PortsType &my_ports) {
258         CHECK_MESSAGE( (std::get<0>(my_ports).try_put(my_type())), "Error putting to successor");
259     }
260 };
261 
262 template<class InputType, class OutputTupleType, TestNodeTypeEnum T, size_t Conc>
263 class multifunction_node_body : WaitThrow<Conc,T> {
264     using WaitThrow<Conc,T>::WaitAndThrow;
265     static const int N = std::tuple_size<OutputTupleType>::value;
266     typedef typename tbb::flow::multifunction_node<InputType,OutputTupleType> NodeType;
267     typedef typename NodeType::output_ports_type PortsType;
268     std::atomic<int> *my_count;
269 public:
270     multifunction_node_body(std::atomic<int> &my_cnt) : my_count(&my_cnt) { }
271     void operator()(const InputType& /*in*/, PortsType &my_ports) {
272         UPDATE_COUNTS();
273         int out = ++(*my_count);
274         WaitAndThrow(out,"multifunction_node_body");
275         // issue an item to each output port.
276         IssueOutput<N,PortsType>::issue_tuple_element(my_ports);
277     }
278 
279     int count_value() { return *my_count; }
280 };
281 
282 // --------- body to sort items in sequencer_node
283 template<class BufferItemType>
284 struct sequencer_body {
285     size_t operator()(const BufferItemType &s) {
286         CHECK_MESSAGE( (s), "sequencer item out of range (== 0)");
287         return size_t(s) - 1;
288     }
289 };
290 
291 // --------- type for < comparison in priority_queue_node.
292 template<class ItemType>
293 struct less_body {
294     bool operator()(const ItemType &lhs, const ItemType &rhs) {
295         return (int(lhs) % 3) < (int(rhs) % 3);
296     }
297 };
298 
299 // --------- tag methods for tag_matching join_node
300 template<typename TT>
301 class tag_func {
302     TT my_mult;
303 public:
304     tag_func(TT multiplier) : my_mult(multiplier) { }
305     // operator() will return [0 .. Count)
306     tbb::flow::tag_value operator()( TT v) {
307         tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult);
308         return t;
309     }
310 };
311 
312 // --------- Input body for split_node test.
313 template <class OutputTuple, TestNodeTypeEnum TType>
314 class tuple_test_input_body : WaitThrow<serial_type, TType> {
315     typedef typename std::tuple_element<0,OutputTuple>::type ItemType0;
316     typedef typename std::tuple_element<1,OutputTuple>::type ItemType1;
317     using WaitThrow<serial_type, TType>::WaitAndThrow;
318     std::atomic<int> *my_current_val;
319 public:
320     tuple_test_input_body(std::atomic<int> &my_cnt) : my_current_val(&my_cnt) { }
321 
322     OutputTuple operator()(tbb::flow_control& fc) {
323         UPDATE_COUNTS();
324         int ival = ++(*my_current_val);
325         if(*my_current_val > g_NumItems) {
326             *my_current_val = g_NumItems;  // jam the final value; we assert on it later.
327             fc.stop();
328             return OutputTuple();
329         }
330         WaitAndThrow(ival,"tuple_test_input_body");
331         return OutputTuple(ItemType0(ival),ItemType1(ival));
332     }
333 
334     int count_value() { return (int)*my_current_val; }
335 };
336 
337 // ------- end of node bodies
338 
339 // input_node is only-serial.  input_node can throw, or the function_node can throw.
340 // graph being tested is
341 //
342 //      input_node+---+parallel function_node
343 //
344 //    After each run the graph is reset(), to test the reset functionality.
345 //
346 
347 
348 template<class ItemType, TestNodeTypeEnum inpThrowType, TestNodeTypeEnum absorbThrowType>
349 void run_one_input_node_test(bool throwException, bool flog) {
350     typedef test_input_body<ItemType,inpThrowType> src_body_type;
351     typedef absorber_body<ItemType, tbb::flow::continue_msg, absorbThrowType, unlimited_type> parallel_absorb_body_type;
352     std::atomic<int> input_body_count;
353     std::atomic<int> absorber_body_count;
354     input_body_count = 0;
355     absorber_body_count = 0;
356 
357     tbb::flow::graph g;
358 
359     g_Master = std::this_thread::get_id();
360 
361 #if USE_TASK_SCHEDULER_OBSERVER
362     eh_test_observer o;
363     o.observe(true);
364 #endif
365 
366     tbb::flow::input_node<ItemType> sn(g, src_body_type(input_body_count));
367     parallel_absorb_body_type ab2(absorber_body_count);
368     tbb::flow::function_node<ItemType> parallel_fn(g,tbb::flow::unlimited,ab2);
369     make_edge(sn, parallel_fn);
370     for(int runcnt = 0; runcnt < 2; ++runcnt) {
371         ResetGlobals(throwException,flog);
372         if(throwException) {
373             TRY();
374                 sn.activate();
375                 g.wait_for_all();
376             CATCH_AND_ASSERT();
377         }
378         else {
379             TRY();
380                 sn.activate();
381                 g.wait_for_all();
382             CATCH_AND_FAIL();
383         }
384 
385         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
386         int src_cnt = tbb::flow::copy_body<src_body_type>(sn).count_value();
387         int sink_cnt = tbb::flow::copy_body<parallel_absorb_body_type>(parallel_fn).count_value();
388         if(throwException) {
389             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception flag in flow::graph not set");
390             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "canceled flag not set");
391             CHECK_MESSAGE( (src_cnt <= g_NumItems), "Too many input_node items emitted");
392             CHECK_MESSAGE( (sink_cnt <= src_cnt), "Too many input_node items received");
393         }
394         else {
395             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
396             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
397             CHECK_MESSAGE( (src_cnt == g_NumItems), "Incorrect # input_node items emitted");
398             CHECK_MESSAGE( (sink_cnt == src_cnt), "Incorrect # input_node items received");
399         }
400         g.reset();  // resets the body of the input_node and the absorb_nodes.
401         input_body_count = 0;
402         absorber_body_count = 0;
403         CHECK_MESSAGE( (!g.exception_thrown()), "Reset didn't clear exception_thrown()");
404         CHECK_MESSAGE( (!g.is_cancelled()), "Reset didn't clear is_cancelled()");
405         src_cnt = tbb::flow::copy_body<src_body_type>(sn).count_value();
406         sink_cnt = tbb::flow::copy_body<parallel_absorb_body_type>(parallel_fn).count_value();
407         CHECK_MESSAGE( (src_cnt == 0), "input_node count not reset");
408         CHECK_MESSAGE( (sink_cnt == 0), "sink_node count not reset");
409     }
410 #if USE_TASK_SCHEDULER_OBSERVER
411     o.observe(false);
412 #endif
413 }  // run_one_input_node_test
414 
415 
416 template<class ItemType, TestNodeTypeEnum inpThrowType, TestNodeTypeEnum absorbThrowType>
417 void run_input_node_test() {
418     run_one_input_node_test<ItemType,inpThrowType,absorbThrowType>(false,false);
419     run_one_input_node_test<ItemType,inpThrowType,absorbThrowType>(true,false);
420     run_one_input_node_test<ItemType,inpThrowType,absorbThrowType>(true,true);
421 }  // run_input_node_test
422 
423 void test_input_node() {
424     INFO("Testing input_node\n");
425     CheckType<int>::check_type_counter = 0;
426     g_Wakeup_Msg = "input_node(1): Missed wakeup or machine is overloaded?";
427     run_input_node_test<CheckType<int>, isThrowing, nonThrowing>();
428     CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test");
429     g_Wakeup_Msg = "input_node(2): Missed wakeup or machine is overloaded?";
430     run_input_node_test<int, isThrowing, nonThrowing>();
431     g_Wakeup_Msg = "input_node(3): Missed wakeup or machine is overloaded?";
432     run_input_node_test<int, nonThrowing, isThrowing>();
433     g_Wakeup_Msg = "input_node(4): Missed wakeup or machine is overloaded?";
434     run_input_node_test<int, isThrowing, isThrowing>();
435     g_Wakeup_Msg = "input_node(5): Missed wakeup or machine is overloaded?";
436     run_input_node_test<CheckType<int>, isThrowing, isThrowing>();
437     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
438     CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test");
439 }
440 
441 // -------- utilities & types to test function_node and multifunction_node.
442 
443 // need to tell the template which node type I am using so it attaches successors correctly.
444 enum NodeFetchType { func_node_type, multifunc_node_type };
445 
446 template<class NodeType, class ItemType, int indx, NodeFetchType NFT>
447 struct AttachPoint;
448 
449 template<class NodeType, class ItemType, int indx>
450 struct AttachPoint<NodeType,ItemType,indx,multifunc_node_type> {
451     static tbb::flow::sender<ItemType> &GetSender(NodeType &n) {
452         return tbb::flow::output_port<indx>(n);
453     }
454 };
455 
456 template<class NodeType, class ItemType, int indx>
457 struct AttachPoint<NodeType,ItemType,indx,func_node_type> {
458     static tbb::flow::sender<ItemType> &GetSender(NodeType &n) {
459         return n;
460     }
461 };
462 
463 
464 // common template for running function_node, multifunction_node.  continue_node
465 // has different firing requirements, so it needs a different graph topology.
466 template<
467     class InputNodeType,
468     class InputNodeBodyType0,
469     class InputNodeBodyType1,
470     NodeFetchType NFT,
471     class TestNodeType,
472     class TestNodeBodyType,
473     class TypeToSink0,          // what kind of item are we sending to sink0
474     class TypeToSink1,          // what kind of item are we sending to sink1
475     class SinkNodeType0,        // will be same for function;
476     class SinkNodeType1,        // may differ for multifunction_node
477     class SinkNodeBodyType0,
478     class SinkNodeBodyType1,
479     size_t Conc
480     >
481 void
482 run_one_functype_node_test(bool throwException, bool flog, const char * /*name*/) {
483 
484     std::stringstream ss;
485     char *saved_msg = const_cast<char *>(g_Wakeup_Msg);
486     tbb::flow::graph g;
487 
488     std::atomic<int> input0_count;
489     std::atomic<int> input1_count;
490     std::atomic<int> sink0_count;
491     std::atomic<int> sink1_count;
492     std::atomic<int> test_count;
493     input0_count = input1_count = sink0_count = sink1_count = test_count = 0;
494 
495 #if USE_TASK_SCHEDULER_OBSERVER
496     eh_test_observer o;
497     o.observe(true);
498 #endif
499 
500     g_Master = std::this_thread::get_id();
501     InputNodeType input0(g, InputNodeBodyType0(input0_count));
502     InputNodeType input1(g, InputNodeBodyType1(input1_count));
503     TestNodeType node_to_test(g, Conc, TestNodeBodyType(test_count));
504     SinkNodeType0 sink0(g,tbb::flow::unlimited,SinkNodeBodyType0(sink0_count));
505     SinkNodeType1 sink1(g,tbb::flow::unlimited,SinkNodeBodyType1(sink1_count));
506     make_edge(input0, node_to_test);
507     make_edge(input1, node_to_test);
508     make_edge(AttachPoint<TestNodeType, TypeToSink0, 0, NFT>::GetSender(node_to_test), sink0);
509     make_edge(AttachPoint<TestNodeType, TypeToSink1, 1, NFT>::GetSender(node_to_test), sink1);
510 
511     for(int iter = 0; iter < 2; ++iter) {  // run, reset, run again
512         ss.clear();
513         ss << saved_msg << " iter=" << iter << ", threads=" << g_NumThreads << ", throw=" << (throwException ? "T" : "F") << ", flow=" << (flog ? "T" : "F");
514         g_Wakeup_Msg = ss.str().c_str();
515         ResetGlobals(throwException,flog);
516         if(throwException) {
517             TRY();
518                 input0.activate();
519                 input1.activate();
520                 g.wait_for_all();
521             CATCH_AND_ASSERT();
522         }
523         else {
524             TRY();
525                 input0.activate();
526                 input1.activate();
527                 g.wait_for_all();
528             CATCH_AND_FAIL();
529         }
530         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
531         int ib0_cnt = tbb::flow::copy_body<InputNodeBodyType0>(input0).count_value();
532         int ib1_cnt = tbb::flow::copy_body<InputNodeBodyType1>(input1).count_value();
533         int t_cnt   = tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value();
534         int nb0_cnt = tbb::flow::copy_body<SinkNodeBodyType0>(sink0).count_value();
535         int nb1_cnt = tbb::flow::copy_body<SinkNodeBodyType1>(sink1).count_value();
536         if(throwException) {
537             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
538             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
539             CHECK_MESSAGE( (ib0_cnt + ib1_cnt <= 2*g_NumItems), "Too many items sent by inputs");
540             CHECK_MESSAGE( (ib0_cnt + ib1_cnt >= t_cnt), "Too many items received by test node");
541             CHECK_MESSAGE( (nb0_cnt + nb1_cnt <= t_cnt*2), "Too many items received by sink nodes");
542         }
543         else {
544             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
545             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
546             CHECK_MESSAGE( (ib0_cnt + ib1_cnt == 2*g_NumItems), "Missing invocations of input_nodes");
547             CHECK_MESSAGE( (t_cnt == 2*g_NumItems), "Not all items reached test node");
548             CHECK_MESSAGE( (nb0_cnt == 2*g_NumItems && nb1_cnt == 2*g_NumItems), "Missing items in absorbers");
549         }
550         g.reset();  // resets the body of the input_nodes, test_node and the absorb_nodes.
551         input0_count = input1_count = sink0_count = sink1_count = test_count = 0;
552         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType0>(input0).count_value()),"Reset input 0 failed");
553         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType1>(input1).count_value()),"Reset input 1 failed");
554         CHECK_MESSAGE( (0 == tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value()),"Reset test_node failed");
555         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType0>(sink0).count_value()),"Reset sink 0 failed");
556         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType1>(sink1).count_value()),"Reset sink 1 failed");
557 
558         g_Wakeup_Msg = saved_msg;
559     }
560 #if USE_TASK_SCHEDULER_OBSERVER
561     o.observe(false);
562 #endif
563 }
564 
565 //  Test function_node
566 //
567 // graph being tested is
568 //
569 //         input_node -\                 /- parallel function_node
570 //                      \               /
571 //                       +function_node+
572 //                      /               \                                  x
573 //         input_node -/                 \- parallel function_node
574 //
575 //    After each run the graph is reset(), to test the reset functionality.
576 //
577 template<
578     TestNodeTypeEnum IType1,                          // does input node 1 throw?
579     TestNodeTypeEnum IType2,                          // does input node 2 throw?
580     class Item12,                                     // type of item passed between inputs and test node
581     TestNodeTypeEnum FType,                           // does function node throw?
582     class Item23,                                     // type passed from function_node to sink nodes
583     TestNodeTypeEnum NType1,                          // does sink node 1 throw?
584     TestNodeTypeEnum NType2,                          // does sink node 1 throw?
585     class NodePolicy,                                 // rejecting,queueing
586     size_t Conc                                       // is node concurrent? {serial | limited | unlimited}
587 >
588 void run_function_node_test() {
589 
590     typedef test_input_body<Item12,IType1> IBodyType1;
591     typedef test_input_body<Item12,IType2> IBodyType2;
592     typedef absorber_body<Item12, Item23, FType, Conc> TestBodyType;
593     typedef absorber_body<Item23,tbb::flow::continue_msg, NType1, unlimited_type> SinkBodyType1;
594     typedef absorber_body<Item23,tbb::flow::continue_msg, NType2, unlimited_type> SinkBodyType2;
595 
596     typedef tbb::flow::input_node<Item12> InputType;
597     typedef tbb::flow::function_node<Item12, Item23, NodePolicy> TestType;
598     typedef tbb::flow::function_node<Item23,tbb::flow::continue_msg> SnkType;
599 
600     for(int i = 0; i < 4; ++i ) {
601         if(i != 2) {  // doesn't make sense to flog a non-throwing test
602             bool doThrow = (i & 0x1) != 0;
603             bool doFlog = (i & 0x2) != 0;
604             run_one_functype_node_test<
605                 /*InputNodeType*/       InputType,
606                 /*InputNodeBodyType0*/  IBodyType1,
607                 /*InputNodeBodyType1*/  IBodyType2,
608                 /* NFT */               func_node_type,
609                 /*TestNodeType*/        TestType,
610                 /*TestNodeBodyType*/    TestBodyType,
611                 /*TypeToSink0 */        Item23,
612                 /*TypeToSink1 */        Item23,
613                 /*SinkNodeType0*/       SnkType,
614                 /*SinkNodeType1*/       SnkType,
615                 /*SinkNodeBodyType1*/   SinkBodyType1,
616                 /*SinkNodeBodyType2*/   SinkBodyType2,
617                 /*Conc*/                Conc>
618                     (doThrow,doFlog,"function_node");
619         }
620     }
621 }  // run_function_node_test
622 
623 void test_function_node() {
624     INFO("Testing function_node\n");
625     // serial rejecting
626     g_Wakeup_Msg = "function_node(1a): Missed wakeup or machine is overloaded?";
627     run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
628     g_Wakeup_Msg = "function_node(1b): Missed wakeup or machine is overloaded?";
629     run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
630     g_Wakeup_Msg = "function_node(1c): Missed wakeup or machine is overloaded?";
631     run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, isThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
632 
633     // serial queueing
634     g_Wakeup_Msg = "function_node(2): Missed wakeup or machine is overloaded?";
635     run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
636     run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
637     run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
638     CheckType<int>::check_type_counter = 0;
639     run_function_node_test<nonThrowing, nonThrowing, CheckType<int>, nonThrowing, CheckType<int>, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
640     CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test");
641 
642     // unlimited parallel rejecting
643     g_Wakeup_Msg = "function_node(3): Missed wakeup or machine is overloaded?";
644     run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
645     run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
646     run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
647 
648     // limited parallel rejecting
649     g_Wakeup_Msg = "function_node(4): Missed wakeup or machine is overloaded?";
650     run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, limited_type>();
651     run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, (size_t)limited_type>();
652     run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::rejecting, (size_t)limited_type>();
653 
654     // limited parallel queueing
655     g_Wakeup_Msg = "function_node(5): Missed wakeup or machine is overloaded?";
656     run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
657     run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
658     run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::queueing, (size_t)limited_type>();
659 
660     // everyone throwing
661     g_Wakeup_Msg = "function_node(6): Missed wakeup or machine is overloaded?";
662     run_function_node_test<isThrowing, isThrowing, int, isThrowing, int, isThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
663     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
664 }
665 
666 // ----------------------------------- multifunction_node ----------------------------------
667 //  Test multifunction_node.
668 //
669 // graph being tested is
670 //
671 //         input_node -\                      /- parallel function_node
672 //                      \                    /
673 //                       +multifunction_node+
674 //                      /                    \                                  x
675 //         input_node -/                      \- parallel function_node
676 //
677 //    After each run the graph is reset(), to test the reset functionality.  The
678 //    multifunction_node will put an item to each successor for every item
679 //    received.
680 //
681 template<
682     TestNodeTypeEnum IType0,                          // does input node 1 throw?
683     TestNodeTypeEnum IType1,                          // does input node 2 thorw?
684     class Item12,                                 // type of item passed between inputs and test node
685     TestNodeTypeEnum FType,                           // does multifunction node throw?
686     class ItemTuple,                              // tuple of types passed from multifunction_node to sink nodes
687     TestNodeTypeEnum NType1,                          // does sink node 1 throw?
688     TestNodeTypeEnum NType2,                          // does sink node 2 throw?
689     class  NodePolicy,                            // rejecting,queueing
690     size_t Conc                                   // is node concurrent? {serial | limited | unlimited}
691 >
692 void run_multifunction_node_test() {
693 
694     typedef typename std::tuple_element<0,ItemTuple>::type Item23Type0;
695     typedef typename std::tuple_element<1,ItemTuple>::type Item23Type1;
696     typedef test_input_body<Item12,IType0> IBodyType1;
697     typedef test_input_body<Item12,IType1> IBodyType2;
698     typedef multifunction_node_body<Item12, ItemTuple, FType, Conc> TestBodyType;
699     typedef absorber_body<Item23Type0,tbb::flow::continue_msg, NType1, unlimited_type> SinkBodyType1;
700     typedef absorber_body<Item23Type1,tbb::flow::continue_msg, NType2, unlimited_type> SinkBodyType2;
701 
702     typedef tbb::flow::input_node<Item12> InputType;
703     typedef tbb::flow::multifunction_node<Item12, ItemTuple, NodePolicy> TestType;
704     typedef tbb::flow::function_node<Item23Type0,tbb::flow::continue_msg> SnkType0;
705     typedef tbb::flow::function_node<Item23Type1,tbb::flow::continue_msg> SnkType1;
706 
707     for(int i = 0; i < 4; ++i ) {
708         if(i != 2) {  // doesn't make sense to flog a non-throwing test
709             bool doThrow = (i & 0x1) != 0;
710             bool doFlog = (i & 0x2) != 0;
711     run_one_functype_node_test<
712         /*InputNodeType*/       InputType,
713         /*InputNodeBodyType0*/  IBodyType1,
714         /*InputNodeBodyType1*/  IBodyType2,
715         /*NFT*/                 multifunc_node_type,
716         /*TestNodeType*/        TestType,
717         /*TestNodeBodyType*/    TestBodyType,
718         /*TypeToSink0*/         Item23Type0,
719         /*TypeToSink1*/         Item23Type1,
720         /*SinkNodeType0*/       SnkType0,
721         /*SinkNodeType1*/       SnkType1,
722         /*SinkNodeBodyType0*/   SinkBodyType1,
723         /*SinkNodeBodyType1*/   SinkBodyType2,
724         /*Conc*/                Conc>
725             (doThrow,doFlog,"multifunction_node");
726         }
727     }
728 }  // run_multifunction_node_test
729 
730 void test_multifunction_node() {
731     INFO("Testing multifunction_node\n");
732     g_Wakeup_Msg = "multifunction_node(input throws,rejecting,serial): Missed wakeup or machine is overloaded?";
733     // serial rejecting
734     run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,float>, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
735     g_Wakeup_Msg = "multifunction_node(test throws,rejecting,serial): Missed wakeup or machine is overloaded?";
736     run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
737     g_Wakeup_Msg = "multifunction_node(sink throws,rejecting,serial): Missed wakeup or machine is overloaded?";
738     run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, isThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
739 
740     g_Wakeup_Msg = "multifunction_node(2): Missed wakeup or machine is overloaded?";
741     // serial queueing
742     run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
743     run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
744     run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
745     CheckType<int>::check_type_counter = 0;
746     run_multifunction_node_test<nonThrowing, nonThrowing, CheckType<int>, nonThrowing, std::tuple<CheckType<int>, CheckType<int> >, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
747     CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test");
748 
749     g_Wakeup_Msg = "multifunction_node(3): Missed wakeup or machine is overloaded?";
750     // unlimited parallel rejecting
751     run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
752     run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
753     run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
754 
755     g_Wakeup_Msg = "multifunction_node(4): Missed wakeup or machine is overloaded?";
756     // limited parallel rejecting
757     run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, limited_type>();
758     run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, (size_t)limited_type>();
759     run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::rejecting, (size_t)limited_type>();
760 
761     g_Wakeup_Msg = "multifunction_node(5): Missed wakeup or machine is overloaded?";
762     // limited parallel queueing
763     run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
764     run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
765     run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::queueing, (size_t)limited_type>();
766 
767     g_Wakeup_Msg = "multifunction_node(6): Missed wakeup or machine is overloaded?";
768     // everyone throwing
769     run_multifunction_node_test<isThrowing, isThrowing, int, isThrowing, std::tuple<int,int>, isThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
770     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
771 }
772 
773 //
774 // Continue node has T predecessors.  when it receives messages (continue_msg) on T predecessors
775 // it executes the body of the node, and forwards a continue_msg to its successors.
776 // However many predecessors the continue_node has, that's how many continue_msgs it receives
777 // on input before forwarding a message.
778 //
779 // The graph will look like
780 //
781 //                                          +broadcast_node+
782 //                                         /                \             ___
783 //       input_node+------>+broadcast_node+                  +continue_node+--->+absorber
784 //                                         \                /
785 //                                          +broadcast_node+
786 //
787 // The continue_node has unlimited parallelism, no input buffering, and broadcasts to successors.
788 // The absorber is parallel, so each item emitted by the input will result in one thread
789 // spinning.  So for N threads we pass N-1 continue_messages, then spin wait and then throw if
790 // we are allowed to.
791 
792 template < class InputNodeType, class InputNodeBodyType, class TTestNodeType, class TestNodeBodyType,
793         class SinkNodeType, class SinkNodeBodyType>
794 void run_one_continue_node_test (bool throwException, bool flog) {
795     tbb::flow::graph g;
796 
797     std::atomic<int> input_count;
798     std::atomic<int> test_count;
799     std::atomic<int> sink_count;
800     input_count = test_count = sink_count = 0;
801 #if USE_TASK_SCHEDULER_OBSERVER
802     eh_test_observer o;
803     o.observe(true);
804 #endif
805     g_Master = std::this_thread::get_id();
806     InputNodeType input(g, InputNodeBodyType(input_count));
807     TTestNodeType node_to_test(g, TestNodeBodyType(test_count));
808     SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
809     tbb::flow::broadcast_node<tbb::flow::continue_msg> b1(g), b2(g), b3(g);
810     make_edge(input, b1);
811     make_edge(b1,b2);
812     make_edge(b1,b3);
813     make_edge(b2,node_to_test);
814     make_edge(b3,node_to_test);
815     make_edge(node_to_test, sink);
816     for(int iter = 0; iter < 2; ++iter) {
817         ResetGlobals(throwException,flog);
818         if(throwException) {
819             TRY();
820                 input.activate();
821                 g.wait_for_all();
822             CATCH_AND_ASSERT();
823         }
824         else {
825             TRY();
826                 input.activate();
827                 g.wait_for_all();
828             CATCH_AND_FAIL();
829         }
830         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
831         int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value();
832         int t_cnt   = tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value();
833         int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
834         if(throwException) {
835             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
836             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
837             CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs");
838             CHECK_MESSAGE( (ib_cnt >= t_cnt), "Too many items received by test node");
839             CHECK_MESSAGE( (nb_cnt <= t_cnt), "Too many items received by sink nodes");
840         }
841         else {
842             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
843             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
844             CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node");
845             CHECK_MESSAGE( (t_cnt == g_NumItems), "Not all items reached test node");
846             CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
847         }
848         g.reset();  // resets the body of the input_nodes, test_node and the absorb_nodes.
849         input_count = test_count = sink_count = 0;
850         CHECK_MESSAGE( (0 == (int)test_count), "Atomic wasn't reset properly");
851         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed");
852         CHECK_MESSAGE( (0 == tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value()),"Reset test_node failed");
853         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed");
854     }
855 #if USE_TASK_SCHEDULER_OBSERVER
856     o.observe(false);
857 #endif
858 }
859 
860 template<
861     class ItemType,
862     TestNodeTypeEnum IType,   // does input node throw?
863     TestNodeTypeEnum CType,   // does continue_node throw?
864     TestNodeTypeEnum AType>    // does absorber throw
865 void run_continue_node_test() {
866     typedef test_input_body<tbb::flow::continue_msg,IType> IBodyType;
867     typedef absorber_body<tbb::flow::continue_msg,ItemType,CType,unlimited_type> ContBodyType;
868     typedef absorber_body<ItemType,tbb::flow::continue_msg, AType, unlimited_type> SinkBodyType;
869 
870     typedef tbb::flow::input_node<tbb::flow::continue_msg> InputType;
871     typedef tbb::flow::continue_node<ItemType> TestType;
872     typedef tbb::flow::function_node<ItemType,tbb::flow::continue_msg> SnkType;
873 
874     for(int i = 0; i < 4; ++i ) {
875         if(i == 2) continue;  // don't run (false,true); it doesn't make sense.
876         bool doThrow = (i & 0x1) != 0;
877         bool doFlog = (i & 0x2) != 0;
878         run_one_continue_node_test<
879             /*InputNodeType*/       InputType,
880             /*InputNodeBodyType*/   IBodyType,
881             /*TestNodeType*/        TestType,
882             /*TestNodeBodyType*/    ContBodyType,
883             /*SinkNodeType*/        SnkType,
884             /*SinkNodeBodyType*/    SinkBodyType>
885             (doThrow,doFlog);
886     }
887 }
888 
889 //
890 void test_continue_node() {
891     INFO("Testing continue_node\n");
892     g_Wakeup_Msg = "buffer_node(non,is,non): Missed wakeup or machine is overloaded?";
893     run_continue_node_test<int,nonThrowing,isThrowing,nonThrowing>();
894     g_Wakeup_Msg = "buffer_node(non,non,is): Missed wakeup or machine is overloaded?";
895     run_continue_node_test<int,nonThrowing,nonThrowing,isThrowing>();
896     g_Wakeup_Msg = "buffer_node(is,non,non): Missed wakeup or machine is overloaded?";
897     run_continue_node_test<int,isThrowing,nonThrowing,nonThrowing>();
898     g_Wakeup_Msg = "buffer_node(is,is,is): Missed wakeup or machine is overloaded?";
899     run_continue_node_test<int,isThrowing,isThrowing,isThrowing>();
900     CheckType<double>::check_type_counter = 0;
901     run_continue_node_test<CheckType<double>,isThrowing,isThrowing,isThrowing>();
902     CHECK_MESSAGE( (!CheckType<double>::check_type_counter), "Dropped objects in continue_node test");
903     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
904 }
905 
906 // ---------- buffer_node queue_node overwrite_node --------------
907 
908 template<
909     class BufferItemType,       //
910     class InputNodeType,
911     class InputNodeBodyType,
912     class TestNodeType,
913     class SinkNodeType,
914     class SinkNodeBodyType >
915 void run_one_buffer_node_test(bool throwException,bool flog) {
916     tbb::flow::graph g;
917 
918     std::atomic<int> input_count;
919     std::atomic<int> sink_count;
920     input_count = sink_count = 0;
921 #if USE_TASK_SCHEDULER_OBSERVER
922     eh_test_observer o;
923     o.observe(true);
924 #endif
925     g_Master = std::this_thread::get_id();
926     InputNodeType input(g, InputNodeBodyType(input_count));
927     TestNodeType node_to_test(g);
928     SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
929     make_edge(input,node_to_test);
930     make_edge(node_to_test, sink);
931     for(int iter = 0; iter < 2; ++iter) {
932         ResetGlobals(throwException,flog);
933         if(throwException) {
934             TRY();
935                 input.activate();
936                 g.wait_for_all();
937             CATCH_AND_ASSERT();
938         }
939         else {
940             TRY();
941                 input.activate();
942                 g.wait_for_all();
943             CATCH_AND_FAIL();
944         }
945         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
946         int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value();
947         int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
948         if(throwException) {
949             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
950             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
951             CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs");
952             CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes");
953         }
954         else {
955             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
956             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
957             CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node");
958             CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
959         }
960         if(iter == 0) {
961             remove_edge(node_to_test, sink);
962             node_to_test.try_put(BufferItemType());
963             g.wait_for_all();
964             g.reset();
965             input_count = sink_count = 0;
966             BufferItemType tmp;
967             CHECK_MESSAGE( (!node_to_test.try_get(tmp)), "node not empty");
968             make_edge(node_to_test, sink);
969             g.wait_for_all();
970         }
971         else {
972             g.reset();
973             input_count = sink_count = 0;
974         }
975         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed");
976         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed");
977     }
978 
979 #if USE_TASK_SCHEDULER_OBSERVER
980     o.observe(false);
981 #endif
982 }
983 template<class BufferItemType,
984          TestNodeTypeEnum InputThrowType,
985          TestNodeTypeEnum SinkThrowType>
986 void run_buffer_queue_and_overwrite_node_test() {
987     typedef test_input_body<BufferItemType,InputThrowType> InputBodyType;
988     typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
989 
990     typedef tbb::flow::input_node<BufferItemType> InputType;
991     typedef tbb::flow::buffer_node<BufferItemType> BufType;
992     typedef tbb::flow::queue_node<BufferItemType>  QueType;
993     typedef tbb::flow::overwrite_node<BufferItemType>  OvrType;
994     typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
995 
996     for(int i = 0; i < 4; ++i) {
997         if(i == 2) continue;  // no need to test flog w/o throws
998         bool throwException = (i & 0x1) != 0;
999         bool doFlog = (i & 0x2) != 0;
1000         run_one_buffer_node_test<
1001             /* class BufferItemType*/     BufferItemType,
1002             /*class InputNodeType*/       InputType,
1003             /*class InputNodeBodyType*/   InputBodyType,
1004             /*class TestNodeType*/        BufType,
1005             /*class SinkNodeType*/        SnkType,
1006             /*class SinkNodeBodyType*/    SinkBodyType
1007             >(throwException, doFlog);
1008         run_one_buffer_node_test<
1009             /* class BufferItemType*/     BufferItemType,
1010             /*class InputNodeType*/       InputType,
1011             /*class InputNodeBodyType*/   InputBodyType,
1012             /*class TestNodeType*/        QueType,
1013             /*class SinkNodeType*/        SnkType,
1014             /*class SinkNodeBodyType*/    SinkBodyType
1015             >(throwException, doFlog);
1016         run_one_buffer_node_test<
1017             /* class BufferItemType*/     BufferItemType,
1018             /*class InputNodeType*/       InputType,
1019             /*class InputNodeBodyType*/   InputBodyType,
1020             /*class TestNodeType*/        OvrType,
1021             /*class SinkNodeType*/        SnkType,
1022             /*class SinkNodeBodyType*/    SinkBodyType
1023             >(throwException, doFlog);
1024     }
1025 }
1026 
1027 void test_buffer_queue_and_overwrite_node() {
1028     INFO("Testing buffer_node, queue_node and overwrite_node\n");
1029     g_Wakeup_Msg = "buffer, queue, overwrite(is,non): Missed wakeup or machine is overloaded?";
1030     run_buffer_queue_and_overwrite_node_test<int,isThrowing,nonThrowing>();
1031     g_Wakeup_Msg = "buffer, queue, overwrite(non,is): Missed wakeup or machine is overloaded?";
1032     run_buffer_queue_and_overwrite_node_test<int,nonThrowing,isThrowing>();
1033     g_Wakeup_Msg = "buffer, queue, overwrite(is,is): Missed wakeup or machine is overloaded?";
1034     run_buffer_queue_and_overwrite_node_test<int,isThrowing,isThrowing>();
1035     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1036 }
1037 
1038 // ---------- sequencer_node -------------------------
1039 
1040 
1041 template<
1042     class BufferItemType,       //
1043     class InputNodeType,
1044     class InputNodeBodyType,
1045     class TestNodeType,
1046     class SeqBodyType,
1047     class SinkNodeType,
1048     class SinkNodeBodyType >
1049 void run_one_sequencer_node_test(bool throwException,bool flog) {
1050     tbb::flow::graph g;
1051 
1052     std::atomic<int> input_count;
1053     std::atomic<int> sink_count;
1054     input_count = sink_count = 0;
1055 #if USE_TASK_SCHEDULER_OBSERVER
1056     eh_test_observer o;
1057     o.observe(true);
1058 #endif
1059     g_Master = std::this_thread::get_id();
1060     InputNodeType input(g, InputNodeBodyType(input_count));
1061     TestNodeType node_to_test(g,SeqBodyType());
1062     SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
1063     make_edge(input,node_to_test);
1064     make_edge(node_to_test, sink);
1065     for(int iter = 0; iter < 2; ++iter) {
1066         ResetGlobals(throwException,flog);
1067         if(throwException) {
1068             TRY();
1069                 input.activate();
1070                 g.wait_for_all();
1071             CATCH_AND_ASSERT();
1072         }
1073         else {
1074             TRY();
1075                 input.activate();
1076                 g.wait_for_all();
1077             CATCH_AND_FAIL();
1078         }
1079         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1080         int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value();
1081         int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
1082         if(throwException) {
1083             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
1084             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
1085             CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs");
1086             CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes");
1087         }
1088         else {
1089             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
1090             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
1091             CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node");
1092             CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
1093         }
1094         if(iter == 0) {
1095             remove_edge(node_to_test, sink);
1096             node_to_test.try_put(BufferItemType(g_NumItems + 1));
1097             node_to_test.try_put(BufferItemType(1));
1098             g.wait_for_all();
1099             g.reset();
1100             input_count = sink_count = 0;
1101             make_edge(node_to_test, sink);
1102             g.wait_for_all();
1103         }
1104         else {
1105             g.reset();
1106             input_count = sink_count = 0;
1107         }
1108         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed");
1109         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed");
1110     }
1111 
1112 #if USE_TASK_SCHEDULER_OBSERVER
1113     o.observe(false);
1114 #endif
1115 }
1116 
1117 template<class BufferItemType,
1118          TestNodeTypeEnum InputThrowType,
1119          TestNodeTypeEnum SinkThrowType>
1120 void run_sequencer_node_test() {
1121     typedef test_input_body<BufferItemType,InputThrowType> InputBodyType;
1122     typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1123     typedef sequencer_body<BufferItemType> SeqBodyType;
1124 
1125     typedef tbb::flow::input_node<BufferItemType> InputType;
1126     typedef tbb::flow::sequencer_node<BufferItemType>  SeqType;
1127     typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
1128 
1129     for(int i = 0; i < 4; ++i) {
1130         if(i == 2) continue;  // no need to test flog w/o throws
1131         bool throwException = (i & 0x1) != 0;
1132         bool doFlog = (i & 0x2) != 0;
1133         run_one_sequencer_node_test<
1134             /* class BufferItemType*/     BufferItemType,
1135             /*class InputNodeType*/       InputType,
1136             /*class InputNodeBodyType*/   InputBodyType,
1137             /*class TestNodeType*/        SeqType,
1138             /*class SeqBodyType*/         SeqBodyType,
1139             /*class SinkNodeType*/        SnkType,
1140             /*class SinkNodeBodyType*/    SinkBodyType
1141             >(throwException, doFlog);
1142     }
1143 }
1144 
1145 
1146 
1147 void test_sequencer_node() {
1148     INFO("Testing sequencer_node\n");
1149     g_Wakeup_Msg = "sequencer_node(is,non): Missed wakeup or machine is overloaded?";
1150     run_sequencer_node_test<int, isThrowing,nonThrowing>();
1151     CheckType<int>::check_type_counter = 0;
1152     g_Wakeup_Msg = "sequencer_node(non,is): Missed wakeup or machine is overloaded?";
1153     run_sequencer_node_test<CheckType<int>, nonThrowing,isThrowing>();
1154     CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Dropped objects in sequencer_node test");
1155     g_Wakeup_Msg = "sequencer_node(is,is): Missed wakeup or machine is overloaded?";
1156     run_sequencer_node_test<int, isThrowing,isThrowing>();
1157     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1158 }
1159 
1160 // ------------ priority_queue_node ------------------
1161 
1162 template<
1163     class BufferItemType,
1164     class InputNodeType,
1165     class InputNodeBodyType,
1166     class TestNodeType,
1167     class SinkNodeType,
1168     class SinkNodeBodyType >
1169 void run_one_priority_queue_node_test(bool throwException,bool flog) {
1170     tbb::flow::graph g;
1171 
1172     std::atomic<int> input_count;
1173     std::atomic<int> sink_count;
1174     input_count = sink_count = 0;
1175 #if USE_TASK_SCHEDULER_OBSERVER
1176     eh_test_observer o;
1177     o.observe(true);
1178 #endif
1179     g_Master = std::this_thread::get_id();
1180     InputNodeType input(g, InputNodeBodyType(input_count));
1181 
1182     TestNodeType node_to_test(g);
1183 
1184     SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
1185 
1186     make_edge(input,node_to_test);
1187     make_edge(node_to_test, sink);
1188     for(int iter = 0; iter < 2; ++iter) {
1189         ResetGlobals(throwException,flog);
1190         if(throwException) {
1191             TRY();
1192                 input.activate();
1193                 g.wait_for_all();
1194             CATCH_AND_ASSERT();
1195         }
1196         else {
1197             TRY();
1198                 input.activate();
1199                 g.wait_for_all();
1200             CATCH_AND_FAIL();
1201         }
1202         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1203         int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value();
1204         int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
1205         if(throwException) {
1206             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
1207             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
1208             CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs");
1209             CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes");
1210         }
1211         else {
1212             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
1213             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
1214             CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node");
1215             CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
1216         }
1217         if(iter == 0) {
1218             remove_edge(node_to_test, sink);
1219             node_to_test.try_put(BufferItemType(g_NumItems + 1));
1220             node_to_test.try_put(BufferItemType(g_NumItems + 2));
1221             node_to_test.try_put(BufferItemType());
1222             g.wait_for_all();
1223             g.reset();
1224             input_count = sink_count = 0;
1225             make_edge(node_to_test, sink);
1226             g.wait_for_all();
1227         }
1228         else {
1229             g.reset();
1230             input_count = sink_count = 0;
1231         }
1232         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed");
1233         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed");
1234     }
1235 
1236 #if USE_TASK_SCHEDULER_OBSERVER
1237     o.observe(false);
1238 #endif
1239 }
1240 
1241 template<class BufferItemType,
1242          TestNodeTypeEnum InputThrowType,
1243          TestNodeTypeEnum SinkThrowType>
1244 void run_priority_queue_node_test() {
1245     typedef test_input_body<BufferItemType,InputThrowType> InputBodyType;
1246     typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1247     typedef less_body<BufferItemType> LessBodyType;
1248 
1249     typedef tbb::flow::input_node<BufferItemType> InputType;
1250     typedef tbb::flow::priority_queue_node<BufferItemType,LessBodyType>  PrqType;
1251     typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
1252 
1253     for(int i = 0; i < 4; ++i) {
1254         if(i == 2) continue;  // no need to test flog w/o throws
1255         bool throwException = (i & 0x1) != 0;
1256         bool doFlog = (i & 0x2) != 0;
1257         run_one_priority_queue_node_test<
1258             /* class BufferItemType*/     BufferItemType,
1259             /*class InputNodeType*/       InputType,
1260             /*class InputNodeBodyType*/   InputBodyType,
1261             /*class TestNodeType*/        PrqType,
1262             /*class SinkNodeType*/        SnkType,
1263             /*class SinkNodeBodyType*/    SinkBodyType
1264             >(throwException, doFlog);
1265     }
1266 }
1267 
1268 void test_priority_queue_node() {
1269     INFO("Testing priority_queue_node\n");
1270     g_Wakeup_Msg = "priority_queue_node(is,non): Missed wakeup or machine is overloaded?";
1271     run_priority_queue_node_test<int, isThrowing,nonThrowing>();
1272     CheckType<int>::check_type_counter = 0;
1273     g_Wakeup_Msg = "priority_queue_node(non,is): Missed wakeup or machine is overloaded?";
1274     run_priority_queue_node_test<CheckType<int>, nonThrowing,isThrowing>();
1275     CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Dropped objects in priority_queue_node test");
1276     g_Wakeup_Msg = "priority_queue_node(is,is): Missed wakeup or machine is overloaded?";
1277     run_priority_queue_node_test<int, isThrowing,isThrowing>();
1278     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1279 }
1280 
1281 // ------------------- join_node ----------------
1282 template<class JP> struct graph_policy_name{
1283     static const char* name() {return "unknown"; }
1284 };
1285 template<> struct graph_policy_name<tbb::flow::queueing>  {
1286     static const char* name() {return "queueing"; }
1287 };
1288 template<> struct graph_policy_name<tbb::flow::reserving> {
1289     static const char* name() {return "reserving"; }
1290 };
1291 template<> struct graph_policy_name<tbb::flow::tag_matching> {
1292     static const char* name() {return "tag_matching"; }
1293 };
1294 
1295 
1296 template<
1297     class JP,
1298     class OutputTuple,
1299     class InputType0,
1300     class InputBodyType0,
1301     class InputType1,
1302     class InputBodyType1,
1303     class TestJoinType,
1304     class SinkType,
1305     class SinkBodyType
1306     >
1307 struct run_one_join_node_test {
1308     run_one_join_node_test() {}
1309     static void execute_test(bool throwException,bool flog) {
1310         typedef typename std::tuple_element<0,OutputTuple>::type ItemType0;
1311         typedef typename std::tuple_element<1,OutputTuple>::type ItemType1;
1312 
1313         tbb::flow::graph g;
1314         std::atomic<int>input0_count;
1315         std::atomic<int>input1_count;
1316         std::atomic<int>sink_count;
1317         input0_count = input1_count = sink_count = 0;
1318 #if USE_TASK_SCHEDULER_OBSERVER
1319         eh_test_observer o;
1320         o.observe(true);
1321 #endif
1322         g_Master = std::this_thread::get_id();
1323         InputType0 input0(g, InputBodyType0(input0_count));
1324         InputType1 input1(g, InputBodyType1(input1_count));
1325         TestJoinType node_to_test(g);
1326         SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count));
1327         make_edge(input0,tbb::flow::input_port<0>(node_to_test));
1328         make_edge(input1,tbb::flow::input_port<1>(node_to_test));
1329         make_edge(node_to_test, sink);
1330         for(int iter = 0; iter < 2; ++iter) {
1331             ResetGlobals(throwException,flog);
1332             if(throwException) {
1333                 TRY();
1334                     input0.activate();
1335                     input1.activate();
1336                     g.wait_for_all();
1337                 CATCH_AND_ASSERT();
1338             }
1339             else {
1340                 TRY();
1341                     input0.activate();
1342                     input1.activate();
1343                     g.wait_for_all();
1344                 CATCH_AND_FAIL();
1345             }
1346             bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1347             int ib0_cnt = tbb::flow::copy_body<InputBodyType0>(input0).count_value();
1348             int ib1_cnt = tbb::flow::copy_body<InputBodyType1>(input1).count_value();
1349             int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1350             if(throwException) {
1351                 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
1352                 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
1353                 CHECK_MESSAGE( (ib0_cnt <= g_NumItems && ib1_cnt <= g_NumItems), "Too many items sent by inputs");
1354                 CHECK_MESSAGE( (nb_cnt <= ((ib0_cnt < ib1_cnt) ? ib0_cnt : ib1_cnt)), "Too many items received by sink nodes");
1355             }
1356             else {
1357                 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
1358                 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
1359                 if(ib0_cnt != g_NumItems) {
1360                 //     INFO("throwException == %s\n" << (throwException ? "true" : "false"));
1361                 //     INFO("iter == " << iter << "\n");
1362                 //     INFO("ib0_cnt == " << ib0_cnt << "\n");
1363                 //     INFO("g_NumItems == " << g_NumItems << "\n");
1364                 }
1365                 CHECK_MESSAGE( (ib0_cnt == g_NumItems), "Missing invocations of input_node0");  // this one
1366                 CHECK_MESSAGE( (ib1_cnt == g_NumItems), "Missing invocations of input_node1");
1367                 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
1368             }
1369             if(iter == 0) {
1370                 remove_edge(node_to_test, sink);
1371                 tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 1));
1372                 tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2));
1373                 g.wait_for_all();
1374                 g.reset();
1375                 input0_count = input1_count = sink_count = 0;
1376                 make_edge(node_to_test, sink);
1377                 g.wait_for_all();
1378             }
1379             else {
1380                 g.wait_for_all();
1381                 g.reset();
1382                 input0_count = input1_count = sink_count = 0;
1383             }
1384             CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType0>(input0).count_value()),"Reset input failed");
1385             CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType1>(input1).count_value()),"Reset input failed");
1386             nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1387             CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value()),"Reset sink failed");
1388         }
1389 
1390 #if USE_TASK_SCHEDULER_OBSERVER
1391         o.observe(false);
1392 #endif
1393     }
1394 };  // run_one_join_node_test
1395 
1396 template<
1397     class OutputTuple,
1398     class InputType0,
1399     class InputBodyType0,
1400     class InputType1,
1401     class InputBodyType1,
1402     class TestJoinType,
1403     class SinkType,
1404     class SinkBodyType
1405     >
1406 struct run_one_join_node_test<
1407         tbb::flow::tag_matching,
1408         OutputTuple,
1409         InputType0,
1410         InputBodyType0,
1411         InputType1,
1412         InputBodyType1,
1413         TestJoinType,
1414         SinkType,
1415         SinkBodyType
1416     > {
1417     run_one_join_node_test() {}
1418     static void execute_test(bool throwException,bool flog) {
1419         typedef typename std::tuple_element<0,OutputTuple>::type ItemType0;
1420         typedef typename std::tuple_element<1,OutputTuple>::type ItemType1;
1421 
1422         tbb::flow::graph g;
1423 
1424         std::atomic<int>input0_count;
1425         std::atomic<int>input1_count;
1426         std::atomic<int>sink_count;
1427         input0_count = input1_count = sink_count = 0;
1428 #if USE_TASK_SCHEDULER_OBSERVER
1429         eh_test_observer o;
1430         o.observe(true);
1431 #endif
1432         g_Master = std::this_thread::get_id();
1433         InputType0 input0(g, InputBodyType0(input0_count, 2));
1434         InputType1 input1(g, InputBodyType1(input1_count, 3));
1435         TestJoinType node_to_test(g, tag_func<ItemType0>(ItemType0(2)), tag_func<ItemType1>(ItemType1(3)));
1436         SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count));
1437         make_edge(input0,tbb::flow::input_port<0>(node_to_test));
1438         make_edge(input1,tbb::flow::input_port<1>(node_to_test));
1439         make_edge(node_to_test, sink);
1440         for(int iter = 0; iter < 2; ++iter) {
1441             ResetGlobals(throwException,flog);
1442             if(throwException) {
1443                 TRY();
1444                     input0.activate();
1445                     input1.activate();
1446                     g.wait_for_all();
1447                 CATCH_AND_ASSERT();
1448             }
1449             else {
1450                 TRY();
1451                     input0.activate();
1452                     input1.activate();
1453                     g.wait_for_all();
1454                 CATCH_AND_FAIL();
1455             }
1456             bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1457             int ib0_cnt = tbb::flow::copy_body<InputBodyType0>(input0).count_value();
1458             int ib1_cnt = tbb::flow::copy_body<InputBodyType1>(input1).count_value();
1459             int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1460             if(throwException) {
1461                 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
1462                 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
1463                 CHECK_MESSAGE( (ib0_cnt <= g_NumItems && ib1_cnt <= g_NumItems), "Too many items sent by inputs");
1464                 CHECK_MESSAGE( (nb_cnt <= ((ib0_cnt < ib1_cnt) ? ib0_cnt : ib1_cnt)), "Too many items received by sink nodes");
1465             }
1466             else {
1467                 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
1468                 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
1469                 CHECK_MESSAGE( (ib0_cnt == g_NumItems), "Missing invocations of input_node0");
1470                 CHECK_MESSAGE( (ib1_cnt == g_NumItems), "Missing invocations of input_node1");
1471                 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
1472             }
1473             if(iter == 0) {
1474                 remove_edge(node_to_test, sink);
1475                 tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 4));
1476                 tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2));
1477                 g.wait_for_all();   // have to wait for the graph to stop again....
1478                 g.reset();  // resets the body of the input_nodes, test_node and the absorb_nodes.
1479                 input0_count = input1_count = sink_count = 0;
1480                 make_edge(node_to_test, sink);
1481                 g.wait_for_all();   // have to wait for the graph to stop again....
1482             }
1483             else {
1484                 g.wait_for_all();
1485                 g.reset();
1486                 input0_count = input1_count = sink_count = 0;
1487             }
1488             CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType0>(input0).count_value()),"Reset input failed");
1489             CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType1>(input1).count_value()),"Reset input failed");
1490             nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1491             CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value()),"Reset sink failed");
1492         }
1493 
1494 #if USE_TASK_SCHEDULER_OBSERVER
1495         o.observe(false);
1496 #endif
1497     }
1498 };  // run_one_join_node_test<tag_matching>
1499 
1500 template<class JP, class OutputTuple,
1501              TestNodeTypeEnum InputThrowType,
1502              TestNodeTypeEnum SinkThrowType>
1503 void run_join_node_test() {
1504     typedef typename std::tuple_element<0,OutputTuple>::type ItemType0;
1505     typedef typename std::tuple_element<1,OutputTuple>::type ItemType1;
1506     typedef test_input_body<ItemType0,InputThrowType> InputBodyType0;
1507     typedef test_input_body<ItemType1,InputThrowType> InputBodyType1;
1508     typedef absorber_body<OutputTuple,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1509 
1510     typedef typename tbb::flow::input_node<ItemType0> InputType0;
1511     typedef typename tbb::flow::input_node<ItemType1> InputType1;
1512     typedef typename tbb::flow::join_node<OutputTuple,JP> TestJoinType;
1513     typedef typename tbb::flow::function_node<OutputTuple,tbb::flow::continue_msg> SinkType;
1514 
1515     for(int i = 0; i < 4; ++i) {
1516         if(2 == i) continue;
1517         bool throwException = (i & 0x1) != 0;
1518         bool doFlog = (i & 0x2) != 0;
1519         run_one_join_node_test<
1520              JP,
1521              OutputTuple,
1522              InputType0,
1523              InputBodyType0,
1524              InputType1,
1525              InputBodyType1,
1526              TestJoinType,
1527              SinkType,
1528              SinkBodyType>::execute_test(throwException,doFlog);
1529     }
1530 }
1531 
1532 template<class JP>
1533 void test_join_node() {
1534     INFO("Testing join_node<" << graph_policy_name<JP>::name() << ">\n");
1535     // only doing two-input joins
1536     g_Wakeup_Msg = "join(is,non): Missed wakeup or machine is overloaded?";
1537     run_join_node_test<JP, std::tuple<int,int>,  isThrowing, nonThrowing>();
1538     CheckType<int>::check_type_counter = 0;
1539     g_Wakeup_Msg = "join(non,is): Missed wakeup or machine is overloaded?";
1540     run_join_node_test<JP, std::tuple<CheckType<int>,int>, nonThrowing, isThrowing>();
1541     CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Dropped items in test");
1542     g_Wakeup_Msg = "join(is,is): Missed wakeup or machine is overloaded?";
1543     run_join_node_test<JP, std::tuple<int,int>,  isThrowing, isThrowing>();
1544     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1545 }
1546 
1547 // ------------------- limiter_node -------------
1548 
1549 template<
1550     class BufferItemType,       //
1551     class InputNodeType,
1552     class InputNodeBodyType,
1553     class TestNodeType,
1554     class SinkNodeType,
1555     class SinkNodeBodyType >
1556 void run_one_limiter_node_test(bool throwException,bool flog) {
1557     tbb::flow::graph g;
1558 
1559     std::atomic<int> input_count;
1560     std::atomic<int> sink_count;
1561     input_count = sink_count = 0;
1562 #if USE_TASK_SCHEDULER_OBSERVER
1563     eh_test_observer o;
1564     o.observe(true);
1565 #endif
1566     g_Master = std::this_thread::get_id();
1567     InputNodeType input(g, InputNodeBodyType(input_count));
1568     TestNodeType node_to_test(g,g_NumThreads + 1);
1569     SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
1570     make_edge(input,node_to_test);
1571     make_edge(node_to_test, sink);
1572     for(int iter = 0; iter < 2; ++iter) {
1573         ResetGlobals(throwException,flog);
1574         if(throwException) {
1575             TRY();
1576                 input.activate();
1577                 g.wait_for_all();
1578             CATCH_AND_ASSERT();
1579         }
1580         else {
1581             TRY();
1582                 input.activate();
1583                 g.wait_for_all();
1584             CATCH_AND_FAIL();
1585         }
1586         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1587         int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value();
1588         int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
1589         if(throwException) {
1590             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
1591             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
1592             CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs");
1593             CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes");
1594         }
1595         else {
1596             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
1597             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
1598             // we stop after limiter's limit, which is g_NumThreads + 1.  The input_node
1599             // is invoked one extra time, filling its buffer, so its limit is g_NumThreads + 2.
1600             CHECK_MESSAGE( (ib_cnt == g_NumThreads + 2), "Missing invocations of input_node");
1601             CHECK_MESSAGE( (nb_cnt == g_NumThreads + 1), "Missing items in absorbers");
1602         }
1603         if(iter == 0) {
1604             remove_edge(node_to_test, sink);
1605             node_to_test.try_put(BufferItemType());
1606             node_to_test.try_put(BufferItemType());
1607             g.wait_for_all();
1608             g.reset();
1609             input_count = sink_count = 0;
1610             BufferItemType tmp;
1611             CHECK_MESSAGE( (!node_to_test.try_get(tmp)), "node not empty");
1612             make_edge(node_to_test, sink);
1613             g.wait_for_all();
1614         }
1615         else {
1616             g.reset();
1617             input_count = sink_count = 0;
1618         }
1619         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed");
1620         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed");
1621     }
1622 
1623 #if USE_TASK_SCHEDULER_OBSERVER
1624     o.observe(false);
1625 #endif
1626 }
1627 
1628 template<class BufferItemType,
1629          TestNodeTypeEnum InputThrowType,
1630          TestNodeTypeEnum SinkThrowType>
1631 void run_limiter_node_test() {
1632     typedef test_input_body<BufferItemType,InputThrowType> InputBodyType;
1633     typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1634 
1635     typedef tbb::flow::input_node<BufferItemType> InputType;
1636     typedef tbb::flow::limiter_node<BufferItemType>  LmtType;
1637     typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
1638 
1639     for(int i = 0; i < 4; ++i) {
1640         if(i == 2) continue;  // no need to test flog w/o throws
1641         bool throwException = (i & 0x1) != 0;
1642         bool doFlog = (i & 0x2) != 0;
1643         run_one_limiter_node_test<
1644             /* class BufferItemType*/     BufferItemType,
1645             /*class InputNodeType*/       InputType,
1646             /*class InputNodeBodyType*/   InputBodyType,
1647             /*class TestNodeType*/        LmtType,
1648             /*class SinkNodeType*/        SnkType,
1649             /*class SinkNodeBodyType*/    SinkBodyType
1650             >(throwException, doFlog);
1651     }
1652 }
1653 
1654 void test_limiter_node() {
1655     INFO("Testing limiter_node\n");
1656     g_Wakeup_Msg = "limiter_node(is,non): Missed wakeup or machine is overloaded?";
1657     run_limiter_node_test<int,isThrowing,nonThrowing>();
1658     g_Wakeup_Msg = "limiter_node(non,is): Missed wakeup or machine is overloaded?";
1659     run_limiter_node_test<int,nonThrowing,isThrowing>();
1660     g_Wakeup_Msg = "limiter_node(is,is): Missed wakeup or machine is overloaded?";
1661     run_limiter_node_test<int,isThrowing,isThrowing>();
1662     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1663 }
1664 
1665 // -------- split_node --------------------
1666 
1667 template<
1668     class InputTuple,
1669     class InputType,
1670     class InputBodyType,
1671     class TestSplitType,
1672     class SinkType0,
1673     class SinkBodyType0,
1674     class SinkType1,
1675     class SinkBodyType1>
1676 void run_one_split_node_test(bool throwException, bool flog) {
1677 
1678     tbb::flow::graph g;
1679 
1680     std::atomic<int> input_count;
1681     std::atomic<int> sink0_count;
1682     std::atomic<int> sink1_count;
1683     input_count = sink0_count = sink1_count = 0;
1684 #if USE_TASK_SCHEDULER_OBSERVER
1685     eh_test_observer o;
1686     o.observe(true);
1687 #endif
1688 
1689     g_Master = std::this_thread::get_id();
1690     InputType input(g, InputBodyType(input_count));
1691     TestSplitType node_to_test(g);
1692     SinkType0 sink0(g,tbb::flow::unlimited,SinkBodyType0(sink0_count));
1693     SinkType1 sink1(g,tbb::flow::unlimited,SinkBodyType1(sink1_count));
1694     make_edge(input, node_to_test);
1695     make_edge(tbb::flow::output_port<0>(node_to_test), sink0);
1696     make_edge(tbb::flow::output_port<1>(node_to_test), sink1);
1697 
1698     for(int iter = 0; iter < 2; ++iter) {  // run, reset, run again
1699         ResetGlobals(throwException,flog);
1700         if(throwException) {
1701             TRY();
1702                 input.activate();
1703                 g.wait_for_all();
1704             CATCH_AND_ASSERT();
1705         }
1706         else {
1707             TRY();
1708                 input.activate();
1709                 g.wait_for_all();
1710             CATCH_AND_FAIL();
1711         }
1712         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1713         int ib_cnt = tbb::flow::copy_body<InputBodyType>(input).count_value();
1714         int nb0_cnt = tbb::flow::copy_body<SinkBodyType0>(sink0).count_value();
1715         int nb1_cnt = tbb::flow::copy_body<SinkBodyType1>(sink1).count_value();
1716         if(throwException) {
1717             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
1718             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
1719             CHECK_MESSAGE( (ib_cnt <= 2*g_NumItems), "Too many items sent by input");
1720             CHECK_MESSAGE( (nb0_cnt + nb1_cnt <= ib_cnt*2), "Too many items received by sink nodes");
1721         }
1722         else {
1723             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
1724             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
1725             CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_nodes");
1726             CHECK_MESSAGE( (nb0_cnt == g_NumItems && nb1_cnt == g_NumItems), "Missing items in absorbers");
1727         }
1728         g.reset();  // resets the body of the input_nodes and the absorb_nodes.
1729         input_count = sink0_count = sink1_count = 0;
1730         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType>(input).count_value()),"Reset input failed");
1731         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType0>(sink0).count_value()),"Reset sink 0 failed");
1732         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType1>(sink1).count_value()),"Reset sink 1 failed");
1733     }
1734 #if USE_TASK_SCHEDULER_OBSERVER
1735     o.observe(false);
1736 #endif
1737 }
1738 
1739 template<class InputTuple,
1740              TestNodeTypeEnum InputThrowType,
1741              TestNodeTypeEnum SinkThrowType>
1742 void run_split_node_test() {
1743     typedef typename std::tuple_element<0,InputTuple>::type ItemType0;
1744     typedef typename std::tuple_element<1,InputTuple>::type ItemType1;
1745     typedef tuple_test_input_body<InputTuple,InputThrowType> InputBodyType;
1746     typedef absorber_body<ItemType0,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType0;
1747     typedef absorber_body<ItemType1,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType1;
1748 
1749     typedef typename tbb::flow::input_node<InputTuple> InputType;
1750     typedef typename tbb::flow::split_node<InputTuple> TestSplitType;
1751     typedef typename tbb::flow::function_node<ItemType0,tbb::flow::continue_msg> SinkType0;
1752     typedef typename tbb::flow::function_node<ItemType1,tbb::flow::continue_msg> SinkType1;
1753 
1754     for(int i = 0; i < 4; ++i) {
1755         if(2 == i) continue;
1756         bool throwException = (i & 0x1) != 0;
1757         bool doFlog = (i & 0x2) != 0;
1758         run_one_split_node_test<
1759             InputTuple,
1760             InputType,
1761             InputBodyType,
1762             TestSplitType,
1763             SinkType0,
1764             SinkBodyType0,
1765             SinkType1,
1766             SinkBodyType1>
1767                 (throwException,doFlog);
1768     }
1769 }
1770 
1771 void test_split_node() {
1772     INFO("Testing split_node\n");
1773     g_Wakeup_Msg = "split_node(is,non): Missed wakeup or machine is overloaded?";
1774     run_split_node_test<std::tuple<int,int>, isThrowing, nonThrowing>();
1775     g_Wakeup_Msg = "split_node(non,is): Missed wakeup or machine is overloaded?";
1776     run_split_node_test<std::tuple<int,int>, nonThrowing, isThrowing>();
1777     g_Wakeup_Msg = "split_node(is,is): Missed wakeup or machine is overloaded?";
1778     run_split_node_test<std::tuple<int,int>, isThrowing,  isThrowing>();
1779     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1780 }
1781 
1782 // --------- indexer_node ----------------------
1783 
1784 template < class InputTuple,
1785     class InputType0,
1786     class InputBodyType0,
1787     class InputType1,
1788     class InputBodyType1,
1789     class TestNodeType,
1790     class SinkType,
1791     class SinkBodyType>
1792 void run_one_indexer_node_test(bool throwException,bool flog) {
1793     typedef typename std::tuple_element<0,InputTuple>::type ItemType0;
1794     typedef typename std::tuple_element<1,InputTuple>::type ItemType1;
1795 
1796     tbb::flow::graph g;
1797 
1798     std::atomic<int> input0_count;
1799     std::atomic<int> input1_count;
1800     std::atomic<int> sink_count;
1801     input0_count = input1_count = sink_count = 0;
1802 #if USE_TASK_SCHEDULER_OBSERVER
1803     eh_test_observer o;
1804     o.observe(true);
1805 #endif
1806     g_Master = std::this_thread::get_id();
1807     InputType0 input0(g, InputBodyType0(input0_count));
1808     InputType1 input1(g, InputBodyType1(input1_count));
1809     TestNodeType node_to_test(g);
1810     SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count));
1811     make_edge(input0,tbb::flow::input_port<0>(node_to_test));
1812     make_edge(input1,tbb::flow::input_port<1>(node_to_test));
1813     make_edge(node_to_test, sink);
1814     for(int iter = 0; iter < 2; ++iter) {
1815         ResetGlobals(throwException,flog);
1816         if(throwException) {
1817             TRY();
1818                 input0.activate();
1819                 input1.activate();
1820                 g.wait_for_all();
1821             CATCH_AND_ASSERT();
1822         }
1823         else {
1824             TRY();
1825                 input0.activate();
1826                 input1.activate();
1827                 g.wait_for_all();
1828             CATCH_AND_FAIL();
1829         }
1830         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1831         int ib0_cnt = tbb::flow::copy_body<InputBodyType0>(input0).count_value();
1832         int ib1_cnt = tbb::flow::copy_body<InputBodyType1>(input1).count_value();
1833         int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1834         if(throwException) {
1835             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
1836             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
1837             CHECK_MESSAGE( (ib0_cnt <= g_NumItems && ib1_cnt <= g_NumItems), "Too many items sent by inputs");
1838             CHECK_MESSAGE( (nb_cnt <= ib0_cnt + ib1_cnt), "Too many items received by sink nodes");
1839         }
1840         else {
1841             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
1842             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
1843             CHECK_MESSAGE( (ib0_cnt == g_NumItems), "Missing invocations of input_node0");
1844             CHECK_MESSAGE( (ib1_cnt == g_NumItems), "Missing invocations of input_node1");
1845             CHECK_MESSAGE( (nb_cnt == 2*g_NumItems), "Missing items in absorbers");
1846         }
1847         if(iter == 0) {
1848             remove_edge(node_to_test, sink);
1849             tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 4));
1850             tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2));
1851             g.wait_for_all();
1852             g.reset();
1853             input0_count = input1_count = sink_count = 0;
1854             make_edge(node_to_test, sink);
1855             g.wait_for_all();
1856         }
1857         else {
1858             g.wait_for_all();
1859             g.reset();
1860             input0_count = input1_count = sink_count = 0;
1861         }
1862         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType0>(input0).count_value()),"Reset input failed");
1863         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType1>(input1).count_value()),"Reset input failed");
1864         nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1865         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value()),"Reset sink failed");
1866     }
1867 
1868 #if USE_TASK_SCHEDULER_OBSERVER
1869     o.observe(false);
1870 #endif
1871 }
1872 
1873 template<class InputTuple,
1874     TestNodeTypeEnum InputThrowType,
1875     TestNodeTypeEnum SinkThrowType>
1876 void run_indexer_node_test() {
1877     typedef typename std::tuple_element<0,InputTuple>::type ItemType0;
1878     typedef typename std::tuple_element<1,InputTuple>::type ItemType1;
1879     typedef test_input_body<ItemType0,InputThrowType> InputBodyType0;
1880     typedef test_input_body<ItemType1,InputThrowType> InputBodyType1;
1881     typedef typename tbb::flow::indexer_node<ItemType0, ItemType1> TestNodeType;
1882     typedef absorber_body<typename TestNodeType::output_type,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1883 
1884     typedef typename tbb::flow::input_node<ItemType0> InputType0;
1885     typedef typename tbb::flow::input_node<ItemType1> InputType1;
1886     typedef typename tbb::flow::function_node<typename TestNodeType::output_type,tbb::flow::continue_msg> SinkType;
1887 
1888     for(int i = 0; i < 4; ++i) {
1889         if(2 == i) continue;
1890         bool throwException = (i & 0x1) != 0;
1891         bool doFlog = (i & 0x2) != 0;
1892         run_one_indexer_node_test<
1893              InputTuple,
1894              InputType0,
1895              InputBodyType0,
1896              InputType1,
1897              InputBodyType1,
1898              TestNodeType,
1899              SinkType,
1900              SinkBodyType>(throwException,doFlog);
1901     }
1902 }
1903 
1904 void test_indexer_node() {
1905     INFO("Testing indexer_node\n");
1906     g_Wakeup_Msg = "indexer_node(is,non): Missed wakeup or machine is overloaded?";
1907     run_indexer_node_test<std::tuple<int,int>, isThrowing, nonThrowing>();
1908     g_Wakeup_Msg = "indexer_node(non,is): Missed wakeup or machine is overloaded?";
1909     run_indexer_node_test<std::tuple<int,int>, nonThrowing, isThrowing>();
1910     g_Wakeup_Msg = "indexer_node(is,is): Missed wakeup or machine is overloaded?";
1911     run_indexer_node_test<std::tuple<int,int>, isThrowing,  isThrowing>();
1912     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1913 }
1914 
1915 ///////////////////////////////////////////////
1916 // whole-graph exception test
1917 
1918 class Foo {
1919 private:
1920     // std::vector<int>& m_vec;
1921     std::vector<int>* m_vec;
1922 public:
1923     Foo(std::vector<int>& vec) : m_vec(&vec) { }
1924     void operator() (tbb::flow::continue_msg) const {
1925         ++nExceptions;
1926         (void)m_vec->at(m_vec->size()); // Will throw out_of_range exception
1927         CHECK_MESSAGE( (false), "Exception not thrown by invalid access");
1928     }
1929 };
1930 
1931 // test from user ahelwer: http://software.intel.com/en-us/forums/showthread.php?t=103786
1932 // exception thrown in graph node, not caught in wait_for_all()
1933 void
1934 test_flow_graph_exception0() {
1935     // Initializes body
1936     std::vector<int> vec;
1937     vec.push_back(0);
1938     Foo f(vec);
1939     nExceptions = 0;
1940 
1941     // Construct graph and nodes
1942     tbb::flow::graph g;
1943     tbb::flow::broadcast_node<tbb::flow::continue_msg> start(g);
1944     tbb::flow::continue_node<tbb::flow::continue_msg> fooNode(g, f);
1945 
1946     // Construct edge
1947     tbb::flow::make_edge(start, fooNode);
1948 
1949     // Execute graph
1950     CHECK_MESSAGE( (!g.exception_thrown()), "exception_thrown flag already set");
1951     CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag already set");
1952     try {
1953         start.try_put(tbb::flow::continue_msg());
1954         g.wait_for_all();
1955         CHECK_MESSAGE( (false), "Exception not thrown");
1956     }
1957     catch(std::out_of_range& ex) {
1958         INFO("Exception: " << ex.what() << "(expected)\n");
1959     }
1960     catch(...) {
1961         INFO("Unknown exception caught (expected)\n");
1962     }
1963     CHECK_MESSAGE( (nExceptions > 0), "Exception caught, but no body signaled exception being thrown");
1964     nExceptions = 0;
1965     CHECK_MESSAGE( (g.exception_thrown()), "Exception not intercepted");
1966     // if exception set, cancellation also set.
1967     CHECK_MESSAGE( (g.is_cancelled()), "Exception cancellation not signaled");
1968     // in case we got an exception
1969     try {
1970         g.wait_for_all();  // context still signalled canceled, my_exception still set.
1971     }
1972     catch(...) {
1973         CHECK_MESSAGE( (false), "Second exception thrown but no task executing");
1974     }
1975     CHECK_MESSAGE( (nExceptions == 0), "body signaled exception being thrown, but no body executed");
1976     CHECK_MESSAGE( (!g.exception_thrown()), "exception_thrown flag not reset");
1977     CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag not reset");
1978 }
1979 
1980 void TestOneThreadNum(int nThread) {
1981     INFO("Testing " << nThread << "%d threads\n");
1982     g_NumItems = ((nThread > NUM_ITEMS) ? nThread *2 : NUM_ITEMS);
1983     g_NumThreads = nThread;
1984     tbb::task_arena arena(nThread);
1985 	arena.execute(
1986         [&]() {
1987             // whole-graph exception catch and rethrow test
1988             test_flow_graph_exception0();
1989             for(int i = 0; i < 4; ++i) {
1990                 g_ExceptionInMaster = (i & 1) != 0;
1991                 g_SolitaryException = (i & 2) != 0;
1992                 INFO("g_ExceptionInMaster == " << (g_ExceptionInMaster ? "T":"F")
1993                      << ", g_SolitaryException == " << (g_SolitaryException ? "T":"F") << "\n");
1994                 test_input_node();
1995                 test_function_node();
1996                 test_continue_node();  // also test broadcast_node
1997                 test_multifunction_node();
1998                 // single- and multi-item buffering nodes
1999                 test_buffer_queue_and_overwrite_node();
2000                 test_sequencer_node();
2001                 test_priority_queue_node();
2002 
2003                 // join_nodes
2004                 test_join_node<tbb::flow::queueing>();
2005                 test_join_node<tbb::flow::reserving>();
2006                 test_join_node<tbb::flow::tag_matching>();
2007 
2008                 test_limiter_node();
2009                 test_split_node();
2010                 // graph for write_once_node will be complicated by the fact the node will
2011                 // not do try_puts after it has been set.  To get parallelism of N we have
2012                 // to attach N successor nodes to the write_once (or play some similar game).
2013                 // test_write_once_node();
2014                 test_indexer_node();
2015             }
2016         }
2017     );
2018 }
2019 
2020 //! Test exceptions with parallelism
2021 //! \brief \ref error_guessing
2022 TEST_CASE("Testing several threads"){
2023     // reverse order of tests
2024     for(unsigned int nThread=utils::MaxThread; nThread >= utils::MinThread; --nThread) {
2025         tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, nThread);
2026         TestOneThreadNum(nThread);
2027     }
2028 }
2029 
2030 #endif // TBB_USE_EXCEPTIONS
2031