1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/config.h"
18 
19 #if _MSC_VER
20     #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning
21     #if _MSC_VER==1700 && !defined(__INTEL_COMPILER)
22         // Suppress "unreachable code" warning by VC++ 17.0 (VS 2012)
23         #pragma warning (disable: 4702)
24     #endif
25 #endif
26 
27 // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these
28 // parts in all of tests might make testing of the product, which is different from what is actually
29 // released.
30 #define __TBB_EXTRA_DEBUG 1
31 
32 // need these to get proper external names for private methods in library.
33 #include "tbb/spin_mutex.h"
34 #include "tbb/spin_rw_mutex.h"
35 #include "tbb/task_arena.h"
36 #include "tbb/task_group.h"
37 
38 #define private public
39 #define protected public
40 #include "tbb/flow_graph.h"
41 #undef protected
42 #undef private
43 
44 #include "common/test.h"
45 #include "common/utils.h"
46 #include "common/graph_utils.h"
47 
48 #include <string> // merely prevents LNK2001 error to happen (on ICL+VC9 configurations)
49 
50 
51 //! \file test_flow_graph_whitebox.cpp
52 //! \brief Test for [flow_graph.broadcast_node flow_graph.priority_queue_node flow_graph.indexer_node flow_graph.sequencer_node flow_graph.remove_edge flow_graph.join_node flow_graph.split_node flow_graph.limiter_node flow_graph.write_once_node flow_graph.overwrite_node flow_graph.make_edge flow_graph.graph flow_graph.buffer_node flow_graph.function_node flow_graph.multifunction_node flow_graph.continue_node flow_graph.input_node] specification
53 
54 template<typename T>
55 struct receiverBody {
56     tbb::flow::continue_msg operator()(const T &/*in*/) {
57         return tbb::flow::continue_msg();
58     }
59 };
60 
61 // split_nodes cannot have predecessors
62 // they do not reject messages and always forward.
63 // they reject edge reversals from successors.
64 void TestSplitNode() {
65     typedef tbb::flow::split_node<std::tuple<int> > snode_type;
66     tbb::flow::graph g;
67     snode_type snode(g);
68     tbb::flow::function_node<int> rcvr(g,tbb::flow::unlimited, receiverBody<int>());
69     INFO("Testing split_node\n");
70     CHECK_MESSAGE( (tbb::flow::output_port<0>(snode).my_successors.empty()), "Constructed split_node has successors");
71     // tbb::flow::output_port<0>(snode)
72     tbb::flow::make_edge(tbb::flow::output_port<0>(snode), rcvr);
73     CHECK_MESSAGE( (!(tbb::flow::output_port<0>(snode).my_successors.empty())), "after make_edge, split_node has no successor.");
74     snode.try_put(std::tuple<int>(1));
75     g.wait_for_all();
76     g.reset();
77     CHECK_MESSAGE( (!(tbb::flow::output_port<0>(snode).my_successors.empty())), "after reset(), split_node has no successor.");
78     g.reset(tbb::flow::rf_clear_edges);
79     CHECK_MESSAGE( (tbb::flow::output_port<0>(snode).my_successors.empty()), "after reset(rf_clear_edges), split_node has a successor.");
80 }
81 
82 // buffering nodes cannot have predecessors
83 // they do not reject messages and always save or forward
84 // they allow edge reversals from successors
85 template< typename B >
86 void TestBufferingNode(const char * name) {
87     tbb::flow::graph g;
88     B                bnode(g);
89     tbb::flow::function_node<int,int,tbb::flow::rejecting> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
90     INFO("Testing " << name << ":");
91     for(int icnt = 0; icnt < 2; icnt++) {
92         bool reverse_edge = (icnt & 0x2) != 0;
93         serial_fn_state0 = 0;  // reset to waiting state.
94         INFO(" make_edge");
95         tbb::flow::make_edge(bnode, fnode);
96         CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after make_edge");
97         INFO(" try_put");
98         bnode.try_put(1);  // will forward to the fnode
99         BACKOFF_WAIT(serial_fn_state0 == 0, "Timed out waiting for first put");
100         if(reverse_edge) {
101             INFO(" try_put2");
102             bnode.try_put(2);  // will reverse the edge
103             // cannot do a wait_for_all here; the function_node is still executing
104             BACKOFF_WAIT(!bnode.my_successors.empty(), "Timed out waiting after 2nd put");
105             // at this point the only task running is the one for the function_node.
106             CHECK_MESSAGE( (bnode.my_successors.empty()), "successor not removed");
107         }
108         else {
109             CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after forwarding message");
110         }
111         serial_fn_state0 = 0;  // release the function_node.
112         if(reverse_edge) {
113             // have to do a second release because the function_node will get the 2nd item
114             BACKOFF_WAIT( serial_fn_state0 == 0, "Timed out waiting after 2nd put");
115             serial_fn_state0 = 0;  // release the function_node.
116         }
117         g.wait_for_all();
118         INFO(" remove_edge");
119         tbb::flow::remove_edge(bnode, fnode);
120         CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after remove_edge");
121     }
122     tbb::flow::join_node<std::tuple<int,int>,tbb::flow::reserving> jnode(g);
123     tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode));  // will spawn a task
124     g.wait_for_all();
125     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after attaching to join");
126     INFO(" reverse");
127     bnode.try_put(1);  // the edge should reverse
128     g.wait_for_all();
129     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving");
130     INFO(" reset()");
131     g.wait_for_all();
132     g.reset();  // should be in forward direction again
133     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after reset()");
134     INFO(" remove_edge");
135     g.reset(tbb::flow::rf_clear_edges);
136     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)");
137     tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode));  // add edge again
138     // reverse edge by adding to buffer.
139     bnode.try_put(1);  // the edge should reverse
140     g.wait_for_all();
141     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving");
142     INFO(" remove_edge(reversed)");
143     g.reset(tbb::flow::rf_clear_edges);
144     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has no successor after reset()");
145     CHECK_MESSAGE( (tbb::flow::input_port<0>(jnode).my_predecessors.empty()), "predecessor not reset");
146     INFO("  done\n");
147     g.wait_for_all();
148 }
149 
150 // continue_node has only predecessor count
151 // they do not have predecessors, only the counts
152 // successor edges cannot be reversed
153 void TestContinueNode() {
154     tbb::flow::graph g;
155     tbb::flow::function_node<int> fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
156     tbb::flow::continue_node<int> cnode(g, 1, serial_continue_body<int>(serial_continue_state0));
157     tbb::flow::function_node<int> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state1));
158     tbb::flow::make_edge(fnode0, cnode);
159     tbb::flow::make_edge(cnode, fnode1);
160     INFO("Testing continue_node:");
161     for( int icnt = 0; icnt < 2; ++icnt ) {
162         INFO( " initial" << icnt);
163         CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor addition didn't increment count");
164         CHECK_MESSAGE( (!cnode.successors().empty()), "successors empty though we added one");
165         CHECK_MESSAGE( (cnode.my_current_count == 0), "state of continue_receiver incorrect");
166         serial_continue_state0 = 0;
167         serial_fn_state0 = 0;
168         serial_fn_state1 = 0;
169 
170         fnode0.try_put(1);  // start the first function node.
171         BACKOFF_WAIT(!serial_fn_state0, "Timed out waiting for function_node to start");
172         // Now the body of function_node 0 is executing.
173         serial_fn_state0 = 0;  // release the node
174         // wait for node to count the message (or for the node body to execute, which would be wrong)
175         BACKOFF_WAIT(serial_continue_state0 == 0 && cnode.my_current_count == 0, "Timed out waiting for continue_state0 to change");
176         CHECK_MESSAGE( (serial_continue_state0 == 0), "Improperly released continue_node");
177         CHECK_MESSAGE( (cnode.my_current_count == 1), "state of continue_receiver incorrect");
178         if(icnt == 0) {  // first time through, let the continue_node fire
179             INFO(" firing");
180             fnode0.try_put(1);  // second message
181             BACKOFF_WAIT(serial_fn_state0 == 0, "timeout waiting for continue_body to execute");
182             // Now the body of function_node 0 is executing.
183             serial_fn_state0 = 0;  // release the node
184 
185             BACKOFF_WAIT(!serial_continue_state0,"continue_node didn't start");  // now we wait for the continue_node.
186             CHECK_MESSAGE( (cnode.my_current_count == 0), " my_current_count not reset before body of continue_node started");
187             serial_continue_state0 = 0;  // release the continue_node
188             BACKOFF_WAIT(!serial_fn_state1,"successor function_node didn't start");    // wait for the successor function_node to enter body
189             serial_fn_state1 = 0;  // release successor function_node.
190             g.wait_for_all();
191 
192             // try a try_get()
193             {
194                 int i;
195                 CHECK_MESSAGE( (!cnode.try_get(i)), "try_get not rejected");
196             }
197 
198             INFO(" reset");
199             CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (before reset)");
200             CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (before reset)");
201             g.reset();  // should still be the same
202             CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (after reset)" );
203             CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (after reset)");
204         }
205         else {  // we're going to see if the rf_clear_edges resets things.
206             g.wait_for_all();
207             INFO(" reset(rf_clear_edges)");
208             CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (before reset)");
209             CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (before reset)");
210             g.reset(tbb::flow::rf_clear_edges);  // should be in forward direction again
211             CHECK_MESSAGE( (cnode.my_current_count == 0), "state of continue_receiver incorrect after reset(rf_clear_edges)");
212             CHECK_MESSAGE( (cnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)");
213             CHECK_MESSAGE( (cnode.my_predecessor_count == cnode.my_initial_predecessor_count), "predecessor count not reset");
214         }
215     }
216 
217     INFO(" done\n");
218 
219 }
220 
221 // function_node has predecessors and successors
222 // try_get() rejects
223 // successor edges cannot be reversed
224 // predecessors will reverse (only rejecting will reverse)
225 void TestFunctionNode() {
226     tbb::flow::graph g;
227     tbb::flow::queue_node<int> qnode0(g);
228     tbb::flow::function_node<int,int, tbb::flow::rejecting > fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
229     // queueing function node
230     tbb::flow::function_node<int,int> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
231 
232     tbb::flow::queue_node<int> qnode1(g);
233 
234     tbb::flow::make_edge(fnode0, qnode1);
235     tbb::flow::make_edge(qnode0, fnode0);
236 
237     serial_fn_state0 = 2;  // just let it go
238     // see if the darned thing will work....
239     qnode0.try_put(1);
240     g.wait_for_all();
241     int ii;
242     CHECK_MESSAGE( (qnode1.try_get(ii) && ii == 1), "output not passed");
243     tbb::flow::remove_edge(qnode0, fnode0);
244     tbb::flow::remove_edge(fnode0, qnode1);
245 
246     tbb::flow::make_edge(fnode1, qnode1);
247     tbb::flow::make_edge(qnode0, fnode1);
248 
249     serial_fn_state0 = 2;  // just let it go
250     // see if the darned thing will work....
251     qnode0.try_put(1);
252     g.wait_for_all();
253     CHECK_MESSAGE( (qnode1.try_get(ii) && ii == 1), "output not passed");
254     tbb::flow::remove_edge(qnode0, fnode1);
255     tbb::flow::remove_edge(fnode1, qnode1);
256 
257     // rejecting
258     serial_fn_state0 = 0;
259     tbb::flow::make_edge(fnode0, qnode1);
260     tbb::flow::make_edge(qnode0, fnode0);
261     INFO("Testing rejecting function_node:");
262     CHECK_MESSAGE( (!fnode0.my_queue), "node should have no queue");
263     CHECK_MESSAGE( (!fnode0.my_successors.empty()), "successor edge not added");
264     qnode0.try_put(1);
265     BACKOFF_WAIT(!serial_fn_state0,"rejecting function_node didn't start");
266     qnode0.try_put(2);   // rejecting node should reject, reverse.
267     BACKOFF_WAIT(fnode0.my_predecessors.empty(), "Missing predecessor ---");
268     serial_fn_state0 = 2;   // release function_node body.
269     g.wait_for_all();
270     INFO(" reset");
271     g.reset();  // should reverse the edge from the input to the function node.
272     CHECK_MESSAGE( (!qnode0.my_successors.empty()), "empty successors after reset()");
273     CHECK_MESSAGE( (fnode0.my_predecessors.empty()), "predecessor not reversed");
274     tbb::flow::remove_edge(qnode0, fnode0);
275     tbb::flow::remove_edge(fnode0, qnode1);
276     INFO("\n");
277 
278     // queueing
279     tbb::flow::make_edge(fnode1, qnode1);
280     INFO("Testing queueing function_node:");
281     CHECK_MESSAGE( (fnode1.my_queue), "node should have no queue");
282     CHECK_MESSAGE( (!fnode1.my_successors.empty()), "successor edge not added");
283     INFO(" add_pred");
284     CHECK_MESSAGE( (fnode1.register_predecessor(qnode0)), "Cannot register as predecessor");
285     CHECK_MESSAGE( (!fnode1.my_predecessors.empty()), "Missing predecessor");
286     INFO(" reset");
287     g.wait_for_all();
288     g.reset();  // should reverse the edge from the input to the function node.
289     CHECK_MESSAGE( (!qnode0.my_successors.empty()), "empty successors after reset()");
290     CHECK_MESSAGE( (fnode1.my_predecessors.empty()), "predecessor not reversed");
291     tbb::flow::remove_edge(qnode0, fnode1);
292     tbb::flow::remove_edge(fnode1, qnode1);
293     INFO("\n");
294 
295     serial_fn_state0 = 0;  // make the function_node wait
296     tbb::flow::make_edge(qnode0, fnode0);
297     INFO(" start_func");
298     qnode0.try_put(1);
299     BACKOFF_WAIT(serial_fn_state0 == 0, "Timed out waiting after 1st put");
300     // now if we put an item to the queues the edges to the function_node will reverse.
301     INFO(" put_node(2)");
302     qnode0.try_put(2);   // start queue node.
303     // wait for the edges to reverse
304     BACKOFF_WAIT(fnode0.my_predecessors.empty(), "Timed out waiting");
305     CHECK_MESSAGE( (!fnode0.my_predecessors.empty()), "function_node edge not reversed");
306     g.my_context->cancel_group_execution();
307     // release the function_node
308     serial_fn_state0 = 2;
309     g.wait_for_all();
310     CHECK_MESSAGE( (!fnode0.my_predecessors.empty() && qnode0.my_successors.empty()), "function_node edge not reversed");
311     g.reset(tbb::flow::rf_clear_edges);
312     CHECK_MESSAGE( (fnode0.my_predecessors.empty() && qnode0.my_successors.empty()), "function_node edge not removed");
313     CHECK_MESSAGE( (fnode0.my_successors.empty()), "successor to fnode not removed");
314     INFO(" done\n");
315 }
316 
317 template<typename TT>
318 class tag_func {
319     TT my_mult;
320 public:
321     tag_func(TT multiplier) : my_mult(multiplier) { }
322     // operator() will return [0 .. Count)
323     tbb::flow::tag_value operator()( TT v) {
324         tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult);
325         return t;
326     }
327 };
328 
329 template<typename JNODE_TYPE>
330 void
331 TestSimpleSuccessorArc(const char *name) {
332     tbb::flow::graph g;
333     {
334         INFO("Join<" << name << "> successor test ");
335         tbb::flow::join_node<std::tuple<int>, JNODE_TYPE> qj(g);
336         tbb::flow::broadcast_node<std::tuple<int> > bnode(g);
337         tbb::flow::make_edge(qj, bnode);
338         CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after linking");
339         g.reset();
340         CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after reset()");
341         g.reset(tbb::flow::rf_clear_edges);
342         CHECK_MESSAGE( (qj.my_successors.empty()), "successors not removed after reset(rf_clear_edges)");
343     }
344 }
345 
346 template<>
347 void
348 TestSimpleSuccessorArc<tbb::flow::tag_matching>(const char *name) {
349     tbb::flow::graph g;
350     {
351         INFO("Join<" << name << "> successor test ");
352         typedef std::tuple<int,int> my_tuple;
353         tbb::flow::join_node<my_tuple, tbb::flow::tag_matching> qj(g,
354                                                                    tag_func<int>(1),
355                                                                    tag_func<int>(1)
356         );
357         tbb::flow::broadcast_node<my_tuple > bnode(g);
358         tbb::flow::make_edge(qj, bnode);
359         CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after linking");
360         g.reset();
361         CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after reset()");
362         g.reset(tbb::flow::rf_clear_edges);
363         CHECK_MESSAGE( (qj.my_successors.empty()), "successors not removed after reset(rf_clear_edges)");
364     }
365 }
366 
367 void
368 TestJoinNode() {
369     tbb::flow::graph g;
370 
371     TestSimpleSuccessorArc<tbb::flow::queueing>("queueing");
372     TestSimpleSuccessorArc<tbb::flow::reserving>("reserving");
373     TestSimpleSuccessorArc<tbb::flow::tag_matching>("tag_matching");
374 
375     // queueing and tagging join nodes have input queues, so the input ports do not reverse.
376     INFO(" reserving preds");
377     {
378         tbb::flow::join_node<std::tuple<int,int>, tbb::flow::reserving> rj(g);
379         tbb::flow::queue_node<int> q0(g);
380         tbb::flow::queue_node<int> q1(g);
381         tbb::flow::make_edge(q0,tbb::flow::input_port<0>(rj));
382         tbb::flow::make_edge(q1,tbb::flow::input_port<1>(rj));
383         q0.try_put(1);
384         g.wait_for_all();  // quiesce
385         CHECK_MESSAGE( (!(tbb::flow::input_port<0>(rj).my_predecessors.empty())),"reversed port missing predecessor");
386         CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"non-reversed port has pred");
387         g.reset();
388         CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"reversed port has pred after reset()");
389         CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()");
390         q1.try_put(2);
391         g.wait_for_all();  // quiesce
392         CHECK_MESSAGE( (!(tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port missing predecessor");
393         CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred");
394         g.reset();
395         CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port has pred after reset()");
396         CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()");
397         // should reset predecessors just as regular reset.
398         q1.try_put(3);
399         g.wait_for_all();  // quiesce
400         CHECK_MESSAGE( (!(tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port missing predecessor");
401         CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred");
402         g.reset(tbb::flow::rf_clear_edges);
403         CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port has pred after reset()");
404         CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()");
405         CHECK_MESSAGE( (q0.my_successors.empty()), "edge not removed by reset(rf_clear_edges)");
406         CHECK_MESSAGE( (q1.my_successors.empty()), "edge not removed by reset(rf_clear_edges)");
407     }
408     INFO(" done\n");
409 }
410 
411 template <typename DecrementerType>
412 struct limiter_node_type {
413     using type = tbb::flow::limiter_node<int, DecrementerType>;
414     using dtype = DecrementerType;
415 };
416 
417 template <>
418 struct limiter_node_type<void> {
419     using type = tbb::flow::limiter_node<int>;
420     using dtype = tbb::flow::continue_msg;
421 };
422 
423 template <typename DType>
424 struct DecrementerHelper {
425     template <typename Decrementer>
426     static void check(Decrementer&) {}
427     static DType makeDType() {
428         return DType(1);
429     }
430 };
431 
432 template <>
433 struct DecrementerHelper<tbb::flow::continue_msg> {
434     template <typename Decrementer>
435     static void check(Decrementer& decrementer) {
436         auto& d = static_cast<tbb::detail::d1::continue_receiver&>(decrementer);
437         CHECK_MESSAGE(d.my_predecessor_count == 0, "error in pred count");
438         CHECK_MESSAGE(d.my_initial_predecessor_count == 0, "error in initial pred count");
439         CHECK_MESSAGE(d.my_current_count == 0, "error in current count");
440     }
441     static tbb::flow::continue_msg makeDType() {
442         return tbb::flow::continue_msg();
443     }
444 };
445 
446 template <typename DecrementerType>
447 void TestLimiterNode() {
448     int out_int{};
449     tbb::flow::graph g;
450     using dtype = typename limiter_node_type<DecrementerType>::dtype;
451     typename limiter_node_type<DecrementerType>::type ln(g,1);
452     INFO("Testing limiter_node: preds and succs");
453     DecrementerHelper<dtype>::check(ln.decrementer());
454     CHECK_MESSAGE( (ln.my_threshold == 1), "error in my_threshold");
455     tbb::flow::queue_node<int> inq(g);
456     tbb::flow::queue_node<int> outq(g);
457     tbb::flow::broadcast_node<dtype> bn(g);
458 
459     tbb::flow::make_edge(inq,ln);
460     tbb::flow::make_edge(ln,outq);
461     tbb::flow::make_edge(bn,ln.decrementer());
462 
463     g.wait_for_all();
464     CHECK_MESSAGE( (!(ln.my_successors.empty())),"successors empty after make_edge");
465     CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge reversed");
466     inq.try_put(1);
467     g.wait_for_all();
468     CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 1), "limiter_node didn't pass first value");
469     CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge reversed");
470     inq.try_put(2);
471     g.wait_for_all();
472     CHECK_MESSAGE( (!outq.try_get(out_int)), "limiter_node incorrectly passed second input");
473     CHECK_MESSAGE( (!ln.my_predecessors.empty()), "input edge to limiter_node not reversed");
474     bn.try_put(DecrementerHelper<dtype>::makeDType());
475     g.wait_for_all();
476     CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 2), "limiter_node didn't pass second value");
477     g.wait_for_all();
478     CHECK_MESSAGE( (!ln.my_predecessors.empty()), "input edge was reversed(after try_get())");
479     g.reset();
480     CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge not reset");
481     inq.try_put(3);
482     g.wait_for_all();
483     CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 3), "limiter_node didn't pass third value");
484 
485     INFO(" rf_clear_edges");
486     // currently the limiter_node will not pass another message
487     g.reset(tbb::flow::rf_clear_edges);
488     DecrementerHelper<dtype>::check(ln.decrementer());
489     CHECK_MESSAGE( (ln.my_threshold == 1), "error in my_threshold");
490     CHECK_MESSAGE( (ln.my_predecessors.empty()), "preds not reset(rf_clear_edges)");
491     CHECK_MESSAGE( (ln.my_successors.empty()), "preds not reset(rf_clear_edges)");
492     CHECK_MESSAGE( (inq.my_successors.empty()), "Arc not removed on reset(rf_clear_edges)");
493     CHECK_MESSAGE( (inq.my_successors.empty()), "Arc not removed on reset(rf_clear_edges)");
494     CHECK_MESSAGE( (bn.my_successors.empty()), "control edge not removed on reset(rf_clear_edges)");
495     tbb::flow::make_edge(inq,ln);
496     tbb::flow::make_edge(ln,outq);
497     inq.try_put(4);
498     inq.try_put(5);
499     g.wait_for_all();
500     CHECK_MESSAGE( (outq.try_get(out_int)),"missing output after reset(rf_clear_edges)");
501     CHECK_MESSAGE( (out_int == 4), "input incorrect (4)");
502     bn.try_put(DecrementerHelper<dtype>::makeDType());
503     g.wait_for_all();
504     CHECK_MESSAGE( (!outq.try_get(out_int)),"second output incorrectly passed (rf_clear_edges)");
505     INFO(" done\n");
506 }
507 
508 template<typename MF_TYPE>
509 struct mf_body {
510     std::atomic<int> *_flag;
511     mf_body( std::atomic<int> &myatomic) : _flag(&myatomic) { }
512     void operator()( const int& in, typename MF_TYPE::output_ports_type &outports) {
513         if(_flag->load(std::memory_order_acquire) == 0) {
514             _flag->store(1, std::memory_order_release);
515             BACKOFF_WAIT(_flag->load(std::memory_order_acquire) == 1, "multifunction_node not released");
516         }
517 
518         if(in & 0x1) std::get<1>(outports).try_put(in);
519         else         std::get<0>(outports).try_put(in);
520     }
521 };
522 
523 template<typename P, typename T>
524 struct test_reversal;
525 template<typename T>
526 struct test_reversal<tbb::flow::queueing, T> {
527     test_reversal() { INFO("<queueing>"); }
528     // queueing node will not reverse.
529     bool operator()( T &node) { return node.my_predecessors.empty(); }
530 };
531 
532 template<typename T>
533 struct test_reversal<tbb::flow::rejecting, T> {
534     test_reversal() { INFO("<rejecting>"); }
535     bool operator()( T &node) { return !node.my_predecessors.empty(); }
536 };
537 
538 template<typename P>
539 void
540 TestMultifunctionNode() {
541     typedef tbb::flow::multifunction_node<int, std::tuple<int, int>, P> multinode_type;
542     INFO("Testing multifunction_node");
543     test_reversal<P,multinode_type> my_test;
544     INFO(":");
545     tbb::flow::graph g;
546     multinode_type mf(g, tbb::flow::serial, mf_body<multinode_type>(serial_fn_state0));
547     tbb::flow::queue_node<int> qin(g);
548     tbb::flow::queue_node<int> qodd_out(g);
549     tbb::flow::queue_node<int> qeven_out(g);
550     tbb::flow::make_edge(qin,mf);
551     tbb::flow::make_edge(tbb::flow::output_port<0>(mf), qeven_out);
552     tbb::flow::make_edge(tbb::flow::output_port<1>(mf), qodd_out);
553     g.wait_for_all();
554     for( int ii = 0; ii < 2 ; ++ii) {
555         serial_fn_state0 = 0;
556         /* if(ii == 0) REMARK(" reset preds"); else REMARK(" 2nd");*/
557         qin.try_put(0);
558         // wait for node to be active
559         BACKOFF_WAIT(serial_fn_state0 == 0, "timed out waiting for first put");
560         qin.try_put(1);
561         BACKOFF_WAIT((!my_test(mf)), "Timed out waiting");
562         CHECK_MESSAGE( (my_test(mf)), "fail second put test");
563         g.my_context->cancel_group_execution();
564         // release node
565         serial_fn_state0 = 2;
566         g.wait_for_all();
567         CHECK_MESSAGE( (my_test(mf)), "fail cancel group test");
568         if( ii == 1) {
569             INFO(" rf_clear_edges");
570             g.reset(tbb::flow::rf_clear_edges);
571             CHECK_MESSAGE( (tbb::flow::output_port<0>(mf).my_successors.empty()), "output_port<0> not reset (rf_clear_edges)");
572             CHECK_MESSAGE( (tbb::flow::output_port<1>(mf).my_successors.empty()), "output_port<1> not reset (rf_clear_edges)");
573         }
574         else
575         {
576             g.reset();
577         }
578         CHECK_MESSAGE( (mf.my_predecessors.empty()), "edge didn't reset");
579         CHECK_MESSAGE( ((ii == 0 && !qin.my_successors.empty()) || (ii == 1 && qin.my_successors.empty())), "edge didn't reset");
580     }
581     INFO(" done\n");
582 }
583 
584 // indexer_node is like a broadcast_node, in that none of its inputs reverse, and it
585 // never allows a successor to reverse its edge, so we only need test the successors.
586 void
587 TestIndexerNode() {
588     tbb::flow::graph g;
589     typedef tbb::flow::indexer_node< int, int > indexernode_type;
590     indexernode_type inode(g);
591     INFO("Testing indexer_node:");
592     tbb::flow::queue_node<indexernode_type::output_type> qout(g);
593     tbb::flow::make_edge(inode,qout);
594     g.wait_for_all();
595     CHECK_MESSAGE( (!inode.my_successors.empty()), "successor of indexer_node missing");
596     g.reset();
597     CHECK_MESSAGE( (!inode.my_successors.empty()), "successor of indexer_node missing after reset");
598     g.reset(tbb::flow::rf_clear_edges);
599     CHECK_MESSAGE( (inode.my_successors.empty()), "successor of indexer_node not removed by reset(rf_clear_edges)");
600     INFO(" done\n");
601 }
602 
603 template<typename Node>
604 void
605 TestScalarNode(const char *name) {
606     tbb::flow::graph g;
607     Node on(g);
608     tbb::flow::queue_node<int> qout(g);
609     INFO("Testing " << name << ":");
610     tbb::flow::make_edge(on,qout);
611     g.wait_for_all();
612     CHECK_MESSAGE( (!on.my_successors.empty()), "edge not added");
613     g.reset();
614     CHECK_MESSAGE( (!on.my_successors.empty()), "edge improperly removed");
615     g.reset(tbb::flow::rf_clear_edges);
616     CHECK_MESSAGE( (on.my_successors.empty()), "edge not removed by reset(rf_clear_edges)");
617     INFO(" done\n");
618 }
619 
620 struct seq_body {
621     size_t operator()(const int &in) {
622         return size_t(in / 3);
623     }
624 };
625 
626 // sequencer_node behaves like a queueing node, but requires a different constructor.
627 void
628 TestSequencerNode() {
629     tbb::flow::graph g;
630     tbb::flow::sequencer_node<int> bnode(g, seq_body());
631     INFO("Testing sequencer_node:");
632     tbb::flow::function_node<int> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
633     INFO("Testing sequencer_node:");
634     serial_fn_state0 = 0;  // reset to waiting state.
635     INFO(" make_edge");
636     tbb::flow::make_edge(bnode, fnode);
637     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after make_edge");
638     INFO(" try_put");
639     bnode.try_put(0);  // will forward to the fnode
640     BACKOFF_WAIT( serial_fn_state0 == 0, "timeout waiting for function_node");  // wait for the function_node to fire up
641     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after forwarding message");
642     serial_fn_state0 = 0;
643     g.wait_for_all();
644     INFO(" remove_edge");
645     tbb::flow::remove_edge(bnode, fnode);
646     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after remove_edge");
647     tbb::flow::join_node<std::tuple<int,int>,tbb::flow::reserving> jnode(g);
648     tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode));  // will spawn a task
649     g.wait_for_all();
650     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after attaching to join");
651     INFO(" reverse");
652     bnode.try_put(3);  // the edge should reverse
653     g.wait_for_all();
654     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving");
655     INFO(" reset()");
656     g.wait_for_all();
657     g.reset();  // should be in forward direction again
658     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after reset()");
659     INFO(" remove_edge");
660     g.reset(tbb::flow::rf_clear_edges);  // should be in forward direction again
661     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)");
662     CHECK_MESSAGE( (fnode.my_predecessors.empty()), "buffering node reversed after reset(rf_clear_edges)");
663     INFO("  done\n");
664     g.wait_for_all();
665 }
666 
667 struct snode_body {
668     int max_cnt;
669     int my_cnt;
670     snode_body( const int &in) : max_cnt(in) { my_cnt = 0; }
671     int operator()(tbb::flow_control &fc) {
672         if(max_cnt <= my_cnt++) {
673             fc.stop();
674             return int();
675         }
676         return my_cnt;
677     }
678 };
679 
680 void
681 TestInputNode() {
682     tbb::flow::graph g;
683     tbb::flow::input_node<int> in(g, snode_body(4));
684     INFO("Testing input_node:");
685     tbb::flow::queue_node<int> qin(g);
686     tbb::flow::join_node<std::tuple<int,int>, tbb::flow::reserving> jn(g);
687     tbb::flow::queue_node<std::tuple<int,int> > qout(g);
688 
689     INFO(" make_edges");
690     tbb::flow::make_edge(in, tbb::flow::input_port<0>(jn));
691     tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn));
692     tbb::flow::make_edge(jn,qout);
693     CHECK_MESSAGE( (!in.my_successors.empty()), "input node has no successor after make_edge");
694     g.wait_for_all();
695     g.reset();
696     CHECK_MESSAGE( (!in.my_successors.empty()), "input node has no successor after reset");
697     g.wait_for_all();
698     g.reset(tbb::flow::rf_clear_edges);
699     CHECK_MESSAGE( (in.my_successors.empty()), "input node has successor after reset(rf_clear_edges)");
700     tbb::flow::make_edge(in, tbb::flow::input_port<0>(jn));
701     tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn));
702     tbb::flow::make_edge(jn,qout);
703     g.wait_for_all();
704     INFO(" activate");
705     in.activate();  // will forward to the fnode
706     INFO(" wait1");
707     BACKOFF_WAIT( !in.my_successors.empty(), "Timed out waiting for edge to reverse");
708     CHECK_MESSAGE( (in.my_successors.empty()), "input node has no successor after forwarding message");
709 
710     g.wait_for_all();
711     g.reset();
712     CHECK_MESSAGE( (!in.my_successors.empty()), "input_node has no successors after reset");
713     CHECK_MESSAGE( (tbb::flow::input_port<0>(jn).my_predecessors.empty()), "successor of input_node has pred after reset.");
714     INFO(" done\n");
715 }
716 
717 //! Test buffering nodes
718 //! \brief \ref error_guessing
719 TEST_CASE("Test buffering nodes"){
720     unsigned int MinThread = utils::MinThread;
721     if(MinThread < 3) MinThread = 3;
722     tbb::task_arena arena(MinThread);
723 	arena.execute(
724         [&]() {
725             // tests presume at least three threads
726 
727             TestBufferingNode< tbb::flow::buffer_node<int> >("buffer_node");
728             TestBufferingNode< tbb::flow::priority_queue_node<int> >("priority_queue_node");
729             TestBufferingNode< tbb::flow::queue_node<int> >("queue_node");
730 
731         }
732 	);
733 }
734 
735 //! Test sequencer_node
736 //! \brief \ref error_guessing
737 TEST_CASE("Test sequencer node"){
738     TestSequencerNode();
739 }
740 
741 TEST_SUITE("Test multifunction node") {
742     //! Test multifunction_node with rejecting policy
743     //! \brief \ref error_guessing
744     TEST_CASE("with rejecting policy"){
745         TestMultifunctionNode<tbb::flow::rejecting>();
746     }
747 
748     //! Test multifunction_node with queueing policy
749     //! \brief \ref error_guessing
750     TEST_CASE("with queueing policy") {
751         TestMultifunctionNode<tbb::flow::queueing>();
752     }
753 }
754 
755 //! Test input_node
756 //! \brief \ref error_guessing
757 TEST_CASE("Test input node"){
758     TestInputNode();
759 }
760 
761 //! Test continue_node
762 //! \brief \ref error_guessing
763 TEST_CASE("Test continue node"){
764     TestContinueNode();
765 }
766 
767 //! Test function_node
768 //! \brief \ref error_guessing
769 TEST_CASE("Test function node" * doctest::may_fail()){
770     TestFunctionNode();
771 }
772 
773 //! Test join_node
774 //! \brief \ref error_guessing
775 TEST_CASE("Test join node"){
776     TestJoinNode();
777 }
778 
779 //! Test limiter_node
780 //! \brief \ref error_guessing
781 TEST_CASE("Test limiter node"){
782     TestLimiterNode<void>();
783     TestLimiterNode<int>();
784     TestLimiterNode<tbb::flow::continue_msg>();
785 }
786 
787 //! Test indexer_node
788 //! \brief \ref error_guessing
789 TEST_CASE("Test indexer node"){
790     TestIndexerNode();
791 }
792 
793 //! Test split_node
794 //! \brief \ref error_guessing
795 TEST_CASE("Test split node"){
796     TestSplitNode();
797 }
798 
799 //! Test broadcast, overwrite, write_once nodes
800 //! \brief \ref error_guessing
801 TEST_CASE("Test scalar node"){
802     TestScalarNode<tbb::flow::broadcast_node<int> >("broadcast_node");
803     TestScalarNode<tbb::flow::overwrite_node<int> >("overwrite_node");
804     TestScalarNode<tbb::flow::write_once_node<int> >("write_once_node");
805 }
806 
807 //! try_get in inactive graph
808 //! \brief \ref error_guessing
809 TEST_CASE("try_get in inactive graph"){
810     tbb::flow::graph g;
811 
812     tbb::flow::input_node<int> src(g, [&](tbb::flow_control& fc) -> bool { fc.stop(); return 0;});
813     deactivate_graph(g);
814 
815     int tmp = -1;
816     CHECK_MESSAGE((src.try_get(tmp) == false), "try_get can not succeed");
817 
818     src.activate();
819     tmp = -1;
820     CHECK_MESSAGE((src.try_get(tmp) == false), "try_get can not succeed");
821 }
822 
823 //! Test make_edge in inactive graph
824 //! \brief \ref error_guessing
825 TEST_CASE("Test make_edge in inactive graph"){
826     tbb::flow::graph g;
827 
828     tbb::flow::continue_node<int> c(g, [](const tbb::flow::continue_msg&){ return 1; });
829 
830     tbb::flow::function_node<int, int> f(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
831 
832     c.try_put(tbb::flow::continue_msg());
833     g.wait_for_all();
834 
835     deactivate_graph(g);
836 
837     make_edge(c, f);
838 }
839 
840 //! Test make_edge from overwrite_node in inactive graph
841 //! \brief \ref error_guessing
842 TEST_CASE("Test make_edge from overwrite_node in inactive graph"){
843     tbb::flow::graph g;
844 
845     tbb::flow::queue_node<int> q(g);
846 
847     tbb::flow::overwrite_node<int> on(g);
848 
849     on.try_put(1);
850     g.wait_for_all();
851 
852     deactivate_graph(g);
853 
854     make_edge(on, q);
855 
856     int tmp = -1;
857     CHECK_MESSAGE((q.try_get(tmp) == false), "Message should not be passed on");
858 }
859 
860 //! Test iterators directly
861 //! \brief \ref error_guessing
862 TEST_CASE("graph_iterator details"){
863     tbb::flow::graph g;
864     const tbb::flow::graph cg;
865 
866     tbb::flow::graph::iterator b = g.begin();
867     tbb::flow::graph::iterator b2 = g.begin();
868     ++b2;
869     // Cast to a volatile pointer to workaround self assignment warnings from some compilers.
870     tbb::flow::graph::iterator* volatile b2_ptr = &b2;
871     b2 = *b2_ptr;
872     b = b2;
873     CHECK_MESSAGE((b == b2), "Assignment should make iterators equal");
874 }
875 
876 //! const graph
877 //! \brief \ref error_guessing
878 TEST_CASE("const graph"){
879     using namespace tbb::flow;
880 
881     const graph g;
882     CHECK_MESSAGE((g.cbegin() == g.cend()), "Starting graph is empty");
883     CHECK_MESSAGE((g.begin() == g.end()), "Starting graph is empty");
884 
885     graph g2;
886     CHECK_MESSAGE((g2.begin() == g2.end()), "Starting graph is empty");
887 }
888 
889 //! Send message to continue_node while graph is inactive
890 //! \brief \ref error_guessing
891 TEST_CASE("Send message to continue_node while graph is inactive") {
892     using namespace tbb::flow;
893 
894     graph g;
895 
896     continue_node<int> c(g, [](const continue_msg&){ return 1; });
897     buffer_node<int> b(g);
898 
899     make_edge(c, b);
900 
901     deactivate_graph(g);
902 
903     c.try_put(continue_msg());
904     g.wait_for_all();
905 
906     int tmp = -1;
907     CHECK_MESSAGE((b.try_get(tmp) == false), "Message should not arrive");
908     CHECK_MESSAGE((tmp == -1), "Value should not be altered");
909 }
910 
911 
912 //! Bypass of a successor's message in a node with lightweight policy
913 //! \brief \ref error_guessing
914 TEST_CASE("Bypass of a successor's message in a node with lightweight policy") {
915     using namespace tbb::flow;
916 
917     graph g;
918 
919     auto body = [](const int&v)->int { return v * 2; };
920     function_node<int, int, lightweight> f1(g, unlimited, body);
921 
922     auto body2 = [](const int&v)->int {return v / 2;};
923     function_node<int, int> f2(g, unlimited, body2);
924 
925     buffer_node<int> b(g);
926 
927     make_edge(f1, f2);
928     make_edge(f2, b);
929 
930     f1.try_put(1);
931     g.wait_for_all();
932 
933     int tmp = -1;
934     CHECK_MESSAGE((b.try_get(tmp) == true), "Functional nodes can work in succession");
935     CHECK_MESSAGE((tmp == 1), "Value should not be altered");
936 }
937 
938