xref: /oneTBB/test/tbb/test_eh_flow_graph.cpp (revision adf44fbf)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 //! \file test_eh_flow_graph.cpp
18 //! \brief Test for [flow_graph.copy_body flow_graph.broadcast_node flow_graph.priority_queue_node flow_graph.indexer_node flow_graph.sequencer_node flow_graph.remove_edge flow_graph.join_node flow_graph.split_node flow_graph.limiter_node flow_graph.write_once_node flow_graph.overwrite_node flow_graph.make_edge flow_graph.graph flow_graph.buffer_node flow_graph.function_node flow_graph.multifunction_node flow_graph.continue_node flow_graph.input_node] specification
19 
20 #include "common/config.h"
21 
22 #if USE_TASK_SCHEDULER_OBSERVER
23 #include "tbb/task_scheduler_observer.h"
24 #endif
25 #include "tbb/flow_graph.h"
26 #include "tbb/global_control.h"
27 
28 #include "common/test.h"
29 
30 #if TBB_USE_EXCEPTIONS
31 
32 #include "common/utils.h"
33 #include "common/checktype.h"
34 #include "common/concurrency_tracker.h"
35 
36 #if _MSC_VER
37     #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning
38 #endif
39 
40 #if __TBB_MSVC_UNREACHABLE_CODE_IGNORED
41     // Suppress "unreachable code" warning by VC++ 17.0-18.0 (VS 2012 or newer)
42     #pragma warning (disable: 4702)
43 #endif
44 
45 // global task_scheduler_observer is an imperfect tool to find how many threads are really
46 // participating.  That was the hope, but it counts the entries into the marketplace,
47 // not the arena.
48 // TODO: Consider using local task scheduler observer
49 // #define USE_TASK_SCHEDULER_OBSERVER 1
50 
51 #include <iostream>
52 #include <sstream>
53 #include <vector>
54 
55 #include "common/exception_handling.h"
56 
57 #include <stdexcept>
58 
59 #define NUM_ITEMS 15
60 int g_NumItems;
61 
62 std::atomic<unsigned> nExceptions;
63 std::atomic<intptr_t> g_TGCCancelled;
64 
65 enum TestNodeTypeEnum { nonThrowing, isThrowing };
66 
67 static const size_t unlimited_type = 0;
68 static const size_t serial_type = 1;
69 static const size_t limited_type = 4;
70 
71 template<TestNodeTypeEnum T> struct TestNodeTypeName;
72 template<> struct TestNodeTypeName<nonThrowing> { static const char *name() { return "nonThrowing"; } };
73 template<> struct TestNodeTypeName<isThrowing> { static const char *name() { return "isThrowing"; } };
74 
75 template<size_t Conc> struct concurrencyName;
76 template<> struct concurrencyName<serial_type>{ static const char *name() { return "serial"; } };
77 template<> struct concurrencyName<unlimited_type>{ static const char *name() { return "unlimited"; } };
78 template<> struct concurrencyName<limited_type>{ static const char *name() { return "limited"; } };
79 
80 // Class that provides waiting and throwing behavior.  If we are not throwing, do nothing
81 // If serial, we can't wait for concurrency to peak; we may be the bottleneck and will
82 // stop further processing.  We will execute g_NumThreads + 10 times (the "10" is somewhat
83 // arbitrary, and just makes sure there are enough items in the graph to keep it flowing),
84 // If parallel or serial and throwing, use utils::ConcurrencyTracker to wait.
85 
86 template<size_t Conc, TestNodeTypeEnum t = nonThrowing>
87 class WaitThrow;
88 
89 template<>
90 class WaitThrow<serial_type,nonThrowing> {
91 protected:
92     void WaitAndThrow(int cnt, const char * /*name*/) {
93         if(cnt > g_NumThreads + 10) {
94             utils::ConcurrencyTracker ct;
95             WaitUntilConcurrencyPeaks();
96         }
97     }
98 };
99 
100 template<>
101 class WaitThrow<serial_type,isThrowing> {
102 protected:
103     void WaitAndThrow(int cnt, const char * /*name*/) {
104         if(cnt > g_NumThreads + 10) {
105             utils::ConcurrencyTracker ct;
106             WaitUntilConcurrencyPeaks();
107             ThrowTestException(1);
108         }
109     }
110 };
111 
112 // for nodes with limited concurrency, if that concurrency is < g_NumThreads, we need
113 // to make sure enough other nodes wait for concurrency to peak.  If we are attached to
114 // N successors, for each item we pass to a successor, we will get N executions of the
115 // "absorbers" (because we broadcast to successors.)  for an odd number of threads we
116 // need (g_NumThreads - limited + 1) / 2 items (that will give us one extra execution
117 // of an "absorber", but we can't change that without changing the behavior of the node.)
118 template<>
119 class WaitThrow<limited_type,nonThrowing> {
120 protected:
121     void WaitAndThrow(int cnt, const char * /*name*/) {
122         if(cnt <= (g_NumThreads - (int)limited_type + 1)/2) {
123             return;
124         }
125         utils::ConcurrencyTracker ct;
126         WaitUntilConcurrencyPeaks();
127     }
128 };
129 
130 template<>
131 class WaitThrow<limited_type,isThrowing> {
132 protected:
133     void WaitAndThrow(int cnt, const char * /*name*/) {
134         utils::ConcurrencyTracker ct;
135         if(cnt <= (g_NumThreads - (int)limited_type + 1)/2) {
136             return;
137         }
138         WaitUntilConcurrencyPeaks();
139         ThrowTestException(1);
140     }
141 };
142 
143 template<>
144 class WaitThrow<unlimited_type,nonThrowing> {
145 protected:
146     void WaitAndThrow(int /*cnt*/, const char * /*name*/) {
147         utils::ConcurrencyTracker ct;
148         WaitUntilConcurrencyPeaks();
149     }
150 };
151 
152 template<>
153 class WaitThrow<unlimited_type,isThrowing> {
154 protected:
155     void WaitAndThrow(int /*cnt*/, const char * /*name*/) {
156         utils::ConcurrencyTracker ct;
157         WaitUntilConcurrencyPeaks();
158         ThrowTestException(1);
159     }
160 };
161 
162 void
163 ResetGlobals(bool throwException = true, bool flog = false) {
164     nExceptions = 0;
165     g_TGCCancelled = 0;
166     ResetEhGlobals(throwException, flog);
167 }
168 
169 // -------input_node body ------------------
170 template <class OutputType, TestNodeTypeEnum TType>
171 class test_input_body : WaitThrow<serial_type, TType> {
172     using WaitThrow<serial_type, TType>::WaitAndThrow;
173     std::atomic<int> *my_current_val;
174     int my_mult;
175 public:
176     test_input_body(std::atomic<int> &my_cnt, int multiplier = 1) : my_current_val(&my_cnt), my_mult(multiplier) {
177         // INFO("- --------- - - -   constructed " << (size_t)(my_current_val) << "\n");
178     }
179 
180     OutputType operator()(tbb::flow_control& fc) {
181         UPDATE_COUNTS();
182         OutputType ret = OutputType(my_mult * ++(*my_current_val));
183         // TODO revamp: reconsider logging for the tests.
184 
185         // The following line is known to cause double frees. Therefore, commenting out frequent
186         // calls to INFO() macro.
187 
188         // INFO("xx(" << (size_t)(my_current_val) << ") ret == " << (int)ret << "\n");
189         if(*my_current_val > g_NumItems) {
190             // INFO(" ------ End of the line!\n");
191             *my_current_val = g_NumItems;
192             fc.stop();
193             return OutputType();
194         }
195         WaitAndThrow((int)ret,"test_input_body");
196         return ret;
197     }
198 
199     int count_value() { return (int)*my_current_val; }
200 };
201 
202 template <TestNodeTypeEnum TType>
203 class test_input_body<tbb::flow::continue_msg, TType> : WaitThrow<serial_type, TType> {
204     using WaitThrow<serial_type, TType>::WaitAndThrow;
205     std::atomic<int> *my_current_val;
206 public:
207     test_input_body(std::atomic<int> &my_cnt) : my_current_val(&my_cnt) { }
208 
209     tbb::flow::continue_msg operator()( tbb::flow_control & fc) {
210         UPDATE_COUNTS();
211         int outint = ++(*my_current_val);
212         if(*my_current_val > g_NumItems) {
213             *my_current_val = g_NumItems;
214             fc.stop();
215             return tbb::flow::continue_msg();
216         }
217         WaitAndThrow(outint,"test_input_body");
218         return tbb::flow::continue_msg();
219     }
220 
221     int count_value() { return (int)*my_current_val; }
222 };
223 
224 // -------{function/continue}_node body ------------------
225 template<class InputType, class OutputType, TestNodeTypeEnum T, size_t Conc>
226 class absorber_body : WaitThrow<Conc,T> {
227     using WaitThrow<Conc,T>::WaitAndThrow;
228     std::atomic<int> *my_count;
229 public:
230     absorber_body(std::atomic<int> &my_cnt) : my_count(&my_cnt) { }
231     OutputType operator()(const InputType &/*p_in*/) {
232         UPDATE_COUNTS();
233         int out = ++(*my_count);
234         WaitAndThrow(out,"absorber_body");
235         return OutputType();
236     }
237     int count_value() { return *my_count; }
238 };
239 
240 // -------multifunction_node body ------------------
241 
242 // helper classes
243 template<int N,class PortsType>
244 struct IssueOutput {
245     typedef typename std::tuple_element<N-1,PortsType>::type::output_type my_type;
246 
247     static void issue_tuple_element( PortsType &my_ports) {
248         CHECK_MESSAGE( (std::get<N-1>(my_ports).try_put(my_type())), "Error putting to successor");
249         IssueOutput<N-1,PortsType>::issue_tuple_element(my_ports);
250     }
251 };
252 
253 template<class PortsType>
254 struct IssueOutput<1,PortsType> {
255     typedef typename std::tuple_element<0,PortsType>::type::output_type my_type;
256 
257     static void issue_tuple_element( PortsType &my_ports) {
258         CHECK_MESSAGE( (std::get<0>(my_ports).try_put(my_type())), "Error putting to successor");
259     }
260 };
261 
262 template<class InputType, class OutputTupleType, TestNodeTypeEnum T, size_t Conc>
263 class multifunction_node_body : WaitThrow<Conc,T> {
264     using WaitThrow<Conc,T>::WaitAndThrow;
265     static const int N = std::tuple_size<OutputTupleType>::value;
266     typedef typename tbb::flow::multifunction_node<InputType,OutputTupleType> NodeType;
267     typedef typename NodeType::output_ports_type PortsType;
268     std::atomic<int> *my_count;
269 public:
270     multifunction_node_body(std::atomic<int> &my_cnt) : my_count(&my_cnt) { }
271     void operator()(const InputType& /*in*/, PortsType &my_ports) {
272         UPDATE_COUNTS();
273         int out = ++(*my_count);
274         WaitAndThrow(out,"multifunction_node_body");
275         // issue an item to each output port.
276         IssueOutput<N,PortsType>::issue_tuple_element(my_ports);
277     }
278 
279     int count_value() { return *my_count; }
280 };
281 
282 // --------- body to sort items in sequencer_node
283 template<class BufferItemType>
284 struct sequencer_body {
285     size_t operator()(const BufferItemType &s) {
286         CHECK_MESSAGE( (s), "sequencer item out of range (== 0)");
287         return size_t(s) - 1;
288     }
289 };
290 
291 // --------- body to compare the "priorities" of objects for priority_queue_node  five priority levels 0-4.
292 template<class T>
293 struct myLess {
294     bool operator()(const T &t1, const T &t2) {
295         return (int(t1) % 5) < (int(t2) % 5);
296     }
297 };
298 
299 // --------- type for < comparison in priority_queue_node.
300 template<class ItemType>
301 struct less_body {
302     bool operator()(const ItemType &lhs, const ItemType &rhs) {
303         return (int(lhs) % 3) < (int(rhs) % 3);
304     }
305 };
306 
307 // --------- tag methods for tag_matching join_node
308 template<typename TT>
309 class tag_func {
310     TT my_mult;
311 public:
312     tag_func(TT multiplier) : my_mult(multiplier) { }
313     // operator() will return [0 .. Count)
314     tbb::flow::tag_value operator()( TT v) {
315         tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult);
316         return t;
317     }
318 };
319 
320 // --------- Input body for split_node test.
321 template <class OutputTuple, TestNodeTypeEnum TType>
322 class tuple_test_input_body : WaitThrow<serial_type, TType> {
323     typedef typename std::tuple_element<0,OutputTuple>::type ItemType0;
324     typedef typename std::tuple_element<1,OutputTuple>::type ItemType1;
325     using WaitThrow<serial_type, TType>::WaitAndThrow;
326     std::atomic<int> *my_current_val;
327 public:
328     tuple_test_input_body(std::atomic<int> &my_cnt) : my_current_val(&my_cnt) { }
329 
330     OutputTuple operator()(tbb::flow_control& fc) {
331         UPDATE_COUNTS();
332         int ival = ++(*my_current_val);
333         if(*my_current_val > g_NumItems) {
334             *my_current_val = g_NumItems;  // jam the final value; we assert on it later.
335             fc.stop();
336             return OutputTuple();
337         }
338         WaitAndThrow(ival,"tuple_test_input_body");
339         return OutputTuple(ItemType0(ival),ItemType1(ival));
340     }
341 
342     int count_value() { return (int)*my_current_val; }
343 };
344 
345 // ------- end of node bodies
346 
347 // input_node is only-serial.  input_node can throw, or the function_node can throw.
348 // graph being tested is
349 //
350 //      input_node+---+parallel function_node
351 //
352 //    After each run the graph is reset(), to test the reset functionality.
353 //
354 
355 
356 template<class ItemType, TestNodeTypeEnum inpThrowType, TestNodeTypeEnum absorbThrowType>
357 void run_one_input_node_test(bool throwException, bool flog) {
358     typedef test_input_body<ItemType,inpThrowType> src_body_type;
359     typedef absorber_body<ItemType, tbb::flow::continue_msg, absorbThrowType, unlimited_type> parallel_absorb_body_type;
360     std::atomic<int> input_body_count;
361     std::atomic<int> absorber_body_count;
362     input_body_count = 0;
363     absorber_body_count = 0;
364 
365     tbb::flow::graph g;
366 
367     g_Master = std::this_thread::get_id();
368 
369 #if USE_TASK_SCHEDULER_OBSERVER
370     eh_test_observer o;
371     o.observe(true);
372 #endif
373 
374     tbb::flow::input_node<ItemType> sn(g, src_body_type(input_body_count));
375     parallel_absorb_body_type ab2(absorber_body_count);
376     tbb::flow::function_node<ItemType> parallel_fn(g,tbb::flow::unlimited,ab2);
377     make_edge(sn, parallel_fn);
378     for(int runcnt = 0; runcnt < 2; ++runcnt) {
379         ResetGlobals(throwException,flog);
380         if(throwException) {
381             TRY();
382                 sn.activate();
383                 g.wait_for_all();
384             CATCH_AND_ASSERT();
385         }
386         else {
387             TRY();
388                 sn.activate();
389                 g.wait_for_all();
390             CATCH_AND_FAIL();
391         }
392 
393         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
394         int src_cnt = tbb::flow::copy_body<src_body_type>(sn).count_value();
395         int sink_cnt = tbb::flow::copy_body<parallel_absorb_body_type>(parallel_fn).count_value();
396         if(throwException) {
397             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception flag in flow::graph not set");
398             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "canceled flag not set");
399             CHECK_MESSAGE( (src_cnt <= g_NumItems), "Too many input_node items emitted");
400             CHECK_MESSAGE( (sink_cnt <= src_cnt), "Too many input_node items received");
401         }
402         else {
403             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
404             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
405             CHECK_MESSAGE( (src_cnt == g_NumItems), "Incorrect # input_node items emitted");
406             CHECK_MESSAGE( (sink_cnt == src_cnt), "Incorrect # input_node items received");
407         }
408         g.reset();  // resets the body of the input_node and the absorb_nodes.
409         input_body_count = 0;
410         absorber_body_count = 0;
411         CHECK_MESSAGE( (!g.exception_thrown()), "Reset didn't clear exception_thrown()");
412         CHECK_MESSAGE( (!g.is_cancelled()), "Reset didn't clear is_cancelled()");
413         src_cnt = tbb::flow::copy_body<src_body_type>(sn).count_value();
414         sink_cnt = tbb::flow::copy_body<parallel_absorb_body_type>(parallel_fn).count_value();
415         CHECK_MESSAGE( (src_cnt == 0), "input_node count not reset");
416         CHECK_MESSAGE( (sink_cnt == 0), "sink_node count not reset");
417     }
418 #if USE_TASK_SCHEDULER_OBSERVER
419     o.observe(false);
420 #endif
421 }  // run_one_input_node_test
422 
423 
424 template<class ItemType, TestNodeTypeEnum inpThrowType, TestNodeTypeEnum absorbThrowType>
425 void run_input_node_test() {
426     run_one_input_node_test<ItemType,inpThrowType,absorbThrowType>(false,false);
427     run_one_input_node_test<ItemType,inpThrowType,absorbThrowType>(true,false);
428     run_one_input_node_test<ItemType,inpThrowType,absorbThrowType>(true,true);
429 }  // run_input_node_test
430 
431 void test_input_node() {
432     INFO("Testing input_node\n");
433     CheckType<int>::check_type_counter = 0;
434     g_Wakeup_Msg = "input_node(1): Missed wakeup or machine is overloaded?";
435     run_input_node_test<CheckType<int>, isThrowing, nonThrowing>();
436     CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test");
437     g_Wakeup_Msg = "input_node(2): Missed wakeup or machine is overloaded?";
438     run_input_node_test<int, isThrowing, nonThrowing>();
439     g_Wakeup_Msg = "input_node(3): Missed wakeup or machine is overloaded?";
440     run_input_node_test<int, nonThrowing, isThrowing>();
441     g_Wakeup_Msg = "input_node(4): Missed wakeup or machine is overloaded?";
442     run_input_node_test<int, isThrowing, isThrowing>();
443     g_Wakeup_Msg = "input_node(5): Missed wakeup or machine is overloaded?";
444     run_input_node_test<CheckType<int>, isThrowing, isThrowing>();
445     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
446     CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test");
447 }
448 
449 // -------- utilities & types to test function_node and multifunction_node.
450 
451 // need to tell the template which node type I am using so it attaches successors correctly.
452 enum NodeFetchType { func_node_type, multifunc_node_type };
453 
454 template<class NodeType, class ItemType, int indx, NodeFetchType NFT>
455 struct AttachPoint;
456 
457 template<class NodeType, class ItemType, int indx>
458 struct AttachPoint<NodeType,ItemType,indx,multifunc_node_type> {
459     static tbb::flow::sender<ItemType> &GetSender(NodeType &n) {
460         return tbb::flow::output_port<indx>(n);
461     }
462 };
463 
464 template<class NodeType, class ItemType, int indx>
465 struct AttachPoint<NodeType,ItemType,indx,func_node_type> {
466     static tbb::flow::sender<ItemType> &GetSender(NodeType &n) {
467         return n;
468     }
469 };
470 
471 
472 // common template for running function_node, multifunction_node.  continue_node
473 // has different firing requirements, so it needs a different graph topology.
474 template<
475     class InputNodeType,
476     class InputNodeBodyType0,
477     class InputNodeBodyType1,
478     NodeFetchType NFT,
479     class TestNodeType,
480     class TestNodeBodyType,
481     class TypeToSink0,          // what kind of item are we sending to sink0
482     class TypeToSink1,          // what kind of item are we sending to sink1
483     class SinkNodeType0,        // will be same for function;
484     class SinkNodeType1,        // may differ for multifunction_node
485     class SinkNodeBodyType0,
486     class SinkNodeBodyType1,
487     size_t Conc
488     >
489 void
490 run_one_functype_node_test(bool throwException, bool flog, const char * /*name*/) {
491 
492     std::stringstream ss;
493     char *saved_msg = const_cast<char *>(g_Wakeup_Msg);
494     tbb::flow::graph g;
495 
496     std::atomic<int> input0_count;
497     std::atomic<int> input1_count;
498     std::atomic<int> sink0_count;
499     std::atomic<int> sink1_count;
500     std::atomic<int> test_count;
501     input0_count = input1_count = sink0_count = sink1_count = test_count = 0;
502 
503 #if USE_TASK_SCHEDULER_OBSERVER
504     eh_test_observer o;
505     o.observe(true);
506 #endif
507 
508     g_Master = std::this_thread::get_id();
509     InputNodeType input0(g, InputNodeBodyType0(input0_count));
510     InputNodeType input1(g, InputNodeBodyType1(input1_count));
511     TestNodeType node_to_test(g, Conc, TestNodeBodyType(test_count));
512     SinkNodeType0 sink0(g,tbb::flow::unlimited,SinkNodeBodyType0(sink0_count));
513     SinkNodeType1 sink1(g,tbb::flow::unlimited,SinkNodeBodyType1(sink1_count));
514     make_edge(input0, node_to_test);
515     make_edge(input1, node_to_test);
516     make_edge(AttachPoint<TestNodeType, TypeToSink0, 0, NFT>::GetSender(node_to_test), sink0);
517     make_edge(AttachPoint<TestNodeType, TypeToSink1, 1, NFT>::GetSender(node_to_test), sink1);
518 
519     for(int iter = 0; iter < 2; ++iter) {  // run, reset, run again
520         ss.clear();
521         ss << saved_msg << " iter=" << iter << ", threads=" << g_NumThreads << ", throw=" << (throwException ? "T" : "F") << ", flow=" << (flog ? "T" : "F");
522         g_Wakeup_Msg = ss.str().c_str();
523         ResetGlobals(throwException,flog);
524         if(throwException) {
525             TRY();
526                 input0.activate();
527                 input1.activate();
528                 g.wait_for_all();
529             CATCH_AND_ASSERT();
530         }
531         else {
532             TRY();
533                 input0.activate();
534                 input1.activate();
535                 g.wait_for_all();
536             CATCH_AND_FAIL();
537         }
538         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
539         int ib0_cnt = tbb::flow::copy_body<InputNodeBodyType0>(input0).count_value();
540         int ib1_cnt = tbb::flow::copy_body<InputNodeBodyType1>(input1).count_value();
541         int t_cnt   = tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value();
542         int nb0_cnt = tbb::flow::copy_body<SinkNodeBodyType0>(sink0).count_value();
543         int nb1_cnt = tbb::flow::copy_body<SinkNodeBodyType1>(sink1).count_value();
544         if(throwException) {
545             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
546             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
547             CHECK_MESSAGE( (ib0_cnt + ib1_cnt <= 2*g_NumItems), "Too many items sent by inputs");
548             CHECK_MESSAGE( (ib0_cnt + ib1_cnt >= t_cnt), "Too many items received by test node");
549             CHECK_MESSAGE( (nb0_cnt + nb1_cnt <= t_cnt*2), "Too many items received by sink nodes");
550         }
551         else {
552             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
553             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
554             CHECK_MESSAGE( (ib0_cnt + ib1_cnt == 2*g_NumItems), "Missing invocations of input_nodes");
555             CHECK_MESSAGE( (t_cnt == 2*g_NumItems), "Not all items reached test node");
556             CHECK_MESSAGE( (nb0_cnt == 2*g_NumItems && nb1_cnt == 2*g_NumItems), "Missing items in absorbers");
557         }
558         g.reset();  // resets the body of the input_nodes, test_node and the absorb_nodes.
559         input0_count = input1_count = sink0_count = sink1_count = test_count = 0;
560         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType0>(input0).count_value()),"Reset input 0 failed");
561         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType1>(input1).count_value()),"Reset input 1 failed");
562         CHECK_MESSAGE( (0 == tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value()),"Reset test_node failed");
563         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType0>(sink0).count_value()),"Reset sink 0 failed");
564         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType1>(sink1).count_value()),"Reset sink 1 failed");
565 
566         g_Wakeup_Msg = saved_msg;
567     }
568 #if USE_TASK_SCHEDULER_OBSERVER
569     o.observe(false);
570 #endif
571 }
572 
573 //  Test function_node
574 //
575 // graph being tested is
576 //
577 //         input_node -\                 /- parallel function_node
578 //                      \               /
579 //                       +function_node+
580 //                      /               \                                  x
581 //         input_node -/                 \- parallel function_node
582 //
583 //    After each run the graph is reset(), to test the reset functionality.
584 //
585 template<
586     TestNodeTypeEnum IType1,                          // does input node 1 throw?
587     TestNodeTypeEnum IType2,                          // does input node 2 throw?
588     class Item12,                                     // type of item passed between inputs and test node
589     TestNodeTypeEnum FType,                           // does function node throw?
590     class Item23,                                     // type passed from function_node to sink nodes
591     TestNodeTypeEnum NType1,                          // does sink node 1 throw?
592     TestNodeTypeEnum NType2,                          // does sink node 1 throw?
593     class NodePolicy,                                 // rejecting,queueing
594     size_t Conc                                       // is node concurrent? {serial | limited | unlimited}
595 >
596 void run_function_node_test() {
597 
598     typedef test_input_body<Item12,IType1> IBodyType1;
599     typedef test_input_body<Item12,IType2> IBodyType2;
600     typedef absorber_body<Item12, Item23, FType, Conc> TestBodyType;
601     typedef absorber_body<Item23,tbb::flow::continue_msg, NType1, unlimited_type> SinkBodyType1;
602     typedef absorber_body<Item23,tbb::flow::continue_msg, NType2, unlimited_type> SinkBodyType2;
603 
604     typedef tbb::flow::input_node<Item12> InputType;
605     typedef tbb::flow::function_node<Item12, Item23, NodePolicy> TestType;
606     typedef tbb::flow::function_node<Item23,tbb::flow::continue_msg> SnkType;
607 
608     for(int i = 0; i < 4; ++i ) {
609         if(i != 2) {  // doesn't make sense to flog a non-throwing test
610             bool doThrow = (i & 0x1) != 0;
611             bool doFlog = (i & 0x2) != 0;
612             run_one_functype_node_test<
613                 /*InputNodeType*/       InputType,
614                 /*InputNodeBodyType0*/  IBodyType1,
615                 /*InputNodeBodyType1*/  IBodyType2,
616                 /* NFT */               func_node_type,
617                 /*TestNodeType*/        TestType,
618                 /*TestNodeBodyType*/    TestBodyType,
619                 /*TypeToSink0 */        Item23,
620                 /*TypeToSink1 */        Item23,
621                 /*SinkNodeType0*/       SnkType,
622                 /*SinkNodeType1*/       SnkType,
623                 /*SinkNodeBodyType1*/   SinkBodyType1,
624                 /*SinkNodeBodyType2*/   SinkBodyType2,
625                 /*Conc*/                Conc>
626                     (doThrow,doFlog,"function_node");
627         }
628     }
629 }  // run_function_node_test
630 
631 void test_function_node() {
632     INFO("Testing function_node\n");
633     // serial rejecting
634     g_Wakeup_Msg = "function_node(1a): Missed wakeup or machine is overloaded?";
635     run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
636     g_Wakeup_Msg = "function_node(1b): Missed wakeup or machine is overloaded?";
637     run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
638     g_Wakeup_Msg = "function_node(1c): Missed wakeup or machine is overloaded?";
639     run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, isThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
640 
641     // serial queueing
642     g_Wakeup_Msg = "function_node(2): Missed wakeup or machine is overloaded?";
643     run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
644     run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
645     run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
646     CheckType<int>::check_type_counter = 0;
647     run_function_node_test<nonThrowing, nonThrowing, CheckType<int>, nonThrowing, CheckType<int>, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
648     CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test");
649 
650     // unlimited parallel rejecting
651     g_Wakeup_Msg = "function_node(3): Missed wakeup or machine is overloaded?";
652     run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
653     run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
654     run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
655 
656     // limited parallel rejecting
657     g_Wakeup_Msg = "function_node(4): Missed wakeup or machine is overloaded?";
658     run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, limited_type>();
659     run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, (size_t)limited_type>();
660     run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::rejecting, (size_t)limited_type>();
661 
662     // limited parallel queueing
663     g_Wakeup_Msg = "function_node(5): Missed wakeup or machine is overloaded?";
664     run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
665     run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
666     run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::queueing, (size_t)limited_type>();
667 
668     // everyone throwing
669     g_Wakeup_Msg = "function_node(6): Missed wakeup or machine is overloaded?";
670     run_function_node_test<isThrowing, isThrowing, int, isThrowing, int, isThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
671     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
672 }
673 
674 // ----------------------------------- multifunction_node ----------------------------------
675 //  Test multifunction_node.
676 //
677 // graph being tested is
678 //
679 //         input_node -\                      /- parallel function_node
680 //                      \                    /
681 //                       +multifunction_node+
682 //                      /                    \                                  x
683 //         input_node -/                      \- parallel function_node
684 //
685 //    After each run the graph is reset(), to test the reset functionality.  The
686 //    multifunction_node will put an item to each successor for every item
687 //    received.
688 //
689 template<
690     TestNodeTypeEnum IType0,                          // does input node 1 throw?
691     TestNodeTypeEnum IType1,                          // does input node 2 thorw?
692     class Item12,                                 // type of item passed between inputs and test node
693     TestNodeTypeEnum FType,                           // does multifunction node throw?
694     class ItemTuple,                              // tuple of types passed from multifunction_node to sink nodes
695     TestNodeTypeEnum NType1,                          // does sink node 1 throw?
696     TestNodeTypeEnum NType2,                          // does sink node 2 throw?
697     class  NodePolicy,                            // rejecting,queueing
698     size_t Conc                                   // is node concurrent? {serial | limited | unlimited}
699 >
700 void run_multifunction_node_test() {
701 
702     typedef typename std::tuple_element<0,ItemTuple>::type Item23Type0;
703     typedef typename std::tuple_element<1,ItemTuple>::type Item23Type1;
704     typedef test_input_body<Item12,IType0> IBodyType1;
705     typedef test_input_body<Item12,IType1> IBodyType2;
706     typedef multifunction_node_body<Item12, ItemTuple, FType, Conc> TestBodyType;
707     typedef absorber_body<Item23Type0,tbb::flow::continue_msg, NType1, unlimited_type> SinkBodyType1;
708     typedef absorber_body<Item23Type1,tbb::flow::continue_msg, NType2, unlimited_type> SinkBodyType2;
709 
710     typedef tbb::flow::input_node<Item12> InputType;
711     typedef tbb::flow::multifunction_node<Item12, ItemTuple, NodePolicy> TestType;
712     typedef tbb::flow::function_node<Item23Type0,tbb::flow::continue_msg> SnkType0;
713     typedef tbb::flow::function_node<Item23Type1,tbb::flow::continue_msg> SnkType1;
714 
715     for(int i = 0; i < 4; ++i ) {
716         if(i != 2) {  // doesn't make sense to flog a non-throwing test
717             bool doThrow = (i & 0x1) != 0;
718             bool doFlog = (i & 0x2) != 0;
719     run_one_functype_node_test<
720         /*InputNodeType*/       InputType,
721         /*InputNodeBodyType0*/  IBodyType1,
722         /*InputNodeBodyType1*/  IBodyType2,
723         /*NFT*/                 multifunc_node_type,
724         /*TestNodeType*/        TestType,
725         /*TestNodeBodyType*/    TestBodyType,
726         /*TypeToSink0*/         Item23Type0,
727         /*TypeToSink1*/         Item23Type1,
728         /*SinkNodeType0*/       SnkType0,
729         /*SinkNodeType1*/       SnkType1,
730         /*SinkNodeBodyType0*/   SinkBodyType1,
731         /*SinkNodeBodyType1*/   SinkBodyType2,
732         /*Conc*/                Conc>
733             (doThrow,doFlog,"multifunction_node");
734         }
735     }
736 }  // run_multifunction_node_test
737 
738 void test_multifunction_node() {
739     INFO("Testing multifunction_node\n");
740     g_Wakeup_Msg = "multifunction_node(input throws,rejecting,serial): Missed wakeup or machine is overloaded?";
741     // serial rejecting
742     run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,float>, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
743     g_Wakeup_Msg = "multifunction_node(test throws,rejecting,serial): Missed wakeup or machine is overloaded?";
744     run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
745     g_Wakeup_Msg = "multifunction_node(sink throws,rejecting,serial): Missed wakeup or machine is overloaded?";
746     run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, isThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
747 
748     g_Wakeup_Msg = "multifunction_node(2): Missed wakeup or machine is overloaded?";
749     // serial queueing
750     run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
751     run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
752     run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
753     CheckType<int>::check_type_counter = 0;
754     run_multifunction_node_test<nonThrowing, nonThrowing, CheckType<int>, nonThrowing, std::tuple<CheckType<int>, CheckType<int> >, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
755     CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test");
756 
757     g_Wakeup_Msg = "multifunction_node(3): Missed wakeup or machine is overloaded?";
758     // unlimited parallel rejecting
759     run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
760     run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
761     run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
762 
763     g_Wakeup_Msg = "multifunction_node(4): Missed wakeup or machine is overloaded?";
764     // limited parallel rejecting
765     run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, limited_type>();
766     run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, (size_t)limited_type>();
767     run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::rejecting, (size_t)limited_type>();
768 
769     g_Wakeup_Msg = "multifunction_node(5): Missed wakeup or machine is overloaded?";
770     // limited parallel queueing
771     run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
772     run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
773     run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::queueing, (size_t)limited_type>();
774 
775     g_Wakeup_Msg = "multifunction_node(6): Missed wakeup or machine is overloaded?";
776     // everyone throwing
777     run_multifunction_node_test<isThrowing, isThrowing, int, isThrowing, std::tuple<int,int>, isThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
778     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
779 }
780 
781 //
782 // Continue node has T predecessors.  when it receives messages (continue_msg) on T predecessors
783 // it executes the body of the node, and forwards a continue_msg to its successors.
784 // However many predecessors the continue_node has, that's how many continue_msgs it receives
785 // on input before forwarding a message.
786 //
787 // The graph will look like
788 //
789 //                                          +broadcast_node+
790 //                                         /                \             ___
791 //       input_node+------>+broadcast_node+                  +continue_node+--->+absorber
792 //                                         \                /
793 //                                          +broadcast_node+
794 //
795 // The continue_node has unlimited parallelism, no input buffering, and broadcasts to successors.
796 // The absorber is parallel, so each item emitted by the input will result in one thread
797 // spinning.  So for N threads we pass N-1 continue_messages, then spin wait and then throw if
798 // we are allowed to.
799 
800 template < class InputNodeType, class InputNodeBodyType, class TTestNodeType, class TestNodeBodyType,
801         class SinkNodeType, class SinkNodeBodyType>
802 void run_one_continue_node_test (bool throwException, bool flog) {
803     tbb::flow::graph g;
804 
805     std::atomic<int> input_count;
806     std::atomic<int> test_count;
807     std::atomic<int> sink_count;
808     input_count = test_count = sink_count = 0;
809 #if USE_TASK_SCHEDULER_OBSERVER
810     eh_test_observer o;
811     o.observe(true);
812 #endif
813     g_Master = std::this_thread::get_id();
814     InputNodeType input(g, InputNodeBodyType(input_count));
815     TTestNodeType node_to_test(g, TestNodeBodyType(test_count));
816     SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
817     tbb::flow::broadcast_node<tbb::flow::continue_msg> b1(g), b2(g), b3(g);
818     make_edge(input, b1);
819     make_edge(b1,b2);
820     make_edge(b1,b3);
821     make_edge(b2,node_to_test);
822     make_edge(b3,node_to_test);
823     make_edge(node_to_test, sink);
824     for(int iter = 0; iter < 2; ++iter) {
825         ResetGlobals(throwException,flog);
826         if(throwException) {
827             TRY();
828                 input.activate();
829                 g.wait_for_all();
830             CATCH_AND_ASSERT();
831         }
832         else {
833             TRY();
834                 input.activate();
835                 g.wait_for_all();
836             CATCH_AND_FAIL();
837         }
838         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
839         int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value();
840         int t_cnt   = tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value();
841         int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
842         if(throwException) {
843             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
844             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
845             CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs");
846             CHECK_MESSAGE( (ib_cnt >= t_cnt), "Too many items received by test node");
847             CHECK_MESSAGE( (nb_cnt <= t_cnt), "Too many items received by sink nodes");
848         }
849         else {
850             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
851             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
852             CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node");
853             CHECK_MESSAGE( (t_cnt == g_NumItems), "Not all items reached test node");
854             CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
855         }
856         g.reset();  // resets the body of the input_nodes, test_node and the absorb_nodes.
857         input_count = test_count = sink_count = 0;
858         CHECK_MESSAGE( (0 == (int)test_count), "Atomic wasn't reset properly");
859         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed");
860         CHECK_MESSAGE( (0 == tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value()),"Reset test_node failed");
861         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed");
862     }
863 #if USE_TASK_SCHEDULER_OBSERVER
864     o.observe(false);
865 #endif
866 }
867 
868 template<
869     class ItemType,
870     TestNodeTypeEnum IType,   // does input node throw?
871     TestNodeTypeEnum CType,   // does continue_node throw?
872     TestNodeTypeEnum AType>    // does absorber throw
873 void run_continue_node_test() {
874     typedef test_input_body<tbb::flow::continue_msg,IType> IBodyType;
875     typedef absorber_body<tbb::flow::continue_msg,ItemType,CType,unlimited_type> ContBodyType;
876     typedef absorber_body<ItemType,tbb::flow::continue_msg, AType, unlimited_type> SinkBodyType;
877 
878     typedef tbb::flow::input_node<tbb::flow::continue_msg> InputType;
879     typedef tbb::flow::continue_node<ItemType> TestType;
880     typedef tbb::flow::function_node<ItemType,tbb::flow::continue_msg> SnkType;
881 
882     for(int i = 0; i < 4; ++i ) {
883         if(i == 2) continue;  // don't run (false,true); it doesn't make sense.
884         bool doThrow = (i & 0x1) != 0;
885         bool doFlog = (i & 0x2) != 0;
886         run_one_continue_node_test<
887             /*InputNodeType*/       InputType,
888             /*InputNodeBodyType*/   IBodyType,
889             /*TestNodeType*/        TestType,
890             /*TestNodeBodyType*/    ContBodyType,
891             /*SinkNodeType*/        SnkType,
892             /*SinkNodeBodyType*/    SinkBodyType>
893             (doThrow,doFlog);
894     }
895 }
896 
897 //
898 void test_continue_node() {
899     INFO("Testing continue_node\n");
900     g_Wakeup_Msg = "buffer_node(non,is,non): Missed wakeup or machine is overloaded?";
901     run_continue_node_test<int,nonThrowing,isThrowing,nonThrowing>();
902     g_Wakeup_Msg = "buffer_node(non,non,is): Missed wakeup or machine is overloaded?";
903     run_continue_node_test<int,nonThrowing,nonThrowing,isThrowing>();
904     g_Wakeup_Msg = "buffer_node(is,non,non): Missed wakeup or machine is overloaded?";
905     run_continue_node_test<int,isThrowing,nonThrowing,nonThrowing>();
906     g_Wakeup_Msg = "buffer_node(is,is,is): Missed wakeup or machine is overloaded?";
907     run_continue_node_test<int,isThrowing,isThrowing,isThrowing>();
908     CheckType<double>::check_type_counter = 0;
909     run_continue_node_test<CheckType<double>,isThrowing,isThrowing,isThrowing>();
910     CHECK_MESSAGE( (!CheckType<double>::check_type_counter), "Dropped objects in continue_node test");
911     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
912 }
913 
914 // ---------- buffer_node queue_node overwrite_node --------------
915 
916 template<
917     class BufferItemType,       //
918     class InputNodeType,
919     class InputNodeBodyType,
920     class TestNodeType,
921     class SinkNodeType,
922     class SinkNodeBodyType >
923 void run_one_buffer_node_test(bool throwException,bool flog) {
924     tbb::flow::graph g;
925 
926     std::atomic<int> input_count;
927     std::atomic<int> sink_count;
928     input_count = sink_count = 0;
929 #if USE_TASK_SCHEDULER_OBSERVER
930     eh_test_observer o;
931     o.observe(true);
932 #endif
933     g_Master = std::this_thread::get_id();
934     InputNodeType input(g, InputNodeBodyType(input_count));
935     TestNodeType node_to_test(g);
936     SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
937     make_edge(input,node_to_test);
938     make_edge(node_to_test, sink);
939     for(int iter = 0; iter < 2; ++iter) {
940         ResetGlobals(throwException,flog);
941         if(throwException) {
942             TRY();
943                 input.activate();
944                 g.wait_for_all();
945             CATCH_AND_ASSERT();
946         }
947         else {
948             TRY();
949                 input.activate();
950                 g.wait_for_all();
951             CATCH_AND_FAIL();
952         }
953         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
954         int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value();
955         int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
956         if(throwException) {
957             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
958             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
959             CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs");
960             CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes");
961         }
962         else {
963             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
964             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
965             CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node");
966             CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
967         }
968         if(iter == 0) {
969             remove_edge(node_to_test, sink);
970             node_to_test.try_put(BufferItemType());
971             g.wait_for_all();
972             g.reset();
973             input_count = sink_count = 0;
974             BufferItemType tmp;
975             CHECK_MESSAGE( (!node_to_test.try_get(tmp)), "node not empty");
976             make_edge(node_to_test, sink);
977             g.wait_for_all();
978         }
979         else {
980             g.reset();
981             input_count = sink_count = 0;
982         }
983         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed");
984         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed");
985     }
986 
987 #if USE_TASK_SCHEDULER_OBSERVER
988     o.observe(false);
989 #endif
990 }
991 template<class BufferItemType,
992          TestNodeTypeEnum InputThrowType,
993          TestNodeTypeEnum SinkThrowType>
994 void run_buffer_queue_and_overwrite_node_test() {
995     typedef test_input_body<BufferItemType,InputThrowType> InputBodyType;
996     typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
997 
998     typedef tbb::flow::input_node<BufferItemType> InputType;
999     typedef tbb::flow::buffer_node<BufferItemType> BufType;
1000     typedef tbb::flow::queue_node<BufferItemType>  QueType;
1001     typedef tbb::flow::overwrite_node<BufferItemType>  OvrType;
1002     typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
1003 
1004     for(int i = 0; i < 4; ++i) {
1005         if(i == 2) continue;  // no need to test flog w/o throws
1006         bool throwException = (i & 0x1) != 0;
1007         bool doFlog = (i & 0x2) != 0;
1008         run_one_buffer_node_test<
1009             /* class BufferItemType*/     BufferItemType,
1010             /*class InputNodeType*/       InputType,
1011             /*class InputNodeBodyType*/   InputBodyType,
1012             /*class TestNodeType*/        BufType,
1013             /*class SinkNodeType*/        SnkType,
1014             /*class SinkNodeBodyType*/    SinkBodyType
1015             >(throwException, doFlog);
1016         run_one_buffer_node_test<
1017             /* class BufferItemType*/     BufferItemType,
1018             /*class InputNodeType*/       InputType,
1019             /*class InputNodeBodyType*/   InputBodyType,
1020             /*class TestNodeType*/        QueType,
1021             /*class SinkNodeType*/        SnkType,
1022             /*class SinkNodeBodyType*/    SinkBodyType
1023             >(throwException, doFlog);
1024         run_one_buffer_node_test<
1025             /* class BufferItemType*/     BufferItemType,
1026             /*class InputNodeType*/       InputType,
1027             /*class InputNodeBodyType*/   InputBodyType,
1028             /*class TestNodeType*/        OvrType,
1029             /*class SinkNodeType*/        SnkType,
1030             /*class SinkNodeBodyType*/    SinkBodyType
1031             >(throwException, doFlog);
1032     }
1033 }
1034 
1035 void test_buffer_queue_and_overwrite_node() {
1036     INFO("Testing buffer_node, queue_node and overwrite_node\n");
1037     g_Wakeup_Msg = "buffer, queue, overwrite(is,non): Missed wakeup or machine is overloaded?";
1038     run_buffer_queue_and_overwrite_node_test<int,isThrowing,nonThrowing>();
1039     g_Wakeup_Msg = "buffer, queue, overwrite(non,is): Missed wakeup or machine is overloaded?";
1040     run_buffer_queue_and_overwrite_node_test<int,nonThrowing,isThrowing>();
1041     g_Wakeup_Msg = "buffer, queue, overwrite(is,is): Missed wakeup or machine is overloaded?";
1042     run_buffer_queue_and_overwrite_node_test<int,isThrowing,isThrowing>();
1043     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1044 }
1045 
1046 // ---------- sequencer_node -------------------------
1047 
1048 
1049 template<
1050     class BufferItemType,       //
1051     class InputNodeType,
1052     class InputNodeBodyType,
1053     class TestNodeType,
1054     class SeqBodyType,
1055     class SinkNodeType,
1056     class SinkNodeBodyType >
1057 void run_one_sequencer_node_test(bool throwException,bool flog) {
1058     tbb::flow::graph g;
1059 
1060     std::atomic<int> input_count;
1061     std::atomic<int> sink_count;
1062     input_count = sink_count = 0;
1063 #if USE_TASK_SCHEDULER_OBSERVER
1064     eh_test_observer o;
1065     o.observe(true);
1066 #endif
1067     g_Master = std::this_thread::get_id();
1068     InputNodeType input(g, InputNodeBodyType(input_count));
1069     TestNodeType node_to_test(g,SeqBodyType());
1070     SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
1071     make_edge(input,node_to_test);
1072     make_edge(node_to_test, sink);
1073     for(int iter = 0; iter < 2; ++iter) {
1074         ResetGlobals(throwException,flog);
1075         if(throwException) {
1076             TRY();
1077                 input.activate();
1078                 g.wait_for_all();
1079             CATCH_AND_ASSERT();
1080         }
1081         else {
1082             TRY();
1083                 input.activate();
1084                 g.wait_for_all();
1085             CATCH_AND_FAIL();
1086         }
1087         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1088         int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value();
1089         int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
1090         if(throwException) {
1091             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
1092             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
1093             CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs");
1094             CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes");
1095         }
1096         else {
1097             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
1098             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
1099             CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node");
1100             CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
1101         }
1102         if(iter == 0) {
1103             remove_edge(node_to_test, sink);
1104             node_to_test.try_put(BufferItemType(g_NumItems + 1));
1105             node_to_test.try_put(BufferItemType(1));
1106             g.wait_for_all();
1107             g.reset();
1108             input_count = sink_count = 0;
1109             make_edge(node_to_test, sink);
1110             g.wait_for_all();
1111         }
1112         else {
1113             g.reset();
1114             input_count = sink_count = 0;
1115         }
1116         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed");
1117         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed");
1118     }
1119 
1120 #if USE_TASK_SCHEDULER_OBSERVER
1121     o.observe(false);
1122 #endif
1123 }
1124 
1125 template<class BufferItemType,
1126          TestNodeTypeEnum InputThrowType,
1127          TestNodeTypeEnum SinkThrowType>
1128 void run_sequencer_node_test() {
1129     typedef test_input_body<BufferItemType,InputThrowType> InputBodyType;
1130     typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1131     typedef sequencer_body<BufferItemType> SeqBodyType;
1132 
1133     typedef tbb::flow::input_node<BufferItemType> InputType;
1134     typedef tbb::flow::sequencer_node<BufferItemType>  SeqType;
1135     typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
1136 
1137     for(int i = 0; i < 4; ++i) {
1138         if(i == 2) continue;  // no need to test flog w/o throws
1139         bool throwException = (i & 0x1) != 0;
1140         bool doFlog = (i & 0x2) != 0;
1141         run_one_sequencer_node_test<
1142             /* class BufferItemType*/     BufferItemType,
1143             /*class InputNodeType*/       InputType,
1144             /*class InputNodeBodyType*/   InputBodyType,
1145             /*class TestNodeType*/        SeqType,
1146             /*class SeqBodyType*/         SeqBodyType,
1147             /*class SinkNodeType*/        SnkType,
1148             /*class SinkNodeBodyType*/    SinkBodyType
1149             >(throwException, doFlog);
1150     }
1151 }
1152 
1153 
1154 
1155 void test_sequencer_node() {
1156     INFO("Testing sequencer_node\n");
1157     g_Wakeup_Msg = "sequencer_node(is,non): Missed wakeup or machine is overloaded?";
1158     run_sequencer_node_test<int, isThrowing,nonThrowing>();
1159     CheckType<int>::check_type_counter = 0;
1160     g_Wakeup_Msg = "sequencer_node(non,is): Missed wakeup or machine is overloaded?";
1161     run_sequencer_node_test<CheckType<int>, nonThrowing,isThrowing>();
1162     CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Dropped objects in sequencer_node test");
1163     g_Wakeup_Msg = "sequencer_node(is,is): Missed wakeup or machine is overloaded?";
1164     run_sequencer_node_test<int, isThrowing,isThrowing>();
1165     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1166 }
1167 
1168 // ------------ priority_queue_node ------------------
1169 
1170 template<
1171     class BufferItemType,
1172     class InputNodeType,
1173     class InputNodeBodyType,
1174     class TestNodeType,
1175     class SinkNodeType,
1176     class SinkNodeBodyType >
1177 void run_one_priority_queue_node_test(bool throwException,bool flog) {
1178     tbb::flow::graph g;
1179 
1180     std::atomic<int> input_count;
1181     std::atomic<int> sink_count;
1182     input_count = sink_count = 0;
1183 #if USE_TASK_SCHEDULER_OBSERVER
1184     eh_test_observer o;
1185     o.observe(true);
1186 #endif
1187     g_Master = std::this_thread::get_id();
1188     InputNodeType input(g, InputNodeBodyType(input_count));
1189 
1190     TestNodeType node_to_test(g);
1191 
1192     SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
1193 
1194     make_edge(input,node_to_test);
1195     make_edge(node_to_test, sink);
1196     for(int iter = 0; iter < 2; ++iter) {
1197         ResetGlobals(throwException,flog);
1198         if(throwException) {
1199             TRY();
1200                 input.activate();
1201                 g.wait_for_all();
1202             CATCH_AND_ASSERT();
1203         }
1204         else {
1205             TRY();
1206                 input.activate();
1207                 g.wait_for_all();
1208             CATCH_AND_FAIL();
1209         }
1210         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1211         int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value();
1212         int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
1213         if(throwException) {
1214             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
1215             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
1216             CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs");
1217             CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes");
1218         }
1219         else {
1220             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
1221             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
1222             CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node");
1223             CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
1224         }
1225         if(iter == 0) {
1226             remove_edge(node_to_test, sink);
1227             node_to_test.try_put(BufferItemType(g_NumItems + 1));
1228             node_to_test.try_put(BufferItemType(g_NumItems + 2));
1229             node_to_test.try_put(BufferItemType());
1230             g.wait_for_all();
1231             g.reset();
1232             input_count = sink_count = 0;
1233             make_edge(node_to_test, sink);
1234             g.wait_for_all();
1235         }
1236         else {
1237             g.reset();
1238             input_count = sink_count = 0;
1239         }
1240         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed");
1241         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed");
1242     }
1243 
1244 #if USE_TASK_SCHEDULER_OBSERVER
1245     o.observe(false);
1246 #endif
1247 }
1248 
1249 template<class BufferItemType,
1250          TestNodeTypeEnum InputThrowType,
1251          TestNodeTypeEnum SinkThrowType>
1252 void run_priority_queue_node_test() {
1253     typedef test_input_body<BufferItemType,InputThrowType> InputBodyType;
1254     typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1255     typedef less_body<BufferItemType> LessBodyType;
1256 
1257     typedef tbb::flow::input_node<BufferItemType> InputType;
1258     typedef tbb::flow::priority_queue_node<BufferItemType,LessBodyType>  PrqType;
1259     typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
1260 
1261     for(int i = 0; i < 4; ++i) {
1262         if(i == 2) continue;  // no need to test flog w/o throws
1263         bool throwException = (i & 0x1) != 0;
1264         bool doFlog = (i & 0x2) != 0;
1265         run_one_priority_queue_node_test<
1266             /* class BufferItemType*/     BufferItemType,
1267             /*class InputNodeType*/       InputType,
1268             /*class InputNodeBodyType*/   InputBodyType,
1269             /*class TestNodeType*/        PrqType,
1270             /*class SinkNodeType*/        SnkType,
1271             /*class SinkNodeBodyType*/    SinkBodyType
1272             >(throwException, doFlog);
1273     }
1274 }
1275 
1276 void test_priority_queue_node() {
1277     INFO("Testing priority_queue_node\n");
1278     g_Wakeup_Msg = "priority_queue_node(is,non): Missed wakeup or machine is overloaded?";
1279     run_priority_queue_node_test<int, isThrowing,nonThrowing>();
1280     CheckType<int>::check_type_counter = 0;
1281     g_Wakeup_Msg = "priority_queue_node(non,is): Missed wakeup or machine is overloaded?";
1282     run_priority_queue_node_test<CheckType<int>, nonThrowing,isThrowing>();
1283     CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Dropped objects in priority_queue_node test");
1284     g_Wakeup_Msg = "priority_queue_node(is,is): Missed wakeup or machine is overloaded?";
1285     run_priority_queue_node_test<int, isThrowing,isThrowing>();
1286     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1287 }
1288 
1289 // ------------------- join_node ----------------
1290 template<class JP> struct graph_policy_name{
1291     static const char* name() {return "unknown"; }
1292 };
1293 template<> struct graph_policy_name<tbb::flow::queueing>  {
1294     static const char* name() {return "queueing"; }
1295 };
1296 template<> struct graph_policy_name<tbb::flow::reserving> {
1297     static const char* name() {return "reserving"; }
1298 };
1299 template<> struct graph_policy_name<tbb::flow::tag_matching> {
1300     static const char* name() {return "tag_matching"; }
1301 };
1302 
1303 
1304 template<
1305     class JP,
1306     class OutputTuple,
1307     class InputType0,
1308     class InputBodyType0,
1309     class InputType1,
1310     class InputBodyType1,
1311     class TestJoinType,
1312     class SinkType,
1313     class SinkBodyType
1314     >
1315 struct run_one_join_node_test {
1316     run_one_join_node_test() {}
1317     static void execute_test(bool throwException,bool flog) {
1318         typedef typename std::tuple_element<0,OutputTuple>::type ItemType0;
1319         typedef typename std::tuple_element<1,OutputTuple>::type ItemType1;
1320 
1321         tbb::flow::graph g;
1322         std::atomic<int>input0_count;
1323         std::atomic<int>input1_count;
1324         std::atomic<int>sink_count;
1325         input0_count = input1_count = sink_count = 0;
1326 #if USE_TASK_SCHEDULER_OBSERVER
1327         eh_test_observer o;
1328         o.observe(true);
1329 #endif
1330         g_Master = std::this_thread::get_id();
1331         InputType0 input0(g, InputBodyType0(input0_count));
1332         InputType1 input1(g, InputBodyType1(input1_count));
1333         TestJoinType node_to_test(g);
1334         SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count));
1335         make_edge(input0,tbb::flow::input_port<0>(node_to_test));
1336         make_edge(input1,tbb::flow::input_port<1>(node_to_test));
1337         make_edge(node_to_test, sink);
1338         for(int iter = 0; iter < 2; ++iter) {
1339             ResetGlobals(throwException,flog);
1340             if(throwException) {
1341                 TRY();
1342                     input0.activate();
1343                     input1.activate();
1344                     g.wait_for_all();
1345                 CATCH_AND_ASSERT();
1346             }
1347             else {
1348                 TRY();
1349                     input0.activate();
1350                     input1.activate();
1351                     g.wait_for_all();
1352                 CATCH_AND_FAIL();
1353             }
1354             bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1355             int ib0_cnt = tbb::flow::copy_body<InputBodyType0>(input0).count_value();
1356             int ib1_cnt = tbb::flow::copy_body<InputBodyType1>(input1).count_value();
1357             int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1358             if(throwException) {
1359                 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
1360                 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
1361                 CHECK_MESSAGE( (ib0_cnt <= g_NumItems && ib1_cnt <= g_NumItems), "Too many items sent by inputs");
1362                 CHECK_MESSAGE( (nb_cnt <= ((ib0_cnt < ib1_cnt) ? ib0_cnt : ib1_cnt)), "Too many items received by sink nodes");
1363             }
1364             else {
1365                 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
1366                 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
1367                 if(ib0_cnt != g_NumItems) {
1368                 //     INFO("throwException == %s\n" << (throwException ? "true" : "false"));
1369                 //     INFO("iter == " << iter << "\n");
1370                 //     INFO("ib0_cnt == " << ib0_cnt << "\n");
1371                 //     INFO("g_NumItems == " << g_NumItems << "\n");
1372                 }
1373                 CHECK_MESSAGE( (ib0_cnt == g_NumItems), "Missing invocations of input_node0");  // this one
1374                 CHECK_MESSAGE( (ib1_cnt == g_NumItems), "Missing invocations of input_node1");
1375                 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
1376             }
1377             if(iter == 0) {
1378                 remove_edge(node_to_test, sink);
1379                 tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 1));
1380                 tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2));
1381                 g.wait_for_all();
1382                 g.reset();
1383                 input0_count = input1_count = sink_count = 0;
1384                 make_edge(node_to_test, sink);
1385                 g.wait_for_all();
1386             }
1387             else {
1388                 g.wait_for_all();
1389                 g.reset();
1390                 input0_count = input1_count = sink_count = 0;
1391             }
1392             CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType0>(input0).count_value()),"Reset input failed");
1393             CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType1>(input1).count_value()),"Reset input failed");
1394             nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1395             CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value()),"Reset sink failed");
1396         }
1397 
1398 #if USE_TASK_SCHEDULER_OBSERVER
1399         o.observe(false);
1400 #endif
1401     }
1402 };  // run_one_join_node_test
1403 
1404 template<
1405     class OutputTuple,
1406     class InputType0,
1407     class InputBodyType0,
1408     class InputType1,
1409     class InputBodyType1,
1410     class TestJoinType,
1411     class SinkType,
1412     class SinkBodyType
1413     >
1414 struct run_one_join_node_test<
1415         tbb::flow::tag_matching,
1416         OutputTuple,
1417         InputType0,
1418         InputBodyType0,
1419         InputType1,
1420         InputBodyType1,
1421         TestJoinType,
1422         SinkType,
1423         SinkBodyType
1424     > {
1425     run_one_join_node_test() {}
1426     static void execute_test(bool throwException,bool flog) {
1427         typedef typename std::tuple_element<0,OutputTuple>::type ItemType0;
1428         typedef typename std::tuple_element<1,OutputTuple>::type ItemType1;
1429 
1430         tbb::flow::graph g;
1431 
1432         std::atomic<int>input0_count;
1433         std::atomic<int>input1_count;
1434         std::atomic<int>sink_count;
1435         input0_count = input1_count = sink_count = 0;
1436 #if USE_TASK_SCHEDULER_OBSERVER
1437         eh_test_observer o;
1438         o.observe(true);
1439 #endif
1440         g_Master = std::this_thread::get_id();
1441         InputType0 input0(g, InputBodyType0(input0_count, 2));
1442         InputType1 input1(g, InputBodyType1(input1_count, 3));
1443         TestJoinType node_to_test(g, tag_func<ItemType0>(ItemType0(2)), tag_func<ItemType1>(ItemType1(3)));
1444         SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count));
1445         make_edge(input0,tbb::flow::input_port<0>(node_to_test));
1446         make_edge(input1,tbb::flow::input_port<1>(node_to_test));
1447         make_edge(node_to_test, sink);
1448         for(int iter = 0; iter < 2; ++iter) {
1449             ResetGlobals(throwException,flog);
1450             if(throwException) {
1451                 TRY();
1452                     input0.activate();
1453                     input1.activate();
1454                     g.wait_for_all();
1455                 CATCH_AND_ASSERT();
1456             }
1457             else {
1458                 TRY();
1459                     input0.activate();
1460                     input1.activate();
1461                     g.wait_for_all();
1462                 CATCH_AND_FAIL();
1463             }
1464             bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1465             int ib0_cnt = tbb::flow::copy_body<InputBodyType0>(input0).count_value();
1466             int ib1_cnt = tbb::flow::copy_body<InputBodyType1>(input1).count_value();
1467             int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1468             if(throwException) {
1469                 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
1470                 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
1471                 CHECK_MESSAGE( (ib0_cnt <= g_NumItems && ib1_cnt <= g_NumItems), "Too many items sent by inputs");
1472                 CHECK_MESSAGE( (nb_cnt <= ((ib0_cnt < ib1_cnt) ? ib0_cnt : ib1_cnt)), "Too many items received by sink nodes");
1473             }
1474             else {
1475                 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
1476                 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
1477                 CHECK_MESSAGE( (ib0_cnt == g_NumItems), "Missing invocations of input_node0");
1478                 CHECK_MESSAGE( (ib1_cnt == g_NumItems), "Missing invocations of input_node1");
1479                 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
1480             }
1481             if(iter == 0) {
1482                 remove_edge(node_to_test, sink);
1483                 tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 4));
1484                 tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2));
1485                 g.wait_for_all();   // have to wait for the graph to stop again....
1486                 g.reset();  // resets the body of the input_nodes, test_node and the absorb_nodes.
1487                 input0_count = input1_count = sink_count = 0;
1488                 make_edge(node_to_test, sink);
1489                 g.wait_for_all();   // have to wait for the graph to stop again....
1490             }
1491             else {
1492                 g.wait_for_all();
1493                 g.reset();
1494                 input0_count = input1_count = sink_count = 0;
1495             }
1496             CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType0>(input0).count_value()),"Reset input failed");
1497             CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType1>(input1).count_value()),"Reset input failed");
1498             nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1499             CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value()),"Reset sink failed");
1500         }
1501 
1502 #if USE_TASK_SCHEDULER_OBSERVER
1503         o.observe(false);
1504 #endif
1505     }
1506 };  // run_one_join_node_test<tag_matching>
1507 
1508 template<class JP, class OutputTuple,
1509              TestNodeTypeEnum InputThrowType,
1510              TestNodeTypeEnum SinkThrowType>
1511 void run_join_node_test() {
1512     typedef typename std::tuple_element<0,OutputTuple>::type ItemType0;
1513     typedef typename std::tuple_element<1,OutputTuple>::type ItemType1;
1514     typedef test_input_body<ItemType0,InputThrowType> InputBodyType0;
1515     typedef test_input_body<ItemType1,InputThrowType> InputBodyType1;
1516     typedef absorber_body<OutputTuple,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1517 
1518     typedef typename tbb::flow::input_node<ItemType0> InputType0;
1519     typedef typename tbb::flow::input_node<ItemType1> InputType1;
1520     typedef typename tbb::flow::join_node<OutputTuple,JP> TestJoinType;
1521     typedef typename tbb::flow::function_node<OutputTuple,tbb::flow::continue_msg> SinkType;
1522 
1523     for(int i = 0; i < 4; ++i) {
1524         if(2 == i) continue;
1525         bool throwException = (i & 0x1) != 0;
1526         bool doFlog = (i & 0x2) != 0;
1527         run_one_join_node_test<
1528              JP,
1529              OutputTuple,
1530              InputType0,
1531              InputBodyType0,
1532              InputType1,
1533              InputBodyType1,
1534              TestJoinType,
1535              SinkType,
1536              SinkBodyType>::execute_test(throwException,doFlog);
1537     }
1538 }
1539 
1540 template<class JP>
1541 void test_join_node() {
1542     INFO("Testing join_node<" << graph_policy_name<JP>::name() << ">\n");
1543     // only doing two-input joins
1544     g_Wakeup_Msg = "join(is,non): Missed wakeup or machine is overloaded?";
1545     run_join_node_test<JP, std::tuple<int,int>,  isThrowing, nonThrowing>();
1546     CheckType<int>::check_type_counter = 0;
1547     g_Wakeup_Msg = "join(non,is): Missed wakeup or machine is overloaded?";
1548     run_join_node_test<JP, std::tuple<CheckType<int>,int>, nonThrowing, isThrowing>();
1549     CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Dropped items in test");
1550     g_Wakeup_Msg = "join(is,is): Missed wakeup or machine is overloaded?";
1551     run_join_node_test<JP, std::tuple<int,int>,  isThrowing, isThrowing>();
1552     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1553 }
1554 
1555 // ------------------- limiter_node -------------
1556 
1557 template<
1558     class BufferItemType,       //
1559     class InputNodeType,
1560     class InputNodeBodyType,
1561     class TestNodeType,
1562     class SinkNodeType,
1563     class SinkNodeBodyType >
1564 void run_one_limiter_node_test(bool throwException,bool flog) {
1565     tbb::flow::graph g;
1566 
1567     std::atomic<int> input_count;
1568     std::atomic<int> sink_count;
1569     input_count = sink_count = 0;
1570 #if USE_TASK_SCHEDULER_OBSERVER
1571     eh_test_observer o;
1572     o.observe(true);
1573 #endif
1574     g_Master = std::this_thread::get_id();
1575     InputNodeType input(g, InputNodeBodyType(input_count));
1576     TestNodeType node_to_test(g,g_NumThreads + 1);
1577     SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
1578     make_edge(input,node_to_test);
1579     make_edge(node_to_test, sink);
1580     for(int iter = 0; iter < 2; ++iter) {
1581         ResetGlobals(throwException,flog);
1582         if(throwException) {
1583             TRY();
1584                 input.activate();
1585                 g.wait_for_all();
1586             CATCH_AND_ASSERT();
1587         }
1588         else {
1589             TRY();
1590                 input.activate();
1591                 g.wait_for_all();
1592             CATCH_AND_FAIL();
1593         }
1594         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1595         int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value();
1596         int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
1597         if(throwException) {
1598             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
1599             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
1600             CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs");
1601             CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes");
1602         }
1603         else {
1604             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
1605             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
1606             // we stop after limiter's limit, which is g_NumThreads + 1.  The input_node
1607             // is invoked one extra time, filling its buffer, so its limit is g_NumThreads + 2.
1608             CHECK_MESSAGE( (ib_cnt == g_NumThreads + 2), "Missing invocations of input_node");
1609             CHECK_MESSAGE( (nb_cnt == g_NumThreads + 1), "Missing items in absorbers");
1610         }
1611         if(iter == 0) {
1612             remove_edge(node_to_test, sink);
1613             node_to_test.try_put(BufferItemType());
1614             node_to_test.try_put(BufferItemType());
1615             g.wait_for_all();
1616             g.reset();
1617             input_count = sink_count = 0;
1618             BufferItemType tmp;
1619             CHECK_MESSAGE( (!node_to_test.try_get(tmp)), "node not empty");
1620             make_edge(node_to_test, sink);
1621             g.wait_for_all();
1622         }
1623         else {
1624             g.reset();
1625             input_count = sink_count = 0;
1626         }
1627         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed");
1628         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed");
1629     }
1630 
1631 #if USE_TASK_SCHEDULER_OBSERVER
1632     o.observe(false);
1633 #endif
1634 }
1635 
1636 template<class BufferItemType,
1637          TestNodeTypeEnum InputThrowType,
1638          TestNodeTypeEnum SinkThrowType>
1639 void run_limiter_node_test() {
1640     typedef test_input_body<BufferItemType,InputThrowType> InputBodyType;
1641     typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1642 
1643     typedef tbb::flow::input_node<BufferItemType> InputType;
1644     typedef tbb::flow::limiter_node<BufferItemType>  LmtType;
1645     typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
1646 
1647     for(int i = 0; i < 4; ++i) {
1648         if(i == 2) continue;  // no need to test flog w/o throws
1649         bool throwException = (i & 0x1) != 0;
1650         bool doFlog = (i & 0x2) != 0;
1651         run_one_limiter_node_test<
1652             /* class BufferItemType*/     BufferItemType,
1653             /*class InputNodeType*/       InputType,
1654             /*class InputNodeBodyType*/   InputBodyType,
1655             /*class TestNodeType*/        LmtType,
1656             /*class SinkNodeType*/        SnkType,
1657             /*class SinkNodeBodyType*/    SinkBodyType
1658             >(throwException, doFlog);
1659     }
1660 }
1661 
1662 void test_limiter_node() {
1663     INFO("Testing limiter_node\n");
1664     g_Wakeup_Msg = "limiter_node(is,non): Missed wakeup or machine is overloaded?";
1665     run_limiter_node_test<int,isThrowing,nonThrowing>();
1666     g_Wakeup_Msg = "limiter_node(non,is): Missed wakeup or machine is overloaded?";
1667     run_limiter_node_test<int,nonThrowing,isThrowing>();
1668     g_Wakeup_Msg = "limiter_node(is,is): Missed wakeup or machine is overloaded?";
1669     run_limiter_node_test<int,isThrowing,isThrowing>();
1670     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1671 }
1672 
1673 // -------- split_node --------------------
1674 
1675 template<
1676     class InputTuple,
1677     class InputType,
1678     class InputBodyType,
1679     class TestSplitType,
1680     class SinkType0,
1681     class SinkBodyType0,
1682     class SinkType1,
1683     class SinkBodyType1>
1684 void run_one_split_node_test(bool throwException, bool flog) {
1685 
1686     tbb::flow::graph g;
1687 
1688     std::atomic<int> input_count;
1689     std::atomic<int> sink0_count;
1690     std::atomic<int> sink1_count;
1691     input_count = sink0_count = sink1_count = 0;
1692 #if USE_TASK_SCHEDULER_OBSERVER
1693     eh_test_observer o;
1694     o.observe(true);
1695 #endif
1696 
1697     g_Master = std::this_thread::get_id();
1698     InputType input(g, InputBodyType(input_count));
1699     TestSplitType node_to_test(g);
1700     SinkType0 sink0(g,tbb::flow::unlimited,SinkBodyType0(sink0_count));
1701     SinkType1 sink1(g,tbb::flow::unlimited,SinkBodyType1(sink1_count));
1702     make_edge(input, node_to_test);
1703     make_edge(tbb::flow::output_port<0>(node_to_test), sink0);
1704     make_edge(tbb::flow::output_port<1>(node_to_test), sink1);
1705 
1706     for(int iter = 0; iter < 2; ++iter) {  // run, reset, run again
1707         ResetGlobals(throwException,flog);
1708         if(throwException) {
1709             TRY();
1710                 input.activate();
1711                 g.wait_for_all();
1712             CATCH_AND_ASSERT();
1713         }
1714         else {
1715             TRY();
1716                 input.activate();
1717                 g.wait_for_all();
1718             CATCH_AND_FAIL();
1719         }
1720         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1721         int ib_cnt = tbb::flow::copy_body<InputBodyType>(input).count_value();
1722         int nb0_cnt = tbb::flow::copy_body<SinkBodyType0>(sink0).count_value();
1723         int nb1_cnt = tbb::flow::copy_body<SinkBodyType1>(sink1).count_value();
1724         if(throwException) {
1725             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
1726             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
1727             CHECK_MESSAGE( (ib_cnt <= 2*g_NumItems), "Too many items sent by input");
1728             CHECK_MESSAGE( (nb0_cnt + nb1_cnt <= ib_cnt*2), "Too many items received by sink nodes");
1729         }
1730         else {
1731             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
1732             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
1733             CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_nodes");
1734             CHECK_MESSAGE( (nb0_cnt == g_NumItems && nb1_cnt == g_NumItems), "Missing items in absorbers");
1735         }
1736         g.reset();  // resets the body of the input_nodes and the absorb_nodes.
1737         input_count = sink0_count = sink1_count = 0;
1738         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType>(input).count_value()),"Reset input failed");
1739         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType0>(sink0).count_value()),"Reset sink 0 failed");
1740         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType1>(sink1).count_value()),"Reset sink 1 failed");
1741     }
1742 #if USE_TASK_SCHEDULER_OBSERVER
1743     o.observe(false);
1744 #endif
1745 }
1746 
1747 template<class InputTuple,
1748              TestNodeTypeEnum InputThrowType,
1749              TestNodeTypeEnum SinkThrowType>
1750 void run_split_node_test() {
1751     typedef typename std::tuple_element<0,InputTuple>::type ItemType0;
1752     typedef typename std::tuple_element<1,InputTuple>::type ItemType1;
1753     typedef tuple_test_input_body<InputTuple,InputThrowType> InputBodyType;
1754     typedef absorber_body<ItemType0,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType0;
1755     typedef absorber_body<ItemType1,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType1;
1756 
1757     typedef typename tbb::flow::input_node<InputTuple> InputType;
1758     typedef typename tbb::flow::split_node<InputTuple> TestSplitType;
1759     typedef typename tbb::flow::function_node<ItemType0,tbb::flow::continue_msg> SinkType0;
1760     typedef typename tbb::flow::function_node<ItemType1,tbb::flow::continue_msg> SinkType1;
1761 
1762     for(int i = 0; i < 4; ++i) {
1763         if(2 == i) continue;
1764         bool throwException = (i & 0x1) != 0;
1765         bool doFlog = (i & 0x2) != 0;
1766         run_one_split_node_test<
1767             InputTuple,
1768             InputType,
1769             InputBodyType,
1770             TestSplitType,
1771             SinkType0,
1772             SinkBodyType0,
1773             SinkType1,
1774             SinkBodyType1>
1775                 (throwException,doFlog);
1776     }
1777 }
1778 
1779 void test_split_node() {
1780     INFO("Testing split_node\n");
1781     g_Wakeup_Msg = "split_node(is,non): Missed wakeup or machine is overloaded?";
1782     run_split_node_test<std::tuple<int,int>, isThrowing, nonThrowing>();
1783     g_Wakeup_Msg = "split_node(non,is): Missed wakeup or machine is overloaded?";
1784     run_split_node_test<std::tuple<int,int>, nonThrowing, isThrowing>();
1785     g_Wakeup_Msg = "split_node(is,is): Missed wakeup or machine is overloaded?";
1786     run_split_node_test<std::tuple<int,int>, isThrowing,  isThrowing>();
1787     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1788 }
1789 
1790 // --------- indexer_node ----------------------
1791 
1792 template < class InputTuple,
1793     class InputType0,
1794     class InputBodyType0,
1795     class InputType1,
1796     class InputBodyType1,
1797     class TestNodeType,
1798     class SinkType,
1799     class SinkBodyType>
1800 void run_one_indexer_node_test(bool throwException,bool flog) {
1801     typedef typename std::tuple_element<0,InputTuple>::type ItemType0;
1802     typedef typename std::tuple_element<1,InputTuple>::type ItemType1;
1803 
1804     tbb::flow::graph g;
1805 
1806     std::atomic<int> input0_count;
1807     std::atomic<int> input1_count;
1808     std::atomic<int> sink_count;
1809     input0_count = input1_count = sink_count = 0;
1810 #if USE_TASK_SCHEDULER_OBSERVER
1811     eh_test_observer o;
1812     o.observe(true);
1813 #endif
1814     g_Master = std::this_thread::get_id();
1815     InputType0 input0(g, InputBodyType0(input0_count));
1816     InputType1 input1(g, InputBodyType1(input1_count));
1817     TestNodeType node_to_test(g);
1818     SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count));
1819     make_edge(input0,tbb::flow::input_port<0>(node_to_test));
1820     make_edge(input1,tbb::flow::input_port<1>(node_to_test));
1821     make_edge(node_to_test, sink);
1822     for(int iter = 0; iter < 2; ++iter) {
1823         ResetGlobals(throwException,flog);
1824         if(throwException) {
1825             TRY();
1826                 input0.activate();
1827                 input1.activate();
1828                 g.wait_for_all();
1829             CATCH_AND_ASSERT();
1830         }
1831         else {
1832             TRY();
1833                 input0.activate();
1834                 input1.activate();
1835                 g.wait_for_all();
1836             CATCH_AND_FAIL();
1837         }
1838         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1839         int ib0_cnt = tbb::flow::copy_body<InputBodyType0>(input0).count_value();
1840         int ib1_cnt = tbb::flow::copy_body<InputBodyType1>(input1).count_value();
1841         int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1842         if(throwException) {
1843             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
1844             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
1845             CHECK_MESSAGE( (ib0_cnt <= g_NumItems && ib1_cnt <= g_NumItems), "Too many items sent by inputs");
1846             CHECK_MESSAGE( (nb_cnt <= ib0_cnt + ib1_cnt), "Too many items received by sink nodes");
1847         }
1848         else {
1849             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
1850             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
1851             CHECK_MESSAGE( (ib0_cnt == g_NumItems), "Missing invocations of input_node0");
1852             CHECK_MESSAGE( (ib1_cnt == g_NumItems), "Missing invocations of input_node1");
1853             CHECK_MESSAGE( (nb_cnt == 2*g_NumItems), "Missing items in absorbers");
1854         }
1855         if(iter == 0) {
1856             remove_edge(node_to_test, sink);
1857             tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 4));
1858             tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2));
1859             g.wait_for_all();
1860             g.reset();
1861             input0_count = input1_count = sink_count = 0;
1862             make_edge(node_to_test, sink);
1863             g.wait_for_all();
1864         }
1865         else {
1866             g.wait_for_all();
1867             g.reset();
1868             input0_count = input1_count = sink_count = 0;
1869         }
1870         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType0>(input0).count_value()),"Reset input failed");
1871         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType1>(input1).count_value()),"Reset input failed");
1872         nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1873         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value()),"Reset sink failed");
1874     }
1875 
1876 #if USE_TASK_SCHEDULER_OBSERVER
1877     o.observe(false);
1878 #endif
1879 }
1880 
1881 template<class InputTuple,
1882     TestNodeTypeEnum InputThrowType,
1883     TestNodeTypeEnum SinkThrowType>
1884 void run_indexer_node_test() {
1885     typedef typename std::tuple_element<0,InputTuple>::type ItemType0;
1886     typedef typename std::tuple_element<1,InputTuple>::type ItemType1;
1887     typedef test_input_body<ItemType0,InputThrowType> InputBodyType0;
1888     typedef test_input_body<ItemType1,InputThrowType> InputBodyType1;
1889     typedef typename tbb::flow::indexer_node<ItemType0, ItemType1> TestNodeType;
1890     typedef absorber_body<typename TestNodeType::output_type,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1891 
1892     typedef typename tbb::flow::input_node<ItemType0> InputType0;
1893     typedef typename tbb::flow::input_node<ItemType1> InputType1;
1894     typedef typename tbb::flow::function_node<typename TestNodeType::output_type,tbb::flow::continue_msg> SinkType;
1895 
1896     for(int i = 0; i < 4; ++i) {
1897         if(2 == i) continue;
1898         bool throwException = (i & 0x1) != 0;
1899         bool doFlog = (i & 0x2) != 0;
1900         run_one_indexer_node_test<
1901              InputTuple,
1902              InputType0,
1903              InputBodyType0,
1904              InputType1,
1905              InputBodyType1,
1906              TestNodeType,
1907              SinkType,
1908              SinkBodyType>(throwException,doFlog);
1909     }
1910 }
1911 
1912 void test_indexer_node() {
1913     INFO("Testing indexer_node\n");
1914     g_Wakeup_Msg = "indexer_node(is,non): Missed wakeup or machine is overloaded?";
1915     run_indexer_node_test<std::tuple<int,int>, isThrowing, nonThrowing>();
1916     g_Wakeup_Msg = "indexer_node(non,is): Missed wakeup or machine is overloaded?";
1917     run_indexer_node_test<std::tuple<int,int>, nonThrowing, isThrowing>();
1918     g_Wakeup_Msg = "indexer_node(is,is): Missed wakeup or machine is overloaded?";
1919     run_indexer_node_test<std::tuple<int,int>, isThrowing,  isThrowing>();
1920     g_Wakeup_Msg = g_Orig_Wakeup_Msg;;
1921 }
1922 
1923 ///////////////////////////////////////////////
1924 // whole-graph exception test
1925 
1926 class Foo {
1927 private:
1928     // std::vector<int>& m_vec;
1929     std::vector<int>* m_vec;
1930 public:
1931     Foo(std::vector<int>& vec) : m_vec(&vec) { }
1932     void operator() (tbb::flow::continue_msg) const {
1933         ++nExceptions;
1934         (void)m_vec->at(m_vec->size()); // Will throw out_of_range exception
1935         CHECK_MESSAGE( (false), "Exception not thrown by invalid access");
1936     }
1937 };
1938 
1939 // test from user ahelwer: http://software.intel.com/en-us/forums/showthread.php?t=103786
1940 // exception thrown in graph node, not caught in wait_for_all()
1941 void
1942 test_flow_graph_exception0() {
1943     // Initializes body
1944     std::vector<int> vec;
1945     vec.push_back(0);
1946     Foo f(vec);
1947     nExceptions = 0;
1948 
1949     // Construct graph and nodes
1950     tbb::flow::graph g;
1951     tbb::flow::broadcast_node<tbb::flow::continue_msg> start(g);
1952     tbb::flow::continue_node<tbb::flow::continue_msg> fooNode(g, f);
1953 
1954     // Construct edge
1955     tbb::flow::make_edge(start, fooNode);
1956 
1957     // Execute graph
1958     CHECK_MESSAGE( (!g.exception_thrown()), "exception_thrown flag already set");
1959     CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag already set");
1960     try {
1961         start.try_put(tbb::flow::continue_msg());
1962         g.wait_for_all();
1963         CHECK_MESSAGE( (false), "Exception not thrown");
1964     }
1965     catch(std::out_of_range& ex) {
1966         INFO("Exception: " << ex.what() << "(expected)\n");
1967     }
1968     catch(...) {
1969         INFO("Unknown exception caught (expected)\n");
1970     }
1971     CHECK_MESSAGE( (nExceptions > 0), "Exception caught, but no body signaled exception being thrown");
1972     nExceptions = 0;
1973     CHECK_MESSAGE( (g.exception_thrown()), "Exception not intercepted");
1974     // if exception set, cancellation also set.
1975     CHECK_MESSAGE( (g.is_cancelled()), "Exception cancellation not signaled");
1976     // in case we got an exception
1977     try {
1978         g.wait_for_all();  // context still signalled canceled, my_exception still set.
1979     }
1980     catch(...) {
1981         CHECK_MESSAGE( (false), "Second exception thrown but no task executing");
1982     }
1983     CHECK_MESSAGE( (nExceptions == 0), "body signaled exception being thrown, but no body executed");
1984     CHECK_MESSAGE( (!g.exception_thrown()), "exception_thrown flag not reset");
1985     CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag not reset");
1986 }
1987 
1988 void TestOneThreadNum(int nThread) {
1989     INFO("Testing " << nThread << "%d threads\n");
1990     g_NumItems = ((nThread > NUM_ITEMS) ? nThread *2 : NUM_ITEMS);
1991     g_NumThreads = nThread;
1992     tbb::task_arena arena(nThread);
1993 	arena.execute(
1994         [&]() {
1995             // whole-graph exception catch and rethrow test
1996             test_flow_graph_exception0();
1997             for(int i = 0; i < 4; ++i) {
1998                 g_ExceptionInMaster = (i & 1) != 0;
1999                 g_SolitaryException = (i & 2) != 0;
2000                 INFO("g_ExceptionInMaster == " << (g_ExceptionInMaster ? "T":"F")
2001                      << ", g_SolitaryException == " << (g_SolitaryException ? "T":"F") << "\n");
2002                 test_input_node();
2003                 test_function_node();
2004                 test_continue_node();  // also test broadcast_node
2005                 test_multifunction_node();
2006                 // single- and multi-item buffering nodes
2007                 test_buffer_queue_and_overwrite_node();
2008                 test_sequencer_node();
2009                 test_priority_queue_node();
2010 
2011                 // join_nodes
2012                 test_join_node<tbb::flow::queueing>();
2013                 test_join_node<tbb::flow::reserving>();
2014                 test_join_node<tbb::flow::tag_matching>();
2015 
2016                 test_limiter_node();
2017                 test_split_node();
2018                 // graph for write_once_node will be complicated by the fact the node will
2019                 // not do try_puts after it has been set.  To get parallelism of N we have
2020                 // to attach N successor nodes to the write_once (or play some similar game).
2021                 // test_write_once_node();
2022                 test_indexer_node();
2023             }
2024         }
2025     );
2026 }
2027 
2028 //! Test exceptions with parallelism
2029 //! \brief \ref error_guessing
2030 TEST_CASE("Testing several threads"){
2031     // reverse order of tests
2032     for(unsigned int nThread=utils::MaxThread; nThread >= utils::MinThread; --nThread) {
2033         tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, nThread);
2034         TestOneThreadNum(nThread);
2035     }
2036 }
2037 
2038 #endif // TBB_USE_EXCEPTIONS
2039