1 /*
2 Copyright (c) 2005-2023 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 //! \file test_eh_flow_graph.cpp
18 //! \brief Test for [flow_graph.copy_body flow_graph.broadcast_node flow_graph.priority_queue_node flow_graph.indexer_node flow_graph.sequencer_node flow_graph.remove_edge flow_graph.join_node flow_graph.split_node flow_graph.limiter_node flow_graph.write_once_node flow_graph.overwrite_node flow_graph.make_edge flow_graph.graph flow_graph.buffer_node flow_graph.function_node flow_graph.multifunction_node flow_graph.continue_node flow_graph.input_node] specification
19
20 #include "common/config.h"
21
22 #if USE_TASK_SCHEDULER_OBSERVER
23 #include "tbb/task_scheduler_observer.h"
24 #endif
25 #include "tbb/flow_graph.h"
26 #include "tbb/global_control.h"
27
28 #include "common/test.h"
29
30 #if TBB_USE_EXCEPTIONS
31
32 #include "common/utils.h"
33 #include "common/checktype.h"
34 #include "common/concurrency_tracker.h"
35
36 #if _MSC_VER
37 #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning
38 #endif
39
40 #if __TBB_MSVC_UNREACHABLE_CODE_IGNORED
41 // Suppress "unreachable code" warning by VC++ 17.0-18.0 (VS 2012 or newer)
42 #pragma warning (disable: 4702)
43 #endif
44
45 // global task_scheduler_observer is an imperfect tool to find how many threads are really
46 // participating. That was the hope, but it counts the entries into the marketplace,
47 // not the arena.
48 // TODO: Consider using local task scheduler observer
49 // #define USE_TASK_SCHEDULER_OBSERVER 1
50
51 #include <iostream>
52 #include <sstream>
53 #include <vector>
54
55 #include "common/exception_handling.h"
56
57 #include <stdexcept>
58
59 #define NUM_ITEMS 15
60 int g_NumItems;
61
62 std::atomic<unsigned> nExceptions;
63 std::atomic<intptr_t> g_TGCCancelled;
64
65 enum TestNodeTypeEnum { nonThrowing, isThrowing };
66
67 static const size_t unlimited_type = 0;
68 static const size_t serial_type = 1;
69 static const size_t limited_type = 4;
70
71 template<TestNodeTypeEnum T> struct TestNodeTypeName;
nameTestNodeTypeName72 template<> struct TestNodeTypeName<nonThrowing> { static const char *name() { return "nonThrowing"; } };
nameTestNodeTypeName73 template<> struct TestNodeTypeName<isThrowing> { static const char *name() { return "isThrowing"; } };
74
75 template<size_t Conc> struct concurrencyName;
nameconcurrencyName76 template<> struct concurrencyName<serial_type>{ static const char *name() { return "serial"; } };
nameconcurrencyName77 template<> struct concurrencyName<unlimited_type>{ static const char *name() { return "unlimited"; } };
nameconcurrencyName78 template<> struct concurrencyName<limited_type>{ static const char *name() { return "limited"; } };
79
80 // Class that provides waiting and throwing behavior. If we are not throwing, do nothing
81 // If serial, we can't wait for concurrency to peak; we may be the bottleneck and will
82 // stop further processing. We will execute g_NumThreads + 10 times (the "10" is somewhat
83 // arbitrary, and just makes sure there are enough items in the graph to keep it flowing),
84 // If parallel or serial and throwing, use utils::ConcurrencyTracker to wait.
85
86 template<size_t Conc, TestNodeTypeEnum t = nonThrowing>
87 class WaitThrow;
88
89 template<>
90 class WaitThrow<serial_type,nonThrowing> {
91 protected:
WaitAndThrow(int cnt,const char *)92 void WaitAndThrow(int cnt, const char * /*name*/) {
93 if(cnt > g_NumThreads + 10) {
94 utils::ConcurrencyTracker ct;
95 WaitUntilConcurrencyPeaks();
96 }
97 }
98 };
99
100 template<>
101 class WaitThrow<serial_type,isThrowing> {
102 protected:
WaitAndThrow(int cnt,const char *)103 void WaitAndThrow(int cnt, const char * /*name*/) {
104 if(cnt > g_NumThreads + 10) {
105 utils::ConcurrencyTracker ct;
106 WaitUntilConcurrencyPeaks();
107 ThrowTestException(1);
108 }
109 }
110 };
111
112 // for nodes with limited concurrency, if that concurrency is < g_NumThreads, we need
113 // to make sure enough other nodes wait for concurrency to peak. If we are attached to
114 // N successors, for each item we pass to a successor, we will get N executions of the
115 // "absorbers" (because we broadcast to successors.) for an odd number of threads we
116 // need (g_NumThreads - limited + 1) / 2 items (that will give us one extra execution
117 // of an "absorber", but we can't change that without changing the behavior of the node.)
118 template<>
119 class WaitThrow<limited_type,nonThrowing> {
120 protected:
WaitAndThrow(int cnt,const char *)121 void WaitAndThrow(int cnt, const char * /*name*/) {
122 if(cnt <= (g_NumThreads - (int)limited_type + 1)/2) {
123 return;
124 }
125 utils::ConcurrencyTracker ct;
126 WaitUntilConcurrencyPeaks();
127 }
128 };
129
130 template<>
131 class WaitThrow<limited_type,isThrowing> {
132 protected:
WaitAndThrow(int cnt,const char *)133 void WaitAndThrow(int cnt, const char * /*name*/) {
134 utils::ConcurrencyTracker ct;
135 if(cnt <= (g_NumThreads - (int)limited_type + 1)/2) {
136 return;
137 }
138 WaitUntilConcurrencyPeaks();
139 ThrowTestException(1);
140 }
141 };
142
143 template<>
144 class WaitThrow<unlimited_type,nonThrowing> {
145 protected:
WaitAndThrow(int,const char *)146 void WaitAndThrow(int /*cnt*/, const char * /*name*/) {
147 utils::ConcurrencyTracker ct;
148 WaitUntilConcurrencyPeaks();
149 }
150 };
151
152 template<>
153 class WaitThrow<unlimited_type,isThrowing> {
154 protected:
WaitAndThrow(int,const char *)155 void WaitAndThrow(int /*cnt*/, const char * /*name*/) {
156 utils::ConcurrencyTracker ct;
157 WaitUntilConcurrencyPeaks();
158 ThrowTestException(1);
159 }
160 };
161
162 void
ResetGlobals(bool throwException=true,bool flog=false)163 ResetGlobals(bool throwException = true, bool flog = false) {
164 nExceptions = 0;
165 g_TGCCancelled = 0;
166 ResetEhGlobals(throwException, flog);
167 }
168
169 // -------input_node body ------------------
170 template <class OutputType, TestNodeTypeEnum TType>
171 class test_input_body : WaitThrow<serial_type, TType> {
172 using WaitThrow<serial_type, TType>::WaitAndThrow;
173 std::atomic<int> *my_current_val;
174 int my_mult;
175 public:
test_input_body(std::atomic<int> & my_cnt,int multiplier=1)176 test_input_body(std::atomic<int> &my_cnt, int multiplier = 1) : my_current_val(&my_cnt), my_mult(multiplier) {
177 // INFO("- --------- - - - constructed " << (size_t)(my_current_val) << "\n");
178 }
179
operator ()(tbb::flow_control & fc)180 OutputType operator()(tbb::flow_control& fc) {
181 UPDATE_COUNTS();
182 OutputType ret = OutputType(my_mult * ++(*my_current_val));
183 // TODO revamp: reconsider logging for the tests.
184
185 // The following line is known to cause double frees. Therefore, commenting out frequent
186 // calls to INFO() macro.
187
188 // INFO("xx(" << (size_t)(my_current_val) << ") ret == " << (int)ret << "\n");
189 if(*my_current_val > g_NumItems) {
190 // INFO(" ------ End of the line!\n");
191 *my_current_val = g_NumItems;
192 fc.stop();
193 return OutputType();
194 }
195 WaitAndThrow((int)ret,"test_input_body");
196 return ret;
197 }
198
count_value()199 int count_value() { return (int)*my_current_val; }
200 };
201
202 template <TestNodeTypeEnum TType>
203 class test_input_body<tbb::flow::continue_msg, TType> : WaitThrow<serial_type, TType> {
204 using WaitThrow<serial_type, TType>::WaitAndThrow;
205 std::atomic<int> *my_current_val;
206 public:
test_input_body(std::atomic<int> & my_cnt)207 test_input_body(std::atomic<int> &my_cnt) : my_current_val(&my_cnt) { }
208
operator ()(tbb::flow_control & fc)209 tbb::flow::continue_msg operator()( tbb::flow_control & fc) {
210 UPDATE_COUNTS();
211 int outint = ++(*my_current_val);
212 if(*my_current_val > g_NumItems) {
213 *my_current_val = g_NumItems;
214 fc.stop();
215 return tbb::flow::continue_msg();
216 }
217 WaitAndThrow(outint,"test_input_body");
218 return tbb::flow::continue_msg();
219 }
220
count_value()221 int count_value() { return (int)*my_current_val; }
222 };
223
224 // -------{function/continue}_node body ------------------
225 template<class InputType, class OutputType, TestNodeTypeEnum T, size_t Conc>
226 class absorber_body : WaitThrow<Conc,T> {
227 using WaitThrow<Conc,T>::WaitAndThrow;
228 std::atomic<int> *my_count;
229 public:
absorber_body(std::atomic<int> & my_cnt)230 absorber_body(std::atomic<int> &my_cnt) : my_count(&my_cnt) { }
operator ()(const InputType &)231 OutputType operator()(const InputType &/*p_in*/) {
232 UPDATE_COUNTS();
233 int out = ++(*my_count);
234 WaitAndThrow(out,"absorber_body");
235 return OutputType();
236 }
count_value()237 int count_value() { return *my_count; }
238 };
239
240 // -------multifunction_node body ------------------
241
242 // helper classes
243 template<int N,class PortsType>
244 struct IssueOutput {
245 typedef typename std::tuple_element<N-1,PortsType>::type::output_type my_type;
246
issue_tuple_elementIssueOutput247 static void issue_tuple_element( PortsType &my_ports) {
248 CHECK_MESSAGE( (std::get<N-1>(my_ports).try_put(my_type())), "Error putting to successor");
249 IssueOutput<N-1,PortsType>::issue_tuple_element(my_ports);
250 }
251 };
252
253 template<class PortsType>
254 struct IssueOutput<1,PortsType> {
255 typedef typename std::tuple_element<0,PortsType>::type::output_type my_type;
256
issue_tuple_elementIssueOutput257 static void issue_tuple_element( PortsType &my_ports) {
258 CHECK_MESSAGE( (std::get<0>(my_ports).try_put(my_type())), "Error putting to successor");
259 }
260 };
261
262 template<class InputType, class OutputTupleType, TestNodeTypeEnum T, size_t Conc>
263 class multifunction_node_body : WaitThrow<Conc,T> {
264 using WaitThrow<Conc,T>::WaitAndThrow;
265 static const int N = std::tuple_size<OutputTupleType>::value;
266 typedef typename tbb::flow::multifunction_node<InputType,OutputTupleType> NodeType;
267 typedef typename NodeType::output_ports_type PortsType;
268 std::atomic<int> *my_count;
269 public:
multifunction_node_body(std::atomic<int> & my_cnt)270 multifunction_node_body(std::atomic<int> &my_cnt) : my_count(&my_cnt) { }
operator ()(const InputType &,PortsType & my_ports)271 void operator()(const InputType& /*in*/, PortsType &my_ports) {
272 UPDATE_COUNTS();
273 int out = ++(*my_count);
274 WaitAndThrow(out,"multifunction_node_body");
275 // issue an item to each output port.
276 IssueOutput<N,PortsType>::issue_tuple_element(my_ports);
277 }
278
count_value()279 int count_value() { return *my_count; }
280 };
281
282 // --------- body to sort items in sequencer_node
283 template<class BufferItemType>
284 struct sequencer_body {
operator ()sequencer_body285 size_t operator()(const BufferItemType &s) {
286 CHECK_MESSAGE( (s), "sequencer item out of range (== 0)");
287 return size_t(s) - 1;
288 }
289 };
290
291 // --------- type for < comparison in priority_queue_node.
292 template<class ItemType>
293 struct less_body {
operator ()less_body294 bool operator()(const ItemType &lhs, const ItemType &rhs) {
295 return (int(lhs) % 3) < (int(rhs) % 3);
296 }
297 };
298
299 // --------- tag methods for tag_matching join_node
300 template<typename TT>
301 class tag_func {
302 TT my_mult;
303 public:
tag_func(TT multiplier)304 tag_func(TT multiplier) : my_mult(multiplier) { }
305 // operator() will return [0 .. Count)
operator ()(TT v)306 tbb::flow::tag_value operator()( TT v) {
307 tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult);
308 return t;
309 }
310 };
311
312 // --------- Input body for split_node test.
313 template <class OutputTuple, TestNodeTypeEnum TType>
314 class tuple_test_input_body : WaitThrow<serial_type, TType> {
315 typedef typename std::tuple_element<0,OutputTuple>::type ItemType0;
316 typedef typename std::tuple_element<1,OutputTuple>::type ItemType1;
317 using WaitThrow<serial_type, TType>::WaitAndThrow;
318 std::atomic<int> *my_current_val;
319 public:
tuple_test_input_body(std::atomic<int> & my_cnt)320 tuple_test_input_body(std::atomic<int> &my_cnt) : my_current_val(&my_cnt) { }
321
operator ()(tbb::flow_control & fc)322 OutputTuple operator()(tbb::flow_control& fc) {
323 UPDATE_COUNTS();
324 int ival = ++(*my_current_val);
325 if(*my_current_val > g_NumItems) {
326 *my_current_val = g_NumItems; // jam the final value; we assert on it later.
327 fc.stop();
328 return OutputTuple();
329 }
330 WaitAndThrow(ival,"tuple_test_input_body");
331 return OutputTuple(ItemType0(ival),ItemType1(ival));
332 }
333
count_value()334 int count_value() { return (int)*my_current_val; }
335 };
336
337 // ------- end of node bodies
338
339 // input_node is only-serial. input_node can throw, or the function_node can throw.
340 // graph being tested is
341 //
342 // input_node+---+parallel function_node
343 //
344 // After each run the graph is reset(), to test the reset functionality.
345 //
346
347
348 template<class ItemType, TestNodeTypeEnum inpThrowType, TestNodeTypeEnum absorbThrowType>
run_one_input_node_test(bool throwException,bool flog)349 void run_one_input_node_test(bool throwException, bool flog) {
350 typedef test_input_body<ItemType,inpThrowType> src_body_type;
351 typedef absorber_body<ItemType, tbb::flow::continue_msg, absorbThrowType, unlimited_type> parallel_absorb_body_type;
352 std::atomic<int> input_body_count;
353 std::atomic<int> absorber_body_count;
354 input_body_count = 0;
355 absorber_body_count = 0;
356
357 tbb::flow::graph g;
358
359 g_Master = std::this_thread::get_id();
360
361 #if USE_TASK_SCHEDULER_OBSERVER
362 eh_test_observer o;
363 o.observe(true);
364 #endif
365
366 tbb::flow::input_node<ItemType> sn(g, src_body_type(input_body_count));
367 parallel_absorb_body_type ab2(absorber_body_count);
368 tbb::flow::function_node<ItemType> parallel_fn(g,tbb::flow::unlimited,ab2);
369 make_edge(sn, parallel_fn);
370 for(int runcnt = 0; runcnt < 2; ++runcnt) {
371 ResetGlobals(throwException,flog);
372 if(throwException) {
373 TRY();
374 sn.activate();
375 g.wait_for_all();
376 CATCH_AND_ASSERT();
377 }
378 else {
379 TRY();
380 sn.activate();
381 g.wait_for_all();
382 CATCH_AND_FAIL();
383 }
384
385 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
386 int src_cnt = tbb::flow::copy_body<src_body_type>(sn).count_value();
387 int sink_cnt = tbb::flow::copy_body<parallel_absorb_body_type>(parallel_fn).count_value();
388 if(throwException) {
389 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception flag in flow::graph not set");
390 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "canceled flag not set");
391 CHECK_MESSAGE( (src_cnt <= g_NumItems), "Too many input_node items emitted");
392 CHECK_MESSAGE( (sink_cnt <= src_cnt), "Too many input_node items received");
393 }
394 else {
395 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
396 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
397 CHECK_MESSAGE( (src_cnt == g_NumItems), "Incorrect # input_node items emitted");
398 CHECK_MESSAGE( (sink_cnt == src_cnt), "Incorrect # input_node items received");
399 }
400 g.reset(); // resets the body of the input_node and the absorb_nodes.
401 input_body_count = 0;
402 absorber_body_count = 0;
403 CHECK_MESSAGE( (!g.exception_thrown()), "Reset didn't clear exception_thrown()");
404 CHECK_MESSAGE( (!g.is_cancelled()), "Reset didn't clear is_cancelled()");
405 src_cnt = tbb::flow::copy_body<src_body_type>(sn).count_value();
406 sink_cnt = tbb::flow::copy_body<parallel_absorb_body_type>(parallel_fn).count_value();
407 CHECK_MESSAGE( (src_cnt == 0), "input_node count not reset");
408 CHECK_MESSAGE( (sink_cnt == 0), "sink_node count not reset");
409 }
410 #if USE_TASK_SCHEDULER_OBSERVER
411 o.observe(false);
412 #endif
413 } // run_one_input_node_test
414
415
416 template<class ItemType, TestNodeTypeEnum inpThrowType, TestNodeTypeEnum absorbThrowType>
run_input_node_test()417 void run_input_node_test() {
418 run_one_input_node_test<ItemType,inpThrowType,absorbThrowType>(false,false);
419 run_one_input_node_test<ItemType,inpThrowType,absorbThrowType>(true,false);
420 run_one_input_node_test<ItemType,inpThrowType,absorbThrowType>(true,true);
421 } // run_input_node_test
422
test_input_node()423 void test_input_node() {
424 INFO("Testing input_node\n");
425 CheckType<int>::check_type_counter = 0;
426 g_Wakeup_Msg = "input_node(1): Missed wakeup or machine is overloaded?";
427 run_input_node_test<CheckType<int>, isThrowing, nonThrowing>();
428 CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test");
429 g_Wakeup_Msg = "input_node(2): Missed wakeup or machine is overloaded?";
430 run_input_node_test<int, isThrowing, nonThrowing>();
431 g_Wakeup_Msg = "input_node(3): Missed wakeup or machine is overloaded?";
432 run_input_node_test<int, nonThrowing, isThrowing>();
433 g_Wakeup_Msg = "input_node(4): Missed wakeup or machine is overloaded?";
434 run_input_node_test<int, isThrowing, isThrowing>();
435 g_Wakeup_Msg = "input_node(5): Missed wakeup or machine is overloaded?";
436 run_input_node_test<CheckType<int>, isThrowing, isThrowing>();
437 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
438 CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test");
439 }
440
441 // -------- utilities & types to test function_node and multifunction_node.
442
443 // need to tell the template which node type I am using so it attaches successors correctly.
444 enum NodeFetchType { func_node_type, multifunc_node_type };
445
446 template<class NodeType, class ItemType, int indx, NodeFetchType NFT>
447 struct AttachPoint;
448
449 template<class NodeType, class ItemType, int indx>
450 struct AttachPoint<NodeType,ItemType,indx,multifunc_node_type> {
GetSenderAttachPoint451 static tbb::flow::sender<ItemType> &GetSender(NodeType &n) {
452 return tbb::flow::output_port<indx>(n);
453 }
454 };
455
456 template<class NodeType, class ItemType, int indx>
457 struct AttachPoint<NodeType,ItemType,indx,func_node_type> {
GetSenderAttachPoint458 static tbb::flow::sender<ItemType> &GetSender(NodeType &n) {
459 return n;
460 }
461 };
462
463
464 // common template for running function_node, multifunction_node. continue_node
465 // has different firing requirements, so it needs a different graph topology.
466 template<
467 class InputNodeType,
468 class InputNodeBodyType0,
469 class InputNodeBodyType1,
470 NodeFetchType NFT,
471 class TestNodeType,
472 class TestNodeBodyType,
473 class TypeToSink0, // what kind of item are we sending to sink0
474 class TypeToSink1, // what kind of item are we sending to sink1
475 class SinkNodeType0, // will be same for function;
476 class SinkNodeType1, // may differ for multifunction_node
477 class SinkNodeBodyType0,
478 class SinkNodeBodyType1,
479 size_t Conc
480 >
481 void
run_one_functype_node_test(bool throwException,bool flog,const char *)482 run_one_functype_node_test(bool throwException, bool flog, const char * /*name*/) {
483
484 std::stringstream ss;
485 char *saved_msg = const_cast<char *>(g_Wakeup_Msg);
486 tbb::flow::graph g;
487
488 std::atomic<int> input0_count;
489 std::atomic<int> input1_count;
490 std::atomic<int> sink0_count;
491 std::atomic<int> sink1_count;
492 std::atomic<int> test_count;
493 input0_count = input1_count = sink0_count = sink1_count = test_count = 0;
494
495 #if USE_TASK_SCHEDULER_OBSERVER
496 eh_test_observer o;
497 o.observe(true);
498 #endif
499
500 g_Master = std::this_thread::get_id();
501 InputNodeType input0(g, InputNodeBodyType0(input0_count));
502 InputNodeType input1(g, InputNodeBodyType1(input1_count));
503 TestNodeType node_to_test(g, Conc, TestNodeBodyType(test_count));
504 SinkNodeType0 sink0(g,tbb::flow::unlimited,SinkNodeBodyType0(sink0_count));
505 SinkNodeType1 sink1(g,tbb::flow::unlimited,SinkNodeBodyType1(sink1_count));
506 make_edge(input0, node_to_test);
507 make_edge(input1, node_to_test);
508 make_edge(AttachPoint<TestNodeType, TypeToSink0, 0, NFT>::GetSender(node_to_test), sink0);
509 make_edge(AttachPoint<TestNodeType, TypeToSink1, 1, NFT>::GetSender(node_to_test), sink1);
510
511 for(int iter = 0; iter < 2; ++iter) { // run, reset, run again
512 ss.clear();
513 ss << saved_msg << " iter=" << iter << ", threads=" << g_NumThreads << ", throw=" << (throwException ? "T" : "F") << ", flow=" << (flog ? "T" : "F");
514 g_Wakeup_Msg = ss.str().c_str();
515 ResetGlobals(throwException,flog);
516 if(throwException) {
517 TRY();
518 input0.activate();
519 input1.activate();
520 g.wait_for_all();
521 CATCH_AND_ASSERT();
522 }
523 else {
524 TRY();
525 input0.activate();
526 input1.activate();
527 g.wait_for_all();
528 CATCH_AND_FAIL();
529 }
530 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
531 int ib0_cnt = tbb::flow::copy_body<InputNodeBodyType0>(input0).count_value();
532 int ib1_cnt = tbb::flow::copy_body<InputNodeBodyType1>(input1).count_value();
533 int t_cnt = tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value();
534 int nb0_cnt = tbb::flow::copy_body<SinkNodeBodyType0>(sink0).count_value();
535 int nb1_cnt = tbb::flow::copy_body<SinkNodeBodyType1>(sink1).count_value();
536 if(throwException) {
537 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
538 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
539 CHECK_MESSAGE( (ib0_cnt + ib1_cnt <= 2*g_NumItems), "Too many items sent by inputs");
540 CHECK_MESSAGE( (ib0_cnt + ib1_cnt >= t_cnt), "Too many items received by test node");
541 CHECK_MESSAGE( (nb0_cnt + nb1_cnt <= t_cnt*2), "Too many items received by sink nodes");
542 }
543 else {
544 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
545 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
546 CHECK_MESSAGE( (ib0_cnt + ib1_cnt == 2*g_NumItems), "Missing invocations of input_nodes");
547 CHECK_MESSAGE( (t_cnt == 2*g_NumItems), "Not all items reached test node");
548 CHECK_MESSAGE( (nb0_cnt == 2*g_NumItems && nb1_cnt == 2*g_NumItems), "Missing items in absorbers");
549 }
550 g.reset(); // resets the body of the input_nodes, test_node and the absorb_nodes.
551 input0_count = input1_count = sink0_count = sink1_count = test_count = 0;
552 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType0>(input0).count_value()),"Reset input 0 failed");
553 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType1>(input1).count_value()),"Reset input 1 failed");
554 CHECK_MESSAGE( (0 == tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value()),"Reset test_node failed");
555 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType0>(sink0).count_value()),"Reset sink 0 failed");
556 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType1>(sink1).count_value()),"Reset sink 1 failed");
557
558 g_Wakeup_Msg = saved_msg;
559 }
560 #if USE_TASK_SCHEDULER_OBSERVER
561 o.observe(false);
562 #endif
563 }
564
565 // Test function_node
566 //
567 // graph being tested is
568 //
569 // input_node -\ /- parallel function_node
570 // \ /
571 // +function_node+
572 // / \ x
573 // input_node -/ \- parallel function_node
574 //
575 // After each run the graph is reset(), to test the reset functionality.
576 //
577 template<
578 TestNodeTypeEnum IType1, // does input node 1 throw?
579 TestNodeTypeEnum IType2, // does input node 2 throw?
580 class Item12, // type of item passed between inputs and test node
581 TestNodeTypeEnum FType, // does function node throw?
582 class Item23, // type passed from function_node to sink nodes
583 TestNodeTypeEnum NType1, // does sink node 1 throw?
584 TestNodeTypeEnum NType2, // does sink node 1 throw?
585 class NodePolicy, // rejecting,queueing
586 size_t Conc // is node concurrent? {serial | limited | unlimited}
587 >
run_function_node_test()588 void run_function_node_test() {
589
590 typedef test_input_body<Item12,IType1> IBodyType1;
591 typedef test_input_body<Item12,IType2> IBodyType2;
592 typedef absorber_body<Item12, Item23, FType, Conc> TestBodyType;
593 typedef absorber_body<Item23,tbb::flow::continue_msg, NType1, unlimited_type> SinkBodyType1;
594 typedef absorber_body<Item23,tbb::flow::continue_msg, NType2, unlimited_type> SinkBodyType2;
595
596 typedef tbb::flow::input_node<Item12> InputType;
597 typedef tbb::flow::function_node<Item12, Item23, NodePolicy> TestType;
598 typedef tbb::flow::function_node<Item23,tbb::flow::continue_msg> SnkType;
599
600 for(int i = 0; i < 4; ++i ) {
601 if(i != 2) { // doesn't make sense to flog a non-throwing test
602 bool doThrow = (i & 0x1) != 0;
603 bool doFlog = (i & 0x2) != 0;
604 run_one_functype_node_test<
605 /*InputNodeType*/ InputType,
606 /*InputNodeBodyType0*/ IBodyType1,
607 /*InputNodeBodyType1*/ IBodyType2,
608 /* NFT */ func_node_type,
609 /*TestNodeType*/ TestType,
610 /*TestNodeBodyType*/ TestBodyType,
611 /*TypeToSink0 */ Item23,
612 /*TypeToSink1 */ Item23,
613 /*SinkNodeType0*/ SnkType,
614 /*SinkNodeType1*/ SnkType,
615 /*SinkNodeBodyType1*/ SinkBodyType1,
616 /*SinkNodeBodyType2*/ SinkBodyType2,
617 /*Conc*/ Conc>
618 (doThrow,doFlog,"function_node");
619 }
620 }
621 } // run_function_node_test
622
test_function_node()623 void test_function_node() {
624 INFO("Testing function_node\n");
625 // serial rejecting
626 g_Wakeup_Msg = "function_node(1a): Missed wakeup or machine is overloaded?";
627 run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
628 g_Wakeup_Msg = "function_node(1b): Missed wakeup or machine is overloaded?";
629 run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
630 g_Wakeup_Msg = "function_node(1c): Missed wakeup or machine is overloaded?";
631 run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, isThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
632
633 // serial queueing
634 g_Wakeup_Msg = "function_node(2): Missed wakeup or machine is overloaded?";
635 run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
636 run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
637 run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
638 CheckType<int>::check_type_counter = 0;
639 run_function_node_test<nonThrowing, nonThrowing, CheckType<int>, nonThrowing, CheckType<int>, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
640 CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test");
641
642 // unlimited parallel rejecting
643 g_Wakeup_Msg = "function_node(3): Missed wakeup or machine is overloaded?";
644 run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
645 run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
646 run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
647
648 // limited parallel rejecting
649 g_Wakeup_Msg = "function_node(4): Missed wakeup or machine is overloaded?";
650 run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, limited_type>();
651 run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, (size_t)limited_type>();
652 run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::rejecting, (size_t)limited_type>();
653
654 // limited parallel queueing
655 g_Wakeup_Msg = "function_node(5): Missed wakeup or machine is overloaded?";
656 run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
657 run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
658 run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::queueing, (size_t)limited_type>();
659
660 // everyone throwing
661 g_Wakeup_Msg = "function_node(6): Missed wakeup or machine is overloaded?";
662 run_function_node_test<isThrowing, isThrowing, int, isThrowing, int, isThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
663 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
664 }
665
666 // ----------------------------------- multifunction_node ----------------------------------
667 // Test multifunction_node.
668 //
669 // graph being tested is
670 //
671 // input_node -\ /- parallel function_node
672 // \ /
673 // +multifunction_node+
674 // / \ x
675 // input_node -/ \- parallel function_node
676 //
677 // After each run the graph is reset(), to test the reset functionality. The
678 // multifunction_node will put an item to each successor for every item
679 // received.
680 //
681 template<
682 TestNodeTypeEnum IType0, // does input node 1 throw?
683 TestNodeTypeEnum IType1, // does input node 2 thorw?
684 class Item12, // type of item passed between inputs and test node
685 TestNodeTypeEnum FType, // does multifunction node throw?
686 class ItemTuple, // tuple of types passed from multifunction_node to sink nodes
687 TestNodeTypeEnum NType1, // does sink node 1 throw?
688 TestNodeTypeEnum NType2, // does sink node 2 throw?
689 class NodePolicy, // rejecting,queueing
690 size_t Conc // is node concurrent? {serial | limited | unlimited}
691 >
run_multifunction_node_test()692 void run_multifunction_node_test() {
693
694 typedef typename std::tuple_element<0,ItemTuple>::type Item23Type0;
695 typedef typename std::tuple_element<1,ItemTuple>::type Item23Type1;
696 typedef test_input_body<Item12,IType0> IBodyType1;
697 typedef test_input_body<Item12,IType1> IBodyType2;
698 typedef multifunction_node_body<Item12, ItemTuple, FType, Conc> TestBodyType;
699 typedef absorber_body<Item23Type0,tbb::flow::continue_msg, NType1, unlimited_type> SinkBodyType1;
700 typedef absorber_body<Item23Type1,tbb::flow::continue_msg, NType2, unlimited_type> SinkBodyType2;
701
702 typedef tbb::flow::input_node<Item12> InputType;
703 typedef tbb::flow::multifunction_node<Item12, ItemTuple, NodePolicy> TestType;
704 typedef tbb::flow::function_node<Item23Type0,tbb::flow::continue_msg> SnkType0;
705 typedef tbb::flow::function_node<Item23Type1,tbb::flow::continue_msg> SnkType1;
706
707 for(int i = 0; i < 4; ++i ) {
708 if(i != 2) { // doesn't make sense to flog a non-throwing test
709 bool doThrow = (i & 0x1) != 0;
710 bool doFlog = (i & 0x2) != 0;
711 run_one_functype_node_test<
712 /*InputNodeType*/ InputType,
713 /*InputNodeBodyType0*/ IBodyType1,
714 /*InputNodeBodyType1*/ IBodyType2,
715 /*NFT*/ multifunc_node_type,
716 /*TestNodeType*/ TestType,
717 /*TestNodeBodyType*/ TestBodyType,
718 /*TypeToSink0*/ Item23Type0,
719 /*TypeToSink1*/ Item23Type1,
720 /*SinkNodeType0*/ SnkType0,
721 /*SinkNodeType1*/ SnkType1,
722 /*SinkNodeBodyType0*/ SinkBodyType1,
723 /*SinkNodeBodyType1*/ SinkBodyType2,
724 /*Conc*/ Conc>
725 (doThrow,doFlog,"multifunction_node");
726 }
727 }
728 } // run_multifunction_node_test
729
test_multifunction_node()730 void test_multifunction_node() {
731 INFO("Testing multifunction_node\n");
732 g_Wakeup_Msg = "multifunction_node(input throws,rejecting,serial): Missed wakeup or machine is overloaded?";
733 // serial rejecting
734 run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,float>, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
735 g_Wakeup_Msg = "multifunction_node(test throws,rejecting,serial): Missed wakeup or machine is overloaded?";
736 run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
737 g_Wakeup_Msg = "multifunction_node(sink throws,rejecting,serial): Missed wakeup or machine is overloaded?";
738 run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, isThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
739
740 g_Wakeup_Msg = "multifunction_node(2): Missed wakeup or machine is overloaded?";
741 // serial queueing
742 run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
743 run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
744 run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
745 CheckType<int>::check_type_counter = 0;
746 run_multifunction_node_test<nonThrowing, nonThrowing, CheckType<int>, nonThrowing, std::tuple<CheckType<int>, CheckType<int> >, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
747 CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test");
748
749 g_Wakeup_Msg = "multifunction_node(3): Missed wakeup or machine is overloaded?";
750 // unlimited parallel rejecting
751 run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
752 run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
753 run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
754
755 g_Wakeup_Msg = "multifunction_node(4): Missed wakeup or machine is overloaded?";
756 // limited parallel rejecting
757 run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, limited_type>();
758 run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, (size_t)limited_type>();
759 run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::rejecting, (size_t)limited_type>();
760
761 g_Wakeup_Msg = "multifunction_node(5): Missed wakeup or machine is overloaded?";
762 // limited parallel queueing
763 run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
764 run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
765 run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::queueing, (size_t)limited_type>();
766
767 g_Wakeup_Msg = "multifunction_node(6): Missed wakeup or machine is overloaded?";
768 // everyone throwing
769 run_multifunction_node_test<isThrowing, isThrowing, int, isThrowing, std::tuple<int,int>, isThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
770 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
771 }
772
773 //
774 // Continue node has T predecessors. when it receives messages (continue_msg) on T predecessors
775 // it executes the body of the node, and forwards a continue_msg to its successors.
776 // However many predecessors the continue_node has, that's how many continue_msgs it receives
777 // on input before forwarding a message.
778 //
779 // The graph will look like
780 //
781 // +broadcast_node+
782 // / \ ___
783 // input_node+------>+broadcast_node+ +continue_node+--->+absorber
784 // \ /
785 // +broadcast_node+
786 //
787 // The continue_node has unlimited parallelism, no input buffering, and broadcasts to successors.
788 // The absorber is parallel, so each item emitted by the input will result in one thread
789 // spinning. So for N threads we pass N-1 continue_messages, then spin wait and then throw if
790 // we are allowed to.
791
792 template < class InputNodeType, class InputNodeBodyType, class TTestNodeType, class TestNodeBodyType,
793 class SinkNodeType, class SinkNodeBodyType>
run_one_continue_node_test(bool throwException,bool flog)794 void run_one_continue_node_test (bool throwException, bool flog) {
795 tbb::flow::graph g;
796
797 std::atomic<int> input_count;
798 std::atomic<int> test_count;
799 std::atomic<int> sink_count;
800 input_count = test_count = sink_count = 0;
801 #if USE_TASK_SCHEDULER_OBSERVER
802 eh_test_observer o;
803 o.observe(true);
804 #endif
805 g_Master = std::this_thread::get_id();
806 InputNodeType input(g, InputNodeBodyType(input_count));
807 TTestNodeType node_to_test(g, TestNodeBodyType(test_count));
808 SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
809 tbb::flow::broadcast_node<tbb::flow::continue_msg> b1(g), b2(g), b3(g);
810 make_edge(input, b1);
811 make_edge(b1,b2);
812 make_edge(b1,b3);
813 make_edge(b2,node_to_test);
814 make_edge(b3,node_to_test);
815 make_edge(node_to_test, sink);
816 for(int iter = 0; iter < 2; ++iter) {
817 ResetGlobals(throwException,flog);
818 if(throwException) {
819 TRY();
820 input.activate();
821 g.wait_for_all();
822 CATCH_AND_ASSERT();
823 }
824 else {
825 TRY();
826 input.activate();
827 g.wait_for_all();
828 CATCH_AND_FAIL();
829 }
830 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
831 int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value();
832 int t_cnt = tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value();
833 int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
834 if(throwException) {
835 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
836 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
837 CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs");
838 CHECK_MESSAGE( (ib_cnt >= t_cnt), "Too many items received by test node");
839 CHECK_MESSAGE( (nb_cnt <= t_cnt), "Too many items received by sink nodes");
840 }
841 else {
842 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
843 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
844 CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node");
845 CHECK_MESSAGE( (t_cnt == g_NumItems), "Not all items reached test node");
846 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
847 }
848 g.reset(); // resets the body of the input_nodes, test_node and the absorb_nodes.
849 input_count = test_count = sink_count = 0;
850 CHECK_MESSAGE( (0 == (int)test_count), "Atomic wasn't reset properly");
851 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed");
852 CHECK_MESSAGE( (0 == tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value()),"Reset test_node failed");
853 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed");
854 }
855 #if USE_TASK_SCHEDULER_OBSERVER
856 o.observe(false);
857 #endif
858 }
859
860 template<
861 class ItemType,
862 TestNodeTypeEnum IType, // does input node throw?
863 TestNodeTypeEnum CType, // does continue_node throw?
864 TestNodeTypeEnum AType> // does absorber throw
run_continue_node_test()865 void run_continue_node_test() {
866 typedef test_input_body<tbb::flow::continue_msg,IType> IBodyType;
867 typedef absorber_body<tbb::flow::continue_msg,ItemType,CType,unlimited_type> ContBodyType;
868 typedef absorber_body<ItemType,tbb::flow::continue_msg, AType, unlimited_type> SinkBodyType;
869
870 typedef tbb::flow::input_node<tbb::flow::continue_msg> InputType;
871 typedef tbb::flow::continue_node<ItemType> TestType;
872 typedef tbb::flow::function_node<ItemType,tbb::flow::continue_msg> SnkType;
873
874 for(int i = 0; i < 4; ++i ) {
875 if(i == 2) continue; // don't run (false,true); it doesn't make sense.
876 bool doThrow = (i & 0x1) != 0;
877 bool doFlog = (i & 0x2) != 0;
878 run_one_continue_node_test<
879 /*InputNodeType*/ InputType,
880 /*InputNodeBodyType*/ IBodyType,
881 /*TestNodeType*/ TestType,
882 /*TestNodeBodyType*/ ContBodyType,
883 /*SinkNodeType*/ SnkType,
884 /*SinkNodeBodyType*/ SinkBodyType>
885 (doThrow,doFlog);
886 }
887 }
888
889 //
test_continue_node()890 void test_continue_node() {
891 INFO("Testing continue_node\n");
892 g_Wakeup_Msg = "buffer_node(non,is,non): Missed wakeup or machine is overloaded?";
893 run_continue_node_test<int,nonThrowing,isThrowing,nonThrowing>();
894 g_Wakeup_Msg = "buffer_node(non,non,is): Missed wakeup or machine is overloaded?";
895 run_continue_node_test<int,nonThrowing,nonThrowing,isThrowing>();
896 g_Wakeup_Msg = "buffer_node(is,non,non): Missed wakeup or machine is overloaded?";
897 run_continue_node_test<int,isThrowing,nonThrowing,nonThrowing>();
898 g_Wakeup_Msg = "buffer_node(is,is,is): Missed wakeup or machine is overloaded?";
899 run_continue_node_test<int,isThrowing,isThrowing,isThrowing>();
900 CheckType<double>::check_type_counter = 0;
901 run_continue_node_test<CheckType<double>,isThrowing,isThrowing,isThrowing>();
902 CHECK_MESSAGE( (!CheckType<double>::check_type_counter), "Dropped objects in continue_node test");
903 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
904 }
905
906 // ---------- buffer_node queue_node overwrite_node --------------
907
908 template<
909 class BufferItemType, //
910 class InputNodeType,
911 class InputNodeBodyType,
912 class TestNodeType,
913 class SinkNodeType,
914 class SinkNodeBodyType >
run_one_buffer_node_test(bool throwException,bool flog)915 void run_one_buffer_node_test(bool throwException,bool flog) {
916 tbb::flow::graph g;
917
918 std::atomic<int> input_count;
919 std::atomic<int> sink_count;
920 input_count = sink_count = 0;
921 #if USE_TASK_SCHEDULER_OBSERVER
922 eh_test_observer o;
923 o.observe(true);
924 #endif
925 g_Master = std::this_thread::get_id();
926 InputNodeType input(g, InputNodeBodyType(input_count));
927 TestNodeType node_to_test(g);
928 SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
929 make_edge(input,node_to_test);
930 make_edge(node_to_test, sink);
931 for(int iter = 0; iter < 2; ++iter) {
932 ResetGlobals(throwException,flog);
933 if(throwException) {
934 TRY();
935 input.activate();
936 g.wait_for_all();
937 CATCH_AND_ASSERT();
938 }
939 else {
940 TRY();
941 input.activate();
942 g.wait_for_all();
943 CATCH_AND_FAIL();
944 }
945 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
946 int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value();
947 int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
948 if(throwException) {
949 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
950 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
951 CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs");
952 CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes");
953 }
954 else {
955 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
956 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
957 CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node");
958 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
959 }
960 if(iter == 0) {
961 remove_edge(node_to_test, sink);
962 node_to_test.try_put(BufferItemType());
963 g.wait_for_all();
964 g.reset();
965 input_count = sink_count = 0;
966 BufferItemType tmp;
967 CHECK_MESSAGE( (!node_to_test.try_get(tmp)), "node not empty");
968 make_edge(node_to_test, sink);
969 g.wait_for_all();
970 }
971 else {
972 g.reset();
973 input_count = sink_count = 0;
974 }
975 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed");
976 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed");
977 }
978
979 #if USE_TASK_SCHEDULER_OBSERVER
980 o.observe(false);
981 #endif
982 }
983 template<class BufferItemType,
984 TestNodeTypeEnum InputThrowType,
985 TestNodeTypeEnum SinkThrowType>
run_buffer_queue_and_overwrite_node_test()986 void run_buffer_queue_and_overwrite_node_test() {
987 typedef test_input_body<BufferItemType,InputThrowType> InputBodyType;
988 typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
989
990 typedef tbb::flow::input_node<BufferItemType> InputType;
991 typedef tbb::flow::buffer_node<BufferItemType> BufType;
992 typedef tbb::flow::queue_node<BufferItemType> QueType;
993 typedef tbb::flow::overwrite_node<BufferItemType> OvrType;
994 typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
995
996 for(int i = 0; i < 4; ++i) {
997 if(i == 2) continue; // no need to test flog w/o throws
998 bool throwException = (i & 0x1) != 0;
999 bool doFlog = (i & 0x2) != 0;
1000 run_one_buffer_node_test<
1001 /* class BufferItemType*/ BufferItemType,
1002 /*class InputNodeType*/ InputType,
1003 /*class InputNodeBodyType*/ InputBodyType,
1004 /*class TestNodeType*/ BufType,
1005 /*class SinkNodeType*/ SnkType,
1006 /*class SinkNodeBodyType*/ SinkBodyType
1007 >(throwException, doFlog);
1008 run_one_buffer_node_test<
1009 /* class BufferItemType*/ BufferItemType,
1010 /*class InputNodeType*/ InputType,
1011 /*class InputNodeBodyType*/ InputBodyType,
1012 /*class TestNodeType*/ QueType,
1013 /*class SinkNodeType*/ SnkType,
1014 /*class SinkNodeBodyType*/ SinkBodyType
1015 >(throwException, doFlog);
1016 run_one_buffer_node_test<
1017 /* class BufferItemType*/ BufferItemType,
1018 /*class InputNodeType*/ InputType,
1019 /*class InputNodeBodyType*/ InputBodyType,
1020 /*class TestNodeType*/ OvrType,
1021 /*class SinkNodeType*/ SnkType,
1022 /*class SinkNodeBodyType*/ SinkBodyType
1023 >(throwException, doFlog);
1024 }
1025 }
1026
test_buffer_queue_and_overwrite_node()1027 void test_buffer_queue_and_overwrite_node() {
1028 INFO("Testing buffer_node, queue_node and overwrite_node\n");
1029 g_Wakeup_Msg = "buffer, queue, overwrite(is,non): Missed wakeup or machine is overloaded?";
1030 run_buffer_queue_and_overwrite_node_test<int,isThrowing,nonThrowing>();
1031 g_Wakeup_Msg = "buffer, queue, overwrite(non,is): Missed wakeup or machine is overloaded?";
1032 run_buffer_queue_and_overwrite_node_test<int,nonThrowing,isThrowing>();
1033 g_Wakeup_Msg = "buffer, queue, overwrite(is,is): Missed wakeup or machine is overloaded?";
1034 run_buffer_queue_and_overwrite_node_test<int,isThrowing,isThrowing>();
1035 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1036 }
1037
1038 // ---------- sequencer_node -------------------------
1039
1040
1041 template<
1042 class BufferItemType, //
1043 class InputNodeType,
1044 class InputNodeBodyType,
1045 class TestNodeType,
1046 class SeqBodyType,
1047 class SinkNodeType,
1048 class SinkNodeBodyType >
run_one_sequencer_node_test(bool throwException,bool flog)1049 void run_one_sequencer_node_test(bool throwException,bool flog) {
1050 tbb::flow::graph g;
1051
1052 std::atomic<int> input_count;
1053 std::atomic<int> sink_count;
1054 input_count = sink_count = 0;
1055 #if USE_TASK_SCHEDULER_OBSERVER
1056 eh_test_observer o;
1057 o.observe(true);
1058 #endif
1059 g_Master = std::this_thread::get_id();
1060 InputNodeType input(g, InputNodeBodyType(input_count));
1061 TestNodeType node_to_test(g,SeqBodyType());
1062 SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
1063 make_edge(input,node_to_test);
1064 make_edge(node_to_test, sink);
1065 for(int iter = 0; iter < 2; ++iter) {
1066 ResetGlobals(throwException,flog);
1067 if(throwException) {
1068 TRY();
1069 input.activate();
1070 g.wait_for_all();
1071 CATCH_AND_ASSERT();
1072 }
1073 else {
1074 TRY();
1075 input.activate();
1076 g.wait_for_all();
1077 CATCH_AND_FAIL();
1078 }
1079 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1080 int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value();
1081 int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
1082 if(throwException) {
1083 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
1084 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
1085 CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs");
1086 CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes");
1087 }
1088 else {
1089 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
1090 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
1091 CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node");
1092 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
1093 }
1094 if(iter == 0) {
1095 remove_edge(node_to_test, sink);
1096 node_to_test.try_put(BufferItemType(g_NumItems + 1));
1097 node_to_test.try_put(BufferItemType(1));
1098 g.wait_for_all();
1099 g.reset();
1100 input_count = sink_count = 0;
1101 make_edge(node_to_test, sink);
1102 g.wait_for_all();
1103 }
1104 else {
1105 g.reset();
1106 input_count = sink_count = 0;
1107 }
1108 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed");
1109 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed");
1110 }
1111
1112 #if USE_TASK_SCHEDULER_OBSERVER
1113 o.observe(false);
1114 #endif
1115 }
1116
1117 template<class BufferItemType,
1118 TestNodeTypeEnum InputThrowType,
1119 TestNodeTypeEnum SinkThrowType>
run_sequencer_node_test()1120 void run_sequencer_node_test() {
1121 typedef test_input_body<BufferItemType,InputThrowType> InputBodyType;
1122 typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1123 typedef sequencer_body<BufferItemType> SeqBodyType;
1124
1125 typedef tbb::flow::input_node<BufferItemType> InputType;
1126 typedef tbb::flow::sequencer_node<BufferItemType> SeqType;
1127 typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
1128
1129 for(int i = 0; i < 4; ++i) {
1130 if(i == 2) continue; // no need to test flog w/o throws
1131 bool throwException = (i & 0x1) != 0;
1132 bool doFlog = (i & 0x2) != 0;
1133 run_one_sequencer_node_test<
1134 /* class BufferItemType*/ BufferItemType,
1135 /*class InputNodeType*/ InputType,
1136 /*class InputNodeBodyType*/ InputBodyType,
1137 /*class TestNodeType*/ SeqType,
1138 /*class SeqBodyType*/ SeqBodyType,
1139 /*class SinkNodeType*/ SnkType,
1140 /*class SinkNodeBodyType*/ SinkBodyType
1141 >(throwException, doFlog);
1142 }
1143 }
1144
1145
1146
test_sequencer_node()1147 void test_sequencer_node() {
1148 INFO("Testing sequencer_node\n");
1149 g_Wakeup_Msg = "sequencer_node(is,non): Missed wakeup or machine is overloaded?";
1150 run_sequencer_node_test<int, isThrowing,nonThrowing>();
1151 CheckType<int>::check_type_counter = 0;
1152 g_Wakeup_Msg = "sequencer_node(non,is): Missed wakeup or machine is overloaded?";
1153 run_sequencer_node_test<CheckType<int>, nonThrowing,isThrowing>();
1154 CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Dropped objects in sequencer_node test");
1155 g_Wakeup_Msg = "sequencer_node(is,is): Missed wakeup or machine is overloaded?";
1156 run_sequencer_node_test<int, isThrowing,isThrowing>();
1157 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1158 }
1159
1160 // ------------ priority_queue_node ------------------
1161
1162 template<
1163 class BufferItemType,
1164 class InputNodeType,
1165 class InputNodeBodyType,
1166 class TestNodeType,
1167 class SinkNodeType,
1168 class SinkNodeBodyType >
run_one_priority_queue_node_test(bool throwException,bool flog)1169 void run_one_priority_queue_node_test(bool throwException,bool flog) {
1170 tbb::flow::graph g;
1171
1172 std::atomic<int> input_count;
1173 std::atomic<int> sink_count;
1174 input_count = sink_count = 0;
1175 #if USE_TASK_SCHEDULER_OBSERVER
1176 eh_test_observer o;
1177 o.observe(true);
1178 #endif
1179 g_Master = std::this_thread::get_id();
1180 InputNodeType input(g, InputNodeBodyType(input_count));
1181
1182 TestNodeType node_to_test(g);
1183
1184 SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
1185
1186 make_edge(input,node_to_test);
1187 make_edge(node_to_test, sink);
1188 for(int iter = 0; iter < 2; ++iter) {
1189 ResetGlobals(throwException,flog);
1190 if(throwException) {
1191 TRY();
1192 input.activate();
1193 g.wait_for_all();
1194 CATCH_AND_ASSERT();
1195 }
1196 else {
1197 TRY();
1198 input.activate();
1199 g.wait_for_all();
1200 CATCH_AND_FAIL();
1201 }
1202 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1203 int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value();
1204 int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
1205 if(throwException) {
1206 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
1207 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
1208 CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs");
1209 CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes");
1210 }
1211 else {
1212 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
1213 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
1214 CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node");
1215 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
1216 }
1217 if(iter == 0) {
1218 remove_edge(node_to_test, sink);
1219 node_to_test.try_put(BufferItemType(g_NumItems + 1));
1220 node_to_test.try_put(BufferItemType(g_NumItems + 2));
1221 node_to_test.try_put(BufferItemType());
1222 g.wait_for_all();
1223 g.reset();
1224 input_count = sink_count = 0;
1225 make_edge(node_to_test, sink);
1226 g.wait_for_all();
1227 }
1228 else {
1229 g.reset();
1230 input_count = sink_count = 0;
1231 }
1232 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed");
1233 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed");
1234 }
1235
1236 #if USE_TASK_SCHEDULER_OBSERVER
1237 o.observe(false);
1238 #endif
1239 }
1240
1241 template<class BufferItemType,
1242 TestNodeTypeEnum InputThrowType,
1243 TestNodeTypeEnum SinkThrowType>
run_priority_queue_node_test()1244 void run_priority_queue_node_test() {
1245 typedef test_input_body<BufferItemType,InputThrowType> InputBodyType;
1246 typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1247 typedef less_body<BufferItemType> LessBodyType;
1248
1249 typedef tbb::flow::input_node<BufferItemType> InputType;
1250 typedef tbb::flow::priority_queue_node<BufferItemType,LessBodyType> PrqType;
1251 typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
1252
1253 for(int i = 0; i < 4; ++i) {
1254 if(i == 2) continue; // no need to test flog w/o throws
1255 bool throwException = (i & 0x1) != 0;
1256 bool doFlog = (i & 0x2) != 0;
1257 run_one_priority_queue_node_test<
1258 /* class BufferItemType*/ BufferItemType,
1259 /*class InputNodeType*/ InputType,
1260 /*class InputNodeBodyType*/ InputBodyType,
1261 /*class TestNodeType*/ PrqType,
1262 /*class SinkNodeType*/ SnkType,
1263 /*class SinkNodeBodyType*/ SinkBodyType
1264 >(throwException, doFlog);
1265 }
1266 }
1267
test_priority_queue_node()1268 void test_priority_queue_node() {
1269 INFO("Testing priority_queue_node\n");
1270 g_Wakeup_Msg = "priority_queue_node(is,non): Missed wakeup or machine is overloaded?";
1271 run_priority_queue_node_test<int, isThrowing,nonThrowing>();
1272 CheckType<int>::check_type_counter = 0;
1273 g_Wakeup_Msg = "priority_queue_node(non,is): Missed wakeup or machine is overloaded?";
1274 run_priority_queue_node_test<CheckType<int>, nonThrowing,isThrowing>();
1275 CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Dropped objects in priority_queue_node test");
1276 g_Wakeup_Msg = "priority_queue_node(is,is): Missed wakeup or machine is overloaded?";
1277 run_priority_queue_node_test<int, isThrowing,isThrowing>();
1278 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1279 }
1280
1281 // ------------------- join_node ----------------
1282 template<class JP> struct graph_policy_name{
namegraph_policy_name1283 static const char* name() {return "unknown"; }
1284 };
1285 template<> struct graph_policy_name<tbb::flow::queueing> {
namegraph_policy_name1286 static const char* name() {return "queueing"; }
1287 };
1288 template<> struct graph_policy_name<tbb::flow::reserving> {
namegraph_policy_name1289 static const char* name() {return "reserving"; }
1290 };
1291 template<> struct graph_policy_name<tbb::flow::tag_matching> {
namegraph_policy_name1292 static const char* name() {return "tag_matching"; }
1293 };
1294
1295
1296 template<
1297 class JP,
1298 class OutputTuple,
1299 class InputType0,
1300 class InputBodyType0,
1301 class InputType1,
1302 class InputBodyType1,
1303 class TestJoinType,
1304 class SinkType,
1305 class SinkBodyType
1306 >
1307 struct run_one_join_node_test {
run_one_join_node_testrun_one_join_node_test1308 run_one_join_node_test() {}
execute_testrun_one_join_node_test1309 static void execute_test(bool throwException,bool flog) {
1310 typedef typename std::tuple_element<0,OutputTuple>::type ItemType0;
1311 typedef typename std::tuple_element<1,OutputTuple>::type ItemType1;
1312
1313 tbb::flow::graph g;
1314 std::atomic<int>input0_count;
1315 std::atomic<int>input1_count;
1316 std::atomic<int>sink_count;
1317 input0_count = input1_count = sink_count = 0;
1318 #if USE_TASK_SCHEDULER_OBSERVER
1319 eh_test_observer o;
1320 o.observe(true);
1321 #endif
1322 g_Master = std::this_thread::get_id();
1323 InputType0 input0(g, InputBodyType0(input0_count));
1324 InputType1 input1(g, InputBodyType1(input1_count));
1325 TestJoinType node_to_test(g);
1326 SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count));
1327 make_edge(input0,tbb::flow::input_port<0>(node_to_test));
1328 make_edge(input1,tbb::flow::input_port<1>(node_to_test));
1329 make_edge(node_to_test, sink);
1330 for(int iter = 0; iter < 2; ++iter) {
1331 ResetGlobals(throwException,flog);
1332 if(throwException) {
1333 TRY();
1334 input0.activate();
1335 input1.activate();
1336 g.wait_for_all();
1337 CATCH_AND_ASSERT();
1338 }
1339 else {
1340 TRY();
1341 input0.activate();
1342 input1.activate();
1343 g.wait_for_all();
1344 CATCH_AND_FAIL();
1345 }
1346 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1347 int ib0_cnt = tbb::flow::copy_body<InputBodyType0>(input0).count_value();
1348 int ib1_cnt = tbb::flow::copy_body<InputBodyType1>(input1).count_value();
1349 int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1350 if(throwException) {
1351 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
1352 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
1353 CHECK_MESSAGE( (ib0_cnt <= g_NumItems && ib1_cnt <= g_NumItems), "Too many items sent by inputs");
1354 CHECK_MESSAGE( (nb_cnt <= ((ib0_cnt < ib1_cnt) ? ib0_cnt : ib1_cnt)), "Too many items received by sink nodes");
1355 }
1356 else {
1357 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
1358 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
1359 if(ib0_cnt != g_NumItems) {
1360 // INFO("throwException == %s\n" << (throwException ? "true" : "false"));
1361 // INFO("iter == " << iter << "\n");
1362 // INFO("ib0_cnt == " << ib0_cnt << "\n");
1363 // INFO("g_NumItems == " << g_NumItems << "\n");
1364 }
1365 CHECK_MESSAGE( (ib0_cnt == g_NumItems), "Missing invocations of input_node0"); // this one
1366 CHECK_MESSAGE( (ib1_cnt == g_NumItems), "Missing invocations of input_node1");
1367 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
1368 }
1369 if(iter == 0) {
1370 remove_edge(node_to_test, sink);
1371 tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 1));
1372 tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2));
1373 g.wait_for_all();
1374 g.reset();
1375 input0_count = input1_count = sink_count = 0;
1376 make_edge(node_to_test, sink);
1377 g.wait_for_all();
1378 }
1379 else {
1380 g.wait_for_all();
1381 g.reset();
1382 input0_count = input1_count = sink_count = 0;
1383 }
1384 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType0>(input0).count_value()),"Reset input failed");
1385 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType1>(input1).count_value()),"Reset input failed");
1386 nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1387 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value()),"Reset sink failed");
1388 }
1389
1390 #if USE_TASK_SCHEDULER_OBSERVER
1391 o.observe(false);
1392 #endif
1393 }
1394 }; // run_one_join_node_test
1395
1396 template<
1397 class OutputTuple,
1398 class InputType0,
1399 class InputBodyType0,
1400 class InputType1,
1401 class InputBodyType1,
1402 class TestJoinType,
1403 class SinkType,
1404 class SinkBodyType
1405 >
1406 struct run_one_join_node_test<
1407 tbb::flow::tag_matching,
1408 OutputTuple,
1409 InputType0,
1410 InputBodyType0,
1411 InputType1,
1412 InputBodyType1,
1413 TestJoinType,
1414 SinkType,
1415 SinkBodyType
1416 > {
run_one_join_node_testrun_one_join_node_test1417 run_one_join_node_test() {}
execute_testrun_one_join_node_test1418 static void execute_test(bool throwException,bool flog) {
1419 typedef typename std::tuple_element<0,OutputTuple>::type ItemType0;
1420 typedef typename std::tuple_element<1,OutputTuple>::type ItemType1;
1421
1422 tbb::flow::graph g;
1423
1424 std::atomic<int>input0_count;
1425 std::atomic<int>input1_count;
1426 std::atomic<int>sink_count;
1427 input0_count = input1_count = sink_count = 0;
1428 #if USE_TASK_SCHEDULER_OBSERVER
1429 eh_test_observer o;
1430 o.observe(true);
1431 #endif
1432 g_Master = std::this_thread::get_id();
1433 InputType0 input0(g, InputBodyType0(input0_count, 2));
1434 InputType1 input1(g, InputBodyType1(input1_count, 3));
1435 TestJoinType node_to_test(g, tag_func<ItemType0>(ItemType0(2)), tag_func<ItemType1>(ItemType1(3)));
1436 SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count));
1437 make_edge(input0,tbb::flow::input_port<0>(node_to_test));
1438 make_edge(input1,tbb::flow::input_port<1>(node_to_test));
1439 make_edge(node_to_test, sink);
1440 for(int iter = 0; iter < 2; ++iter) {
1441 ResetGlobals(throwException,flog);
1442 if(throwException) {
1443 TRY();
1444 input0.activate();
1445 input1.activate();
1446 g.wait_for_all();
1447 CATCH_AND_ASSERT();
1448 }
1449 else {
1450 TRY();
1451 input0.activate();
1452 input1.activate();
1453 g.wait_for_all();
1454 CATCH_AND_FAIL();
1455 }
1456 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1457 int ib0_cnt = tbb::flow::copy_body<InputBodyType0>(input0).count_value();
1458 int ib1_cnt = tbb::flow::copy_body<InputBodyType1>(input1).count_value();
1459 int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1460 if(throwException) {
1461 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
1462 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
1463 CHECK_MESSAGE( (ib0_cnt <= g_NumItems && ib1_cnt <= g_NumItems), "Too many items sent by inputs");
1464 CHECK_MESSAGE( (nb_cnt <= ((ib0_cnt < ib1_cnt) ? ib0_cnt : ib1_cnt)), "Too many items received by sink nodes");
1465 }
1466 else {
1467 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
1468 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
1469 CHECK_MESSAGE( (ib0_cnt == g_NumItems), "Missing invocations of input_node0");
1470 CHECK_MESSAGE( (ib1_cnt == g_NumItems), "Missing invocations of input_node1");
1471 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
1472 }
1473 if(iter == 0) {
1474 remove_edge(node_to_test, sink);
1475 tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 4));
1476 tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2));
1477 g.wait_for_all(); // have to wait for the graph to stop again....
1478 g.reset(); // resets the body of the input_nodes, test_node and the absorb_nodes.
1479 input0_count = input1_count = sink_count = 0;
1480 make_edge(node_to_test, sink);
1481 g.wait_for_all(); // have to wait for the graph to stop again....
1482 }
1483 else {
1484 g.wait_for_all();
1485 g.reset();
1486 input0_count = input1_count = sink_count = 0;
1487 }
1488 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType0>(input0).count_value()),"Reset input failed");
1489 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType1>(input1).count_value()),"Reset input failed");
1490 nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1491 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value()),"Reset sink failed");
1492 }
1493
1494 #if USE_TASK_SCHEDULER_OBSERVER
1495 o.observe(false);
1496 #endif
1497 }
1498 }; // run_one_join_node_test<tag_matching>
1499
1500 template<class JP, class OutputTuple,
1501 TestNodeTypeEnum InputThrowType,
1502 TestNodeTypeEnum SinkThrowType>
run_join_node_test()1503 void run_join_node_test() {
1504 typedef typename std::tuple_element<0,OutputTuple>::type ItemType0;
1505 typedef typename std::tuple_element<1,OutputTuple>::type ItemType1;
1506 typedef test_input_body<ItemType0,InputThrowType> InputBodyType0;
1507 typedef test_input_body<ItemType1,InputThrowType> InputBodyType1;
1508 typedef absorber_body<OutputTuple,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1509
1510 typedef typename tbb::flow::input_node<ItemType0> InputType0;
1511 typedef typename tbb::flow::input_node<ItemType1> InputType1;
1512 typedef typename tbb::flow::join_node<OutputTuple,JP> TestJoinType;
1513 typedef typename tbb::flow::function_node<OutputTuple,tbb::flow::continue_msg> SinkType;
1514
1515 for(int i = 0; i < 4; ++i) {
1516 if(2 == i) continue;
1517 bool throwException = (i & 0x1) != 0;
1518 bool doFlog = (i & 0x2) != 0;
1519 run_one_join_node_test<
1520 JP,
1521 OutputTuple,
1522 InputType0,
1523 InputBodyType0,
1524 InputType1,
1525 InputBodyType1,
1526 TestJoinType,
1527 SinkType,
1528 SinkBodyType>::execute_test(throwException,doFlog);
1529 }
1530 }
1531
1532 template<class JP>
test_join_node()1533 void test_join_node() {
1534 INFO("Testing join_node<" << graph_policy_name<JP>::name() << ">\n");
1535 // only doing two-input joins
1536 g_Wakeup_Msg = "join(is,non): Missed wakeup or machine is overloaded?";
1537 run_join_node_test<JP, std::tuple<int,int>, isThrowing, nonThrowing>();
1538 CheckType<int>::check_type_counter = 0;
1539 g_Wakeup_Msg = "join(non,is): Missed wakeup or machine is overloaded?";
1540 run_join_node_test<JP, std::tuple<CheckType<int>,int>, nonThrowing, isThrowing>();
1541 CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Dropped items in test");
1542 g_Wakeup_Msg = "join(is,is): Missed wakeup or machine is overloaded?";
1543 run_join_node_test<JP, std::tuple<int,int>, isThrowing, isThrowing>();
1544 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1545 }
1546
1547 // ------------------- limiter_node -------------
1548
1549 template<
1550 class BufferItemType, //
1551 class InputNodeType,
1552 class InputNodeBodyType,
1553 class TestNodeType,
1554 class SinkNodeType,
1555 class SinkNodeBodyType >
run_one_limiter_node_test(bool throwException,bool flog)1556 void run_one_limiter_node_test(bool throwException,bool flog) {
1557 tbb::flow::graph g;
1558
1559 std::atomic<int> input_count;
1560 std::atomic<int> sink_count;
1561 input_count = sink_count = 0;
1562 #if USE_TASK_SCHEDULER_OBSERVER
1563 eh_test_observer o;
1564 o.observe(true);
1565 #endif
1566 g_Master = std::this_thread::get_id();
1567 InputNodeType input(g, InputNodeBodyType(input_count));
1568 TestNodeType node_to_test(g,g_NumThreads + 1);
1569 SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
1570 make_edge(input,node_to_test);
1571 make_edge(node_to_test, sink);
1572 for(int iter = 0; iter < 2; ++iter) {
1573 ResetGlobals(throwException,flog);
1574 if(throwException) {
1575 TRY();
1576 input.activate();
1577 g.wait_for_all();
1578 CATCH_AND_ASSERT();
1579 }
1580 else {
1581 TRY();
1582 input.activate();
1583 g.wait_for_all();
1584 CATCH_AND_FAIL();
1585 }
1586 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1587 int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value();
1588 int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
1589 if(throwException) {
1590 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
1591 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
1592 CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs");
1593 CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes");
1594 }
1595 else {
1596 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
1597 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
1598 // we stop after limiter's limit, which is g_NumThreads + 1. The input_node
1599 // is invoked one extra time, filling its buffer, so its limit is g_NumThreads + 2.
1600 CHECK_MESSAGE( (ib_cnt == g_NumThreads + 2), "Missing invocations of input_node");
1601 CHECK_MESSAGE( (nb_cnt == g_NumThreads + 1), "Missing items in absorbers");
1602 }
1603 if(iter == 0) {
1604 remove_edge(node_to_test, sink);
1605 node_to_test.try_put(BufferItemType());
1606 node_to_test.try_put(BufferItemType());
1607 g.wait_for_all();
1608 g.reset();
1609 input_count = sink_count = 0;
1610 BufferItemType tmp;
1611 CHECK_MESSAGE( (!node_to_test.try_get(tmp)), "node not empty");
1612 make_edge(node_to_test, sink);
1613 g.wait_for_all();
1614 }
1615 else {
1616 g.reset();
1617 input_count = sink_count = 0;
1618 }
1619 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed");
1620 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed");
1621 }
1622
1623 #if USE_TASK_SCHEDULER_OBSERVER
1624 o.observe(false);
1625 #endif
1626 }
1627
1628 template<class BufferItemType,
1629 TestNodeTypeEnum InputThrowType,
1630 TestNodeTypeEnum SinkThrowType>
run_limiter_node_test()1631 void run_limiter_node_test() {
1632 typedef test_input_body<BufferItemType,InputThrowType> InputBodyType;
1633 typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1634
1635 typedef tbb::flow::input_node<BufferItemType> InputType;
1636 typedef tbb::flow::limiter_node<BufferItemType> LmtType;
1637 typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
1638
1639 for(int i = 0; i < 4; ++i) {
1640 if(i == 2) continue; // no need to test flog w/o throws
1641 bool throwException = (i & 0x1) != 0;
1642 bool doFlog = (i & 0x2) != 0;
1643 run_one_limiter_node_test<
1644 /* class BufferItemType*/ BufferItemType,
1645 /*class InputNodeType*/ InputType,
1646 /*class InputNodeBodyType*/ InputBodyType,
1647 /*class TestNodeType*/ LmtType,
1648 /*class SinkNodeType*/ SnkType,
1649 /*class SinkNodeBodyType*/ SinkBodyType
1650 >(throwException, doFlog);
1651 }
1652 }
1653
test_limiter_node()1654 void test_limiter_node() {
1655 INFO("Testing limiter_node\n");
1656 g_Wakeup_Msg = "limiter_node(is,non): Missed wakeup or machine is overloaded?";
1657 run_limiter_node_test<int,isThrowing,nonThrowing>();
1658 g_Wakeup_Msg = "limiter_node(non,is): Missed wakeup or machine is overloaded?";
1659 run_limiter_node_test<int,nonThrowing,isThrowing>();
1660 g_Wakeup_Msg = "limiter_node(is,is): Missed wakeup or machine is overloaded?";
1661 run_limiter_node_test<int,isThrowing,isThrowing>();
1662 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1663 }
1664
1665 // -------- split_node --------------------
1666
1667 template<
1668 class InputTuple,
1669 class InputType,
1670 class InputBodyType,
1671 class TestSplitType,
1672 class SinkType0,
1673 class SinkBodyType0,
1674 class SinkType1,
1675 class SinkBodyType1>
run_one_split_node_test(bool throwException,bool flog)1676 void run_one_split_node_test(bool throwException, bool flog) {
1677
1678 tbb::flow::graph g;
1679
1680 std::atomic<int> input_count;
1681 std::atomic<int> sink0_count;
1682 std::atomic<int> sink1_count;
1683 input_count = sink0_count = sink1_count = 0;
1684 #if USE_TASK_SCHEDULER_OBSERVER
1685 eh_test_observer o;
1686 o.observe(true);
1687 #endif
1688
1689 g_Master = std::this_thread::get_id();
1690 InputType input(g, InputBodyType(input_count));
1691 TestSplitType node_to_test(g);
1692 SinkType0 sink0(g,tbb::flow::unlimited,SinkBodyType0(sink0_count));
1693 SinkType1 sink1(g,tbb::flow::unlimited,SinkBodyType1(sink1_count));
1694 make_edge(input, node_to_test);
1695 make_edge(tbb::flow::output_port<0>(node_to_test), sink0);
1696 make_edge(tbb::flow::output_port<1>(node_to_test), sink1);
1697
1698 for(int iter = 0; iter < 2; ++iter) { // run, reset, run again
1699 ResetGlobals(throwException,flog);
1700 if(throwException) {
1701 TRY();
1702 input.activate();
1703 g.wait_for_all();
1704 CATCH_AND_ASSERT();
1705 }
1706 else {
1707 TRY();
1708 input.activate();
1709 g.wait_for_all();
1710 CATCH_AND_FAIL();
1711 }
1712 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1713 int ib_cnt = tbb::flow::copy_body<InputBodyType>(input).count_value();
1714 int nb0_cnt = tbb::flow::copy_body<SinkBodyType0>(sink0).count_value();
1715 int nb1_cnt = tbb::flow::copy_body<SinkBodyType1>(sink1).count_value();
1716 if(throwException) {
1717 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
1718 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
1719 CHECK_MESSAGE( (ib_cnt <= 2*g_NumItems), "Too many items sent by input");
1720 CHECK_MESSAGE( (nb0_cnt + nb1_cnt <= ib_cnt*2), "Too many items received by sink nodes");
1721 }
1722 else {
1723 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
1724 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
1725 CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_nodes");
1726 CHECK_MESSAGE( (nb0_cnt == g_NumItems && nb1_cnt == g_NumItems), "Missing items in absorbers");
1727 }
1728 g.reset(); // resets the body of the input_nodes and the absorb_nodes.
1729 input_count = sink0_count = sink1_count = 0;
1730 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType>(input).count_value()),"Reset input failed");
1731 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType0>(sink0).count_value()),"Reset sink 0 failed");
1732 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType1>(sink1).count_value()),"Reset sink 1 failed");
1733 }
1734 #if USE_TASK_SCHEDULER_OBSERVER
1735 o.observe(false);
1736 #endif
1737 }
1738
1739 template<class InputTuple,
1740 TestNodeTypeEnum InputThrowType,
1741 TestNodeTypeEnum SinkThrowType>
run_split_node_test()1742 void run_split_node_test() {
1743 typedef typename std::tuple_element<0,InputTuple>::type ItemType0;
1744 typedef typename std::tuple_element<1,InputTuple>::type ItemType1;
1745 typedef tuple_test_input_body<InputTuple,InputThrowType> InputBodyType;
1746 typedef absorber_body<ItemType0,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType0;
1747 typedef absorber_body<ItemType1,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType1;
1748
1749 typedef typename tbb::flow::input_node<InputTuple> InputType;
1750 typedef typename tbb::flow::split_node<InputTuple> TestSplitType;
1751 typedef typename tbb::flow::function_node<ItemType0,tbb::flow::continue_msg> SinkType0;
1752 typedef typename tbb::flow::function_node<ItemType1,tbb::flow::continue_msg> SinkType1;
1753
1754 for(int i = 0; i < 4; ++i) {
1755 if(2 == i) continue;
1756 bool throwException = (i & 0x1) != 0;
1757 bool doFlog = (i & 0x2) != 0;
1758 run_one_split_node_test<
1759 InputTuple,
1760 InputType,
1761 InputBodyType,
1762 TestSplitType,
1763 SinkType0,
1764 SinkBodyType0,
1765 SinkType1,
1766 SinkBodyType1>
1767 (throwException,doFlog);
1768 }
1769 }
1770
test_split_node()1771 void test_split_node() {
1772 INFO("Testing split_node\n");
1773 g_Wakeup_Msg = "split_node(is,non): Missed wakeup or machine is overloaded?";
1774 run_split_node_test<std::tuple<int,int>, isThrowing, nonThrowing>();
1775 g_Wakeup_Msg = "split_node(non,is): Missed wakeup or machine is overloaded?";
1776 run_split_node_test<std::tuple<int,int>, nonThrowing, isThrowing>();
1777 g_Wakeup_Msg = "split_node(is,is): Missed wakeup or machine is overloaded?";
1778 run_split_node_test<std::tuple<int,int>, isThrowing, isThrowing>();
1779 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1780 }
1781
1782 // --------- indexer_node ----------------------
1783
1784 template < class InputTuple,
1785 class InputType0,
1786 class InputBodyType0,
1787 class InputType1,
1788 class InputBodyType1,
1789 class TestNodeType,
1790 class SinkType,
1791 class SinkBodyType>
run_one_indexer_node_test(bool throwException,bool flog)1792 void run_one_indexer_node_test(bool throwException,bool flog) {
1793 typedef typename std::tuple_element<0,InputTuple>::type ItemType0;
1794 typedef typename std::tuple_element<1,InputTuple>::type ItemType1;
1795
1796 tbb::flow::graph g;
1797
1798 std::atomic<int> input0_count;
1799 std::atomic<int> input1_count;
1800 std::atomic<int> sink_count;
1801 input0_count = input1_count = sink_count = 0;
1802 #if USE_TASK_SCHEDULER_OBSERVER
1803 eh_test_observer o;
1804 o.observe(true);
1805 #endif
1806 g_Master = std::this_thread::get_id();
1807 InputType0 input0(g, InputBodyType0(input0_count));
1808 InputType1 input1(g, InputBodyType1(input1_count));
1809 TestNodeType node_to_test(g);
1810 SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count));
1811 make_edge(input0,tbb::flow::input_port<0>(node_to_test));
1812 make_edge(input1,tbb::flow::input_port<1>(node_to_test));
1813 make_edge(node_to_test, sink);
1814 for(int iter = 0; iter < 2; ++iter) {
1815 ResetGlobals(throwException,flog);
1816 if(throwException) {
1817 TRY();
1818 input0.activate();
1819 input1.activate();
1820 g.wait_for_all();
1821 CATCH_AND_ASSERT();
1822 }
1823 else {
1824 TRY();
1825 input0.activate();
1826 input1.activate();
1827 g.wait_for_all();
1828 CATCH_AND_FAIL();
1829 }
1830 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1831 int ib0_cnt = tbb::flow::copy_body<InputBodyType0>(input0).count_value();
1832 int ib1_cnt = tbb::flow::copy_body<InputBodyType1>(input1).count_value();
1833 int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1834 if(throwException) {
1835 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
1836 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
1837 CHECK_MESSAGE( (ib0_cnt <= g_NumItems && ib1_cnt <= g_NumItems), "Too many items sent by inputs");
1838 CHECK_MESSAGE( (nb_cnt <= ib0_cnt + ib1_cnt), "Too many items received by sink nodes");
1839 }
1840 else {
1841 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
1842 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
1843 CHECK_MESSAGE( (ib0_cnt == g_NumItems), "Missing invocations of input_node0");
1844 CHECK_MESSAGE( (ib1_cnt == g_NumItems), "Missing invocations of input_node1");
1845 CHECK_MESSAGE( (nb_cnt == 2*g_NumItems), "Missing items in absorbers");
1846 }
1847 if(iter == 0) {
1848 remove_edge(node_to_test, sink);
1849 tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 4));
1850 tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2));
1851 g.wait_for_all();
1852 g.reset();
1853 input0_count = input1_count = sink_count = 0;
1854 make_edge(node_to_test, sink);
1855 g.wait_for_all();
1856 }
1857 else {
1858 g.wait_for_all();
1859 g.reset();
1860 input0_count = input1_count = sink_count = 0;
1861 }
1862 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType0>(input0).count_value()),"Reset input failed");
1863 CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType1>(input1).count_value()),"Reset input failed");
1864 nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1865 CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value()),"Reset sink failed");
1866 }
1867
1868 #if USE_TASK_SCHEDULER_OBSERVER
1869 o.observe(false);
1870 #endif
1871 }
1872
1873 template<class InputTuple,
1874 TestNodeTypeEnum InputThrowType,
1875 TestNodeTypeEnum SinkThrowType>
run_indexer_node_test()1876 void run_indexer_node_test() {
1877 typedef typename std::tuple_element<0,InputTuple>::type ItemType0;
1878 typedef typename std::tuple_element<1,InputTuple>::type ItemType1;
1879 typedef test_input_body<ItemType0,InputThrowType> InputBodyType0;
1880 typedef test_input_body<ItemType1,InputThrowType> InputBodyType1;
1881 typedef typename tbb::flow::indexer_node<ItemType0, ItemType1> TestNodeType;
1882 typedef absorber_body<typename TestNodeType::output_type,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1883
1884 typedef typename tbb::flow::input_node<ItemType0> InputType0;
1885 typedef typename tbb::flow::input_node<ItemType1> InputType1;
1886 typedef typename tbb::flow::function_node<typename TestNodeType::output_type,tbb::flow::continue_msg> SinkType;
1887
1888 for(int i = 0; i < 4; ++i) {
1889 if(2 == i) continue;
1890 bool throwException = (i & 0x1) != 0;
1891 bool doFlog = (i & 0x2) != 0;
1892 run_one_indexer_node_test<
1893 InputTuple,
1894 InputType0,
1895 InputBodyType0,
1896 InputType1,
1897 InputBodyType1,
1898 TestNodeType,
1899 SinkType,
1900 SinkBodyType>(throwException,doFlog);
1901 }
1902 }
1903
test_indexer_node()1904 void test_indexer_node() {
1905 INFO("Testing indexer_node\n");
1906 g_Wakeup_Msg = "indexer_node(is,non): Missed wakeup or machine is overloaded?";
1907 run_indexer_node_test<std::tuple<int,int>, isThrowing, nonThrowing>();
1908 g_Wakeup_Msg = "indexer_node(non,is): Missed wakeup or machine is overloaded?";
1909 run_indexer_node_test<std::tuple<int,int>, nonThrowing, isThrowing>();
1910 g_Wakeup_Msg = "indexer_node(is,is): Missed wakeup or machine is overloaded?";
1911 run_indexer_node_test<std::tuple<int,int>, isThrowing, isThrowing>();
1912 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1913 }
1914
1915 ///////////////////////////////////////////////
1916 // whole-graph exception test
1917
1918 class Foo {
1919 private:
1920 // std::vector<int>& m_vec;
1921 std::vector<int>* m_vec;
1922 public:
Foo(std::vector<int> & vec)1923 Foo(std::vector<int>& vec) : m_vec(&vec) { }
operator ()(tbb::flow::continue_msg) const1924 void operator() (tbb::flow::continue_msg) const {
1925 ++nExceptions;
1926 (void)m_vec->at(m_vec->size()); // Will throw out_of_range exception
1927 CHECK_MESSAGE( (false), "Exception not thrown by invalid access");
1928 }
1929 };
1930
1931 // test from user ahelwer: https://community.intel.com/t5/Intel-oneAPI-Threading-Building/Exception-in-flow-graph-results-in-graph-wait-for-all-hanging/td-p/789352
1932 // exception thrown in graph node, not caught in wait_for_all()
1933 void
test_flow_graph_exception0()1934 test_flow_graph_exception0() {
1935 // Initializes body
1936 std::vector<int> vec;
1937 vec.push_back(0);
1938 Foo f(vec);
1939 nExceptions = 0;
1940
1941 // Construct graph and nodes
1942 tbb::flow::graph g;
1943 tbb::flow::broadcast_node<tbb::flow::continue_msg> start(g);
1944 tbb::flow::continue_node<tbb::flow::continue_msg> fooNode(g, f);
1945
1946 // Construct edge
1947 tbb::flow::make_edge(start, fooNode);
1948
1949 // Execute graph
1950 CHECK_MESSAGE( (!g.exception_thrown()), "exception_thrown flag already set");
1951 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag already set");
1952 try {
1953 start.try_put(tbb::flow::continue_msg());
1954 g.wait_for_all();
1955 CHECK_MESSAGE( (false), "Exception not thrown");
1956 }
1957 catch(std::out_of_range& ex) {
1958 INFO("Exception: " << ex.what() << "(expected)\n");
1959 }
1960 catch(...) {
1961 INFO("Unknown exception caught (expected)\n");
1962 }
1963 CHECK_MESSAGE( (nExceptions > 0), "Exception caught, but no body signaled exception being thrown");
1964 nExceptions = 0;
1965 CHECK_MESSAGE( (g.exception_thrown()), "Exception not intercepted");
1966 // if exception set, cancellation also set.
1967 CHECK_MESSAGE( (g.is_cancelled()), "Exception cancellation not signaled");
1968 // in case we got an exception
1969 try {
1970 g.wait_for_all(); // context still signalled canceled, my_exception still set.
1971 }
1972 catch(...) {
1973 CHECK_MESSAGE( (false), "Second exception thrown but no task executing");
1974 }
1975 CHECK_MESSAGE( (nExceptions == 0), "body signaled exception being thrown, but no body executed");
1976 CHECK_MESSAGE( (!g.exception_thrown()), "exception_thrown flag not reset");
1977 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag not reset");
1978 }
1979
TestOneThreadNum(int nThread)1980 void TestOneThreadNum(int nThread) {
1981 INFO("Testing " << nThread << "%d threads\n");
1982 g_NumItems = ((nThread > NUM_ITEMS) ? nThread *2 : NUM_ITEMS);
1983 g_NumThreads = nThread;
1984 tbb::task_arena arena(nThread);
1985 arena.execute(
1986 [&]() {
1987 // whole-graph exception catch and rethrow test
1988 test_flow_graph_exception0();
1989 for(int i = 0; i < 4; ++i) {
1990 g_ExceptionInMaster = (i & 1) != 0;
1991 g_SolitaryException = (i & 2) != 0;
1992 INFO("g_ExceptionInMaster == " << (g_ExceptionInMaster ? "T":"F")
1993 << ", g_SolitaryException == " << (g_SolitaryException ? "T":"F") << "\n");
1994 test_input_node();
1995 test_function_node();
1996 test_continue_node(); // also test broadcast_node
1997 test_multifunction_node();
1998 // single- and multi-item buffering nodes
1999 test_buffer_queue_and_overwrite_node();
2000 test_sequencer_node();
2001 test_priority_queue_node();
2002
2003 // join_nodes
2004 test_join_node<tbb::flow::queueing>();
2005 test_join_node<tbb::flow::reserving>();
2006 test_join_node<tbb::flow::tag_matching>();
2007
2008 test_limiter_node();
2009 test_split_node();
2010 // graph for write_once_node will be complicated by the fact the node will
2011 // not do try_puts after it has been set. To get parallelism of N we have
2012 // to attach N successor nodes to the write_once (or play some similar game).
2013 // test_write_once_node();
2014 test_indexer_node();
2015 }
2016 }
2017 );
2018 }
2019
2020 //! Test exceptions with parallelism
2021 //! \brief \ref error_guessing
2022 TEST_CASE("Testing several threads"){
2023 // reverse order of tests
2024 for(unsigned int nThread=utils::MaxThread; nThread >= utils::MinThread; --nThread) {
2025 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, nThread);
2026 TestOneThreadNum(nThread);
2027 }
2028 }
2029
2030 #endif // TBB_USE_EXCEPTIONS
2031