xref: /oneTBB/test/tbb/test_eh_flow_graph.cpp (revision d86ed7fb)
1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 //! \file test_eh_flow_graph.cpp
18 //! \brief Test for [flow_graph.copy_body flow_graph.broadcast_node flow_graph.priority_queue_node flow_graph.indexer_node flow_graph.sequencer_node flow_graph.remove_edge flow_graph.join_node flow_graph.split_node flow_graph.limiter_node flow_graph.write_once_node flow_graph.overwrite_node flow_graph.make_edge flow_graph.graph flow_graph.buffer_node flow_graph.function_node flow_graph.multifunction_node flow_graph.continue_node flow_graph.input_node] specification
19 
20 #include "common/config.h"
21 
22 #if USE_TASK_SCHEDULER_OBSERVER
23 #include "tbb/task_scheduler_observer.h"
24 #endif
25 #include "tbb/flow_graph.h"
26 
27 #include "common/test.h"
28 
29 #if TBB_USE_EXCEPTIONS
30 
31 #include "common/utils.h"
32 #include "common/checktype.h"
33 #include "common/concurrency_tracker.h"
34 
35 #if _MSC_VER
36     #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning
37 #endif
38 
39 #if __TBB_MSVC_UNREACHABLE_CODE_IGNORED
40     // Suppress "unreachable code" warning by VC++ 17.0-18.0 (VS 2012 or newer)
41     #pragma warning (disable: 4702)
42 #endif
43 
44 // global task_scheduler_observer is an imperfect tool to find how many threads are really
45 // participating.  That was the hope, but it counts the entries into the marketplace,
46 // not the arena.
47 // TODO: Consider using local task scheduler observer
48 // #define USE_TASK_SCHEDULER_OBSERVER 1
49 
50 #include <iostream>
51 #include <sstream>
52 #include <vector>
53 
54 #include "common/exception_handling.h"
55 
56 #include <stdexcept>
57 
58 #define NUM_ITEMS 15
59 int g_NumItems;
60 
61 std::atomic<unsigned> nExceptions;
62 std::atomic<intptr_t> g_TGCCancelled;
63 
64 enum TestNodeTypeEnum { nonThrowing, isThrowing };
65 
66 static const size_t unlimited_type = 0;
67 static const size_t serial_type = 1;
68 static const size_t limited_type = 4;
69 
70 template<TestNodeTypeEnum T> struct TestNodeTypeName;
71 template<> struct TestNodeTypeName<nonThrowing> { static const char *name() { return "nonThrowing"; } };
72 template<> struct TestNodeTypeName<isThrowing> { static const char *name() { return "isThrowing"; } };
73 
74 template<size_t Conc> struct concurrencyName;
75 template<> struct concurrencyName<serial_type>{ static const char *name() { return "serial"; } };
76 template<> struct concurrencyName<unlimited_type>{ static const char *name() { return "unlimited"; } };
77 template<> struct concurrencyName<limited_type>{ static const char *name() { return "limited"; } };
78 
79 // Class that provides waiting and throwing behavior.  If we are not throwing, do nothing
80 // If serial, we can't wait for concurrency to peak; we may be the bottleneck and will
81 // stop further processing.  We will execute g_NumThreads + 10 times (the "10" is somewhat
82 // arbitrary, and just makes sure there are enough items in the graph to keep it flowing),
83 // If parallel or serial and throwing, use utils::ConcurrencyTracker to wait.
84 
85 template<size_t Conc, TestNodeTypeEnum t = nonThrowing>
86 class WaitThrow;
87 
88 template<>
89 class WaitThrow<serial_type,nonThrowing> {
90 protected:
91     void WaitAndThrow(int cnt, const char * /*name*/) {
92         if(cnt > g_NumThreads + 10) {
93             utils::ConcurrencyTracker ct;
94             WaitUntilConcurrencyPeaks();
95         }
96     }
97 };
98 
99 template<>
100 class WaitThrow<serial_type,isThrowing> {
101 protected:
102     void WaitAndThrow(int cnt, const char * /*name*/) {
103         if(cnt > g_NumThreads + 10) {
104             utils::ConcurrencyTracker ct;
105             WaitUntilConcurrencyPeaks();
106             ThrowTestException(1);
107         }
108     }
109 };
110 
111 // for nodes with limited concurrency, if that concurrency is < g_NumThreads, we need
112 // to make sure enough other nodes wait for concurrency to peak.  If we are attached to
113 // N successors, for each item we pass to a successor, we will get N executions of the
114 // "absorbers" (because we broadcast to successors.)  for an odd number of threads we
115 // need (g_NumThreads - limited + 1) / 2 items (that will give us one extra execution
116 // of an "absorber", but we can't change that without changing the behavior of the node.)
117 template<>
118 class WaitThrow<limited_type,nonThrowing> {
119 protected:
120     void WaitAndThrow(int cnt, const char * /*name*/) {
121         if(cnt <= (g_NumThreads - (int)limited_type + 1)/2) {
122             return;
123         }
124         utils::ConcurrencyTracker ct;
125         WaitUntilConcurrencyPeaks();
126     }
127 };
128 
129 template<>
130 class WaitThrow<limited_type,isThrowing> {
131 protected:
132     void WaitAndThrow(int cnt, const char * /*name*/) {
133         utils::ConcurrencyTracker ct;
134         if(cnt <= (g_NumThreads - (int)limited_type + 1)/2) {
135             return;
136         }
137         WaitUntilConcurrencyPeaks();
138         ThrowTestException(1);
139     }
140 };
141 
142 template<>
143 class WaitThrow<unlimited_type,nonThrowing> {
144 protected:
145     void WaitAndThrow(int /*cnt*/, const char * /*name*/) {
146         utils::ConcurrencyTracker ct;
147         WaitUntilConcurrencyPeaks();
148     }
149 };
150 
151 template<>
152 class WaitThrow<unlimited_type,isThrowing> {
153 protected:
154     void WaitAndThrow(int /*cnt*/, const char * /*name*/) {
155         utils::ConcurrencyTracker ct;
156         WaitUntilConcurrencyPeaks();
157         ThrowTestException(1);
158     }
159 };
160 
161 void
162 ResetGlobals(bool throwException = true, bool flog = false) {
163     nExceptions = 0;
164     g_TGCCancelled = 0;
165     ResetEhGlobals(throwException, flog);
166 }
167 
168 // -------input_node body ------------------
169 template <class OutputType, TestNodeTypeEnum TType>
170 class test_input_body : WaitThrow<serial_type, TType> {
171     using WaitThrow<serial_type, TType>::WaitAndThrow;
172     std::atomic<int> *my_current_val;
173     int my_mult;
174 public:
175     test_input_body(std::atomic<int> &my_cnt, int multiplier = 1) : my_current_val(&my_cnt), my_mult(multiplier) {
176         // INFO("- --------- - - -   constructed " << (size_t)(my_current_val) << "\n");
177     }
178 
179     OutputType operator()(tbb::flow_control& fc) {
180         UPDATE_COUNTS();
181         OutputType ret = OutputType(my_mult * ++(*my_current_val));
182         // TODO revamp: reconsider logging for the tests.
183 
184         // The following line is known to cause double frees. Therefore, commenting out frequent
185         // calls to INFO() macro.
186 
187         // INFO("xx(" << (size_t)(my_current_val) << ") ret == " << (int)ret << "\n");
188         if(*my_current_val > g_NumItems) {
189             // INFO(" ------ End of the line!\n");
190             *my_current_val = g_NumItems;
191             fc.stop();
192             return OutputType();
193         }
194         WaitAndThrow((int)ret,"test_input_body");
195         return ret;
196     }
197 
198     int count_value() { return (int)*my_current_val; }
199 };
200 
201 template <TestNodeTypeEnum TType>
202 class test_input_body<tbb::flow::continue_msg, TType> : WaitThrow<serial_type, TType> {
203     using WaitThrow<serial_type, TType>::WaitAndThrow;
204     std::atomic<int> *my_current_val;
205 public:
206     test_input_body(std::atomic<int> &my_cnt) : my_current_val(&my_cnt) { }
207 
208     tbb::flow::continue_msg operator()( tbb::flow_control & fc) {
209         UPDATE_COUNTS();
210         int outint = ++(*my_current_val);
211         if(*my_current_val > g_NumItems) {
212             *my_current_val = g_NumItems;
213             fc.stop();
214             return tbb::flow::continue_msg();
215         }
216         WaitAndThrow(outint,"test_input_body");
217         return tbb::flow::continue_msg();
218     }
219 
220     int count_value() { return (int)*my_current_val; }
221 };
222 
223 // -------{function/continue}_node body ------------------
224 template<class InputType, class OutputType, TestNodeTypeEnum T, size_t Conc>
225 class absorber_body : WaitThrow<Conc,T> {
226     using WaitThrow<Conc,T>::WaitAndThrow;
227     std::atomic<int> *my_count;
228 public:
229     absorber_body(std::atomic<int> &my_cnt) : my_count(&my_cnt) { }
230     OutputType operator()(const InputType &/*p_in*/) {
231         UPDATE_COUNTS();
232         int out = ++(*my_count);
233         WaitAndThrow(out,"absorber_body");
234         return OutputType();
235     }
236     int count_value() { return *my_count; }
237 };
238 
239 // -------multifunction_node body ------------------
240 
241 // helper classes
242 template<int N,class PortsType>
243 struct IssueOutput {
244     typedef typename std::tuple_element<N-1,PortsType>::type::output_type my_type;
245 
246     static void issue_tuple_element( PortsType &my_ports) {
247         CHECK_MESSAGE( (std::get<N-1>(my_ports).try_put(my_type())), "Error putting to successor");
248         IssueOutput<N-1,PortsType>::issue_tuple_element(my_ports);
249     }
250 };
251 
252 template<class PortsType>
253 struct IssueOutput<1,PortsType> {
254     typedef typename std::tuple_element<0,PortsType>::type::output_type my_type;
255 
256     static void issue_tuple_element( PortsType &my_ports) {
257         CHECK_MESSAGE( (std::get<0>(my_ports).try_put(my_type())), "Error putting to successor");
258     }
259 };
260 
261 template<class InputType, class OutputTupleType, TestNodeTypeEnum T, size_t Conc>
262 class multifunction_node_body : WaitThrow<Conc,T> {
263     using WaitThrow<Conc,T>::WaitAndThrow;
264     static const int N = std::tuple_size<OutputTupleType>::value;
265     typedef typename tbb::flow::multifunction_node<InputType,OutputTupleType> NodeType;
266     typedef typename NodeType::output_ports_type PortsType;
267     std::atomic<int> *my_count;
268 public:
269     multifunction_node_body(std::atomic<int> &my_cnt) : my_count(&my_cnt) { }
270     void operator()(const InputType& /*in*/, PortsType &my_ports) {
271         UPDATE_COUNTS();
272         int out = ++(*my_count);
273         WaitAndThrow(out,"multifunction_node_body");
274         // issue an item to each output port.
275         IssueOutput<N,PortsType>::issue_tuple_element(my_ports);
276     }
277 
278     int count_value() { return *my_count; }
279 };
280 
281 // --------- body to sort items in sequencer_node
282 template<class BufferItemType>
283 struct sequencer_body {
284     size_t operator()(const BufferItemType &s) {
285         CHECK_MESSAGE( (s), "sequencer item out of range (== 0)");
286         return size_t(s) - 1;
287     }
288 };
289 
290 // --------- body to compare the "priorities" of objects for priority_queue_node  five priority levels 0-4.
291 template<class T>
292 struct myLess {
293     bool operator()(const T &t1, const T &t2) {
294         return (int(t1) % 5) < (int(t2) % 5);
295     }
296 };
297 
298 // --------- type for < comparison in priority_queue_node.
299 template<class ItemType>
300 struct less_body {
301     bool operator()(const ItemType &lhs, const ItemType &rhs) {
302         return (int(lhs) % 3) < (int(rhs) % 3);
303     }
304 };
305 
306 // --------- tag methods for tag_matching join_node
307 template<typename TT>
308 class tag_func {
309     TT my_mult;
310 public:
311     tag_func(TT multiplier) : my_mult(multiplier) { }
312     // operator() will return [0 .. Count)
313     tbb::flow::tag_value operator()( TT v) {
314         tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult);
315         return t;
316     }
317 };
318 
319 // --------- Input body for split_node test.
320 template <class OutputTuple, TestNodeTypeEnum TType>
321 class tuple_test_input_body : WaitThrow<serial_type, TType> {
322     typedef typename std::tuple_element<0,OutputTuple>::type ItemType0;
323     typedef typename std::tuple_element<1,OutputTuple>::type ItemType1;
324     using WaitThrow<serial_type, TType>::WaitAndThrow;
325     std::atomic<int> *my_current_val;
326 public:
327     tuple_test_input_body(std::atomic<int> &my_cnt) : my_current_val(&my_cnt) { }
328 
329     OutputTuple operator()(tbb::flow_control& fc) {
330         UPDATE_COUNTS();
331         int ival = ++(*my_current_val);
332         if(*my_current_val > g_NumItems) {
333             *my_current_val = g_NumItems;  // jam the final value; we assert on it later.
334             fc.stop();
335             return OutputTuple();
336         }
337         WaitAndThrow(ival,"tuple_test_input_body");
338         return OutputTuple(ItemType0(ival),ItemType1(ival));
339     }
340 
341     int count_value() { return (int)*my_current_val; }
342 };
343 
344 // ------- end of node bodies
345 
346 // input_node is only-serial.  input_node can throw, or the function_node can throw.
347 // graph being tested is
348 //
349 //      input_node+---+parallel function_node
350 //
351 //    After each run the graph is reset(), to test the reset functionality.
352 //
353 
354 
355 template<class ItemType, TestNodeTypeEnum inpThrowType, TestNodeTypeEnum absorbThrowType>
356 void run_one_input_node_test(bool throwException, bool flog) {
357     typedef test_input_body<ItemType,inpThrowType> src_body_type;
358     typedef absorber_body<ItemType, tbb::flow::continue_msg, absorbThrowType, unlimited_type> parallel_absorb_body_type;
359     std::atomic<int> input_body_count;
360     std::atomic<int> absorber_body_count;
361     input_body_count = 0;
362     absorber_body_count = 0;
363 
364     tbb::flow::graph g;
365 
366     g_Master = std::this_thread::get_id();
367 
368 #if USE_TASK_SCHEDULER_OBSERVER
369     eh_test_observer o;
370     o.observe(true);
371 #endif
372 
373     tbb::flow::input_node<ItemType> sn(g, src_body_type(input_body_count));
374     parallel_absorb_body_type ab2(absorber_body_count);
375     tbb::flow::function_node<ItemType> parallel_fn(g,tbb::flow::unlimited,ab2);
376     make_edge(sn, parallel_fn);
377     for(int runcnt = 0; runcnt < 2; ++runcnt) {
378         ResetGlobals(throwException,flog);
379         if(throwException) {
380             TRY();
381                 sn.activate();
382                 g.wait_for_all();
383             CATCH_AND_ASSERT();
384         }
385         else {
386             TRY();
387                 sn.activate();
388                 g.wait_for_all();
389             CATCH_AND_FAIL();
390         }
391 
392         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
393         int src_cnt = tbb::flow::copy_body<src_body_type>(sn).count_value();
394         int sink_cnt = tbb::flow::copy_body<parallel_absorb_body_type>(parallel_fn).count_value();
395         if(throwException) {
396             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception flag in flow::graph not set");
397             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "canceled flag not set");
398             CHECK_MESSAGE( (src_cnt <= g_NumItems), "Too many input_node items emitted");
399             CHECK_MESSAGE( (sink_cnt <= src_cnt), "Too many input_node items received");
400         }
401         else {
402             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
403             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
404             CHECK_MESSAGE( (src_cnt == g_NumItems), "Incorrect # input_node items emitted");
405             CHECK_MESSAGE( (sink_cnt == src_cnt), "Incorrect # input_node items received");
406         }
407         g.reset();  // resets the body of the input_node and the absorb_nodes.
408         input_body_count = 0;
409         absorber_body_count = 0;
410         CHECK_MESSAGE( (!g.exception_thrown()), "Reset didn't clear exception_thrown()");
411         CHECK_MESSAGE( (!g.is_cancelled()), "Reset didn't clear is_cancelled()");
412         src_cnt = tbb::flow::copy_body<src_body_type>(sn).count_value();
413         sink_cnt = tbb::flow::copy_body<parallel_absorb_body_type>(parallel_fn).count_value();
414         CHECK_MESSAGE( (src_cnt == 0), "input_node count not reset");
415         CHECK_MESSAGE( (sink_cnt == 0), "sink_node count not reset");
416     }
417 #if USE_TASK_SCHEDULER_OBSERVER
418     o.observe(false);
419 #endif
420 }  // run_one_input_node_test
421 
422 
423 template<class ItemType, TestNodeTypeEnum inpThrowType, TestNodeTypeEnum absorbThrowType>
424 void run_input_node_test() {
425     run_one_input_node_test<ItemType,inpThrowType,absorbThrowType>(false,false);
426     run_one_input_node_test<ItemType,inpThrowType,absorbThrowType>(true,false);
427     run_one_input_node_test<ItemType,inpThrowType,absorbThrowType>(true,true);
428 }  // run_input_node_test
429 
430 void test_input_node() {
431     INFO("Testing input_node\n");
432     CheckType<int>::check_type_counter = 0;
433     g_Wakeup_Msg = "input_node(1): Missed wakeup or machine is overloaded?";
434     run_input_node_test<CheckType<int>, isThrowing, nonThrowing>();
435     CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test");
436     g_Wakeup_Msg = "input_node(2): Missed wakeup or machine is overloaded?";
437     run_input_node_test<int, isThrowing, nonThrowing>();
438     g_Wakeup_Msg = "input_node(3): Missed wakeup or machine is overloaded?";
439     run_input_node_test<int, nonThrowing, isThrowing>();
440     g_Wakeup_Msg = "input_node(4): Missed wakeup or machine is overloaded?";
441     run_input_node_test<int, isThrowing, isThrowing>();
442     g_Wakeup_Msg = "input_node(5): Missed wakeup or machine is overloaded?";
443     run_input_node_test<CheckType<int>, isThrowing, isThrowing>();
444     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
445     CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test");
446 }
447 
448 // -------- utilities & types to test function_node and multifunction_node.
449 
450 // need to tell the template which node type I am using so it attaches successors correctly.
451 enum NodeFetchType { func_node_type, multifunc_node_type };
452 
453 template<class NodeType, class ItemType, int indx, NodeFetchType NFT>
454 struct AttachPoint;
455 
456 template<class NodeType, class ItemType, int indx>
457 struct AttachPoint<NodeType,ItemType,indx,multifunc_node_type> {
458     static tbb::flow::sender<ItemType> &GetSender(NodeType &n) {
459         return tbb::flow::output_port<indx>(n);
460     }
461 };
462 
463 template<class NodeType, class ItemType, int indx>
464 struct AttachPoint<NodeType,ItemType,indx,func_node_type> {
465     static tbb::flow::sender<ItemType> &GetSender(NodeType &n) {
466         return n;
467     }
468 };
469 
470 
471 // common template for running function_node, multifunction_node.  continue_node
472 // has different firing requirements, so it needs a different graph topology.
473 template<
474     class InputNodeType,
475     class InputNodeBodyType0,
476     class InputNodeBodyType1,
477     NodeFetchType NFT,
478     class TestNodeType,
479     class TestNodeBodyType,
480     class TypeToSink0,          // what kind of item are we sending to sink0
481     class TypeToSink1,          // what kind of item are we sending to sink1
482     class SinkNodeType0,        // will be same for function;
483     class SinkNodeType1,        // may differ for multifunction_node
484     class SinkNodeBodyType0,
485     class SinkNodeBodyType1,
486     size_t Conc
487     >
488 void
489 run_one_functype_node_test(bool throwException, bool flog, const char * /*name*/) {
490 
491     std::stringstream ss;
492     char *saved_msg = const_cast<char *>(g_Wakeup_Msg);
493     tbb::flow::graph g;
494 
495     std::atomic<int> input0_count;
496     std::atomic<int> input1_count;
497     std::atomic<int> sink0_count;
498     std::atomic<int> sink1_count;
499     std::atomic<int> test_count;
500     input0_count = input1_count = sink0_count = sink1_count = test_count = 0;
501 
502 #if USE_TASK_SCHEDULER_OBSERVER
503     eh_test_observer o;
504     o.observe(true);
505 #endif
506 
507     g_Master = std::this_thread::get_id();
508     InputNodeType input0(g, InputNodeBodyType0(input0_count));
509     InputNodeType input1(g, InputNodeBodyType1(input1_count));
510     TestNodeType node_to_test(g, Conc, TestNodeBodyType(test_count));
511     SinkNodeType0 sink0(g,tbb::flow::unlimited,SinkNodeBodyType0(sink0_count));
512     SinkNodeType1 sink1(g,tbb::flow::unlimited,SinkNodeBodyType1(sink1_count));
513     make_edge(input0, node_to_test);
514     make_edge(input1, node_to_test);
515     make_edge(AttachPoint<TestNodeType, TypeToSink0, 0, NFT>::GetSender(node_to_test), sink0);
516     make_edge(AttachPoint<TestNodeType, TypeToSink1, 1, NFT>::GetSender(node_to_test), sink1);
517 
518     for(int iter = 0; iter < 2; ++iter) {  // run, reset, run again
519         ss.clear();
520         ss << saved_msg << " iter=" << iter << ", threads=" << g_NumThreads << ", throw=" << (throwException ? "T" : "F") << ", flow=" << (flog ? "T" : "F");
521         g_Wakeup_Msg = ss.str().c_str();
522         ResetGlobals(throwException,flog);
523         if(throwException) {
524             TRY();
525                 input0.activate();
526                 input1.activate();
527                 g.wait_for_all();
528             CATCH_AND_ASSERT();
529         }
530         else {
531             TRY();
532                 input0.activate();
533                 input1.activate();
534                 g.wait_for_all();
535             CATCH_AND_FAIL();
536         }
537         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
538         int ib0_cnt = tbb::flow::copy_body<InputNodeBodyType0>(input0).count_value();
539         int ib1_cnt = tbb::flow::copy_body<InputNodeBodyType1>(input1).count_value();
540         int t_cnt   = tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value();
541         int nb0_cnt = tbb::flow::copy_body<SinkNodeBodyType0>(sink0).count_value();
542         int nb1_cnt = tbb::flow::copy_body<SinkNodeBodyType1>(sink1).count_value();
543         if(throwException) {
544             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
545             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
546             CHECK_MESSAGE( (ib0_cnt + ib1_cnt <= 2*g_NumItems), "Too many items sent by inputs");
547             CHECK_MESSAGE( (ib0_cnt + ib1_cnt >= t_cnt), "Too many items received by test node");
548             CHECK_MESSAGE( (nb0_cnt + nb1_cnt <= t_cnt*2), "Too many items received by sink nodes");
549         }
550         else {
551             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
552             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
553             CHECK_MESSAGE( (ib0_cnt + ib1_cnt == 2*g_NumItems), "Missing invocations of input_nodes");
554             CHECK_MESSAGE( (t_cnt == 2*g_NumItems), "Not all items reached test node");
555             CHECK_MESSAGE( (nb0_cnt == 2*g_NumItems && nb1_cnt == 2*g_NumItems), "Missing items in absorbers");
556         }
557         g.reset();  // resets the body of the input_nodes, test_node and the absorb_nodes.
558         input0_count = input1_count = sink0_count = sink1_count = test_count = 0;
559         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType0>(input0).count_value()),"Reset input 0 failed");
560         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType1>(input1).count_value()),"Reset input 1 failed");
561         CHECK_MESSAGE( (0 == tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value()),"Reset test_node failed");
562         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType0>(sink0).count_value()),"Reset sink 0 failed");
563         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType1>(sink1).count_value()),"Reset sink 1 failed");
564 
565         g_Wakeup_Msg = saved_msg;
566     }
567 #if USE_TASK_SCHEDULER_OBSERVER
568     o.observe(false);
569 #endif
570 }
571 
572 //  Test function_node
573 //
574 // graph being tested is
575 //
576 //         input_node -\                 /- parallel function_node
577 //                      \               /
578 //                       +function_node+
579 //                      /               \                                  x
580 //         input_node -/                 \- parallel function_node
581 //
582 //    After each run the graph is reset(), to test the reset functionality.
583 //
584 template<
585     TestNodeTypeEnum IType1,                          // does input node 1 throw?
586     TestNodeTypeEnum IType2,                          // does input node 2 throw?
587     class Item12,                                     // type of item passed between inputs and test node
588     TestNodeTypeEnum FType,                           // does function node throw?
589     class Item23,                                     // type passed from function_node to sink nodes
590     TestNodeTypeEnum NType1,                          // does sink node 1 throw?
591     TestNodeTypeEnum NType2,                          // does sink node 1 throw?
592     class NodePolicy,                                 // rejecting,queueing
593     size_t Conc                                       // is node concurrent? {serial | limited | unlimited}
594 >
595 void run_function_node_test() {
596 
597     typedef test_input_body<Item12,IType1> IBodyType1;
598     typedef test_input_body<Item12,IType2> IBodyType2;
599     typedef absorber_body<Item12, Item23, FType, Conc> TestBodyType;
600     typedef absorber_body<Item23,tbb::flow::continue_msg, NType1, unlimited_type> SinkBodyType1;
601     typedef absorber_body<Item23,tbb::flow::continue_msg, NType2, unlimited_type> SinkBodyType2;
602 
603     typedef tbb::flow::input_node<Item12> InputType;
604     typedef tbb::flow::function_node<Item12, Item23, NodePolicy> TestType;
605     typedef tbb::flow::function_node<Item23,tbb::flow::continue_msg> SnkType;
606 
607     for(int i = 0; i < 4; ++i ) {
608         if(i != 2) {  // doesn't make sense to flog a non-throwing test
609             bool doThrow = (i & 0x1) != 0;
610             bool doFlog = (i & 0x2) != 0;
611             run_one_functype_node_test<
612                 /*InputNodeType*/       InputType,
613                 /*InputNodeBodyType0*/  IBodyType1,
614                 /*InputNodeBodyType1*/  IBodyType2,
615                 /* NFT */               func_node_type,
616                 /*TestNodeType*/        TestType,
617                 /*TestNodeBodyType*/    TestBodyType,
618                 /*TypeToSink0 */        Item23,
619                 /*TypeToSink1 */        Item23,
620                 /*SinkNodeType0*/       SnkType,
621                 /*SinkNodeType1*/       SnkType,
622                 /*SinkNodeBodyType1*/   SinkBodyType1,
623                 /*SinkNodeBodyType2*/   SinkBodyType2,
624                 /*Conc*/                Conc>
625                     (doThrow,doFlog,"function_node");
626         }
627     }
628 }  // run_function_node_test
629 
630 void test_function_node() {
631     INFO("Testing function_node\n");
632     // serial rejecting
633     g_Wakeup_Msg = "function_node(1a): Missed wakeup or machine is overloaded?";
634     run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
635     g_Wakeup_Msg = "function_node(1b): Missed wakeup or machine is overloaded?";
636     run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
637     g_Wakeup_Msg = "function_node(1c): Missed wakeup or machine is overloaded?";
638     run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, isThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
639 
640     // serial queueing
641     g_Wakeup_Msg = "function_node(2): Missed wakeup or machine is overloaded?";
642     run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
643     run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
644     run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
645     CheckType<int>::check_type_counter = 0;
646     run_function_node_test<nonThrowing, nonThrowing, CheckType<int>, nonThrowing, CheckType<int>, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
647     CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test");
648 
649     // unlimited parallel rejecting
650     g_Wakeup_Msg = "function_node(3): Missed wakeup or machine is overloaded?";
651     run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
652     run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
653     run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
654 
655     // limited parallel rejecting
656     g_Wakeup_Msg = "function_node(4): Missed wakeup or machine is overloaded?";
657     run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, limited_type>();
658     run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, (size_t)limited_type>();
659     run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::rejecting, (size_t)limited_type>();
660 
661     // limited parallel queueing
662     g_Wakeup_Msg = "function_node(5): Missed wakeup or machine is overloaded?";
663     run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
664     run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
665     run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::queueing, (size_t)limited_type>();
666 
667     // everyone throwing
668     g_Wakeup_Msg = "function_node(6): Missed wakeup or machine is overloaded?";
669     run_function_node_test<isThrowing, isThrowing, int, isThrowing, int, isThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
670     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
671 }
672 
673 // ----------------------------------- multifunction_node ----------------------------------
674 //  Test multifunction_node.
675 //
676 // graph being tested is
677 //
678 //         input_node -\                      /- parallel function_node
679 //                      \                    /
680 //                       +multifunction_node+
681 //                      /                    \                                  x
682 //         input_node -/                      \- parallel function_node
683 //
684 //    After each run the graph is reset(), to test the reset functionality.  The
685 //    multifunction_node will put an item to each successor for every item
686 //    received.
687 //
688 template<
689     TestNodeTypeEnum IType0,                          // does input node 1 throw?
690     TestNodeTypeEnum IType1,                          // does input node 2 thorw?
691     class Item12,                                 // type of item passed between inputs and test node
692     TestNodeTypeEnum FType,                           // does multifunction node throw?
693     class ItemTuple,                              // tuple of types passed from multifunction_node to sink nodes
694     TestNodeTypeEnum NType1,                          // does sink node 1 throw?
695     TestNodeTypeEnum NType2,                          // does sink node 2 throw?
696     class  NodePolicy,                            // rejecting,queueing
697     size_t Conc                                   // is node concurrent? {serial | limited | unlimited}
698 >
699 void run_multifunction_node_test() {
700 
701     typedef typename std::tuple_element<0,ItemTuple>::type Item23Type0;
702     typedef typename std::tuple_element<1,ItemTuple>::type Item23Type1;
703     typedef test_input_body<Item12,IType0> IBodyType1;
704     typedef test_input_body<Item12,IType1> IBodyType2;
705     typedef multifunction_node_body<Item12, ItemTuple, FType, Conc> TestBodyType;
706     typedef absorber_body<Item23Type0,tbb::flow::continue_msg, NType1, unlimited_type> SinkBodyType1;
707     typedef absorber_body<Item23Type1,tbb::flow::continue_msg, NType2, unlimited_type> SinkBodyType2;
708 
709     typedef tbb::flow::input_node<Item12> InputType;
710     typedef tbb::flow::multifunction_node<Item12, ItemTuple, NodePolicy> TestType;
711     typedef tbb::flow::function_node<Item23Type0,tbb::flow::continue_msg> SnkType0;
712     typedef tbb::flow::function_node<Item23Type1,tbb::flow::continue_msg> SnkType1;
713 
714     for(int i = 0; i < 4; ++i ) {
715         if(i != 2) {  // doesn't make sense to flog a non-throwing test
716             bool doThrow = (i & 0x1) != 0;
717             bool doFlog = (i & 0x2) != 0;
718     run_one_functype_node_test<
719         /*InputNodeType*/       InputType,
720         /*InputNodeBodyType0*/  IBodyType1,
721         /*InputNodeBodyType1*/  IBodyType2,
722         /*NFT*/                 multifunc_node_type,
723         /*TestNodeType*/        TestType,
724         /*TestNodeBodyType*/    TestBodyType,
725         /*TypeToSink0*/         Item23Type0,
726         /*TypeToSink1*/         Item23Type1,
727         /*SinkNodeType0*/       SnkType0,
728         /*SinkNodeType1*/       SnkType1,
729         /*SinkNodeBodyType0*/   SinkBodyType1,
730         /*SinkNodeBodyType1*/   SinkBodyType2,
731         /*Conc*/                Conc>
732             (doThrow,doFlog,"multifunction_node");
733         }
734     }
735 }  // run_multifunction_node_test
736 
737 void test_multifunction_node() {
738     INFO("Testing multifunction_node\n");
739     g_Wakeup_Msg = "multifunction_node(input throws,rejecting,serial): Missed wakeup or machine is overloaded?";
740     // serial rejecting
741     run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,float>, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
742     g_Wakeup_Msg = "multifunction_node(test throws,rejecting,serial): Missed wakeup or machine is overloaded?";
743     run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
744     g_Wakeup_Msg = "multifunction_node(sink throws,rejecting,serial): Missed wakeup or machine is overloaded?";
745     run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, isThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
746 
747     g_Wakeup_Msg = "multifunction_node(2): Missed wakeup or machine is overloaded?";
748     // serial queueing
749     run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
750     run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
751     run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
752     CheckType<int>::check_type_counter = 0;
753     run_multifunction_node_test<nonThrowing, nonThrowing, CheckType<int>, nonThrowing, std::tuple<CheckType<int>, CheckType<int> >, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
754     CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test");
755 
756     g_Wakeup_Msg = "multifunction_node(3): Missed wakeup or machine is overloaded?";
757     // unlimited parallel rejecting
758     run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
759     run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
760     run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
761 
762     g_Wakeup_Msg = "multifunction_node(4): Missed wakeup or machine is overloaded?";
763     // limited parallel rejecting
764     run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, limited_type>();
765     run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, (size_t)limited_type>();
766     run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::rejecting, (size_t)limited_type>();
767 
768     g_Wakeup_Msg = "multifunction_node(5): Missed wakeup or machine is overloaded?";
769     // limited parallel queueing
770     run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
771     run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
772     run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::queueing, (size_t)limited_type>();
773 
774     g_Wakeup_Msg = "multifunction_node(6): Missed wakeup or machine is overloaded?";
775     // everyone throwing
776     run_multifunction_node_test<isThrowing, isThrowing, int, isThrowing, std::tuple<int,int>, isThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
777     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
778 }
779 
780 //
781 // Continue node has T predecessors.  when it receives messages (continue_msg) on T predecessors
782 // it executes the body of the node, and forwards a continue_msg to its successors.
783 // However many predecessors the continue_node has, that's how many continue_msgs it receives
784 // on input before forwarding a message.
785 //
786 // The graph will look like
787 //
788 //                                          +broadcast_node+
789 //                                         /                \             ___
790 //       input_node+------>+broadcast_node+                  +continue_node+--->+absorber
791 //                                         \                /
792 //                                          +broadcast_node+
793 //
794 // The continue_node has unlimited parallelism, no input buffering, and broadcasts to successors.
795 // The absorber is parallel, so each item emitted by the input will result in one thread
796 // spinning.  So for N threads we pass N-1 continue_messages, then spin wait and then throw if
797 // we are allowed to.
798 
799 template < class InputNodeType, class InputNodeBodyType, class TTestNodeType, class TestNodeBodyType,
800         class SinkNodeType, class SinkNodeBodyType>
801 void run_one_continue_node_test (bool throwException, bool flog) {
802     tbb::flow::graph g;
803 
804     std::atomic<int> input_count;
805     std::atomic<int> test_count;
806     std::atomic<int> sink_count;
807     input_count = test_count = sink_count = 0;
808 #if USE_TASK_SCHEDULER_OBSERVER
809     eh_test_observer o;
810     o.observe(true);
811 #endif
812     g_Master = std::this_thread::get_id();
813     InputNodeType input(g, InputNodeBodyType(input_count));
814     TTestNodeType node_to_test(g, TestNodeBodyType(test_count));
815     SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
816     tbb::flow::broadcast_node<tbb::flow::continue_msg> b1(g), b2(g), b3(g);
817     make_edge(input, b1);
818     make_edge(b1,b2);
819     make_edge(b1,b3);
820     make_edge(b2,node_to_test);
821     make_edge(b3,node_to_test);
822     make_edge(node_to_test, sink);
823     for(int iter = 0; iter < 2; ++iter) {
824         ResetGlobals(throwException,flog);
825         if(throwException) {
826             TRY();
827                 input.activate();
828                 g.wait_for_all();
829             CATCH_AND_ASSERT();
830         }
831         else {
832             TRY();
833                 input.activate();
834                 g.wait_for_all();
835             CATCH_AND_FAIL();
836         }
837         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
838         int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value();
839         int t_cnt   = tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value();
840         int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
841         if(throwException) {
842             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
843             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
844             CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs");
845             CHECK_MESSAGE( (ib_cnt >= t_cnt), "Too many items received by test node");
846             CHECK_MESSAGE( (nb_cnt <= t_cnt), "Too many items received by sink nodes");
847         }
848         else {
849             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
850             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
851             CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node");
852             CHECK_MESSAGE( (t_cnt == g_NumItems), "Not all items reached test node");
853             CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
854         }
855         g.reset();  // resets the body of the input_nodes, test_node and the absorb_nodes.
856         input_count = test_count = sink_count = 0;
857         CHECK_MESSAGE( (0 == (int)test_count), "Atomic wasn't reset properly");
858         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed");
859         CHECK_MESSAGE( (0 == tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value()),"Reset test_node failed");
860         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed");
861     }
862 #if USE_TASK_SCHEDULER_OBSERVER
863     o.observe(false);
864 #endif
865 }
866 
867 template<
868     class ItemType,
869     TestNodeTypeEnum IType,   // does input node throw?
870     TestNodeTypeEnum CType,   // does continue_node throw?
871     TestNodeTypeEnum AType>    // does absorber throw
872 void run_continue_node_test() {
873     typedef test_input_body<tbb::flow::continue_msg,IType> IBodyType;
874     typedef absorber_body<tbb::flow::continue_msg,ItemType,CType,unlimited_type> ContBodyType;
875     typedef absorber_body<ItemType,tbb::flow::continue_msg, AType, unlimited_type> SinkBodyType;
876 
877     typedef tbb::flow::input_node<tbb::flow::continue_msg> InputType;
878     typedef tbb::flow::continue_node<ItemType> TestType;
879     typedef tbb::flow::function_node<ItemType,tbb::flow::continue_msg> SnkType;
880 
881     for(int i = 0; i < 4; ++i ) {
882         if(i == 2) continue;  // don't run (false,true); it doesn't make sense.
883         bool doThrow = (i & 0x1) != 0;
884         bool doFlog = (i & 0x2) != 0;
885         run_one_continue_node_test<
886             /*InputNodeType*/       InputType,
887             /*InputNodeBodyType*/   IBodyType,
888             /*TestNodeType*/        TestType,
889             /*TestNodeBodyType*/    ContBodyType,
890             /*SinkNodeType*/        SnkType,
891             /*SinkNodeBodyType*/    SinkBodyType>
892             (doThrow,doFlog);
893     }
894 }
895 
896 //
897 void test_continue_node() {
898     INFO("Testing continue_node\n");
899     g_Wakeup_Msg = "buffer_node(non,is,non): Missed wakeup or machine is overloaded?";
900     run_continue_node_test<int,nonThrowing,isThrowing,nonThrowing>();
901     g_Wakeup_Msg = "buffer_node(non,non,is): Missed wakeup or machine is overloaded?";
902     run_continue_node_test<int,nonThrowing,nonThrowing,isThrowing>();
903     g_Wakeup_Msg = "buffer_node(is,non,non): Missed wakeup or machine is overloaded?";
904     run_continue_node_test<int,isThrowing,nonThrowing,nonThrowing>();
905     g_Wakeup_Msg = "buffer_node(is,is,is): Missed wakeup or machine is overloaded?";
906     run_continue_node_test<int,isThrowing,isThrowing,isThrowing>();
907     CheckType<double>::check_type_counter = 0;
908     run_continue_node_test<CheckType<double>,isThrowing,isThrowing,isThrowing>();
909     CHECK_MESSAGE( (!CheckType<double>::check_type_counter), "Dropped objects in continue_node test");
910     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
911 }
912 
913 // ---------- buffer_node queue_node overwrite_node --------------
914 
915 template<
916     class BufferItemType,       //
917     class InputNodeType,
918     class InputNodeBodyType,
919     class TestNodeType,
920     class SinkNodeType,
921     class SinkNodeBodyType >
922 void run_one_buffer_node_test(bool throwException,bool flog) {
923     tbb::flow::graph g;
924 
925     std::atomic<int> input_count;
926     std::atomic<int> sink_count;
927     input_count = sink_count = 0;
928 #if USE_TASK_SCHEDULER_OBSERVER
929     eh_test_observer o;
930     o.observe(true);
931 #endif
932     g_Master = std::this_thread::get_id();
933     InputNodeType input(g, InputNodeBodyType(input_count));
934     TestNodeType node_to_test(g);
935     SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
936     make_edge(input,node_to_test);
937     make_edge(node_to_test, sink);
938     for(int iter = 0; iter < 2; ++iter) {
939         ResetGlobals(throwException,flog);
940         if(throwException) {
941             TRY();
942                 input.activate();
943                 g.wait_for_all();
944             CATCH_AND_ASSERT();
945         }
946         else {
947             TRY();
948                 input.activate();
949                 g.wait_for_all();
950             CATCH_AND_FAIL();
951         }
952         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
953         int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value();
954         int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
955         if(throwException) {
956             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
957             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
958             CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs");
959             CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes");
960         }
961         else {
962             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
963             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
964             CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node");
965             CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
966         }
967         if(iter == 0) {
968             remove_edge(node_to_test, sink);
969             node_to_test.try_put(BufferItemType());
970             g.wait_for_all();
971             g.reset();
972             input_count = sink_count = 0;
973             BufferItemType tmp;
974             CHECK_MESSAGE( (!node_to_test.try_get(tmp)), "node not empty");
975             make_edge(node_to_test, sink);
976             g.wait_for_all();
977         }
978         else {
979             g.reset();
980             input_count = sink_count = 0;
981         }
982         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed");
983         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed");
984     }
985 
986 #if USE_TASK_SCHEDULER_OBSERVER
987     o.observe(false);
988 #endif
989 }
990 template<class BufferItemType,
991          TestNodeTypeEnum InputThrowType,
992          TestNodeTypeEnum SinkThrowType>
993 void run_buffer_queue_and_overwrite_node_test() {
994     typedef test_input_body<BufferItemType,InputThrowType> InputBodyType;
995     typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
996 
997     typedef tbb::flow::input_node<BufferItemType> InputType;
998     typedef tbb::flow::buffer_node<BufferItemType> BufType;
999     typedef tbb::flow::queue_node<BufferItemType>  QueType;
1000     typedef tbb::flow::overwrite_node<BufferItemType>  OvrType;
1001     typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
1002 
1003     for(int i = 0; i < 4; ++i) {
1004         if(i == 2) continue;  // no need to test flog w/o throws
1005         bool throwException = (i & 0x1) != 0;
1006         bool doFlog = (i & 0x2) != 0;
1007         run_one_buffer_node_test<
1008             /* class BufferItemType*/     BufferItemType,
1009             /*class InputNodeType*/       InputType,
1010             /*class InputNodeBodyType*/   InputBodyType,
1011             /*class TestNodeType*/        BufType,
1012             /*class SinkNodeType*/        SnkType,
1013             /*class SinkNodeBodyType*/    SinkBodyType
1014             >(throwException, doFlog);
1015         run_one_buffer_node_test<
1016             /* class BufferItemType*/     BufferItemType,
1017             /*class InputNodeType*/       InputType,
1018             /*class InputNodeBodyType*/   InputBodyType,
1019             /*class TestNodeType*/        QueType,
1020             /*class SinkNodeType*/        SnkType,
1021             /*class SinkNodeBodyType*/    SinkBodyType
1022             >(throwException, doFlog);
1023         run_one_buffer_node_test<
1024             /* class BufferItemType*/     BufferItemType,
1025             /*class InputNodeType*/       InputType,
1026             /*class InputNodeBodyType*/   InputBodyType,
1027             /*class TestNodeType*/        OvrType,
1028             /*class SinkNodeType*/        SnkType,
1029             /*class SinkNodeBodyType*/    SinkBodyType
1030             >(throwException, doFlog);
1031     }
1032 }
1033 
1034 void test_buffer_queue_and_overwrite_node() {
1035     INFO("Testing buffer_node, queue_node and overwrite_node\n");
1036     g_Wakeup_Msg = "buffer, queue, overwrite(is,non): Missed wakeup or machine is overloaded?";
1037     run_buffer_queue_and_overwrite_node_test<int,isThrowing,nonThrowing>();
1038     g_Wakeup_Msg = "buffer, queue, overwrite(non,is): Missed wakeup or machine is overloaded?";
1039     run_buffer_queue_and_overwrite_node_test<int,nonThrowing,isThrowing>();
1040     g_Wakeup_Msg = "buffer, queue, overwrite(is,is): Missed wakeup or machine is overloaded?";
1041     run_buffer_queue_and_overwrite_node_test<int,isThrowing,isThrowing>();
1042     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1043 }
1044 
1045 // ---------- sequencer_node -------------------------
1046 
1047 
1048 template<
1049     class BufferItemType,       //
1050     class InputNodeType,
1051     class InputNodeBodyType,
1052     class TestNodeType,
1053     class SeqBodyType,
1054     class SinkNodeType,
1055     class SinkNodeBodyType >
1056 void run_one_sequencer_node_test(bool throwException,bool flog) {
1057     tbb::flow::graph g;
1058 
1059     std::atomic<int> input_count;
1060     std::atomic<int> sink_count;
1061     input_count = sink_count = 0;
1062 #if USE_TASK_SCHEDULER_OBSERVER
1063     eh_test_observer o;
1064     o.observe(true);
1065 #endif
1066     g_Master = std::this_thread::get_id();
1067     InputNodeType input(g, InputNodeBodyType(input_count));
1068     TestNodeType node_to_test(g,SeqBodyType());
1069     SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
1070     make_edge(input,node_to_test);
1071     make_edge(node_to_test, sink);
1072     for(int iter = 0; iter < 2; ++iter) {
1073         ResetGlobals(throwException,flog);
1074         if(throwException) {
1075             TRY();
1076                 input.activate();
1077                 g.wait_for_all();
1078             CATCH_AND_ASSERT();
1079         }
1080         else {
1081             TRY();
1082                 input.activate();
1083                 g.wait_for_all();
1084             CATCH_AND_FAIL();
1085         }
1086         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1087         int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value();
1088         int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
1089         if(throwException) {
1090             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
1091             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
1092             CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs");
1093             CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes");
1094         }
1095         else {
1096             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
1097             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
1098             CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node");
1099             CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
1100         }
1101         if(iter == 0) {
1102             remove_edge(node_to_test, sink);
1103             node_to_test.try_put(BufferItemType(g_NumItems + 1));
1104             node_to_test.try_put(BufferItemType(1));
1105             g.wait_for_all();
1106             g.reset();
1107             input_count = sink_count = 0;
1108             make_edge(node_to_test, sink);
1109             g.wait_for_all();
1110         }
1111         else {
1112             g.reset();
1113             input_count = sink_count = 0;
1114         }
1115         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed");
1116         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed");
1117     }
1118 
1119 #if USE_TASK_SCHEDULER_OBSERVER
1120     o.observe(false);
1121 #endif
1122 }
1123 
1124 template<class BufferItemType,
1125          TestNodeTypeEnum InputThrowType,
1126          TestNodeTypeEnum SinkThrowType>
1127 void run_sequencer_node_test() {
1128     typedef test_input_body<BufferItemType,InputThrowType> InputBodyType;
1129     typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1130     typedef sequencer_body<BufferItemType> SeqBodyType;
1131 
1132     typedef tbb::flow::input_node<BufferItemType> InputType;
1133     typedef tbb::flow::sequencer_node<BufferItemType>  SeqType;
1134     typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
1135 
1136     for(int i = 0; i < 4; ++i) {
1137         if(i == 2) continue;  // no need to test flog w/o throws
1138         bool throwException = (i & 0x1) != 0;
1139         bool doFlog = (i & 0x2) != 0;
1140         run_one_sequencer_node_test<
1141             /* class BufferItemType*/     BufferItemType,
1142             /*class InputNodeType*/       InputType,
1143             /*class InputNodeBodyType*/   InputBodyType,
1144             /*class TestNodeType*/        SeqType,
1145             /*class SeqBodyType*/         SeqBodyType,
1146             /*class SinkNodeType*/        SnkType,
1147             /*class SinkNodeBodyType*/    SinkBodyType
1148             >(throwException, doFlog);
1149     }
1150 }
1151 
1152 
1153 
1154 void test_sequencer_node() {
1155     INFO("Testing sequencer_node\n");
1156     g_Wakeup_Msg = "sequencer_node(is,non): Missed wakeup or machine is overloaded?";
1157     run_sequencer_node_test<int, isThrowing,nonThrowing>();
1158     CheckType<int>::check_type_counter = 0;
1159     g_Wakeup_Msg = "sequencer_node(non,is): Missed wakeup or machine is overloaded?";
1160     run_sequencer_node_test<CheckType<int>, nonThrowing,isThrowing>();
1161     CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Dropped objects in sequencer_node test");
1162     g_Wakeup_Msg = "sequencer_node(is,is): Missed wakeup or machine is overloaded?";
1163     run_sequencer_node_test<int, isThrowing,isThrowing>();
1164     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1165 }
1166 
1167 // ------------ priority_queue_node ------------------
1168 
1169 template<
1170     class BufferItemType,
1171     class InputNodeType,
1172     class InputNodeBodyType,
1173     class TestNodeType,
1174     class SinkNodeType,
1175     class SinkNodeBodyType >
1176 void run_one_priority_queue_node_test(bool throwException,bool flog) {
1177     tbb::flow::graph g;
1178 
1179     std::atomic<int> input_count;
1180     std::atomic<int> sink_count;
1181     input_count = sink_count = 0;
1182 #if USE_TASK_SCHEDULER_OBSERVER
1183     eh_test_observer o;
1184     o.observe(true);
1185 #endif
1186     g_Master = std::this_thread::get_id();
1187     InputNodeType input(g, InputNodeBodyType(input_count));
1188 
1189     TestNodeType node_to_test(g);
1190 
1191     SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
1192 
1193     make_edge(input,node_to_test);
1194     make_edge(node_to_test, sink);
1195     for(int iter = 0; iter < 2; ++iter) {
1196         ResetGlobals(throwException,flog);
1197         if(throwException) {
1198             TRY();
1199                 input.activate();
1200                 g.wait_for_all();
1201             CATCH_AND_ASSERT();
1202         }
1203         else {
1204             TRY();
1205                 input.activate();
1206                 g.wait_for_all();
1207             CATCH_AND_FAIL();
1208         }
1209         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1210         int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value();
1211         int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
1212         if(throwException) {
1213             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
1214             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
1215             CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs");
1216             CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes");
1217         }
1218         else {
1219             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
1220             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
1221             CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node");
1222             CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
1223         }
1224         if(iter == 0) {
1225             remove_edge(node_to_test, sink);
1226             node_to_test.try_put(BufferItemType(g_NumItems + 1));
1227             node_to_test.try_put(BufferItemType(g_NumItems + 2));
1228             node_to_test.try_put(BufferItemType());
1229             g.wait_for_all();
1230             g.reset();
1231             input_count = sink_count = 0;
1232             make_edge(node_to_test, sink);
1233             g.wait_for_all();
1234         }
1235         else {
1236             g.reset();
1237             input_count = sink_count = 0;
1238         }
1239         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed");
1240         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed");
1241     }
1242 
1243 #if USE_TASK_SCHEDULER_OBSERVER
1244     o.observe(false);
1245 #endif
1246 }
1247 
1248 template<class BufferItemType,
1249          TestNodeTypeEnum InputThrowType,
1250          TestNodeTypeEnum SinkThrowType>
1251 void run_priority_queue_node_test() {
1252     typedef test_input_body<BufferItemType,InputThrowType> InputBodyType;
1253     typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1254     typedef less_body<BufferItemType> LessBodyType;
1255 
1256     typedef tbb::flow::input_node<BufferItemType> InputType;
1257     typedef tbb::flow::priority_queue_node<BufferItemType,LessBodyType>  PrqType;
1258     typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
1259 
1260     for(int i = 0; i < 4; ++i) {
1261         if(i == 2) continue;  // no need to test flog w/o throws
1262         bool throwException = (i & 0x1) != 0;
1263         bool doFlog = (i & 0x2) != 0;
1264         run_one_priority_queue_node_test<
1265             /* class BufferItemType*/     BufferItemType,
1266             /*class InputNodeType*/       InputType,
1267             /*class InputNodeBodyType*/   InputBodyType,
1268             /*class TestNodeType*/        PrqType,
1269             /*class SinkNodeType*/        SnkType,
1270             /*class SinkNodeBodyType*/    SinkBodyType
1271             >(throwException, doFlog);
1272     }
1273 }
1274 
1275 void test_priority_queue_node() {
1276     INFO("Testing priority_queue_node\n");
1277     g_Wakeup_Msg = "priority_queue_node(is,non): Missed wakeup or machine is overloaded?";
1278     run_priority_queue_node_test<int, isThrowing,nonThrowing>();
1279     CheckType<int>::check_type_counter = 0;
1280     g_Wakeup_Msg = "priority_queue_node(non,is): Missed wakeup or machine is overloaded?";
1281     run_priority_queue_node_test<CheckType<int>, nonThrowing,isThrowing>();
1282     CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Dropped objects in priority_queue_node test");
1283     g_Wakeup_Msg = "priority_queue_node(is,is): Missed wakeup or machine is overloaded?";
1284     run_priority_queue_node_test<int, isThrowing,isThrowing>();
1285     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1286 }
1287 
1288 // ------------------- join_node ----------------
1289 template<class JP> struct graph_policy_name{
1290     static const char* name() {return "unknown"; }
1291 };
1292 template<> struct graph_policy_name<tbb::flow::queueing>  {
1293     static const char* name() {return "queueing"; }
1294 };
1295 template<> struct graph_policy_name<tbb::flow::reserving> {
1296     static const char* name() {return "reserving"; }
1297 };
1298 template<> struct graph_policy_name<tbb::flow::tag_matching> {
1299     static const char* name() {return "tag_matching"; }
1300 };
1301 
1302 
1303 template<
1304     class JP,
1305     class OutputTuple,
1306     class InputType0,
1307     class InputBodyType0,
1308     class InputType1,
1309     class InputBodyType1,
1310     class TestJoinType,
1311     class SinkType,
1312     class SinkBodyType
1313     >
1314 struct run_one_join_node_test {
1315     run_one_join_node_test() {}
1316     static void execute_test(bool throwException,bool flog) {
1317         typedef typename std::tuple_element<0,OutputTuple>::type ItemType0;
1318         typedef typename std::tuple_element<1,OutputTuple>::type ItemType1;
1319 
1320         tbb::flow::graph g;
1321         std::atomic<int>input0_count;
1322         std::atomic<int>input1_count;
1323         std::atomic<int>sink_count;
1324         input0_count = input1_count = sink_count = 0;
1325 #if USE_TASK_SCHEDULER_OBSERVER
1326         eh_test_observer o;
1327         o.observe(true);
1328 #endif
1329         g_Master = std::this_thread::get_id();
1330         InputType0 input0(g, InputBodyType0(input0_count));
1331         InputType1 input1(g, InputBodyType1(input1_count));
1332         TestJoinType node_to_test(g);
1333         SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count));
1334         make_edge(input0,tbb::flow::input_port<0>(node_to_test));
1335         make_edge(input1,tbb::flow::input_port<1>(node_to_test));
1336         make_edge(node_to_test, sink);
1337         for(int iter = 0; iter < 2; ++iter) {
1338             ResetGlobals(throwException,flog);
1339             if(throwException) {
1340                 TRY();
1341                     input0.activate();
1342                     input1.activate();
1343                     g.wait_for_all();
1344                 CATCH_AND_ASSERT();
1345             }
1346             else {
1347                 TRY();
1348                     input0.activate();
1349                     input1.activate();
1350                     g.wait_for_all();
1351                 CATCH_AND_FAIL();
1352             }
1353             bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1354             int ib0_cnt = tbb::flow::copy_body<InputBodyType0>(input0).count_value();
1355             int ib1_cnt = tbb::flow::copy_body<InputBodyType1>(input1).count_value();
1356             int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1357             if(throwException) {
1358                 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
1359                 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
1360                 CHECK_MESSAGE( (ib0_cnt <= g_NumItems && ib1_cnt <= g_NumItems), "Too many items sent by inputs");
1361                 CHECK_MESSAGE( (nb_cnt <= ((ib0_cnt < ib1_cnt) ? ib0_cnt : ib1_cnt)), "Too many items received by sink nodes");
1362             }
1363             else {
1364                 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
1365                 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
1366                 if(ib0_cnt != g_NumItems) {
1367                 //     INFO("throwException == %s\n" << (throwException ? "true" : "false"));
1368                 //     INFO("iter == " << iter << "\n");
1369                 //     INFO("ib0_cnt == " << ib0_cnt << "\n");
1370                 //     INFO("g_NumItems == " << g_NumItems << "\n");
1371                 }
1372                 CHECK_MESSAGE( (ib0_cnt == g_NumItems), "Missing invocations of input_node0");  // this one
1373                 CHECK_MESSAGE( (ib1_cnt == g_NumItems), "Missing invocations of input_node1");
1374                 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
1375             }
1376             if(iter == 0) {
1377                 remove_edge(node_to_test, sink);
1378                 tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 1));
1379                 tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2));
1380                 g.wait_for_all();
1381                 g.reset();
1382                 input0_count = input1_count = sink_count = 0;
1383                 make_edge(node_to_test, sink);
1384                 g.wait_for_all();
1385             }
1386             else {
1387                 g.wait_for_all();
1388                 g.reset();
1389                 input0_count = input1_count = sink_count = 0;
1390             }
1391             CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType0>(input0).count_value()),"Reset input failed");
1392             CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType1>(input1).count_value()),"Reset input failed");
1393             nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1394             CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value()),"Reset sink failed");
1395         }
1396 
1397 #if USE_TASK_SCHEDULER_OBSERVER
1398         o.observe(false);
1399 #endif
1400     }
1401 };  // run_one_join_node_test
1402 
1403 template<
1404     class OutputTuple,
1405     class InputType0,
1406     class InputBodyType0,
1407     class InputType1,
1408     class InputBodyType1,
1409     class TestJoinType,
1410     class SinkType,
1411     class SinkBodyType
1412     >
1413 struct run_one_join_node_test<
1414         tbb::flow::tag_matching,
1415         OutputTuple,
1416         InputType0,
1417         InputBodyType0,
1418         InputType1,
1419         InputBodyType1,
1420         TestJoinType,
1421         SinkType,
1422         SinkBodyType
1423     > {
1424     run_one_join_node_test() {}
1425     static void execute_test(bool throwException,bool flog) {
1426         typedef typename std::tuple_element<0,OutputTuple>::type ItemType0;
1427         typedef typename std::tuple_element<1,OutputTuple>::type ItemType1;
1428 
1429         tbb::flow::graph g;
1430 
1431         std::atomic<int>input0_count;
1432         std::atomic<int>input1_count;
1433         std::atomic<int>sink_count;
1434         input0_count = input1_count = sink_count = 0;
1435 #if USE_TASK_SCHEDULER_OBSERVER
1436         eh_test_observer o;
1437         o.observe(true);
1438 #endif
1439         g_Master = std::this_thread::get_id();
1440         InputType0 input0(g, InputBodyType0(input0_count, 2));
1441         InputType1 input1(g, InputBodyType1(input1_count, 3));
1442         TestJoinType node_to_test(g, tag_func<ItemType0>(ItemType0(2)), tag_func<ItemType1>(ItemType1(3)));
1443         SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count));
1444         make_edge(input0,tbb::flow::input_port<0>(node_to_test));
1445         make_edge(input1,tbb::flow::input_port<1>(node_to_test));
1446         make_edge(node_to_test, sink);
1447         for(int iter = 0; iter < 2; ++iter) {
1448             ResetGlobals(throwException,flog);
1449             if(throwException) {
1450                 TRY();
1451                     input0.activate();
1452                     input1.activate();
1453                     g.wait_for_all();
1454                 CATCH_AND_ASSERT();
1455             }
1456             else {
1457                 TRY();
1458                     input0.activate();
1459                     input1.activate();
1460                     g.wait_for_all();
1461                 CATCH_AND_FAIL();
1462             }
1463             bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1464             int ib0_cnt = tbb::flow::copy_body<InputBodyType0>(input0).count_value();
1465             int ib1_cnt = tbb::flow::copy_body<InputBodyType1>(input1).count_value();
1466             int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1467             if(throwException) {
1468                 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
1469                 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
1470                 CHECK_MESSAGE( (ib0_cnt <= g_NumItems && ib1_cnt <= g_NumItems), "Too many items sent by inputs");
1471                 CHECK_MESSAGE( (nb_cnt <= ((ib0_cnt < ib1_cnt) ? ib0_cnt : ib1_cnt)), "Too many items received by sink nodes");
1472             }
1473             else {
1474                 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
1475                 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
1476                 CHECK_MESSAGE( (ib0_cnt == g_NumItems), "Missing invocations of input_node0");
1477                 CHECK_MESSAGE( (ib1_cnt == g_NumItems), "Missing invocations of input_node1");
1478                 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
1479             }
1480             if(iter == 0) {
1481                 remove_edge(node_to_test, sink);
1482                 tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 4));
1483                 tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2));
1484                 g.wait_for_all();   // have to wait for the graph to stop again....
1485                 g.reset();  // resets the body of the input_nodes, test_node and the absorb_nodes.
1486                 input0_count = input1_count = sink_count = 0;
1487                 make_edge(node_to_test, sink);
1488                 g.wait_for_all();   // have to wait for the graph to stop again....
1489             }
1490             else {
1491                 g.wait_for_all();
1492                 g.reset();
1493                 input0_count = input1_count = sink_count = 0;
1494             }
1495             CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType0>(input0).count_value()),"Reset input failed");
1496             CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType1>(input1).count_value()),"Reset input failed");
1497             nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1498             CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value()),"Reset sink failed");
1499         }
1500 
1501 #if USE_TASK_SCHEDULER_OBSERVER
1502         o.observe(false);
1503 #endif
1504     }
1505 };  // run_one_join_node_test<tag_matching>
1506 
1507 template<class JP, class OutputTuple,
1508              TestNodeTypeEnum InputThrowType,
1509              TestNodeTypeEnum SinkThrowType>
1510 void run_join_node_test() {
1511     typedef typename std::tuple_element<0,OutputTuple>::type ItemType0;
1512     typedef typename std::tuple_element<1,OutputTuple>::type ItemType1;
1513     typedef test_input_body<ItemType0,InputThrowType> InputBodyType0;
1514     typedef test_input_body<ItemType1,InputThrowType> InputBodyType1;
1515     typedef absorber_body<OutputTuple,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1516 
1517     typedef typename tbb::flow::input_node<ItemType0> InputType0;
1518     typedef typename tbb::flow::input_node<ItemType1> InputType1;
1519     typedef typename tbb::flow::join_node<OutputTuple,JP> TestJoinType;
1520     typedef typename tbb::flow::function_node<OutputTuple,tbb::flow::continue_msg> SinkType;
1521 
1522     for(int i = 0; i < 4; ++i) {
1523         if(2 == i) continue;
1524         bool throwException = (i & 0x1) != 0;
1525         bool doFlog = (i & 0x2) != 0;
1526         run_one_join_node_test<
1527              JP,
1528              OutputTuple,
1529              InputType0,
1530              InputBodyType0,
1531              InputType1,
1532              InputBodyType1,
1533              TestJoinType,
1534              SinkType,
1535              SinkBodyType>::execute_test(throwException,doFlog);
1536     }
1537 }
1538 
1539 template<class JP>
1540 void test_join_node() {
1541     INFO("Testing join_node<" << graph_policy_name<JP>::name() << ">\n");
1542     // only doing two-input joins
1543     g_Wakeup_Msg = "join(is,non): Missed wakeup or machine is overloaded?";
1544     run_join_node_test<JP, std::tuple<int,int>,  isThrowing, nonThrowing>();
1545     CheckType<int>::check_type_counter = 0;
1546     g_Wakeup_Msg = "join(non,is): Missed wakeup or machine is overloaded?";
1547     run_join_node_test<JP, std::tuple<CheckType<int>,int>, nonThrowing, isThrowing>();
1548     CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Dropped items in test");
1549     g_Wakeup_Msg = "join(is,is): Missed wakeup or machine is overloaded?";
1550     run_join_node_test<JP, std::tuple<int,int>,  isThrowing, isThrowing>();
1551     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1552 }
1553 
1554 // ------------------- limiter_node -------------
1555 
1556 template<
1557     class BufferItemType,       //
1558     class InputNodeType,
1559     class InputNodeBodyType,
1560     class TestNodeType,
1561     class SinkNodeType,
1562     class SinkNodeBodyType >
1563 void run_one_limiter_node_test(bool throwException,bool flog) {
1564     tbb::flow::graph g;
1565 
1566     std::atomic<int> input_count;
1567     std::atomic<int> sink_count;
1568     input_count = sink_count = 0;
1569 #if USE_TASK_SCHEDULER_OBSERVER
1570     eh_test_observer o;
1571     o.observe(true);
1572 #endif
1573     g_Master = std::this_thread::get_id();
1574     InputNodeType input(g, InputNodeBodyType(input_count));
1575     TestNodeType node_to_test(g,g_NumThreads + 1);
1576     SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
1577     make_edge(input,node_to_test);
1578     make_edge(node_to_test, sink);
1579     for(int iter = 0; iter < 2; ++iter) {
1580         ResetGlobals(throwException,flog);
1581         if(throwException) {
1582             TRY();
1583                 input.activate();
1584                 g.wait_for_all();
1585             CATCH_AND_ASSERT();
1586         }
1587         else {
1588             TRY();
1589                 input.activate();
1590                 g.wait_for_all();
1591             CATCH_AND_FAIL();
1592         }
1593         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1594         int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value();
1595         int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
1596         if(throwException) {
1597             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
1598             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
1599             CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs");
1600             CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes");
1601         }
1602         else {
1603             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
1604             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
1605             // we stop after limiter's limit, which is g_NumThreads + 1.  The input_node
1606             // is invoked one extra time, filling its buffer, so its limit is g_NumThreads + 2.
1607             CHECK_MESSAGE( (ib_cnt == g_NumThreads + 2), "Missing invocations of input_node");
1608             CHECK_MESSAGE( (nb_cnt == g_NumThreads + 1), "Missing items in absorbers");
1609         }
1610         if(iter == 0) {
1611             remove_edge(node_to_test, sink);
1612             node_to_test.try_put(BufferItemType());
1613             node_to_test.try_put(BufferItemType());
1614             g.wait_for_all();
1615             g.reset();
1616             input_count = sink_count = 0;
1617             BufferItemType tmp;
1618             CHECK_MESSAGE( (!node_to_test.try_get(tmp)), "node not empty");
1619             make_edge(node_to_test, sink);
1620             g.wait_for_all();
1621         }
1622         else {
1623             g.reset();
1624             input_count = sink_count = 0;
1625         }
1626         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed");
1627         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed");
1628     }
1629 
1630 #if USE_TASK_SCHEDULER_OBSERVER
1631     o.observe(false);
1632 #endif
1633 }
1634 
1635 template<class BufferItemType,
1636          TestNodeTypeEnum InputThrowType,
1637          TestNodeTypeEnum SinkThrowType>
1638 void run_limiter_node_test() {
1639     typedef test_input_body<BufferItemType,InputThrowType> InputBodyType;
1640     typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1641 
1642     typedef tbb::flow::input_node<BufferItemType> InputType;
1643     typedef tbb::flow::limiter_node<BufferItemType>  LmtType;
1644     typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
1645 
1646     for(int i = 0; i < 4; ++i) {
1647         if(i == 2) continue;  // no need to test flog w/o throws
1648         bool throwException = (i & 0x1) != 0;
1649         bool doFlog = (i & 0x2) != 0;
1650         run_one_limiter_node_test<
1651             /* class BufferItemType*/     BufferItemType,
1652             /*class InputNodeType*/       InputType,
1653             /*class InputNodeBodyType*/   InputBodyType,
1654             /*class TestNodeType*/        LmtType,
1655             /*class SinkNodeType*/        SnkType,
1656             /*class SinkNodeBodyType*/    SinkBodyType
1657             >(throwException, doFlog);
1658     }
1659 }
1660 
1661 void test_limiter_node() {
1662     INFO("Testing limiter_node\n");
1663     g_Wakeup_Msg = "limiter_node(is,non): Missed wakeup or machine is overloaded?";
1664     run_limiter_node_test<int,isThrowing,nonThrowing>();
1665     g_Wakeup_Msg = "limiter_node(non,is): Missed wakeup or machine is overloaded?";
1666     run_limiter_node_test<int,nonThrowing,isThrowing>();
1667     g_Wakeup_Msg = "limiter_node(is,is): Missed wakeup or machine is overloaded?";
1668     run_limiter_node_test<int,isThrowing,isThrowing>();
1669     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1670 }
1671 
1672 // -------- split_node --------------------
1673 
1674 template<
1675     class InputTuple,
1676     class InputType,
1677     class InputBodyType,
1678     class TestSplitType,
1679     class SinkType0,
1680     class SinkBodyType0,
1681     class SinkType1,
1682     class SinkBodyType1>
1683 void run_one_split_node_test(bool throwException, bool flog) {
1684 
1685     tbb::flow::graph g;
1686 
1687     std::atomic<int> input_count;
1688     std::atomic<int> sink0_count;
1689     std::atomic<int> sink1_count;
1690     input_count = sink0_count = sink1_count = 0;
1691 #if USE_TASK_SCHEDULER_OBSERVER
1692     eh_test_observer o;
1693     o.observe(true);
1694 #endif
1695 
1696     g_Master = std::this_thread::get_id();
1697     InputType input(g, InputBodyType(input_count));
1698     TestSplitType node_to_test(g);
1699     SinkType0 sink0(g,tbb::flow::unlimited,SinkBodyType0(sink0_count));
1700     SinkType1 sink1(g,tbb::flow::unlimited,SinkBodyType1(sink1_count));
1701     make_edge(input, node_to_test);
1702     make_edge(tbb::flow::output_port<0>(node_to_test), sink0);
1703     make_edge(tbb::flow::output_port<1>(node_to_test), sink1);
1704 
1705     for(int iter = 0; iter < 2; ++iter) {  // run, reset, run again
1706         ResetGlobals(throwException,flog);
1707         if(throwException) {
1708             TRY();
1709                 input.activate();
1710                 g.wait_for_all();
1711             CATCH_AND_ASSERT();
1712         }
1713         else {
1714             TRY();
1715                 input.activate();
1716                 g.wait_for_all();
1717             CATCH_AND_FAIL();
1718         }
1719         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1720         int ib_cnt = tbb::flow::copy_body<InputBodyType>(input).count_value();
1721         int nb0_cnt = tbb::flow::copy_body<SinkBodyType0>(sink0).count_value();
1722         int nb1_cnt = tbb::flow::copy_body<SinkBodyType1>(sink1).count_value();
1723         if(throwException) {
1724             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
1725             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
1726             CHECK_MESSAGE( (ib_cnt <= 2*g_NumItems), "Too many items sent by input");
1727             CHECK_MESSAGE( (nb0_cnt + nb1_cnt <= ib_cnt*2), "Too many items received by sink nodes");
1728         }
1729         else {
1730             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
1731             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
1732             CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_nodes");
1733             CHECK_MESSAGE( (nb0_cnt == g_NumItems && nb1_cnt == g_NumItems), "Missing items in absorbers");
1734         }
1735         g.reset();  // resets the body of the input_nodes and the absorb_nodes.
1736         input_count = sink0_count = sink1_count = 0;
1737         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType>(input).count_value()),"Reset input failed");
1738         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType0>(sink0).count_value()),"Reset sink 0 failed");
1739         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType1>(sink1).count_value()),"Reset sink 1 failed");
1740     }
1741 #if USE_TASK_SCHEDULER_OBSERVER
1742     o.observe(false);
1743 #endif
1744 }
1745 
1746 template<class InputTuple,
1747              TestNodeTypeEnum InputThrowType,
1748              TestNodeTypeEnum SinkThrowType>
1749 void run_split_node_test() {
1750     typedef typename std::tuple_element<0,InputTuple>::type ItemType0;
1751     typedef typename std::tuple_element<1,InputTuple>::type ItemType1;
1752     typedef tuple_test_input_body<InputTuple,InputThrowType> InputBodyType;
1753     typedef absorber_body<ItemType0,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType0;
1754     typedef absorber_body<ItemType1,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType1;
1755 
1756     typedef typename tbb::flow::input_node<InputTuple> InputType;
1757     typedef typename tbb::flow::split_node<InputTuple> TestSplitType;
1758     typedef typename tbb::flow::function_node<ItemType0,tbb::flow::continue_msg> SinkType0;
1759     typedef typename tbb::flow::function_node<ItemType1,tbb::flow::continue_msg> SinkType1;
1760 
1761     for(int i = 0; i < 4; ++i) {
1762         if(2 == i) continue;
1763         bool throwException = (i & 0x1) != 0;
1764         bool doFlog = (i & 0x2) != 0;
1765         run_one_split_node_test<
1766             InputTuple,
1767             InputType,
1768             InputBodyType,
1769             TestSplitType,
1770             SinkType0,
1771             SinkBodyType0,
1772             SinkType1,
1773             SinkBodyType1>
1774                 (throwException,doFlog);
1775     }
1776 }
1777 
1778 void test_split_node() {
1779     INFO("Testing split_node\n");
1780     g_Wakeup_Msg = "split_node(is,non): Missed wakeup or machine is overloaded?";
1781     run_split_node_test<std::tuple<int,int>, isThrowing, nonThrowing>();
1782     g_Wakeup_Msg = "split_node(non,is): Missed wakeup or machine is overloaded?";
1783     run_split_node_test<std::tuple<int,int>, nonThrowing, isThrowing>();
1784     g_Wakeup_Msg = "split_node(is,is): Missed wakeup or machine is overloaded?";
1785     run_split_node_test<std::tuple<int,int>, isThrowing,  isThrowing>();
1786     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1787 }
1788 
1789 // --------- indexer_node ----------------------
1790 
1791 template < class InputTuple,
1792     class InputType0,
1793     class InputBodyType0,
1794     class InputType1,
1795     class InputBodyType1,
1796     class TestNodeType,
1797     class SinkType,
1798     class SinkBodyType>
1799 void run_one_indexer_node_test(bool throwException,bool flog) {
1800     typedef typename std::tuple_element<0,InputTuple>::type ItemType0;
1801     typedef typename std::tuple_element<1,InputTuple>::type ItemType1;
1802 
1803     tbb::flow::graph g;
1804 
1805     std::atomic<int> input0_count;
1806     std::atomic<int> input1_count;
1807     std::atomic<int> sink_count;
1808     input0_count = input1_count = sink_count = 0;
1809 #if USE_TASK_SCHEDULER_OBSERVER
1810     eh_test_observer o;
1811     o.observe(true);
1812 #endif
1813     g_Master = std::this_thread::get_id();
1814     InputType0 input0(g, InputBodyType0(input0_count));
1815     InputType1 input1(g, InputBodyType1(input1_count));
1816     TestNodeType node_to_test(g);
1817     SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count));
1818     make_edge(input0,tbb::flow::input_port<0>(node_to_test));
1819     make_edge(input1,tbb::flow::input_port<1>(node_to_test));
1820     make_edge(node_to_test, sink);
1821     for(int iter = 0; iter < 2; ++iter) {
1822         ResetGlobals(throwException,flog);
1823         if(throwException) {
1824             TRY();
1825                 input0.activate();
1826                 input1.activate();
1827                 g.wait_for_all();
1828             CATCH_AND_ASSERT();
1829         }
1830         else {
1831             TRY();
1832                 input0.activate();
1833                 input1.activate();
1834                 g.wait_for_all();
1835             CATCH_AND_FAIL();
1836         }
1837         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1838         int ib0_cnt = tbb::flow::copy_body<InputBodyType0>(input0).count_value();
1839         int ib1_cnt = tbb::flow::copy_body<InputBodyType1>(input1).count_value();
1840         int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1841         if(throwException) {
1842             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
1843             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
1844             CHECK_MESSAGE( (ib0_cnt <= g_NumItems && ib1_cnt <= g_NumItems), "Too many items sent by inputs");
1845             CHECK_MESSAGE( (nb_cnt <= ib0_cnt + ib1_cnt), "Too many items received by sink nodes");
1846         }
1847         else {
1848             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
1849             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
1850             CHECK_MESSAGE( (ib0_cnt == g_NumItems), "Missing invocations of input_node0");
1851             CHECK_MESSAGE( (ib1_cnt == g_NumItems), "Missing invocations of input_node1");
1852             CHECK_MESSAGE( (nb_cnt == 2*g_NumItems), "Missing items in absorbers");
1853         }
1854         if(iter == 0) {
1855             remove_edge(node_to_test, sink);
1856             tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 4));
1857             tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2));
1858             g.wait_for_all();
1859             g.reset();
1860             input0_count = input1_count = sink_count = 0;
1861             make_edge(node_to_test, sink);
1862             g.wait_for_all();
1863         }
1864         else {
1865             g.wait_for_all();
1866             g.reset();
1867             input0_count = input1_count = sink_count = 0;
1868         }
1869         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType0>(input0).count_value()),"Reset input failed");
1870         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType1>(input1).count_value()),"Reset input failed");
1871         nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1872         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value()),"Reset sink failed");
1873     }
1874 
1875 #if USE_TASK_SCHEDULER_OBSERVER
1876     o.observe(false);
1877 #endif
1878 }
1879 
1880 template<class InputTuple,
1881     TestNodeTypeEnum InputThrowType,
1882     TestNodeTypeEnum SinkThrowType>
1883 void run_indexer_node_test() {
1884     typedef typename std::tuple_element<0,InputTuple>::type ItemType0;
1885     typedef typename std::tuple_element<1,InputTuple>::type ItemType1;
1886     typedef test_input_body<ItemType0,InputThrowType> InputBodyType0;
1887     typedef test_input_body<ItemType1,InputThrowType> InputBodyType1;
1888     typedef typename tbb::flow::indexer_node<ItemType0, ItemType1> TestNodeType;
1889     typedef absorber_body<typename TestNodeType::output_type,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1890 
1891     typedef typename tbb::flow::input_node<ItemType0> InputType0;
1892     typedef typename tbb::flow::input_node<ItemType1> InputType1;
1893     typedef typename tbb::flow::function_node<typename TestNodeType::output_type,tbb::flow::continue_msg> SinkType;
1894 
1895     for(int i = 0; i < 4; ++i) {
1896         if(2 == i) continue;
1897         bool throwException = (i & 0x1) != 0;
1898         bool doFlog = (i & 0x2) != 0;
1899         run_one_indexer_node_test<
1900              InputTuple,
1901              InputType0,
1902              InputBodyType0,
1903              InputType1,
1904              InputBodyType1,
1905              TestNodeType,
1906              SinkType,
1907              SinkBodyType>(throwException,doFlog);
1908     }
1909 }
1910 
1911 void test_indexer_node() {
1912     INFO("Testing indexer_node\n");
1913     g_Wakeup_Msg = "indexer_node(is,non): Missed wakeup or machine is overloaded?";
1914     run_indexer_node_test<std::tuple<int,int>, isThrowing, nonThrowing>();
1915     g_Wakeup_Msg = "indexer_node(non,is): Missed wakeup or machine is overloaded?";
1916     run_indexer_node_test<std::tuple<int,int>, nonThrowing, isThrowing>();
1917     g_Wakeup_Msg = "indexer_node(is,is): Missed wakeup or machine is overloaded?";
1918     run_indexer_node_test<std::tuple<int,int>, isThrowing,  isThrowing>();
1919     g_Wakeup_Msg = g_Orig_Wakeup_Msg;;
1920 }
1921 
1922 ///////////////////////////////////////////////
1923 // whole-graph exception test
1924 
1925 class Foo {
1926 private:
1927     // std::vector<int>& m_vec;
1928     std::vector<int>* m_vec;
1929 public:
1930     Foo(std::vector<int>& vec) : m_vec(&vec) { }
1931     void operator() (tbb::flow::continue_msg) const {
1932         ++nExceptions;
1933         (void)m_vec->at(m_vec->size()); // Will throw out_of_range exception
1934         CHECK_MESSAGE( (false), "Exception not thrown by invalid access");
1935     }
1936 };
1937 
1938 // test from user ahelwer: http://software.intel.com/en-us/forums/showthread.php?t=103786
1939 // exception thrown in graph node, not caught in wait_for_all()
1940 void
1941 test_flow_graph_exception0() {
1942     // Initializes body
1943     std::vector<int> vec;
1944     vec.push_back(0);
1945     Foo f(vec);
1946     nExceptions = 0;
1947 
1948     // Construct graph and nodes
1949     tbb::flow::graph g;
1950     tbb::flow::broadcast_node<tbb::flow::continue_msg> start(g);
1951     tbb::flow::continue_node<tbb::flow::continue_msg> fooNode(g, f);
1952 
1953     // Construct edge
1954     tbb::flow::make_edge(start, fooNode);
1955 
1956     // Execute graph
1957     CHECK_MESSAGE( (!g.exception_thrown()), "exception_thrown flag already set");
1958     CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag already set");
1959     try {
1960         start.try_put(tbb::flow::continue_msg());
1961         g.wait_for_all();
1962         CHECK_MESSAGE( (false), "Exception not thrown");
1963     }
1964     catch(std::out_of_range& ex) {
1965         INFO("Exception: " << ex.what() << "(expected)\n");
1966     }
1967     catch(...) {
1968         INFO("Unknown exception caught (expected)\n");
1969     }
1970     CHECK_MESSAGE( (nExceptions > 0), "Exception caught, but no body signaled exception being thrown");
1971     nExceptions = 0;
1972     CHECK_MESSAGE( (g.exception_thrown()), "Exception not intercepted");
1973     // if exception set, cancellation also set.
1974     CHECK_MESSAGE( (g.is_cancelled()), "Exception cancellation not signaled");
1975     // in case we got an exception
1976     try {
1977         g.wait_for_all();  // context still signalled canceled, my_exception still set.
1978     }
1979     catch(...) {
1980         CHECK_MESSAGE( (false), "Second exception thrown but no task executing");
1981     }
1982     CHECK_MESSAGE( (nExceptions == 0), "body signaled exception being thrown, but no body executed");
1983     CHECK_MESSAGE( (!g.exception_thrown()), "exception_thrown flag not reset");
1984     CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag not reset");
1985 }
1986 
1987 void TestOneThreadNum(int nThread) {
1988     INFO("Testing " << nThread << "%d threads\n");
1989     g_NumItems = ((nThread > NUM_ITEMS) ? nThread *2 : NUM_ITEMS);
1990     g_NumThreads = nThread;
1991     tbb::task_arena arena(nThread);
1992 	arena.execute(
1993         [&]() {
1994             // whole-graph exception catch and rethrow test
1995             test_flow_graph_exception0();
1996             for(int i = 0; i < 4; ++i) {
1997                 g_ExceptionInMaster = (i & 1) != 0;
1998                 g_SolitaryException = (i & 2) != 0;
1999                 INFO("g_ExceptionInMaster == " << (g_ExceptionInMaster ? "T":"F")
2000                      << ", g_SolitaryException == " << (g_SolitaryException ? "T":"F") << "\n");
2001                 test_input_node();
2002                 test_function_node();
2003                 test_continue_node();  // also test broadcast_node
2004                 test_multifunction_node();
2005                 // single- and multi-item buffering nodes
2006                 test_buffer_queue_and_overwrite_node();
2007                 test_sequencer_node();
2008                 test_priority_queue_node();
2009 
2010                 // join_nodes
2011                 test_join_node<tbb::flow::queueing>();
2012                 test_join_node<tbb::flow::reserving>();
2013                 test_join_node<tbb::flow::tag_matching>();
2014 
2015                 test_limiter_node();
2016                 test_split_node();
2017                 // graph for write_once_node will be complicated by the fact the node will
2018                 // not do try_puts after it has been set.  To get parallelism of N we have
2019                 // to attach N successor nodes to the write_once (or play some similar game).
2020                 // test_write_once_node();
2021                 test_indexer_node();
2022             }
2023         }
2024     );
2025 }
2026 
2027 //! Test exceptions with parallelism
2028 //! \brief \ref error_guessing
2029 TEST_CASE("Testing several threads"){
2030     // reverse order of tests
2031     for(unsigned int nThread=utils::MaxThread; nThread >= utils::MinThread; --nThread) {
2032         TestOneThreadNum(nThread);
2033     }
2034 }
2035 
2036 #endif // TBB_USE_EXCEPTIONS
2037