1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/config.h"
18 
19 #if _MSC_VER
20     #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning
21     #if _MSC_VER==1700 && !defined(__INTEL_COMPILER)
22         // Suppress "unreachable code" warning by VC++ 17.0 (VS 2012)
23         #pragma warning (disable: 4702)
24     #endif
25 #endif
26 
27 // need these to get proper external names for private methods in library.
28 #include "tbb/spin_mutex.h"
29 #include "tbb/spin_rw_mutex.h"
30 #include "tbb/task_arena.h"
31 #include "tbb/task_group.h"
32 
33 #define private public
34 #define protected public
35 #include "tbb/flow_graph.h"
36 #undef protected
37 #undef private
38 
39 #include "common/test.h"
40 #include "common/utils.h"
41 #include "common/spin_barrier.h"
42 #include "common/graph_utils.h"
43 
44 #include <string> // merely prevents LNK2001 error to happen (on ICL+VC9 configurations)
45 
46 //! \file test_flow_graph_whitebox.cpp
47 //! \brief Test for [flow_graph.broadcast_node flow_graph.priority_queue_node flow_graph.indexer_node flow_graph.sequencer_node flow_graph.remove_edge flow_graph.join_node flow_graph.split_node flow_graph.limiter_node flow_graph.write_once_node flow_graph.overwrite_node flow_graph.make_edge flow_graph.graph flow_graph.buffer_node flow_graph.function_node flow_graph.multifunction_node flow_graph.continue_node flow_graph.input_node] specification
48 
49 template<typename T>
50 struct receiverBody {
51     tbb::flow::continue_msg operator()(const T &/*in*/) {
52         return tbb::flow::continue_msg();
53     }
54 };
55 
56 // split_nodes cannot have predecessors
57 // they do not reject messages and always forward.
58 // they reject edge reversals from successors.
59 void TestSplitNode() {
60     typedef tbb::flow::split_node<std::tuple<int> > snode_type;
61     tbb::flow::graph g;
62     snode_type snode(g);
63     tbb::flow::function_node<int> rcvr(g,tbb::flow::unlimited, receiverBody<int>());
64     INFO("Testing split_node\n");
65     CHECK_MESSAGE( (tbb::flow::output_port<0>(snode).my_successors.empty()), "Constructed split_node has successors");
66     // tbb::flow::output_port<0>(snode)
67     tbb::flow::make_edge(tbb::flow::output_port<0>(snode), rcvr);
68     CHECK_MESSAGE( (!(tbb::flow::output_port<0>(snode).my_successors.empty())), "after make_edge, split_node has no successor.");
69     snode.try_put(std::tuple<int>(1));
70     g.wait_for_all();
71     g.reset();
72     CHECK_MESSAGE( (!(tbb::flow::output_port<0>(snode).my_successors.empty())), "after reset(), split_node has no successor.");
73     g.reset(tbb::flow::rf_clear_edges);
74     CHECK_MESSAGE( (tbb::flow::output_port<0>(snode).my_successors.empty()), "after reset(rf_clear_edges), split_node has a successor.");
75 }
76 
77 // buffering nodes cannot have predecessors
78 // they do not reject messages and always save or forward
79 // they allow edge reversals from successors
80 template< typename B >
81 void TestBufferingNode(const char * name) {
82     tbb::flow::graph g;
83     B bnode(g);
84     tbb::flow::function_node<int,int,tbb::flow::rejecting> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
85     INFO("Testing " << name << ":");
86     for(int icnt = 0; icnt < 2; icnt++) {
87         bool reverse_edge = (icnt & 0x2) != 0;
88         serial_fn_state0 = 0;  // reset to waiting state.
89         INFO(" make_edge");
90         tbb::flow::make_edge(bnode, fnode);
91         CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after make_edge");
92         std::thread t([&] {
93             INFO(" try_put");
94             bnode.try_put(1);  // will forward to the fnode
95             g.wait_for_all();
96         });
97         utils::SpinWaitWhileEq(serial_fn_state0, 0);
98         if(reverse_edge) {
99             INFO(" try_put2");
100             bnode.try_put(2);  // should reverse the edge
101             // waiting for the edge to reverse
102             utils::SpinWaitWhile([&] { return !bnode.my_successors.empty(); });
103         }
104         else {
105             CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after forwarding message");
106         }
107         serial_fn_state0 = 0;  // release the function_node.
108         if(reverse_edge) {
109             // have to do a second release because the function_node will get the 2nd item
110             utils::SpinWaitWhileEq(serial_fn_state0, 0);
111             serial_fn_state0 = 0;  // release the function_node.
112         }
113         t.join();
114         INFO(" remove_edge");
115         tbb::flow::remove_edge(bnode, fnode);
116         CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after remove_edge");
117     }
118     tbb::flow::join_node<std::tuple<int,int>,tbb::flow::reserving> jnode(g);
119     tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode));  // will spawn a task
120     g.wait_for_all();
121     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after attaching to join");
122     INFO(" reverse");
123     bnode.try_put(1);  // the edge should reverse
124     g.wait_for_all();
125     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving");
126     INFO(" reset()");
127     g.wait_for_all();
128     g.reset();  // should be in forward direction again
129     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after reset()");
130     INFO(" remove_edge");
131     g.reset(tbb::flow::rf_clear_edges);
132     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)");
133     tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode));  // add edge again
134     // reverse edge by adding to buffer.
135     bnode.try_put(1);  // the edge should reverse
136     g.wait_for_all();
137     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving");
138     INFO(" remove_edge(reversed)");
139     g.reset(tbb::flow::rf_clear_edges);
140     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has no successor after reset()");
141     CHECK_MESSAGE( (tbb::flow::input_port<0>(jnode).my_predecessors.empty()), "predecessor not reset");
142     INFO("  done\n");
143     g.wait_for_all();
144 }
145 
146 // continue_node has only predecessor count
147 // they do not have predecessors, only the counts
148 // successor edges cannot be reversed
149 void TestContinueNode() {
150     tbb::flow::graph g;
151     tbb::flow::function_node<int> fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
152     tbb::flow::continue_node<int> cnode(g, /*number_of_predecessors*/ 1,
153                                         serial_continue_body<int>(serial_continue_state0));
154     tbb::flow::function_node<int> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state1));
155     tbb::flow::make_edge(fnode0, cnode);
156     tbb::flow::make_edge(cnode, fnode1);
157     INFO("Testing continue_node:");
158     for( int icnt = 0; icnt < 2; ++icnt ) {
159         INFO( " initial" << icnt);
160         CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor addition didn't increment count");
161         CHECK_MESSAGE( (!cnode.successors().empty()), "successors empty though we added one");
162         CHECK_MESSAGE( (cnode.my_current_count == 0), "state of continue_receiver incorrect");
163         serial_continue_state0 = 0;
164         serial_fn_state0 = 0;
165         serial_fn_state1 = 0;
166 
167         std::thread t([&] {
168             fnode0.try_put(1);  // start the first function node.
169             if(icnt == 0) {  // first time through, let the continue_node fire
170                 INFO(" firing");
171                 fnode0.try_put(1);  // second message
172                 g.wait_for_all();
173 
174                 // try a try_get()
175                 {
176                     int i;
177                     CHECK_MESSAGE( (!cnode.try_get(i)), "try_get not rejected");
178                 }
179 
180                 INFO(" reset");
181                 CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (before reset)");
182                 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (before reset)");
183                 g.reset();  // should still be the same
184                 CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (after reset)" );
185                 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (after reset)");
186             }
187             else {  // we're going to see if the rf_clear_edges resets things.
188                 g.wait_for_all();
189                 INFO(" reset(rf_clear_edges)");
190                 CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (before reset)" );
191                 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (before reset)" );
192                 g.reset(tbb::flow::rf_clear_edges);  // should be in forward direction again
193                 CHECK_MESSAGE( (cnode.my_current_count == 0), "state of continue_receiver incorrect after reset(rf_clear_edges)" );
194                 CHECK_MESSAGE( (cnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)" );
195                 CHECK_MESSAGE( (cnode.my_predecessor_count == cnode.my_initial_predecessor_count), "predecessor count not reset" );
196             }
197         });
198 
199         utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting for the first message to arrive in function_node
200         // Now the body of function_node 0 is executing.
201         serial_fn_state0 = 0;  // release the node
202         if (icnt == 0) {
203             // wait for node to count the message (or for the node body to execute, which would be wrong)
204             utils::SpinWaitWhile([&] {
205                 tbb::spin_mutex::scoped_lock l(cnode.my_mutex);
206                 return serial_continue_state0 == 0 && cnode.my_current_count == 0;
207             });
208             CHECK_MESSAGE( (serial_continue_state0 == 0), "Improperly released continue_node");
209             CHECK_MESSAGE( (cnode.my_current_count == 1), "state of continue_receiver incorrect");
210 
211             utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting for the second message to arrive in function_node
212             // Now the body of function_node 0 is executing.
213             serial_fn_state0 = 0;  // release the node
214 
215             utils::SpinWaitWhileEq(serial_continue_state0, 0); // waiting for continue_node to start
216             CHECK_MESSAGE( (cnode.my_current_count == 0), " my_current_count not reset before body of continue_node started");
217             serial_continue_state0 = 0;  // release the continue_node
218 
219             utils::SpinWaitWhileEq(serial_fn_state1, 0); // wait for the successor function_node to enter body
220             serial_fn_state1 = 0;  // release successor function_node.
221         }
222 
223         t.join();
224     }
225 
226     INFO(" done\n");
227 
228 }
229 
230 // function_node has predecessors and successors
231 // try_get() rejects
232 // successor edges cannot be reversed
233 // predecessors will reverse (only rejecting will reverse)
234 void TestFunctionNode() {
235     tbb::flow::graph g;
236     tbb::flow::queue_node<int> qnode0(g);
237     tbb::flow::function_node<int,int,   tbb::flow::rejecting > fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
238     tbb::flow::function_node<int,int/*, tbb::flow::queueing*/> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
239 
240     tbb::flow::queue_node<int> qnode1(g);
241 
242     tbb::flow::make_edge(fnode0, qnode1);
243     tbb::flow::make_edge(qnode0, fnode0);
244 
245     serial_fn_state0 = 2;  // just let it go
246     qnode0.try_put(1);
247     g.wait_for_all();
248     int ii;
249     CHECK_MESSAGE( (qnode1.try_get(ii) && ii == 1), "output not passed" );
250     tbb::flow::remove_edge(qnode0, fnode0);
251     tbb::flow::remove_edge(fnode0, qnode1);
252 
253     tbb::flow::make_edge(fnode1, qnode1);
254     tbb::flow::make_edge(qnode0, fnode1);
255 
256     serial_fn_state0 = 2;  // just let it go
257     qnode0.try_put(1);
258     g.wait_for_all();
259     CHECK_MESSAGE( (qnode1.try_get(ii) && ii == 1), "output not passed" );
260     tbb::flow::remove_edge(qnode0, fnode1);
261     tbb::flow::remove_edge(fnode1, qnode1);
262 
263     // rejecting
264     serial_fn_state0 = 0;
265     std::atomic<bool> rejected{ false };
266     std::thread t([&] {
267         g.reset(); // attach to the current arena
268         tbb::flow::make_edge(fnode0, qnode1);
269         tbb::flow::make_edge(qnode0, fnode0); // TODO: invesigate why it always creates a forwarding task
270         INFO("Testing rejecting function_node:");
271         CHECK_MESSAGE( (!fnode0.my_queue), "node should have no queue");
272         CHECK_MESSAGE( (!fnode0.my_successors.empty()), "successor edge not added");
273         qnode0.try_put(1);
274         qnode0.try_put(2);   // rejecting node should reject, reverse.
275         rejected = true;
276         g.wait_for_all();
277     });
278     utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting rejecting node to start
279     utils::SpinWaitWhileEq(rejected, false);
280     CHECK(fnode0.my_predecessors.empty() == false);
281     serial_fn_state0 = 2;   // release function_node body.
282     t.join();
283     INFO(" reset");
284     g.reset();  // should reverse the edge from the input to the function node.
285     CHECK_MESSAGE( (!qnode0.my_successors.empty()), "empty successors after reset()");
286     CHECK_MESSAGE( (fnode0.my_predecessors.empty()), "predecessor not reversed");
287     tbb::flow::remove_edge(qnode0, fnode0);
288     tbb::flow::remove_edge(fnode0, qnode1);
289     INFO("\n");
290 
291     // queueing
292     tbb::flow::make_edge(fnode1, qnode1);
293     INFO("Testing queueing function_node:");
294     CHECK_MESSAGE( (fnode1.my_queue), "node should have no queue");
295     CHECK_MESSAGE( (!fnode1.my_successors.empty()), "successor edge not added");
296     INFO(" add_pred");
297     CHECK_MESSAGE( (fnode1.register_predecessor(qnode0)), "Cannot register as predecessor");
298     CHECK_MESSAGE( (!fnode1.my_predecessors.empty()), "Missing predecessor");
299     INFO(" reset");
300     g.wait_for_all();
301     g.reset();  // should reverse the edge from the input to the function node.
302     CHECK_MESSAGE( (!qnode0.my_successors.empty()), "empty successors after reset()");
303     CHECK_MESSAGE( (fnode1.my_predecessors.empty()), "predecessor not reversed");
304     tbb::flow::remove_edge(qnode0, fnode1);
305     tbb::flow::remove_edge(fnode1, qnode1);
306     INFO("\n");
307 
308     serial_fn_state0 = 0;  // make the function_node wait
309     rejected = false;
310     std::thread t2([&] {
311         g.reset(); // attach to the current arena
312 
313         tbb::flow::make_edge(qnode0, fnode0); // TODO: invesigate why it always creates a forwarding task
314 
315         INFO(" start_func");
316         qnode0.try_put(1);
317         // now if we put an item to the queues the edges to the function_node will reverse.
318         INFO(" put_node(2)");
319         qnode0.try_put(2);   // start queue node.
320         rejected = true;
321         g.wait_for_all();
322     });
323     utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting rejecting node to start
324     // wait for the edges to reverse
325     utils::SpinWaitWhileEq(rejected, false);
326     CHECK(fnode0.my_predecessors.empty() == false);
327     g.my_context->cancel_group_execution();
328     // release the function_node
329     serial_fn_state0 = 2;
330     t2.join();
331     CHECK_MESSAGE( (!fnode0.my_predecessors.empty() && qnode0.my_successors.empty()), "function_node edge not reversed");
332     g.reset(tbb::flow::rf_clear_edges);
333     CHECK_MESSAGE( (fnode0.my_predecessors.empty() && qnode0.my_successors.empty()), "function_node edge not removed");
334     CHECK_MESSAGE( (fnode0.my_successors.empty()), "successor to fnode not removed");
335     INFO(" done\n");
336 }
337 
338 template<typename TT>
339 class tag_func {
340     TT my_mult;
341 public:
342     tag_func(TT multiplier) : my_mult(multiplier) { }
343     // operator() will return [0 .. Count)
344     tbb::flow::tag_value operator()( TT v) {
345         tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult);
346         return t;
347     }
348 };
349 
350 template<typename JNODE_TYPE>
351 void
352 TestSimpleSuccessorArc(const char *name) {
353     tbb::flow::graph g;
354     {
355         INFO("Join<" << name << "> successor test ");
356         tbb::flow::join_node<std::tuple<int>, JNODE_TYPE> qj(g);
357         tbb::flow::broadcast_node<std::tuple<int> > bnode(g);
358         tbb::flow::make_edge(qj, bnode);
359         CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after linking");
360         g.reset();
361         CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after reset()");
362         g.reset(tbb::flow::rf_clear_edges);
363         CHECK_MESSAGE( (qj.my_successors.empty()), "successors not removed after reset(rf_clear_edges)");
364     }
365 }
366 
367 template<>
368 void
369 TestSimpleSuccessorArc<tbb::flow::tag_matching>(const char *name) {
370     tbb::flow::graph g;
371     {
372         INFO("Join<" << name << "> successor test ");
373         typedef std::tuple<int,int> my_tuple;
374         tbb::flow::join_node<my_tuple, tbb::flow::tag_matching> qj(g,
375                                                                    tag_func<int>(1),
376                                                                    tag_func<int>(1)
377         );
378         tbb::flow::broadcast_node<my_tuple > bnode(g);
379         tbb::flow::make_edge(qj, bnode);
380         CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after linking");
381         g.reset();
382         CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after reset()");
383         g.reset(tbb::flow::rf_clear_edges);
384         CHECK_MESSAGE( (qj.my_successors.empty()), "successors not removed after reset(rf_clear_edges)");
385     }
386 }
387 
388 void
389 TestJoinNode() {
390     tbb::flow::graph g;
391 
392     TestSimpleSuccessorArc<tbb::flow::queueing>("queueing");
393     TestSimpleSuccessorArc<tbb::flow::reserving>("reserving");
394     TestSimpleSuccessorArc<tbb::flow::tag_matching>("tag_matching");
395 
396     // queueing and tagging join nodes have input queues, so the input ports do not reverse.
397     INFO(" reserving preds");
398     {
399         tbb::flow::join_node<std::tuple<int,int>, tbb::flow::reserving> rj(g);
400         tbb::flow::queue_node<int> q0(g);
401         tbb::flow::queue_node<int> q1(g);
402         tbb::flow::make_edge(q0,tbb::flow::input_port<0>(rj));
403         tbb::flow::make_edge(q1,tbb::flow::input_port<1>(rj));
404         q0.try_put(1);
405         g.wait_for_all();  // quiesce
406         CHECK_MESSAGE( (!(tbb::flow::input_port<0>(rj).my_predecessors.empty())),"reversed port missing predecessor");
407         CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"non-reversed port has pred");
408         g.reset();
409         CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"reversed port has pred after reset()");
410         CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()");
411         q1.try_put(2);
412         g.wait_for_all();  // quiesce
413         CHECK_MESSAGE( (!(tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port missing predecessor");
414         CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred");
415         g.reset();
416         CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port has pred after reset()");
417         CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()");
418         // should reset predecessors just as regular reset.
419         q1.try_put(3);
420         g.wait_for_all();  // quiesce
421         CHECK_MESSAGE( (!(tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port missing predecessor");
422         CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred");
423         g.reset(tbb::flow::rf_clear_edges);
424         CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port has pred after reset()");
425         CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()");
426         CHECK_MESSAGE( (q0.my_successors.empty()), "edge not removed by reset(rf_clear_edges)");
427         CHECK_MESSAGE( (q1.my_successors.empty()), "edge not removed by reset(rf_clear_edges)");
428     }
429     INFO(" done\n");
430 }
431 
432 template <typename DecrementerType>
433 struct limiter_node_type {
434     using type = tbb::flow::limiter_node<int, DecrementerType>;
435     using dtype = DecrementerType;
436 };
437 
438 template <>
439 struct limiter_node_type<void> {
440     using type = tbb::flow::limiter_node<int>;
441     using dtype = tbb::flow::continue_msg;
442 };
443 
444 template <typename DType>
445 struct DecrementerHelper {
446     template <typename Decrementer>
447     static void check(Decrementer&) {}
448     static DType makeDType() {
449         return DType(1);
450     }
451 };
452 
453 template <>
454 struct DecrementerHelper<tbb::flow::continue_msg> {
455     template <typename Decrementer>
456     static void check(Decrementer& decrementer) {
457         auto& d = static_cast<tbb::detail::d1::continue_receiver&>(decrementer);
458         CHECK_MESSAGE(d.my_predecessor_count == 0, "error in pred count");
459         CHECK_MESSAGE(d.my_initial_predecessor_count == 0, "error in initial pred count");
460         CHECK_MESSAGE(d.my_current_count == 0, "error in current count");
461     }
462     static tbb::flow::continue_msg makeDType() {
463         return tbb::flow::continue_msg();
464     }
465 };
466 
467 template <typename DecrementerType>
468 void TestLimiterNode() {
469     int out_int{};
470     tbb::flow::graph g;
471     using dtype = typename limiter_node_type<DecrementerType>::dtype;
472     typename limiter_node_type<DecrementerType>::type ln(g,1);
473     INFO("Testing limiter_node: preds and succs");
474     DecrementerHelper<dtype>::check(ln.decrementer());
475     CHECK_MESSAGE( (ln.my_threshold == 1), "error in my_threshold");
476     tbb::flow::queue_node<int> inq(g);
477     tbb::flow::queue_node<int> outq(g);
478     tbb::flow::broadcast_node<dtype> bn(g);
479 
480     tbb::flow::make_edge(inq,ln);
481     tbb::flow::make_edge(ln,outq);
482     tbb::flow::make_edge(bn,ln.decrementer());
483 
484     g.wait_for_all();
485     CHECK_MESSAGE( (!(ln.my_successors.empty())),"successors empty after make_edge");
486     CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge reversed");
487     inq.try_put(1);
488     g.wait_for_all();
489     CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 1), "limiter_node didn't pass first value");
490     CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge reversed");
491     inq.try_put(2);
492     g.wait_for_all();
493     CHECK_MESSAGE( (!outq.try_get(out_int)), "limiter_node incorrectly passed second input");
494     CHECK_MESSAGE( (!ln.my_predecessors.empty()), "input edge to limiter_node not reversed");
495     bn.try_put(DecrementerHelper<dtype>::makeDType());
496     g.wait_for_all();
497     CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 2), "limiter_node didn't pass second value");
498     g.wait_for_all();
499     CHECK_MESSAGE( (!ln.my_predecessors.empty()), "input edge was reversed(after try_get())");
500     g.reset();
501     CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge not reset");
502     inq.try_put(3);
503     g.wait_for_all();
504     CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 3), "limiter_node didn't pass third value");
505 
506     INFO(" rf_clear_edges");
507     // currently the limiter_node will not pass another message
508     g.reset(tbb::flow::rf_clear_edges);
509     DecrementerHelper<dtype>::check(ln.decrementer());
510     CHECK_MESSAGE( (ln.my_threshold == 1), "error in my_threshold");
511     CHECK_MESSAGE( (ln.my_predecessors.empty()), "preds not reset(rf_clear_edges)");
512     CHECK_MESSAGE( (ln.my_successors.empty()), "preds not reset(rf_clear_edges)");
513     CHECK_MESSAGE( (inq.my_successors.empty()), "Arc not removed on reset(rf_clear_edges)");
514     CHECK_MESSAGE( (inq.my_successors.empty()), "Arc not removed on reset(rf_clear_edges)");
515     CHECK_MESSAGE( (bn.my_successors.empty()), "control edge not removed on reset(rf_clear_edges)");
516     tbb::flow::make_edge(inq,ln);
517     tbb::flow::make_edge(ln,outq);
518     inq.try_put(4);
519     inq.try_put(5);
520     g.wait_for_all();
521     CHECK_MESSAGE( (outq.try_get(out_int)),"missing output after reset(rf_clear_edges)");
522     CHECK_MESSAGE( (out_int == 4), "input incorrect (4)");
523     bn.try_put(DecrementerHelper<dtype>::makeDType());
524     g.wait_for_all();
525     CHECK_MESSAGE( (!outq.try_get(out_int)),"second output incorrectly passed (rf_clear_edges)");
526     INFO(" done\n");
527 }
528 
529 template<typename MF_TYPE>
530 struct mf_body {
531     std::atomic<int>& my_flag;
532     mf_body(std::atomic<int>& flag) : my_flag(flag) { }
533     void operator()(const int& in, typename MF_TYPE::output_ports_type& outports) {
534         if(my_flag == 0) {
535             my_flag = 1;
536 
537             utils::SpinWaitWhileEq(my_flag, 1);
538         }
539 
540         if (in & 0x1)
541             std::get<1>(outports).try_put(in);
542         else
543             std::get<0>(outports).try_put(in);
544     }
545 };
546 
547 template<typename P, typename T>
548 struct test_reversal;
549 template<typename T>
550 struct test_reversal<tbb::flow::queueing, T> {
551     test_reversal() { INFO("<queueing>"); }
552     // queueing node will not reverse.
553     bool operator()(T& node) const { return node.my_predecessors.empty(); }
554 };
555 
556 template<typename T>
557 struct test_reversal<tbb::flow::rejecting, T> {
558     test_reversal() { INFO("<rejecting>"); }
559     bool operator()(T& node) const { return !node.my_predecessors.empty(); }
560 };
561 
562 template<typename P>
563 void TestMultifunctionNode() {
564     typedef tbb::flow::multifunction_node<int, std::tuple<int, int>, P> multinode_type;
565     INFO("Testing multifunction_node");
566     test_reversal<P,multinode_type> my_test;
567     INFO(":");
568     tbb::flow::graph g;
569     multinode_type mf(g, tbb::flow::serial, mf_body<multinode_type>(serial_fn_state0));
570     tbb::flow::queue_node<int> qin(g);
571     tbb::flow::queue_node<int> qodd_out(g);
572     tbb::flow::queue_node<int> qeven_out(g);
573     tbb::flow::make_edge(qin,mf);
574     tbb::flow::make_edge(tbb::flow::output_port<0>(mf), qeven_out);
575     tbb::flow::make_edge(tbb::flow::output_port<1>(mf), qodd_out);
576     g.wait_for_all();
577     for (int ii = 0; ii < 2 ; ++ii) {
578         serial_fn_state0 = 0;
579         /* if(ii == 0) REMARK(" reset preds"); else REMARK(" 2nd");*/
580         std::thread t([&] {
581             g.reset(); // attach to the current arena
582             qin.try_put(0);
583             qin.try_put(1);
584             g.wait_for_all();
585         });
586         // wait for node to be active
587         utils::SpinWaitWhileEq(serial_fn_state0, 0);
588         utils::SpinWaitWhile( [&] { return !my_test(mf); });
589         g.my_context->cancel_group_execution();
590         // release node
591         serial_fn_state0 = 2;
592         t.join();
593         CHECK_MESSAGE( (my_test(mf)), "fail cancel group test");
594         if( ii == 1) {
595             INFO(" rf_clear_edges");
596             g.reset(tbb::flow::rf_clear_edges);
597             CHECK_MESSAGE( (tbb::flow::output_port<0>(mf).my_successors.empty()), "output_port<0> not reset (rf_clear_edges)");
598             CHECK_MESSAGE( (tbb::flow::output_port<1>(mf).my_successors.empty()), "output_port<1> not reset (rf_clear_edges)");
599         }
600         else
601         {
602             g.reset();
603         }
604         CHECK_MESSAGE( (mf.my_predecessors.empty()), "edge didn't reset");
605         CHECK_MESSAGE( ((ii == 0 && !qin.my_successors.empty()) || (ii == 1 && qin.my_successors.empty())), "edge didn't reset");
606     }
607     INFO(" done\n");
608 }
609 
610 // indexer_node is like a broadcast_node, in that none of its inputs reverse, and it
611 // never allows a successor to reverse its edge, so we only need test the successors.
612 void
613 TestIndexerNode() {
614     tbb::flow::graph g;
615     typedef tbb::flow::indexer_node< int, int > indexernode_type;
616     indexernode_type inode(g);
617     INFO("Testing indexer_node:");
618     tbb::flow::queue_node<indexernode_type::output_type> qout(g);
619     tbb::flow::make_edge(inode,qout);
620     g.wait_for_all();
621     CHECK_MESSAGE( (!inode.my_successors.empty()), "successor of indexer_node missing");
622     g.reset();
623     CHECK_MESSAGE( (!inode.my_successors.empty()), "successor of indexer_node missing after reset");
624     g.reset(tbb::flow::rf_clear_edges);
625     CHECK_MESSAGE( (inode.my_successors.empty()), "successor of indexer_node not removed by reset(rf_clear_edges)");
626     INFO(" done\n");
627 }
628 
629 template<typename Node>
630 void
631 TestScalarNode(const char *name) {
632     tbb::flow::graph g;
633     Node on(g);
634     tbb::flow::queue_node<int> qout(g);
635     INFO("Testing " << name << ":");
636     tbb::flow::make_edge(on,qout);
637     g.wait_for_all();
638     CHECK_MESSAGE( (!on.my_successors.empty()), "edge not added");
639     g.reset();
640     CHECK_MESSAGE( (!on.my_successors.empty()), "edge improperly removed");
641     g.reset(tbb::flow::rf_clear_edges);
642     CHECK_MESSAGE( (on.my_successors.empty()), "edge not removed by reset(rf_clear_edges)");
643     INFO(" done\n");
644 }
645 
646 struct seq_body {
647     size_t operator()(const int &in) {
648         return size_t(in / 3);
649     }
650 };
651 
652 // sequencer_node behaves like a queueing node, but requires a different constructor.
653 void TestSequencerNode() {
654     tbb::flow::graph g;
655     tbb::flow::sequencer_node<int> bnode(g, seq_body());
656     INFO("Testing sequencer_node:");
657     tbb::flow::function_node<int> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
658     INFO("Testing sequencer_node:");
659     serial_fn_state0 = 0;  // reset to waiting state.
660     INFO(" make_edge");
661     tbb::flow::make_edge(bnode, fnode);
662     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after make_edge" );
663     INFO(" try_put");
664     std::thread t([&]{
665         bnode.try_put(0);  // will forward to the fnode
666         g.wait_for_all();
667     });
668     // wait for the function_node to fire up
669     utils::SpinWaitWhileEq(serial_fn_state0, 0);
670     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after forwarding message" );
671     serial_fn_state0 = 0;       // release the function node
672     t.join();
673 
674     INFO(" remove_edge");
675     tbb::flow::remove_edge(bnode, fnode);
676     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after remove_edge");
677     tbb::flow::join_node<std::tuple<int,int>,tbb::flow::reserving> jnode(g);
678     tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode));  // will spawn a task
679     g.wait_for_all();
680     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after attaching to join");
681     INFO(" reverse");
682     bnode.try_put(3);  // the edge should reverse
683     g.wait_for_all();
684     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving");
685     INFO(" reset()");
686     g.wait_for_all();
687     g.reset();  // should be in forward direction again
688     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after reset()");
689     INFO(" remove_edge");
690     g.reset(tbb::flow::rf_clear_edges);  // should be in forward direction again
691     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)");
692     CHECK_MESSAGE( (fnode.my_predecessors.empty()), "buffering node reversed after reset(rf_clear_edges)");
693     INFO("  done\n");
694     g.wait_for_all();
695 }
696 
697 struct snode_body {
698     int max_cnt;
699     int my_cnt;
700     snode_body(const int& in) : max_cnt(in) { my_cnt = 0; }
701     int operator()(tbb::flow_control& fc) {
702         if (max_cnt <= my_cnt++) {
703             fc.stop();
704             return int();
705         }
706         return my_cnt;
707     }
708 };
709 
710 void TestInputNode() {
711     tbb::flow::graph g;
712     tbb::flow::input_node<int> in(g, snode_body(4));
713     INFO("Testing input_node:");
714     tbb::flow::queue_node<int> qin(g);
715     tbb::flow::join_node<std::tuple<int,int>, tbb::flow::reserving> jn(g);
716     tbb::flow::queue_node<std::tuple<int,int> > qout(g);
717 
718     INFO(" make_edges");
719     tbb::flow::make_edge(in, tbb::flow::input_port<0>(jn));
720     tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn));
721     tbb::flow::make_edge(jn,qout);
722     CHECK_MESSAGE( (!in.my_successors.empty()), "input node has no successor after make_edge");
723     g.wait_for_all();
724     g.reset();
725     CHECK_MESSAGE( (!in.my_successors.empty()), "input node has no successor after reset");
726     g.wait_for_all();
727     g.reset(tbb::flow::rf_clear_edges);
728     CHECK_MESSAGE( (in.my_successors.empty()), "input node has successor after reset(rf_clear_edges)");
729     tbb::flow::make_edge(in, tbb::flow::input_port<0>(jn));
730     tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn));
731     tbb::flow::make_edge(jn,qout);
732     g.wait_for_all();
733     INFO(" activate");
734     in.activate();  // will forward to the fnode
735     INFO(" wait1");
736     g.wait_for_all();
737     CHECK_MESSAGE( (in.my_successors.empty()), "input node has no successor after forwarding message");
738     g.reset();
739     CHECK_MESSAGE( (!in.my_successors.empty()), "input_node has no successors after reset");
740     CHECK_MESSAGE( (tbb::flow::input_port<0>(jn).my_predecessors.empty()), "successor of input_node has pred after reset.");
741     INFO(" done\n");
742 }
743 
744 //! Test buffering nodes
745 //! \brief \ref error_guessing
746 TEST_CASE("Test buffering nodes"){
747     unsigned int MinThread = utils::MinThread;
748     if(MinThread < 3) MinThread = 3;
749     tbb::task_arena arena(MinThread);
750 	arena.execute(
751         [&]() {
752             // tests presume at least three threads
753             TestBufferingNode< tbb::flow::buffer_node<int> >("buffer_node");
754             TestBufferingNode< tbb::flow::priority_queue_node<int> >("priority_queue_node");
755             TestBufferingNode< tbb::flow::queue_node<int> >("queue_node");
756         }
757 	);
758 }
759 
760 //! Test sequencer_node
761 //! \brief \ref error_guessing
762 TEST_CASE("Test sequencer node"){
763     TestSequencerNode();
764 }
765 
766 TEST_SUITE("Test multifunction node") {
767     //! Test multifunction_node with rejecting policy
768     //! \brief \ref error_guessing
769     TEST_CASE("with rejecting policy"){
770         TestMultifunctionNode<tbb::flow::rejecting>();
771     }
772 
773     //! Test multifunction_node with queueing policy
774     //! \brief \ref error_guessing
775     TEST_CASE("with queueing policy") {
776         TestMultifunctionNode<tbb::flow::queueing>();
777     }
778 }
779 
780 //! Test input_node
781 //! \brief \ref error_guessing
782 TEST_CASE("Test input node"){
783     TestInputNode();
784 }
785 
786 //! Test continue_node
787 //! \brief \ref error_guessing
788 TEST_CASE("Test continue node"){
789     TestContinueNode();
790 }
791 
792 //! Test function_node
793 //! \brief \ref error_guessing
794 TEST_CASE("Test function node" * doctest::may_fail()){
795     TestFunctionNode();
796 }
797 
798 //! Test join_node
799 //! \brief \ref error_guessing
800 TEST_CASE("Test join node"){
801     TestJoinNode();
802 }
803 
804 //! Test limiter_node
805 //! \brief \ref error_guessing
806 TEST_CASE("Test limiter node"){
807     TestLimiterNode<void>();
808     TestLimiterNode<int>();
809     TestLimiterNode<tbb::flow::continue_msg>();
810 }
811 
812 //! Test indexer_node
813 //! \brief \ref error_guessing
814 TEST_CASE("Test indexer node"){
815     TestIndexerNode();
816 }
817 
818 //! Test split_node
819 //! \brief \ref error_guessing
820 TEST_CASE("Test split node"){
821     TestSplitNode();
822 }
823 
824 //! Test broadcast, overwrite, write_once nodes
825 //! \brief \ref error_guessing
826 TEST_CASE("Test scalar node"){
827     TestScalarNode<tbb::flow::broadcast_node<int> >("broadcast_node");
828     TestScalarNode<tbb::flow::overwrite_node<int> >("overwrite_node");
829     TestScalarNode<tbb::flow::write_once_node<int> >("write_once_node");
830 }
831 
832 //! try_get in inactive graph
833 //! \brief \ref error_guessing
834 TEST_CASE("try_get in inactive graph"){
835     tbb::flow::graph g;
836 
837     tbb::flow::input_node<int> src(g, [&](tbb::flow_control& fc) { fc.stop(); return 0;});
838     deactivate_graph(g);
839 
840     int tmp = -1;
841     CHECK_MESSAGE((src.try_get(tmp) == false), "try_get can not succeed");
842 
843     src.activate();
844     tmp = -1;
845     CHECK_MESSAGE((src.try_get(tmp) == false), "try_get can not succeed");
846 }
847 
848 //! Test make_edge in inactive graph
849 //! \brief \ref error_guessing
850 TEST_CASE("Test make_edge in inactive graph"){
851     tbb::flow::graph g;
852 
853     tbb::flow::continue_node<int> c(g, [](const tbb::flow::continue_msg&){ return 1; });
854 
855     tbb::flow::function_node<int, int> f(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
856 
857     c.try_put(tbb::flow::continue_msg());
858     g.wait_for_all();
859 
860     deactivate_graph(g);
861 
862     make_edge(c, f);
863 }
864 
865 //! Test make_edge from overwrite_node in inactive graph
866 //! \brief \ref error_guessing
867 TEST_CASE("Test make_edge from overwrite_node in inactive graph"){
868     tbb::flow::graph g;
869 
870     tbb::flow::queue_node<int> q(g);
871 
872     tbb::flow::overwrite_node<int> on(g);
873 
874     on.try_put(1);
875     g.wait_for_all();
876 
877     deactivate_graph(g);
878 
879     make_edge(on, q);
880 
881     int tmp = -1;
882     CHECK_MESSAGE((q.try_get(tmp) == false), "Message should not be passed on");
883 }
884 
885 //! Test iterators directly
886 //! \brief \ref error_guessing
887 TEST_CASE("graph_iterator details"){
888     tbb::flow::graph g;
889     const tbb::flow::graph cg;
890 
891     tbb::flow::graph::iterator b = g.begin();
892     tbb::flow::graph::iterator b2 = g.begin();
893     ++b2;
894     // Cast to a volatile pointer to workaround self assignment warnings from some compilers.
895     tbb::flow::graph::iterator* volatile b2_ptr = &b2;
896     b2 = *b2_ptr;
897     b = b2;
898     CHECK_MESSAGE((b == b2), "Assignment should make iterators equal");
899 }
900 
901 //! const graph
902 //! \brief \ref error_guessing
903 TEST_CASE("const graph"){
904     using namespace tbb::flow;
905 
906     const graph g;
907     CHECK_MESSAGE((g.cbegin() == g.cend()), "Starting graph is empty");
908     CHECK_MESSAGE((g.begin() == g.end()), "Starting graph is empty");
909 
910     graph g2;
911     CHECK_MESSAGE((g2.begin() == g2.end()), "Starting graph is empty");
912 }
913 
914 //! Send message to continue_node while graph is inactive
915 //! \brief \ref error_guessing
916 TEST_CASE("Send message to continue_node while graph is inactive") {
917     using namespace tbb::flow;
918 
919     graph g;
920 
921     continue_node<int> c(g, [](const continue_msg&){ return 1; });
922     buffer_node<int> b(g);
923 
924     make_edge(c, b);
925 
926     deactivate_graph(g);
927 
928     c.try_put(continue_msg());
929     g.wait_for_all();
930 
931     int tmp = -1;
932     CHECK_MESSAGE((b.try_get(tmp) == false), "Message should not arrive");
933     CHECK_MESSAGE((tmp == -1), "Value should not be altered");
934 }
935 
936 
937 //! Bypass of a successor's message in a node with lightweight policy
938 //! \brief \ref error_guessing
939 TEST_CASE("Bypass of a successor's message in a node with lightweight policy") {
940     using namespace tbb::flow;
941 
942     graph g;
943 
944     auto body = [](const int&v)->int { return v * 2; };
945     function_node<int, int, lightweight> f1(g, unlimited, body);
946 
947     auto body2 = [](const int&v)->int {return v / 2;};
948     function_node<int, int> f2(g, unlimited, body2);
949 
950     buffer_node<int> b(g);
951 
952     make_edge(f1, f2);
953     make_edge(f2, b);
954 
955     f1.try_put(1);
956     g.wait_for_all();
957 
958     int tmp = -1;
959     CHECK_MESSAGE((b.try_get(tmp) == true), "Functional nodes can work in succession");
960     CHECK_MESSAGE((tmp == 1), "Value should not be altered");
961 }
962