1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/config.h"
18 
19 #if _MSC_VER
20     #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning
21     #if _MSC_VER==1700 && !defined(__INTEL_COMPILER)
22         // Suppress "unreachable code" warning by VC++ 17.0 (VS 2012)
23         #pragma warning (disable: 4702)
24     #endif
25 #endif
26 
27 // need these to get proper external names for private methods in library.
28 #include "tbb/spin_mutex.h"
29 #include "tbb/spin_rw_mutex.h"
30 #include "tbb/task_arena.h"
31 #include "tbb/task_group.h"
32 
33 #define private public
34 #define protected public
35 #include "tbb/flow_graph.h"
36 #undef protected
37 #undef private
38 
39 #include "common/test.h"
40 #include "common/utils.h"
41 #include "common/spin_barrier.h"
42 #include "common/graph_utils.h"
43 
44 #include <string> // merely prevents LNK2001 error to happen (on ICL+VC9 configurations)
45 
46 //! \file test_flow_graph_whitebox.cpp
47 //! \brief Test for [flow_graph.broadcast_node flow_graph.priority_queue_node flow_graph.indexer_node flow_graph.sequencer_node flow_graph.remove_edge flow_graph.join_node flow_graph.split_node flow_graph.limiter_node flow_graph.write_once_node flow_graph.overwrite_node flow_graph.make_edge flow_graph.graph flow_graph.buffer_node flow_graph.function_node flow_graph.multifunction_node flow_graph.continue_node flow_graph.input_node] specification
48 
49 template<typename T>
50 struct receiverBody {
51     tbb::flow::continue_msg operator()(const T &/*in*/) {
52         return tbb::flow::continue_msg();
53     }
54 };
55 
56 // split_nodes cannot have predecessors
57 // they do not reject messages and always forward.
58 // they reject edge reversals from successors.
59 void TestSplitNode() {
60     typedef tbb::flow::split_node<std::tuple<int> > snode_type;
61     tbb::flow::graph g;
62     snode_type snode(g);
63     tbb::flow::function_node<int> rcvr(g,tbb::flow::unlimited, receiverBody<int>());
64     INFO("Testing split_node\n");
65     CHECK_MESSAGE( (tbb::flow::output_port<0>(snode).my_successors.empty()), "Constructed split_node has successors");
66     // tbb::flow::output_port<0>(snode)
67     tbb::flow::make_edge(tbb::flow::output_port<0>(snode), rcvr);
68     CHECK_MESSAGE( (!(tbb::flow::output_port<0>(snode).my_successors.empty())), "after make_edge, split_node has no successor.");
69     snode.try_put(std::tuple<int>(1));
70     g.wait_for_all();
71     g.reset();
72     CHECK_MESSAGE( (!(tbb::flow::output_port<0>(snode).my_successors.empty())), "after reset(), split_node has no successor.");
73     g.reset(tbb::flow::rf_clear_edges);
74     CHECK_MESSAGE( (tbb::flow::output_port<0>(snode).my_successors.empty()), "after reset(rf_clear_edges), split_node has a successor.");
75 }
76 
77 // buffering nodes cannot have predecessors
78 // they do not reject messages and always save or forward
79 // they allow edge reversals from successors
80 template< typename B >
81 void TestBufferingNode(const char * name) {
82     tbb::flow::graph g;
83     B bnode(g);
84     tbb::flow::function_node<int,int,tbb::flow::rejecting> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
85     INFO("Testing " << name << ":");
86     for(int icnt = 0; icnt < 2; icnt++) {
87         bool reverse_edge = (icnt & 0x2) != 0;
88         serial_fn_state0 = 0;  // reset to waiting state.
89         INFO(" make_edge");
90         tbb::flow::make_edge(bnode, fnode);
91         CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after make_edge");
92         std::thread t([&] {
93             INFO(" try_put");
94             bnode.try_put(1);  // will forward to the fnode
95             g.wait_for_all();
96         });
97         utils::SpinWaitWhileEq(serial_fn_state0, 0);
98         if(reverse_edge) {
99             INFO(" try_put2");
100             bnode.try_put(2);  // should reverse the edge
101             // waiting for the edge to reverse
102             utils::SpinWaitWhile([&] { return !bnode.my_successors.empty(); });
103         }
104         else {
105             CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after forwarding message");
106         }
107         serial_fn_state0 = 0;  // release the function_node.
108         if(reverse_edge) {
109             // have to do a second release because the function_node will get the 2nd item
110             utils::SpinWaitWhileEq(serial_fn_state0, 0);
111             serial_fn_state0 = 0;  // release the function_node.
112         }
113         t.join();
114         INFO(" remove_edge");
115         tbb::flow::remove_edge(bnode, fnode);
116         CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after remove_edge");
117     }
118     tbb::flow::join_node<std::tuple<int,int>,tbb::flow::reserving> jnode(g);
119     tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode));  // will spawn a task
120     g.wait_for_all();
121     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after attaching to join");
122     INFO(" reverse");
123     bnode.try_put(1);  // the edge should reverse
124     g.wait_for_all();
125     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving");
126     INFO(" reset()");
127     g.wait_for_all();
128     g.reset();  // should be in forward direction again
129     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after reset()");
130     INFO(" remove_edge");
131     g.reset(tbb::flow::rf_clear_edges);
132     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)");
133     tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode));  // add edge again
134     // reverse edge by adding to buffer.
135     bnode.try_put(1);  // the edge should reverse
136     g.wait_for_all();
137     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving");
138     INFO(" remove_edge(reversed)");
139     g.reset(tbb::flow::rf_clear_edges);
140     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has no successor after reset()");
141     CHECK_MESSAGE( (tbb::flow::input_port<0>(jnode).my_predecessors.empty()), "predecessor not reset");
142     INFO("  done\n");
143     g.wait_for_all();
144 }
145 
146 // continue_node has only predecessor count
147 // they do not have predecessors, only the counts
148 // successor edges cannot be reversed
149 void TestContinueNode() {
150     tbb::flow::graph g;
151     tbb::flow::function_node<int> fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
152     tbb::flow::continue_node<int> cnode(g, /*number_of_predecessors*/ 1,
153                                         serial_continue_body<int>(serial_continue_state0));
154     tbb::flow::function_node<int> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state1));
155     tbb::flow::make_edge(fnode0, cnode);
156     tbb::flow::make_edge(cnode, fnode1);
157     INFO("Testing continue_node:");
158     for( int icnt = 0; icnt < 2; ++icnt ) {
159         INFO( " initial" << icnt);
160         CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor addition didn't increment count");
161         CHECK_MESSAGE( (!cnode.successors().empty()), "successors empty though we added one");
162         CHECK_MESSAGE( (cnode.my_current_count == 0), "state of continue_receiver incorrect");
163         serial_continue_state0 = 0;
164         serial_fn_state0 = 0;
165         serial_fn_state1 = 0;
166 
167         std::thread t([&] {
168             fnode0.try_put(1);  // start the first function node.
169             if(icnt == 0) {  // first time through, let the continue_node fire
170                 INFO(" firing");
171                 fnode0.try_put(1);  // second message
172                 g.wait_for_all();
173 
174                 // try a try_get()
175                 {
176                     int i;
177                     CHECK_MESSAGE( (!cnode.try_get(i)), "try_get not rejected");
178                 }
179 
180                 INFO(" reset");
181                 CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (before reset)");
182                 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (before reset)");
183                 g.reset();  // should still be the same
184                 CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (after reset)" );
185                 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (after reset)");
186             }
187             else {  // we're going to see if the rf_clear_edges resets things.
188                 g.wait_for_all();
189                 INFO(" reset(rf_clear_edges)");
190                 CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (before reset)" );
191                 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (before reset)" );
192                 g.reset(tbb::flow::rf_clear_edges);  // should be in forward direction again
193                 CHECK_MESSAGE( (cnode.my_current_count == 0), "state of continue_receiver incorrect after reset(rf_clear_edges)" );
194                 CHECK_MESSAGE( (cnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)" );
195                 CHECK_MESSAGE( (cnode.my_predecessor_count == cnode.my_initial_predecessor_count), "predecessor count not reset" );
196             }
197         });
198 
199         utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting for the first message to arrive in function_node
200         // Now the body of function_node 0 is executing.
201         serial_fn_state0 = 0;  // release the node
202         if (icnt == 0) {
203             // wait for node to count the message (or for the node body to execute, which would be wrong)
204             utils::SpinWaitWhile([&] {
205                 return serial_continue_state0 == 0 && cnode.my_current_count == 0;
206             });
207             CHECK_MESSAGE( (serial_continue_state0 == 0), "Improperly released continue_node");
208             CHECK_MESSAGE( (cnode.my_current_count == 1), "state of continue_receiver incorrect");
209 
210             utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting for the second message to arrive in function_node
211             // Now the body of function_node 0 is executing.
212             serial_fn_state0 = 0;  // release the node
213 
214             utils::SpinWaitWhileEq(serial_continue_state0, 0); // waiting for continue_node to start
215             CHECK_MESSAGE( (cnode.my_current_count == 0), " my_current_count not reset before body of continue_node started");
216             serial_continue_state0 = 0;  // release the continue_node
217 
218             utils::SpinWaitWhileEq(serial_fn_state1, 0); // wait for the successor function_node to enter body
219             serial_fn_state1 = 0;  // release successor function_node.
220         }
221 
222         t.join();
223     }
224 
225     INFO(" done\n");
226 
227 }
228 
229 // function_node has predecessors and successors
230 // try_get() rejects
231 // successor edges cannot be reversed
232 // predecessors will reverse (only rejecting will reverse)
233 void TestFunctionNode() {
234     tbb::flow::graph g;
235     tbb::flow::queue_node<int> qnode0(g);
236     tbb::flow::function_node<int,int,   tbb::flow::rejecting > fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
237     tbb::flow::function_node<int,int/*, tbb::flow::queueing*/> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
238 
239     tbb::flow::queue_node<int> qnode1(g);
240 
241     tbb::flow::make_edge(fnode0, qnode1);
242     tbb::flow::make_edge(qnode0, fnode0);
243 
244     serial_fn_state0 = 2;  // just let it go
245     qnode0.try_put(1);
246     g.wait_for_all();
247     int ii;
248     CHECK_MESSAGE( (qnode1.try_get(ii) && ii == 1), "output not passed" );
249     tbb::flow::remove_edge(qnode0, fnode0);
250     tbb::flow::remove_edge(fnode0, qnode1);
251 
252     tbb::flow::make_edge(fnode1, qnode1);
253     tbb::flow::make_edge(qnode0, fnode1);
254 
255     serial_fn_state0 = 2;  // just let it go
256     qnode0.try_put(1);
257     g.wait_for_all();
258     CHECK_MESSAGE( (qnode1.try_get(ii) && ii == 1), "output not passed" );
259     tbb::flow::remove_edge(qnode0, fnode1);
260     tbb::flow::remove_edge(fnode1, qnode1);
261 
262     // rejecting
263     serial_fn_state0 = 0;
264     std::thread t([&] {
265         g.reset(); // attach to the current arena
266         tbb::flow::make_edge(fnode0, qnode1);
267         tbb::flow::make_edge(qnode0, fnode0); // TODO: invesigate why it always creates a forwarding task
268         INFO("Testing rejecting function_node:");
269         CHECK_MESSAGE( (!fnode0.my_queue), "node should have no queue");
270         CHECK_MESSAGE( (!fnode0.my_successors.empty()), "successor edge not added");
271         qnode0.try_put(1);
272         qnode0.try_put(2);   // rejecting node should reject, reverse.
273         g.wait_for_all();
274     });
275     utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting rejecting node to start
276     utils::SpinWaitWhile([&] { return fnode0.my_predecessors.empty(); });
277     serial_fn_state0 = 2;   // release function_node body.
278     t.join();
279     INFO(" reset");
280     g.reset();  // should reverse the edge from the input to the function node.
281     CHECK_MESSAGE( (!qnode0.my_successors.empty()), "empty successors after reset()");
282     CHECK_MESSAGE( (fnode0.my_predecessors.empty()), "predecessor not reversed");
283     tbb::flow::remove_edge(qnode0, fnode0);
284     tbb::flow::remove_edge(fnode0, qnode1);
285     INFO("\n");
286 
287     // queueing
288     tbb::flow::make_edge(fnode1, qnode1);
289     INFO("Testing queueing function_node:");
290     CHECK_MESSAGE( (fnode1.my_queue), "node should have no queue");
291     CHECK_MESSAGE( (!fnode1.my_successors.empty()), "successor edge not added");
292     INFO(" add_pred");
293     CHECK_MESSAGE( (fnode1.register_predecessor(qnode0)), "Cannot register as predecessor");
294     CHECK_MESSAGE( (!fnode1.my_predecessors.empty()), "Missing predecessor");
295     INFO(" reset");
296     g.wait_for_all();
297     g.reset();  // should reverse the edge from the input to the function node.
298     CHECK_MESSAGE( (!qnode0.my_successors.empty()), "empty successors after reset()");
299     CHECK_MESSAGE( (fnode1.my_predecessors.empty()), "predecessor not reversed");
300     tbb::flow::remove_edge(qnode0, fnode1);
301     tbb::flow::remove_edge(fnode1, qnode1);
302     INFO("\n");
303 
304     serial_fn_state0 = 0;  // make the function_node wait
305     std::thread t2([&] {
306         g.reset(); // attach to the current arena
307 
308         tbb::flow::make_edge(qnode0, fnode0); // TODO: invesigate why it always creates a forwarding task
309 
310         INFO(" start_func");
311         qnode0.try_put(1);
312         // now if we put an item to the queues the edges to the function_node will reverse.
313         INFO(" put_node(2)");
314         qnode0.try_put(2);   // start queue node.
315         g.wait_for_all();
316     });
317     utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting rejecting node to start
318     // wait for the edges to reverse
319     utils::SpinWaitWhile([&] { return fnode0.my_predecessors.empty(); });
320     g.my_context->cancel_group_execution();
321     // release the function_node
322     serial_fn_state0 = 2;
323     t2.join();
324     CHECK_MESSAGE( (!fnode0.my_predecessors.empty() && qnode0.my_successors.empty()), "function_node edge not reversed");
325     g.reset(tbb::flow::rf_clear_edges);
326     CHECK_MESSAGE( (fnode0.my_predecessors.empty() && qnode0.my_successors.empty()), "function_node edge not removed");
327     CHECK_MESSAGE( (fnode0.my_successors.empty()), "successor to fnode not removed");
328     INFO(" done\n");
329 }
330 
331 template<typename TT>
332 class tag_func {
333     TT my_mult;
334 public:
335     tag_func(TT multiplier) : my_mult(multiplier) { }
336     // operator() will return [0 .. Count)
337     tbb::flow::tag_value operator()( TT v) {
338         tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult);
339         return t;
340     }
341 };
342 
343 template<typename JNODE_TYPE>
344 void
345 TestSimpleSuccessorArc(const char *name) {
346     tbb::flow::graph g;
347     {
348         INFO("Join<" << name << "> successor test ");
349         tbb::flow::join_node<std::tuple<int>, JNODE_TYPE> qj(g);
350         tbb::flow::broadcast_node<std::tuple<int> > bnode(g);
351         tbb::flow::make_edge(qj, bnode);
352         CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after linking");
353         g.reset();
354         CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after reset()");
355         g.reset(tbb::flow::rf_clear_edges);
356         CHECK_MESSAGE( (qj.my_successors.empty()), "successors not removed after reset(rf_clear_edges)");
357     }
358 }
359 
360 template<>
361 void
362 TestSimpleSuccessorArc<tbb::flow::tag_matching>(const char *name) {
363     tbb::flow::graph g;
364     {
365         INFO("Join<" << name << "> successor test ");
366         typedef std::tuple<int,int> my_tuple;
367         tbb::flow::join_node<my_tuple, tbb::flow::tag_matching> qj(g,
368                                                                    tag_func<int>(1),
369                                                                    tag_func<int>(1)
370         );
371         tbb::flow::broadcast_node<my_tuple > bnode(g);
372         tbb::flow::make_edge(qj, bnode);
373         CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after linking");
374         g.reset();
375         CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after reset()");
376         g.reset(tbb::flow::rf_clear_edges);
377         CHECK_MESSAGE( (qj.my_successors.empty()), "successors not removed after reset(rf_clear_edges)");
378     }
379 }
380 
381 void
382 TestJoinNode() {
383     tbb::flow::graph g;
384 
385     TestSimpleSuccessorArc<tbb::flow::queueing>("queueing");
386     TestSimpleSuccessorArc<tbb::flow::reserving>("reserving");
387     TestSimpleSuccessorArc<tbb::flow::tag_matching>("tag_matching");
388 
389     // queueing and tagging join nodes have input queues, so the input ports do not reverse.
390     INFO(" reserving preds");
391     {
392         tbb::flow::join_node<std::tuple<int,int>, tbb::flow::reserving> rj(g);
393         tbb::flow::queue_node<int> q0(g);
394         tbb::flow::queue_node<int> q1(g);
395         tbb::flow::make_edge(q0,tbb::flow::input_port<0>(rj));
396         tbb::flow::make_edge(q1,tbb::flow::input_port<1>(rj));
397         q0.try_put(1);
398         g.wait_for_all();  // quiesce
399         CHECK_MESSAGE( (!(tbb::flow::input_port<0>(rj).my_predecessors.empty())),"reversed port missing predecessor");
400         CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"non-reversed port has pred");
401         g.reset();
402         CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"reversed port has pred after reset()");
403         CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()");
404         q1.try_put(2);
405         g.wait_for_all();  // quiesce
406         CHECK_MESSAGE( (!(tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port missing predecessor");
407         CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred");
408         g.reset();
409         CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port has pred after reset()");
410         CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()");
411         // should reset predecessors just as regular reset.
412         q1.try_put(3);
413         g.wait_for_all();  // quiesce
414         CHECK_MESSAGE( (!(tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port missing predecessor");
415         CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred");
416         g.reset(tbb::flow::rf_clear_edges);
417         CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port has pred after reset()");
418         CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()");
419         CHECK_MESSAGE( (q0.my_successors.empty()), "edge not removed by reset(rf_clear_edges)");
420         CHECK_MESSAGE( (q1.my_successors.empty()), "edge not removed by reset(rf_clear_edges)");
421     }
422     INFO(" done\n");
423 }
424 
425 template <typename DecrementerType>
426 struct limiter_node_type {
427     using type = tbb::flow::limiter_node<int, DecrementerType>;
428     using dtype = DecrementerType;
429 };
430 
431 template <>
432 struct limiter_node_type<void> {
433     using type = tbb::flow::limiter_node<int>;
434     using dtype = tbb::flow::continue_msg;
435 };
436 
437 template <typename DType>
438 struct DecrementerHelper {
439     template <typename Decrementer>
440     static void check(Decrementer&) {}
441     static DType makeDType() {
442         return DType(1);
443     }
444 };
445 
446 template <>
447 struct DecrementerHelper<tbb::flow::continue_msg> {
448     template <typename Decrementer>
449     static void check(Decrementer& decrementer) {
450         auto& d = static_cast<tbb::detail::d1::continue_receiver&>(decrementer);
451         CHECK_MESSAGE(d.my_predecessor_count == 0, "error in pred count");
452         CHECK_MESSAGE(d.my_initial_predecessor_count == 0, "error in initial pred count");
453         CHECK_MESSAGE(d.my_current_count == 0, "error in current count");
454     }
455     static tbb::flow::continue_msg makeDType() {
456         return tbb::flow::continue_msg();
457     }
458 };
459 
460 template <typename DecrementerType>
461 void TestLimiterNode() {
462     int out_int{};
463     tbb::flow::graph g;
464     using dtype = typename limiter_node_type<DecrementerType>::dtype;
465     typename limiter_node_type<DecrementerType>::type ln(g,1);
466     INFO("Testing limiter_node: preds and succs");
467     DecrementerHelper<dtype>::check(ln.decrementer());
468     CHECK_MESSAGE( (ln.my_threshold == 1), "error in my_threshold");
469     tbb::flow::queue_node<int> inq(g);
470     tbb::flow::queue_node<int> outq(g);
471     tbb::flow::broadcast_node<dtype> bn(g);
472 
473     tbb::flow::make_edge(inq,ln);
474     tbb::flow::make_edge(ln,outq);
475     tbb::flow::make_edge(bn,ln.decrementer());
476 
477     g.wait_for_all();
478     CHECK_MESSAGE( (!(ln.my_successors.empty())),"successors empty after make_edge");
479     CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge reversed");
480     inq.try_put(1);
481     g.wait_for_all();
482     CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 1), "limiter_node didn't pass first value");
483     CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge reversed");
484     inq.try_put(2);
485     g.wait_for_all();
486     CHECK_MESSAGE( (!outq.try_get(out_int)), "limiter_node incorrectly passed second input");
487     CHECK_MESSAGE( (!ln.my_predecessors.empty()), "input edge to limiter_node not reversed");
488     bn.try_put(DecrementerHelper<dtype>::makeDType());
489     g.wait_for_all();
490     CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 2), "limiter_node didn't pass second value");
491     g.wait_for_all();
492     CHECK_MESSAGE( (!ln.my_predecessors.empty()), "input edge was reversed(after try_get())");
493     g.reset();
494     CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge not reset");
495     inq.try_put(3);
496     g.wait_for_all();
497     CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 3), "limiter_node didn't pass third value");
498 
499     INFO(" rf_clear_edges");
500     // currently the limiter_node will not pass another message
501     g.reset(tbb::flow::rf_clear_edges);
502     DecrementerHelper<dtype>::check(ln.decrementer());
503     CHECK_MESSAGE( (ln.my_threshold == 1), "error in my_threshold");
504     CHECK_MESSAGE( (ln.my_predecessors.empty()), "preds not reset(rf_clear_edges)");
505     CHECK_MESSAGE( (ln.my_successors.empty()), "preds not reset(rf_clear_edges)");
506     CHECK_MESSAGE( (inq.my_successors.empty()), "Arc not removed on reset(rf_clear_edges)");
507     CHECK_MESSAGE( (inq.my_successors.empty()), "Arc not removed on reset(rf_clear_edges)");
508     CHECK_MESSAGE( (bn.my_successors.empty()), "control edge not removed on reset(rf_clear_edges)");
509     tbb::flow::make_edge(inq,ln);
510     tbb::flow::make_edge(ln,outq);
511     inq.try_put(4);
512     inq.try_put(5);
513     g.wait_for_all();
514     CHECK_MESSAGE( (outq.try_get(out_int)),"missing output after reset(rf_clear_edges)");
515     CHECK_MESSAGE( (out_int == 4), "input incorrect (4)");
516     bn.try_put(DecrementerHelper<dtype>::makeDType());
517     g.wait_for_all();
518     CHECK_MESSAGE( (!outq.try_get(out_int)),"second output incorrectly passed (rf_clear_edges)");
519     INFO(" done\n");
520 }
521 
522 template<typename MF_TYPE>
523 struct mf_body {
524     std::atomic<int>& my_flag;
525     mf_body(std::atomic<int>& flag) : my_flag(flag) { }
526     void operator()(const int& in, typename MF_TYPE::output_ports_type& outports) {
527         if(my_flag == 0) {
528             my_flag = 1;
529 
530             utils::SpinWaitWhileEq(my_flag, 1);
531         }
532 
533         if (in & 0x1)
534             std::get<1>(outports).try_put(in);
535         else
536             std::get<0>(outports).try_put(in);
537     }
538 };
539 
540 template<typename P, typename T>
541 struct test_reversal;
542 template<typename T>
543 struct test_reversal<tbb::flow::queueing, T> {
544     test_reversal() { INFO("<queueing>"); }
545     // queueing node will not reverse.
546     bool operator()(T& node) const { return node.my_predecessors.empty(); }
547 };
548 
549 template<typename T>
550 struct test_reversal<tbb::flow::rejecting, T> {
551     test_reversal() { INFO("<rejecting>"); }
552     bool operator()(T& node) const { return !node.my_predecessors.empty(); }
553 };
554 
555 template<typename P>
556 void TestMultifunctionNode() {
557     typedef tbb::flow::multifunction_node<int, std::tuple<int, int>, P> multinode_type;
558     INFO("Testing multifunction_node");
559     test_reversal<P,multinode_type> my_test;
560     INFO(":");
561     tbb::flow::graph g;
562     multinode_type mf(g, tbb::flow::serial, mf_body<multinode_type>(serial_fn_state0));
563     tbb::flow::queue_node<int> qin(g);
564     tbb::flow::queue_node<int> qodd_out(g);
565     tbb::flow::queue_node<int> qeven_out(g);
566     tbb::flow::make_edge(qin,mf);
567     tbb::flow::make_edge(tbb::flow::output_port<0>(mf), qeven_out);
568     tbb::flow::make_edge(tbb::flow::output_port<1>(mf), qodd_out);
569     g.wait_for_all();
570     for (int ii = 0; ii < 2 ; ++ii) {
571         serial_fn_state0 = 0;
572         /* if(ii == 0) REMARK(" reset preds"); else REMARK(" 2nd");*/
573         std::thread t([&] {
574             g.reset(); // attach to the current arena
575             qin.try_put(0);
576             qin.try_put(1);
577             g.wait_for_all();
578         });
579         // wait for node to be active
580         utils::SpinWaitWhileEq(serial_fn_state0, 0);
581         utils::SpinWaitWhile( [&] { return !my_test(mf); });
582         g.my_context->cancel_group_execution();
583         // release node
584         serial_fn_state0 = 2;
585         t.join();
586         CHECK_MESSAGE( (my_test(mf)), "fail cancel group test");
587         if( ii == 1) {
588             INFO(" rf_clear_edges");
589             g.reset(tbb::flow::rf_clear_edges);
590             CHECK_MESSAGE( (tbb::flow::output_port<0>(mf).my_successors.empty()), "output_port<0> not reset (rf_clear_edges)");
591             CHECK_MESSAGE( (tbb::flow::output_port<1>(mf).my_successors.empty()), "output_port<1> not reset (rf_clear_edges)");
592         }
593         else
594         {
595             g.reset();
596         }
597         CHECK_MESSAGE( (mf.my_predecessors.empty()), "edge didn't reset");
598         CHECK_MESSAGE( ((ii == 0 && !qin.my_successors.empty()) || (ii == 1 && qin.my_successors.empty())), "edge didn't reset");
599     }
600     INFO(" done\n");
601 }
602 
603 // indexer_node is like a broadcast_node, in that none of its inputs reverse, and it
604 // never allows a successor to reverse its edge, so we only need test the successors.
605 void
606 TestIndexerNode() {
607     tbb::flow::graph g;
608     typedef tbb::flow::indexer_node< int, int > indexernode_type;
609     indexernode_type inode(g);
610     INFO("Testing indexer_node:");
611     tbb::flow::queue_node<indexernode_type::output_type> qout(g);
612     tbb::flow::make_edge(inode,qout);
613     g.wait_for_all();
614     CHECK_MESSAGE( (!inode.my_successors.empty()), "successor of indexer_node missing");
615     g.reset();
616     CHECK_MESSAGE( (!inode.my_successors.empty()), "successor of indexer_node missing after reset");
617     g.reset(tbb::flow::rf_clear_edges);
618     CHECK_MESSAGE( (inode.my_successors.empty()), "successor of indexer_node not removed by reset(rf_clear_edges)");
619     INFO(" done\n");
620 }
621 
622 template<typename Node>
623 void
624 TestScalarNode(const char *name) {
625     tbb::flow::graph g;
626     Node on(g);
627     tbb::flow::queue_node<int> qout(g);
628     INFO("Testing " << name << ":");
629     tbb::flow::make_edge(on,qout);
630     g.wait_for_all();
631     CHECK_MESSAGE( (!on.my_successors.empty()), "edge not added");
632     g.reset();
633     CHECK_MESSAGE( (!on.my_successors.empty()), "edge improperly removed");
634     g.reset(tbb::flow::rf_clear_edges);
635     CHECK_MESSAGE( (on.my_successors.empty()), "edge not removed by reset(rf_clear_edges)");
636     INFO(" done\n");
637 }
638 
639 struct seq_body {
640     size_t operator()(const int &in) {
641         return size_t(in / 3);
642     }
643 };
644 
645 // sequencer_node behaves like a queueing node, but requires a different constructor.
646 void TestSequencerNode() {
647     tbb::flow::graph g;
648     tbb::flow::sequencer_node<int> bnode(g, seq_body());
649     INFO("Testing sequencer_node:");
650     tbb::flow::function_node<int> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
651     INFO("Testing sequencer_node:");
652     serial_fn_state0 = 0;  // reset to waiting state.
653     INFO(" make_edge");
654     tbb::flow::make_edge(bnode, fnode);
655     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after make_edge" );
656     INFO(" try_put");
657     std::thread t([&]{
658         bnode.try_put(0);  // will forward to the fnode
659         g.wait_for_all();
660     });
661     // wait for the function_node to fire up
662     utils::SpinWaitWhileEq(serial_fn_state0, 0);
663     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after forwarding message" );
664     serial_fn_state0 = 0;       // release the function node
665     t.join();
666 
667     INFO(" remove_edge");
668     tbb::flow::remove_edge(bnode, fnode);
669     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after remove_edge");
670     tbb::flow::join_node<std::tuple<int,int>,tbb::flow::reserving> jnode(g);
671     tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode));  // will spawn a task
672     g.wait_for_all();
673     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after attaching to join");
674     INFO(" reverse");
675     bnode.try_put(3);  // the edge should reverse
676     g.wait_for_all();
677     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving");
678     INFO(" reset()");
679     g.wait_for_all();
680     g.reset();  // should be in forward direction again
681     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after reset()");
682     INFO(" remove_edge");
683     g.reset(tbb::flow::rf_clear_edges);  // should be in forward direction again
684     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)");
685     CHECK_MESSAGE( (fnode.my_predecessors.empty()), "buffering node reversed after reset(rf_clear_edges)");
686     INFO("  done\n");
687     g.wait_for_all();
688 }
689 
690 struct snode_body {
691     int max_cnt;
692     int my_cnt;
693     snode_body(const int& in) : max_cnt(in) { my_cnt = 0; }
694     int operator()(tbb::flow_control& fc) {
695         if (max_cnt <= my_cnt++) {
696             fc.stop();
697             return int();
698         }
699         return my_cnt;
700     }
701 };
702 
703 void TestInputNode() {
704     tbb::flow::graph g;
705     tbb::flow::input_node<int> in(g, snode_body(4));
706     INFO("Testing input_node:");
707     tbb::flow::queue_node<int> qin(g);
708     tbb::flow::join_node<std::tuple<int,int>, tbb::flow::reserving> jn(g);
709     tbb::flow::queue_node<std::tuple<int,int> > qout(g);
710 
711     INFO(" make_edges");
712     tbb::flow::make_edge(in, tbb::flow::input_port<0>(jn));
713     tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn));
714     tbb::flow::make_edge(jn,qout);
715     CHECK_MESSAGE( (!in.my_successors.empty()), "input node has no successor after make_edge");
716     g.wait_for_all();
717     g.reset();
718     CHECK_MESSAGE( (!in.my_successors.empty()), "input node has no successor after reset");
719     g.wait_for_all();
720     g.reset(tbb::flow::rf_clear_edges);
721     CHECK_MESSAGE( (in.my_successors.empty()), "input node has successor after reset(rf_clear_edges)");
722     tbb::flow::make_edge(in, tbb::flow::input_port<0>(jn));
723     tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn));
724     tbb::flow::make_edge(jn,qout);
725     g.wait_for_all();
726     INFO(" activate");
727     in.activate();  // will forward to the fnode
728     INFO(" wait1");
729     g.wait_for_all();
730     CHECK_MESSAGE( (in.my_successors.empty()), "input node has no successor after forwarding message");
731     g.reset();
732     CHECK_MESSAGE( (!in.my_successors.empty()), "input_node has no successors after reset");
733     CHECK_MESSAGE( (tbb::flow::input_port<0>(jn).my_predecessors.empty()), "successor of input_node has pred after reset.");
734     INFO(" done\n");
735 }
736 
737 //! Test buffering nodes
738 //! \brief \ref error_guessing
739 TEST_CASE("Test buffering nodes"){
740     unsigned int MinThread = utils::MinThread;
741     if(MinThread < 3) MinThread = 3;
742     tbb::task_arena arena(MinThread);
743 	arena.execute(
744         [&]() {
745             // tests presume at least three threads
746             TestBufferingNode< tbb::flow::buffer_node<int> >("buffer_node");
747             TestBufferingNode< tbb::flow::priority_queue_node<int> >("priority_queue_node");
748             TestBufferingNode< tbb::flow::queue_node<int> >("queue_node");
749         }
750 	);
751 }
752 
753 //! Test sequencer_node
754 //! \brief \ref error_guessing
755 TEST_CASE("Test sequencer node"){
756     TestSequencerNode();
757 }
758 
759 TEST_SUITE("Test multifunction node") {
760     //! Test multifunction_node with rejecting policy
761     //! \brief \ref error_guessing
762     TEST_CASE("with rejecting policy"){
763         TestMultifunctionNode<tbb::flow::rejecting>();
764     }
765 
766     //! Test multifunction_node with queueing policy
767     //! \brief \ref error_guessing
768     TEST_CASE("with queueing policy") {
769         TestMultifunctionNode<tbb::flow::queueing>();
770     }
771 }
772 
773 //! Test input_node
774 //! \brief \ref error_guessing
775 TEST_CASE("Test input node"){
776     TestInputNode();
777 }
778 
779 //! Test continue_node
780 //! \brief \ref error_guessing
781 TEST_CASE("Test continue node"){
782     TestContinueNode();
783 }
784 
785 //! Test function_node
786 //! \brief \ref error_guessing
787 TEST_CASE("Test function node" * doctest::may_fail()){
788     TestFunctionNode();
789 }
790 
791 //! Test join_node
792 //! \brief \ref error_guessing
793 TEST_CASE("Test join node"){
794     TestJoinNode();
795 }
796 
797 //! Test limiter_node
798 //! \brief \ref error_guessing
799 TEST_CASE("Test limiter node"){
800     TestLimiterNode<void>();
801     TestLimiterNode<int>();
802     TestLimiterNode<tbb::flow::continue_msg>();
803 }
804 
805 //! Test indexer_node
806 //! \brief \ref error_guessing
807 TEST_CASE("Test indexer node"){
808     TestIndexerNode();
809 }
810 
811 //! Test split_node
812 //! \brief \ref error_guessing
813 TEST_CASE("Test split node"){
814     TestSplitNode();
815 }
816 
817 //! Test broadcast, overwrite, write_once nodes
818 //! \brief \ref error_guessing
819 TEST_CASE("Test scalar node"){
820     TestScalarNode<tbb::flow::broadcast_node<int> >("broadcast_node");
821     TestScalarNode<tbb::flow::overwrite_node<int> >("overwrite_node");
822     TestScalarNode<tbb::flow::write_once_node<int> >("write_once_node");
823 }
824 
825 //! try_get in inactive graph
826 //! \brief \ref error_guessing
827 TEST_CASE("try_get in inactive graph"){
828     tbb::flow::graph g;
829 
830     tbb::flow::input_node<int> src(g, [&](tbb::flow_control& fc) { fc.stop(); return 0;});
831     deactivate_graph(g);
832 
833     int tmp = -1;
834     CHECK_MESSAGE((src.try_get(tmp) == false), "try_get can not succeed");
835 
836     src.activate();
837     tmp = -1;
838     CHECK_MESSAGE((src.try_get(tmp) == false), "try_get can not succeed");
839 }
840 
841 //! Test make_edge in inactive graph
842 //! \brief \ref error_guessing
843 TEST_CASE("Test make_edge in inactive graph"){
844     tbb::flow::graph g;
845 
846     tbb::flow::continue_node<int> c(g, [](const tbb::flow::continue_msg&){ return 1; });
847 
848     tbb::flow::function_node<int, int> f(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
849 
850     c.try_put(tbb::flow::continue_msg());
851     g.wait_for_all();
852 
853     deactivate_graph(g);
854 
855     make_edge(c, f);
856 }
857 
858 //! Test make_edge from overwrite_node in inactive graph
859 //! \brief \ref error_guessing
860 TEST_CASE("Test make_edge from overwrite_node in inactive graph"){
861     tbb::flow::graph g;
862 
863     tbb::flow::queue_node<int> q(g);
864 
865     tbb::flow::overwrite_node<int> on(g);
866 
867     on.try_put(1);
868     g.wait_for_all();
869 
870     deactivate_graph(g);
871 
872     make_edge(on, q);
873 
874     int tmp = -1;
875     CHECK_MESSAGE((q.try_get(tmp) == false), "Message should not be passed on");
876 }
877 
878 //! Test iterators directly
879 //! \brief \ref error_guessing
880 TEST_CASE("graph_iterator details"){
881     tbb::flow::graph g;
882     const tbb::flow::graph cg;
883 
884     tbb::flow::graph::iterator b = g.begin();
885     tbb::flow::graph::iterator b2 = g.begin();
886     ++b2;
887     // Cast to a volatile pointer to workaround self assignment warnings from some compilers.
888     tbb::flow::graph::iterator* volatile b2_ptr = &b2;
889     b2 = *b2_ptr;
890     b = b2;
891     CHECK_MESSAGE((b == b2), "Assignment should make iterators equal");
892 }
893 
894 //! const graph
895 //! \brief \ref error_guessing
896 TEST_CASE("const graph"){
897     using namespace tbb::flow;
898 
899     const graph g;
900     CHECK_MESSAGE((g.cbegin() == g.cend()), "Starting graph is empty");
901     CHECK_MESSAGE((g.begin() == g.end()), "Starting graph is empty");
902 
903     graph g2;
904     CHECK_MESSAGE((g2.begin() == g2.end()), "Starting graph is empty");
905 }
906 
907 //! Send message to continue_node while graph is inactive
908 //! \brief \ref error_guessing
909 TEST_CASE("Send message to continue_node while graph is inactive") {
910     using namespace tbb::flow;
911 
912     graph g;
913 
914     continue_node<int> c(g, [](const continue_msg&){ return 1; });
915     buffer_node<int> b(g);
916 
917     make_edge(c, b);
918 
919     deactivate_graph(g);
920 
921     c.try_put(continue_msg());
922     g.wait_for_all();
923 
924     int tmp = -1;
925     CHECK_MESSAGE((b.try_get(tmp) == false), "Message should not arrive");
926     CHECK_MESSAGE((tmp == -1), "Value should not be altered");
927 }
928 
929 
930 //! Bypass of a successor's message in a node with lightweight policy
931 //! \brief \ref error_guessing
932 TEST_CASE("Bypass of a successor's message in a node with lightweight policy") {
933     using namespace tbb::flow;
934 
935     graph g;
936 
937     auto body = [](const int&v)->int { return v * 2; };
938     function_node<int, int, lightweight> f1(g, unlimited, body);
939 
940     auto body2 = [](const int&v)->int {return v / 2;};
941     function_node<int, int> f2(g, unlimited, body2);
942 
943     buffer_node<int> b(g);
944 
945     make_edge(f1, f2);
946     make_edge(f2, b);
947 
948     f1.try_put(1);
949     g.wait_for_all();
950 
951     int tmp = -1;
952     CHECK_MESSAGE((b.try_get(tmp) == true), "Functional nodes can work in succession");
953     CHECK_MESSAGE((tmp == 1), "Value should not be altered");
954 }
955