1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/config.h"
18 
19 #if _MSC_VER
20     #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning
21     #if _MSC_VER==1700 && !defined(__INTEL_COMPILER)
22         // Suppress "unreachable code" warning by VC++ 17.0 (VS 2012)
23         #pragma warning (disable: 4702)
24     #endif
25 #endif
26 
27 // need these to get proper external names for private methods in library.
28 #include "tbb/spin_mutex.h"
29 #include "tbb/spin_rw_mutex.h"
30 #include "tbb/task_arena.h"
31 #include "tbb/task_group.h"
32 
33 #define private public
34 #define protected public
35 #include "tbb/flow_graph.h"
36 #undef protected
37 #undef private
38 
39 #include "common/test.h"
40 #include "common/utils.h"
41 #include "common/spin_barrier.h"
42 #include "common/graph_utils.h"
43 
44 #include <string> // merely prevents LNK2001 error to happen (on ICL+VC9 configurations)
45 
46 //! \file test_flow_graph_whitebox.cpp
47 //! \brief Test for [flow_graph.broadcast_node flow_graph.priority_queue_node flow_graph.indexer_node flow_graph.sequencer_node flow_graph.remove_edge flow_graph.join_node flow_graph.split_node flow_graph.limiter_node flow_graph.write_once_node flow_graph.overwrite_node flow_graph.make_edge flow_graph.graph flow_graph.buffer_node flow_graph.function_node flow_graph.multifunction_node flow_graph.continue_node flow_graph.input_node] specification
48 
49 template<typename T>
50 struct receiverBody {
51     tbb::flow::continue_msg operator()(const T &/*in*/) {
52         return tbb::flow::continue_msg();
53     }
54 };
55 
56 // split_nodes cannot have predecessors
57 // they do not reject messages and always forward.
58 // they reject edge reversals from successors.
59 void TestSplitNode() {
60     typedef tbb::flow::split_node<std::tuple<int> > snode_type;
61     tbb::flow::graph g;
62     snode_type snode(g);
63     tbb::flow::function_node<int> rcvr(g,tbb::flow::unlimited, receiverBody<int>());
64     INFO("Testing split_node\n");
65     CHECK_MESSAGE( (tbb::flow::output_port<0>(snode).my_successors.empty()), "Constructed split_node has successors");
66     // tbb::flow::output_port<0>(snode)
67     tbb::flow::make_edge(tbb::flow::output_port<0>(snode), rcvr);
68     CHECK_MESSAGE( (!(tbb::flow::output_port<0>(snode).my_successors.empty())), "after make_edge, split_node has no successor.");
69     snode.try_put(std::tuple<int>(1));
70     g.wait_for_all();
71     g.reset();
72     CHECK_MESSAGE( (!(tbb::flow::output_port<0>(snode).my_successors.empty())), "after reset(), split_node has no successor.");
73     g.reset(tbb::flow::rf_clear_edges);
74     CHECK_MESSAGE( (tbb::flow::output_port<0>(snode).my_successors.empty()), "after reset(rf_clear_edges), split_node has a successor.");
75 }
76 
77 // buffering nodes cannot have predecessors
78 // they do not reject messages and always save or forward
79 // they allow edge reversals from successors
80 template< typename B >
81 void TestBufferingNode(const char * name) {
82     tbb::flow::graph g;
83     B bnode(g);
84     tbb::flow::function_node<int,int,tbb::flow::rejecting> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
85     INFO("Testing " << name << ":");
86     for(int icnt = 0; icnt < 2; icnt++) {
87         bool reverse_edge = (icnt & 0x2) != 0;
88         serial_fn_state0 = 0;  // reset to waiting state.
89         INFO(" make_edge");
90         tbb::flow::make_edge(bnode, fnode);
91         CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after make_edge");
92         std::thread t([&] {
93             INFO(" try_put");
94             bnode.try_put(1);  // will forward to the fnode
95             g.wait_for_all();
96         });
97         utils::SpinWaitWhileEq(serial_fn_state0, 0);
98         if(reverse_edge) {
99             INFO(" try_put2");
100             bnode.try_put(2);  // should reverse the edge
101             // waiting for the edge to reverse
102             utils::SpinWaitWhile([&] { return !bnode.my_successors.empty(); });
103         }
104         else {
105             CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after forwarding message");
106         }
107         serial_fn_state0 = 0;  // release the function_node.
108         if(reverse_edge) {
109             // have to do a second release because the function_node will get the 2nd item
110             utils::SpinWaitWhileEq(serial_fn_state0, 0);
111             serial_fn_state0 = 0;  // release the function_node.
112         }
113         t.join();
114         INFO(" remove_edge");
115         tbb::flow::remove_edge(bnode, fnode);
116         CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after remove_edge");
117     }
118     tbb::flow::join_node<std::tuple<int,int>,tbb::flow::reserving> jnode(g);
119     tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode));  // will spawn a task
120     g.wait_for_all();
121     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after attaching to join");
122     INFO(" reverse");
123     bnode.try_put(1);  // the edge should reverse
124     g.wait_for_all();
125     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving");
126     INFO(" reset()");
127     g.wait_for_all();
128     g.reset();  // should be in forward direction again
129     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after reset()");
130     INFO(" remove_edge");
131     g.reset(tbb::flow::rf_clear_edges);
132     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)");
133     tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode));  // add edge again
134     // reverse edge by adding to buffer.
135     bnode.try_put(1);  // the edge should reverse
136     g.wait_for_all();
137     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving");
138     INFO(" remove_edge(reversed)");
139     g.reset(tbb::flow::rf_clear_edges);
140     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has no successor after reset()");
141     CHECK_MESSAGE( (tbb::flow::input_port<0>(jnode).my_predecessors.empty()), "predecessor not reset");
142     INFO("  done\n");
143     g.wait_for_all();
144 }
145 
146 // continue_node has only predecessor count
147 // they do not have predecessors, only the counts
148 // successor edges cannot be reversed
149 void TestContinueNode() {
150     tbb::flow::graph g;
151     tbb::flow::function_node<int> fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
152     tbb::flow::continue_node<int> cnode(g, /*number_of_predecessors*/ 1,
153                                         serial_continue_body<int>(serial_continue_state0));
154     tbb::flow::function_node<int> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state1));
155     tbb::flow::make_edge(fnode0, cnode);
156     tbb::flow::make_edge(cnode, fnode1);
157     INFO("Testing continue_node:");
158     for( int icnt = 0; icnt < 2; ++icnt ) {
159         INFO( " initial" << icnt);
160         CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor addition didn't increment count");
161         CHECK_MESSAGE( (!cnode.successors().empty()), "successors empty though we added one");
162         CHECK_MESSAGE( (cnode.my_current_count == 0), "state of continue_receiver incorrect");
163         serial_continue_state0 = 0;
164         serial_fn_state0 = 0;
165         serial_fn_state1 = 0;
166 
167         std::thread t([&] {
168             fnode0.try_put(1);  // start the first function node.
169             if(icnt == 0) {  // first time through, let the continue_node fire
170                 INFO(" firing");
171                 fnode0.try_put(1);  // second message
172                 g.wait_for_all();
173 
174                 // try a try_get()
175                 {
176                     int i;
177                     CHECK_MESSAGE( (!cnode.try_get(i)), "try_get not rejected");
178                 }
179 
180                 INFO(" reset");
181                 CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (before reset)");
182                 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (before reset)");
183                 g.reset();  // should still be the same
184                 CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (after reset)" );
185                 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (after reset)");
186             }
187             else {  // we're going to see if the rf_clear_edges resets things.
188                 g.wait_for_all();
189                 INFO(" reset(rf_clear_edges)");
190                 CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (before reset)" );
191                 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (before reset)" );
192                 g.reset(tbb::flow::rf_clear_edges);  // should be in forward direction again
193                 CHECK_MESSAGE( (cnode.my_current_count == 0), "state of continue_receiver incorrect after reset(rf_clear_edges)" );
194                 CHECK_MESSAGE( (cnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)" );
195                 CHECK_MESSAGE( (cnode.my_predecessor_count == cnode.my_initial_predecessor_count), "predecessor count not reset" );
196             }
197         });
198 
199         utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting for the first message to arrive in function_node
200         // Now the body of function_node 0 is executing.
201         serial_fn_state0 = 0;  // release the node
202         if (icnt == 0) {
203             // wait for node to count the message (or for the node body to execute, which would be wrong)
204             utils::SpinWaitWhile([&] {
205                 return serial_continue_state0 == 0 && cnode.my_current_count == 0;
206             });
207             CHECK_MESSAGE( (serial_continue_state0 == 0), "Improperly released continue_node");
208             CHECK_MESSAGE( (cnode.my_current_count == 1), "state of continue_receiver incorrect");
209 
210             utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting for the second message to arrive in function_node
211             // Now the body of function_node 0 is executing.
212             serial_fn_state0 = 0;  // release the node
213 
214             utils::SpinWaitWhileEq(serial_continue_state0, 0); // waiting for continue_node to start
215             CHECK_MESSAGE( (cnode.my_current_count == 0), " my_current_count not reset before body of continue_node started");
216             serial_continue_state0 = 0;  // release the continue_node
217 
218             utils::SpinWaitWhileEq(serial_fn_state1, 0); // wait for the successor function_node to enter body
219             serial_fn_state1 = 0;  // release successor function_node.
220         }
221 
222         t.join();
223     }
224 
225     INFO(" done\n");
226 
227 }
228 
229 // function_node has predecessors and successors
230 // try_get() rejects
231 // successor edges cannot be reversed
232 // predecessors will reverse (only rejecting will reverse)
233 void TestFunctionNode() {
234     tbb::flow::graph g;
235     tbb::flow::queue_node<int> qnode0(g);
236     tbb::flow::function_node<int,int,   tbb::flow::rejecting > fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
237     tbb::flow::function_node<int,int/*, tbb::flow::queueing*/> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
238 
239     tbb::flow::queue_node<int> qnode1(g);
240 
241     tbb::flow::make_edge(fnode0, qnode1);
242     tbb::flow::make_edge(qnode0, fnode0);
243 
244     serial_fn_state0 = 2;  // just let it go
245     qnode0.try_put(1);
246     g.wait_for_all();
247     int ii;
248     CHECK_MESSAGE( (qnode1.try_get(ii) && ii == 1), "output not passed" );
249     tbb::flow::remove_edge(qnode0, fnode0);
250     tbb::flow::remove_edge(fnode0, qnode1);
251 
252     tbb::flow::make_edge(fnode1, qnode1);
253     tbb::flow::make_edge(qnode0, fnode1);
254 
255     serial_fn_state0 = 2;  // just let it go
256     qnode0.try_put(1);
257     g.wait_for_all();
258     CHECK_MESSAGE( (qnode1.try_get(ii) && ii == 1), "output not passed" );
259     tbb::flow::remove_edge(qnode0, fnode1);
260     tbb::flow::remove_edge(fnode1, qnode1);
261 
262     // rejecting
263     serial_fn_state0 = 0;
264     tbb::flow::make_edge(fnode0, qnode1);
265     tbb::flow::make_edge(qnode0, fnode0);
266     INFO("Testing rejecting function_node:");
267     CHECK_MESSAGE( (!fnode0.my_queue), "node should have no queue");
268     CHECK_MESSAGE( (!fnode0.my_successors.empty()), "successor edge not added");
269     std::thread t([&] {
270         g.reset(); // attach to the current arena
271         qnode0.try_put(1);
272         qnode0.try_put(2);   // rejecting node should reject, reverse.
273         g.wait_for_all();
274     });
275     utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting rejecting node to start
276     utils::SpinWaitWhile([&] { return fnode0.my_predecessors.empty(); });
277     serial_fn_state0 = 2;   // release function_node body.
278     t.join();
279     INFO(" reset");
280     g.reset();  // should reverse the edge from the input to the function node.
281     CHECK_MESSAGE( (!qnode0.my_successors.empty()), "empty successors after reset()");
282     CHECK_MESSAGE( (fnode0.my_predecessors.empty()), "predecessor not reversed");
283     tbb::flow::remove_edge(qnode0, fnode0);
284     tbb::flow::remove_edge(fnode0, qnode1);
285     INFO("\n");
286 
287     // queueing
288     tbb::flow::make_edge(fnode1, qnode1);
289     INFO("Testing queueing function_node:");
290     CHECK_MESSAGE( (fnode1.my_queue), "node should have no queue");
291     CHECK_MESSAGE( (!fnode1.my_successors.empty()), "successor edge not added");
292     INFO(" add_pred");
293     CHECK_MESSAGE( (fnode1.register_predecessor(qnode0)), "Cannot register as predecessor");
294     CHECK_MESSAGE( (!fnode1.my_predecessors.empty()), "Missing predecessor");
295     INFO(" reset");
296     g.wait_for_all();
297     g.reset();  // should reverse the edge from the input to the function node.
298     CHECK_MESSAGE( (!qnode0.my_successors.empty()), "empty successors after reset()");
299     CHECK_MESSAGE( (fnode1.my_predecessors.empty()), "predecessor not reversed");
300     tbb::flow::remove_edge(qnode0, fnode1);
301     tbb::flow::remove_edge(fnode1, qnode1);
302     INFO("\n");
303 
304     serial_fn_state0 = 0;  // make the function_node wait
305     tbb::flow::make_edge(qnode0, fnode0);
306 
307     std::thread t2([&] {
308         g.reset(); // attach to the current arena
309         INFO(" start_func");
310         qnode0.try_put(1);
311         // now if we put an item to the queues the edges to the function_node will reverse.
312         INFO(" put_node(2)");
313         qnode0.try_put(2);   // start queue node.
314         g.wait_for_all();
315     });
316     utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting rejecting node to start
317     // wait for the edges to reverse
318     utils::SpinWaitWhile([&] { return fnode0.my_predecessors.empty(); });
319     g.my_context->cancel_group_execution();
320     // release the function_node
321     serial_fn_state0 = 2;
322     t2.join();
323     CHECK_MESSAGE( (!fnode0.my_predecessors.empty() && qnode0.my_successors.empty()), "function_node edge not reversed");
324     g.reset(tbb::flow::rf_clear_edges);
325     CHECK_MESSAGE( (fnode0.my_predecessors.empty() && qnode0.my_successors.empty()), "function_node edge not removed");
326     CHECK_MESSAGE( (fnode0.my_successors.empty()), "successor to fnode not removed");
327     INFO(" done\n");
328 }
329 
330 template<typename TT>
331 class tag_func {
332     TT my_mult;
333 public:
334     tag_func(TT multiplier) : my_mult(multiplier) { }
335     // operator() will return [0 .. Count)
336     tbb::flow::tag_value operator()( TT v) {
337         tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult);
338         return t;
339     }
340 };
341 
342 template<typename JNODE_TYPE>
343 void
344 TestSimpleSuccessorArc(const char *name) {
345     tbb::flow::graph g;
346     {
347         INFO("Join<" << name << "> successor test ");
348         tbb::flow::join_node<std::tuple<int>, JNODE_TYPE> qj(g);
349         tbb::flow::broadcast_node<std::tuple<int> > bnode(g);
350         tbb::flow::make_edge(qj, bnode);
351         CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after linking");
352         g.reset();
353         CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after reset()");
354         g.reset(tbb::flow::rf_clear_edges);
355         CHECK_MESSAGE( (qj.my_successors.empty()), "successors not removed after reset(rf_clear_edges)");
356     }
357 }
358 
359 template<>
360 void
361 TestSimpleSuccessorArc<tbb::flow::tag_matching>(const char *name) {
362     tbb::flow::graph g;
363     {
364         INFO("Join<" << name << "> successor test ");
365         typedef std::tuple<int,int> my_tuple;
366         tbb::flow::join_node<my_tuple, tbb::flow::tag_matching> qj(g,
367                                                                    tag_func<int>(1),
368                                                                    tag_func<int>(1)
369         );
370         tbb::flow::broadcast_node<my_tuple > bnode(g);
371         tbb::flow::make_edge(qj, bnode);
372         CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after linking");
373         g.reset();
374         CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after reset()");
375         g.reset(tbb::flow::rf_clear_edges);
376         CHECK_MESSAGE( (qj.my_successors.empty()), "successors not removed after reset(rf_clear_edges)");
377     }
378 }
379 
380 void
381 TestJoinNode() {
382     tbb::flow::graph g;
383 
384     TestSimpleSuccessorArc<tbb::flow::queueing>("queueing");
385     TestSimpleSuccessorArc<tbb::flow::reserving>("reserving");
386     TestSimpleSuccessorArc<tbb::flow::tag_matching>("tag_matching");
387 
388     // queueing and tagging join nodes have input queues, so the input ports do not reverse.
389     INFO(" reserving preds");
390     {
391         tbb::flow::join_node<std::tuple<int,int>, tbb::flow::reserving> rj(g);
392         tbb::flow::queue_node<int> q0(g);
393         tbb::flow::queue_node<int> q1(g);
394         tbb::flow::make_edge(q0,tbb::flow::input_port<0>(rj));
395         tbb::flow::make_edge(q1,tbb::flow::input_port<1>(rj));
396         q0.try_put(1);
397         g.wait_for_all();  // quiesce
398         CHECK_MESSAGE( (!(tbb::flow::input_port<0>(rj).my_predecessors.empty())),"reversed port missing predecessor");
399         CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"non-reversed port has pred");
400         g.reset();
401         CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"reversed port has pred after reset()");
402         CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()");
403         q1.try_put(2);
404         g.wait_for_all();  // quiesce
405         CHECK_MESSAGE( (!(tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port missing predecessor");
406         CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred");
407         g.reset();
408         CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port has pred after reset()");
409         CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()");
410         // should reset predecessors just as regular reset.
411         q1.try_put(3);
412         g.wait_for_all();  // quiesce
413         CHECK_MESSAGE( (!(tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port missing predecessor");
414         CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred");
415         g.reset(tbb::flow::rf_clear_edges);
416         CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port has pred after reset()");
417         CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()");
418         CHECK_MESSAGE( (q0.my_successors.empty()), "edge not removed by reset(rf_clear_edges)");
419         CHECK_MESSAGE( (q1.my_successors.empty()), "edge not removed by reset(rf_clear_edges)");
420     }
421     INFO(" done\n");
422 }
423 
424 template <typename DecrementerType>
425 struct limiter_node_type {
426     using type = tbb::flow::limiter_node<int, DecrementerType>;
427     using dtype = DecrementerType;
428 };
429 
430 template <>
431 struct limiter_node_type<void> {
432     using type = tbb::flow::limiter_node<int>;
433     using dtype = tbb::flow::continue_msg;
434 };
435 
436 template <typename DType>
437 struct DecrementerHelper {
438     template <typename Decrementer>
439     static void check(Decrementer&) {}
440     static DType makeDType() {
441         return DType(1);
442     }
443 };
444 
445 template <>
446 struct DecrementerHelper<tbb::flow::continue_msg> {
447     template <typename Decrementer>
448     static void check(Decrementer& decrementer) {
449         auto& d = static_cast<tbb::detail::d1::continue_receiver&>(decrementer);
450         CHECK_MESSAGE(d.my_predecessor_count == 0, "error in pred count");
451         CHECK_MESSAGE(d.my_initial_predecessor_count == 0, "error in initial pred count");
452         CHECK_MESSAGE(d.my_current_count == 0, "error in current count");
453     }
454     static tbb::flow::continue_msg makeDType() {
455         return tbb::flow::continue_msg();
456     }
457 };
458 
459 template <typename DecrementerType>
460 void TestLimiterNode() {
461     int out_int{};
462     tbb::flow::graph g;
463     using dtype = typename limiter_node_type<DecrementerType>::dtype;
464     typename limiter_node_type<DecrementerType>::type ln(g,1);
465     INFO("Testing limiter_node: preds and succs");
466     DecrementerHelper<dtype>::check(ln.decrementer());
467     CHECK_MESSAGE( (ln.my_threshold == 1), "error in my_threshold");
468     tbb::flow::queue_node<int> inq(g);
469     tbb::flow::queue_node<int> outq(g);
470     tbb::flow::broadcast_node<dtype> bn(g);
471 
472     tbb::flow::make_edge(inq,ln);
473     tbb::flow::make_edge(ln,outq);
474     tbb::flow::make_edge(bn,ln.decrementer());
475 
476     g.wait_for_all();
477     CHECK_MESSAGE( (!(ln.my_successors.empty())),"successors empty after make_edge");
478     CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge reversed");
479     inq.try_put(1);
480     g.wait_for_all();
481     CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 1), "limiter_node didn't pass first value");
482     CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge reversed");
483     inq.try_put(2);
484     g.wait_for_all();
485     CHECK_MESSAGE( (!outq.try_get(out_int)), "limiter_node incorrectly passed second input");
486     CHECK_MESSAGE( (!ln.my_predecessors.empty()), "input edge to limiter_node not reversed");
487     bn.try_put(DecrementerHelper<dtype>::makeDType());
488     g.wait_for_all();
489     CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 2), "limiter_node didn't pass second value");
490     g.wait_for_all();
491     CHECK_MESSAGE( (!ln.my_predecessors.empty()), "input edge was reversed(after try_get())");
492     g.reset();
493     CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge not reset");
494     inq.try_put(3);
495     g.wait_for_all();
496     CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 3), "limiter_node didn't pass third value");
497 
498     INFO(" rf_clear_edges");
499     // currently the limiter_node will not pass another message
500     g.reset(tbb::flow::rf_clear_edges);
501     DecrementerHelper<dtype>::check(ln.decrementer());
502     CHECK_MESSAGE( (ln.my_threshold == 1), "error in my_threshold");
503     CHECK_MESSAGE( (ln.my_predecessors.empty()), "preds not reset(rf_clear_edges)");
504     CHECK_MESSAGE( (ln.my_successors.empty()), "preds not reset(rf_clear_edges)");
505     CHECK_MESSAGE( (inq.my_successors.empty()), "Arc not removed on reset(rf_clear_edges)");
506     CHECK_MESSAGE( (inq.my_successors.empty()), "Arc not removed on reset(rf_clear_edges)");
507     CHECK_MESSAGE( (bn.my_successors.empty()), "control edge not removed on reset(rf_clear_edges)");
508     tbb::flow::make_edge(inq,ln);
509     tbb::flow::make_edge(ln,outq);
510     inq.try_put(4);
511     inq.try_put(5);
512     g.wait_for_all();
513     CHECK_MESSAGE( (outq.try_get(out_int)),"missing output after reset(rf_clear_edges)");
514     CHECK_MESSAGE( (out_int == 4), "input incorrect (4)");
515     bn.try_put(DecrementerHelper<dtype>::makeDType());
516     g.wait_for_all();
517     CHECK_MESSAGE( (!outq.try_get(out_int)),"second output incorrectly passed (rf_clear_edges)");
518     INFO(" done\n");
519 }
520 
521 template<typename MF_TYPE>
522 struct mf_body {
523     std::atomic<int>& my_flag;
524     mf_body(std::atomic<int>& flag) : my_flag(flag) { }
525     void operator()(const int& in, typename MF_TYPE::output_ports_type& outports) {
526         if(my_flag == 0) {
527             my_flag = 1;
528 
529             utils::SpinWaitWhileEq(my_flag, 1);
530         }
531 
532         if (in & 0x1)
533             std::get<1>(outports).try_put(in);
534         else
535             std::get<0>(outports).try_put(in);
536     }
537 };
538 
539 template<typename P, typename T>
540 struct test_reversal;
541 template<typename T>
542 struct test_reversal<tbb::flow::queueing, T> {
543     test_reversal() { INFO("<queueing>"); }
544     // queueing node will not reverse.
545     bool operator()(T& node) const { return node.my_predecessors.empty(); }
546 };
547 
548 template<typename T>
549 struct test_reversal<tbb::flow::rejecting, T> {
550     test_reversal() { INFO("<rejecting>"); }
551     bool operator()(T& node) const { return !node.my_predecessors.empty(); }
552 };
553 
554 template<typename P>
555 void TestMultifunctionNode() {
556     typedef tbb::flow::multifunction_node<int, std::tuple<int, int>, P> multinode_type;
557     INFO("Testing multifunction_node");
558     test_reversal<P,multinode_type> my_test;
559     INFO(":");
560     tbb::flow::graph g;
561     multinode_type mf(g, tbb::flow::serial, mf_body<multinode_type>(serial_fn_state0));
562     tbb::flow::queue_node<int> qin(g);
563     tbb::flow::queue_node<int> qodd_out(g);
564     tbb::flow::queue_node<int> qeven_out(g);
565     tbb::flow::make_edge(qin,mf);
566     tbb::flow::make_edge(tbb::flow::output_port<0>(mf), qeven_out);
567     tbb::flow::make_edge(tbb::flow::output_port<1>(mf), qodd_out);
568     g.wait_for_all();
569     for (int ii = 0; ii < 2 ; ++ii) {
570         serial_fn_state0 = 0;
571         /* if(ii == 0) REMARK(" reset preds"); else REMARK(" 2nd");*/
572         std::thread t([&] {
573             g.reset(); // attach to the current arena
574             qin.try_put(0);
575             qin.try_put(1);
576             g.wait_for_all();
577         });
578         // wait for node to be active
579         utils::SpinWaitWhileEq(serial_fn_state0, 0);
580         utils::SpinWaitWhile( [&] { return !my_test(mf); });
581         g.my_context->cancel_group_execution();
582         // release node
583         serial_fn_state0 = 2;
584         t.join();
585         CHECK_MESSAGE( (my_test(mf)), "fail cancel group test");
586         if( ii == 1) {
587             INFO(" rf_clear_edges");
588             g.reset(tbb::flow::rf_clear_edges);
589             CHECK_MESSAGE( (tbb::flow::output_port<0>(mf).my_successors.empty()), "output_port<0> not reset (rf_clear_edges)");
590             CHECK_MESSAGE( (tbb::flow::output_port<1>(mf).my_successors.empty()), "output_port<1> not reset (rf_clear_edges)");
591         }
592         else
593         {
594             g.reset();
595         }
596         CHECK_MESSAGE( (mf.my_predecessors.empty()), "edge didn't reset");
597         CHECK_MESSAGE( ((ii == 0 && !qin.my_successors.empty()) || (ii == 1 && qin.my_successors.empty())), "edge didn't reset");
598     }
599     INFO(" done\n");
600 }
601 
602 // indexer_node is like a broadcast_node, in that none of its inputs reverse, and it
603 // never allows a successor to reverse its edge, so we only need test the successors.
604 void
605 TestIndexerNode() {
606     tbb::flow::graph g;
607     typedef tbb::flow::indexer_node< int, int > indexernode_type;
608     indexernode_type inode(g);
609     INFO("Testing indexer_node:");
610     tbb::flow::queue_node<indexernode_type::output_type> qout(g);
611     tbb::flow::make_edge(inode,qout);
612     g.wait_for_all();
613     CHECK_MESSAGE( (!inode.my_successors.empty()), "successor of indexer_node missing");
614     g.reset();
615     CHECK_MESSAGE( (!inode.my_successors.empty()), "successor of indexer_node missing after reset");
616     g.reset(tbb::flow::rf_clear_edges);
617     CHECK_MESSAGE( (inode.my_successors.empty()), "successor of indexer_node not removed by reset(rf_clear_edges)");
618     INFO(" done\n");
619 }
620 
621 template<typename Node>
622 void
623 TestScalarNode(const char *name) {
624     tbb::flow::graph g;
625     Node on(g);
626     tbb::flow::queue_node<int> qout(g);
627     INFO("Testing " << name << ":");
628     tbb::flow::make_edge(on,qout);
629     g.wait_for_all();
630     CHECK_MESSAGE( (!on.my_successors.empty()), "edge not added");
631     g.reset();
632     CHECK_MESSAGE( (!on.my_successors.empty()), "edge improperly removed");
633     g.reset(tbb::flow::rf_clear_edges);
634     CHECK_MESSAGE( (on.my_successors.empty()), "edge not removed by reset(rf_clear_edges)");
635     INFO(" done\n");
636 }
637 
638 struct seq_body {
639     size_t operator()(const int &in) {
640         return size_t(in / 3);
641     }
642 };
643 
644 // sequencer_node behaves like a queueing node, but requires a different constructor.
645 void TestSequencerNode() {
646     tbb::flow::graph g;
647     tbb::flow::sequencer_node<int> bnode(g, seq_body());
648     INFO("Testing sequencer_node:");
649     tbb::flow::function_node<int> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
650     INFO("Testing sequencer_node:");
651     serial_fn_state0 = 0;  // reset to waiting state.
652     INFO(" make_edge");
653     tbb::flow::make_edge(bnode, fnode);
654     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after make_edge" );
655     INFO(" try_put");
656     std::thread t([&]{
657         bnode.try_put(0);  // will forward to the fnode
658         g.wait_for_all();
659     });
660     // wait for the function_node to fire up
661     utils::SpinWaitWhileEq(serial_fn_state0, 0);
662     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after forwarding message" );
663     serial_fn_state0 = 0;       // release the function node
664     t.join();
665 
666     INFO(" remove_edge");
667     tbb::flow::remove_edge(bnode, fnode);
668     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after remove_edge");
669     tbb::flow::join_node<std::tuple<int,int>,tbb::flow::reserving> jnode(g);
670     tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode));  // will spawn a task
671     g.wait_for_all();
672     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after attaching to join");
673     INFO(" reverse");
674     bnode.try_put(3);  // the edge should reverse
675     g.wait_for_all();
676     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving");
677     INFO(" reset()");
678     g.wait_for_all();
679     g.reset();  // should be in forward direction again
680     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after reset()");
681     INFO(" remove_edge");
682     g.reset(tbb::flow::rf_clear_edges);  // should be in forward direction again
683     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)");
684     CHECK_MESSAGE( (fnode.my_predecessors.empty()), "buffering node reversed after reset(rf_clear_edges)");
685     INFO("  done\n");
686     g.wait_for_all();
687 }
688 
689 struct snode_body {
690     int max_cnt;
691     int my_cnt;
692     snode_body(const int& in) : max_cnt(in) { my_cnt = 0; }
693     int operator()(tbb::flow_control& fc) {
694         if (max_cnt <= my_cnt++) {
695             fc.stop();
696             return int();
697         }
698         return my_cnt;
699     }
700 };
701 
702 void TestInputNode() {
703     tbb::flow::graph g;
704     tbb::flow::input_node<int> in(g, snode_body(4));
705     INFO("Testing input_node:");
706     tbb::flow::queue_node<int> qin(g);
707     tbb::flow::join_node<std::tuple<int,int>, tbb::flow::reserving> jn(g);
708     tbb::flow::queue_node<std::tuple<int,int> > qout(g);
709 
710     INFO(" make_edges");
711     tbb::flow::make_edge(in, tbb::flow::input_port<0>(jn));
712     tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn));
713     tbb::flow::make_edge(jn,qout);
714     CHECK_MESSAGE( (!in.my_successors.empty()), "input node has no successor after make_edge");
715     g.wait_for_all();
716     g.reset();
717     CHECK_MESSAGE( (!in.my_successors.empty()), "input node has no successor after reset");
718     g.wait_for_all();
719     g.reset(tbb::flow::rf_clear_edges);
720     CHECK_MESSAGE( (in.my_successors.empty()), "input node has successor after reset(rf_clear_edges)");
721     tbb::flow::make_edge(in, tbb::flow::input_port<0>(jn));
722     tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn));
723     tbb::flow::make_edge(jn,qout);
724     g.wait_for_all();
725     INFO(" activate");
726     in.activate();  // will forward to the fnode
727     INFO(" wait1");
728     g.wait_for_all();
729     CHECK_MESSAGE( (in.my_successors.empty()), "input node has no successor after forwarding message");
730     g.reset();
731     CHECK_MESSAGE( (!in.my_successors.empty()), "input_node has no successors after reset");
732     CHECK_MESSAGE( (tbb::flow::input_port<0>(jn).my_predecessors.empty()), "successor of input_node has pred after reset.");
733     INFO(" done\n");
734 }
735 
736 //! Test buffering nodes
737 //! \brief \ref error_guessing
738 TEST_CASE("Test buffering nodes"){
739     unsigned int MinThread = utils::MinThread;
740     if(MinThread < 3) MinThread = 3;
741     tbb::task_arena arena(MinThread);
742 	arena.execute(
743         [&]() {
744             // tests presume at least three threads
745             TestBufferingNode< tbb::flow::buffer_node<int> >("buffer_node");
746             TestBufferingNode< tbb::flow::priority_queue_node<int> >("priority_queue_node");
747             TestBufferingNode< tbb::flow::queue_node<int> >("queue_node");
748         }
749 	);
750 }
751 
752 //! Test sequencer_node
753 //! \brief \ref error_guessing
754 TEST_CASE("Test sequencer node"){
755     TestSequencerNode();
756 }
757 
758 TEST_SUITE("Test multifunction node") {
759     //! Test multifunction_node with rejecting policy
760     //! \brief \ref error_guessing
761     TEST_CASE("with rejecting policy"){
762         TestMultifunctionNode<tbb::flow::rejecting>();
763     }
764 
765     //! Test multifunction_node with queueing policy
766     //! \brief \ref error_guessing
767     TEST_CASE("with queueing policy") {
768         TestMultifunctionNode<tbb::flow::queueing>();
769     }
770 }
771 
772 //! Test input_node
773 //! \brief \ref error_guessing
774 TEST_CASE("Test input node"){
775     TestInputNode();
776 }
777 
778 //! Test continue_node
779 //! \brief \ref error_guessing
780 TEST_CASE("Test continue node"){
781     TestContinueNode();
782 }
783 
784 //! Test function_node
785 //! \brief \ref error_guessing
786 TEST_CASE("Test function node" * doctest::may_fail()){
787     TestFunctionNode();
788 }
789 
790 //! Test join_node
791 //! \brief \ref error_guessing
792 TEST_CASE("Test join node"){
793     TestJoinNode();
794 }
795 
796 //! Test limiter_node
797 //! \brief \ref error_guessing
798 TEST_CASE("Test limiter node"){
799     TestLimiterNode<void>();
800     TestLimiterNode<int>();
801     TestLimiterNode<tbb::flow::continue_msg>();
802 }
803 
804 //! Test indexer_node
805 //! \brief \ref error_guessing
806 TEST_CASE("Test indexer node"){
807     TestIndexerNode();
808 }
809 
810 //! Test split_node
811 //! \brief \ref error_guessing
812 TEST_CASE("Test split node"){
813     TestSplitNode();
814 }
815 
816 //! Test broadcast, overwrite, write_once nodes
817 //! \brief \ref error_guessing
818 TEST_CASE("Test scalar node"){
819     TestScalarNode<tbb::flow::broadcast_node<int> >("broadcast_node");
820     TestScalarNode<tbb::flow::overwrite_node<int> >("overwrite_node");
821     TestScalarNode<tbb::flow::write_once_node<int> >("write_once_node");
822 }
823 
824 //! try_get in inactive graph
825 //! \brief \ref error_guessing
826 TEST_CASE("try_get in inactive graph"){
827     tbb::flow::graph g;
828 
829     tbb::flow::input_node<int> src(g, [&](tbb::flow_control& fc) { fc.stop(); return 0;});
830     deactivate_graph(g);
831 
832     int tmp = -1;
833     CHECK_MESSAGE((src.try_get(tmp) == false), "try_get can not succeed");
834 
835     src.activate();
836     tmp = -1;
837     CHECK_MESSAGE((src.try_get(tmp) == false), "try_get can not succeed");
838 }
839 
840 //! Test make_edge in inactive graph
841 //! \brief \ref error_guessing
842 TEST_CASE("Test make_edge in inactive graph"){
843     tbb::flow::graph g;
844 
845     tbb::flow::continue_node<int> c(g, [](const tbb::flow::continue_msg&){ return 1; });
846 
847     tbb::flow::function_node<int, int> f(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
848 
849     c.try_put(tbb::flow::continue_msg());
850     g.wait_for_all();
851 
852     deactivate_graph(g);
853 
854     make_edge(c, f);
855 }
856 
857 //! Test make_edge from overwrite_node in inactive graph
858 //! \brief \ref error_guessing
859 TEST_CASE("Test make_edge from overwrite_node in inactive graph"){
860     tbb::flow::graph g;
861 
862     tbb::flow::queue_node<int> q(g);
863 
864     tbb::flow::overwrite_node<int> on(g);
865 
866     on.try_put(1);
867     g.wait_for_all();
868 
869     deactivate_graph(g);
870 
871     make_edge(on, q);
872 
873     int tmp = -1;
874     CHECK_MESSAGE((q.try_get(tmp) == false), "Message should not be passed on");
875 }
876 
877 //! Test iterators directly
878 //! \brief \ref error_guessing
879 TEST_CASE("graph_iterator details"){
880     tbb::flow::graph g;
881     const tbb::flow::graph cg;
882 
883     tbb::flow::graph::iterator b = g.begin();
884     tbb::flow::graph::iterator b2 = g.begin();
885     ++b2;
886     // Cast to a volatile pointer to workaround self assignment warnings from some compilers.
887     tbb::flow::graph::iterator* volatile b2_ptr = &b2;
888     b2 = *b2_ptr;
889     b = b2;
890     CHECK_MESSAGE((b == b2), "Assignment should make iterators equal");
891 }
892 
893 //! const graph
894 //! \brief \ref error_guessing
895 TEST_CASE("const graph"){
896     using namespace tbb::flow;
897 
898     const graph g;
899     CHECK_MESSAGE((g.cbegin() == g.cend()), "Starting graph is empty");
900     CHECK_MESSAGE((g.begin() == g.end()), "Starting graph is empty");
901 
902     graph g2;
903     CHECK_MESSAGE((g2.begin() == g2.end()), "Starting graph is empty");
904 }
905 
906 //! Send message to continue_node while graph is inactive
907 //! \brief \ref error_guessing
908 TEST_CASE("Send message to continue_node while graph is inactive") {
909     using namespace tbb::flow;
910 
911     graph g;
912 
913     continue_node<int> c(g, [](const continue_msg&){ return 1; });
914     buffer_node<int> b(g);
915 
916     make_edge(c, b);
917 
918     deactivate_graph(g);
919 
920     c.try_put(continue_msg());
921     g.wait_for_all();
922 
923     int tmp = -1;
924     CHECK_MESSAGE((b.try_get(tmp) == false), "Message should not arrive");
925     CHECK_MESSAGE((tmp == -1), "Value should not be altered");
926 }
927 
928 
929 //! Bypass of a successor's message in a node with lightweight policy
930 //! \brief \ref error_guessing
931 TEST_CASE("Bypass of a successor's message in a node with lightweight policy") {
932     using namespace tbb::flow;
933 
934     graph g;
935 
936     auto body = [](const int&v)->int { return v * 2; };
937     function_node<int, int, lightweight> f1(g, unlimited, body);
938 
939     auto body2 = [](const int&v)->int {return v / 2;};
940     function_node<int, int> f2(g, unlimited, body2);
941 
942     buffer_node<int> b(g);
943 
944     make_edge(f1, f2);
945     make_edge(f2, b);
946 
947     f1.try_put(1);
948     g.wait_for_all();
949 
950     int tmp = -1;
951     CHECK_MESSAGE((b.try_get(tmp) == true), "Functional nodes can work in succession");
952     CHECK_MESSAGE((tmp == 1), "Value should not be altered");
953 }
954 
955