151c0b2f7Stbbdev /*
2b15aabb3Stbbdev     Copyright (c) 2005-2021 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
1751c0b2f7Stbbdev #include "common/config.h"
1851c0b2f7Stbbdev 
1951c0b2f7Stbbdev #if _MSC_VER
2051c0b2f7Stbbdev     #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning
2151c0b2f7Stbbdev     #if _MSC_VER==1700 && !defined(__INTEL_COMPILER)
2251c0b2f7Stbbdev         // Suppress "unreachable code" warning by VC++ 17.0 (VS 2012)
2351c0b2f7Stbbdev         #pragma warning (disable: 4702)
2451c0b2f7Stbbdev     #endif
2551c0b2f7Stbbdev #endif
2651c0b2f7Stbbdev 
2751c0b2f7Stbbdev // need these to get proper external names for private methods in library.
2851c0b2f7Stbbdev #include "tbb/spin_mutex.h"
2951c0b2f7Stbbdev #include "tbb/spin_rw_mutex.h"
3051c0b2f7Stbbdev #include "tbb/task_arena.h"
3151c0b2f7Stbbdev #include "tbb/task_group.h"
3251c0b2f7Stbbdev 
3351c0b2f7Stbbdev #define private public
3451c0b2f7Stbbdev #define protected public
3551c0b2f7Stbbdev #include "tbb/flow_graph.h"
3651c0b2f7Stbbdev #undef protected
3751c0b2f7Stbbdev #undef private
3851c0b2f7Stbbdev 
3951c0b2f7Stbbdev #include "common/test.h"
4051c0b2f7Stbbdev #include "common/utils.h"
41478de5b1Stbbdev #include "common/spin_barrier.h"
4251c0b2f7Stbbdev #include "common/graph_utils.h"
4351c0b2f7Stbbdev 
4451c0b2f7Stbbdev #include <string> // merely prevents LNK2001 error to happen (on ICL+VC9 configurations)
4551c0b2f7Stbbdev 
4651c0b2f7Stbbdev //! \file test_flow_graph_whitebox.cpp
4751c0b2f7Stbbdev //! \brief Test for [flow_graph.broadcast_node flow_graph.priority_queue_node flow_graph.indexer_node flow_graph.sequencer_node flow_graph.remove_edge flow_graph.join_node flow_graph.split_node flow_graph.limiter_node flow_graph.write_once_node flow_graph.overwrite_node flow_graph.make_edge flow_graph.graph flow_graph.buffer_node flow_graph.function_node flow_graph.multifunction_node flow_graph.continue_node flow_graph.input_node] specification
4851c0b2f7Stbbdev 
4951c0b2f7Stbbdev template<typename T>
5051c0b2f7Stbbdev struct receiverBody {
operator ()receiverBody5151c0b2f7Stbbdev     tbb::flow::continue_msg operator()(const T &/*in*/) {
5251c0b2f7Stbbdev         return tbb::flow::continue_msg();
5351c0b2f7Stbbdev     }
5451c0b2f7Stbbdev };
5551c0b2f7Stbbdev 
5651c0b2f7Stbbdev // split_nodes cannot have predecessors
5751c0b2f7Stbbdev // they do not reject messages and always forward.
5851c0b2f7Stbbdev // they reject edge reversals from successors.
TestSplitNode()5951c0b2f7Stbbdev void TestSplitNode() {
6051c0b2f7Stbbdev     typedef tbb::flow::split_node<std::tuple<int> > snode_type;
6151c0b2f7Stbbdev     tbb::flow::graph g;
6251c0b2f7Stbbdev     snode_type snode(g);
6351c0b2f7Stbbdev     tbb::flow::function_node<int> rcvr(g,tbb::flow::unlimited, receiverBody<int>());
6451c0b2f7Stbbdev     INFO("Testing split_node\n");
6551c0b2f7Stbbdev     CHECK_MESSAGE( (tbb::flow::output_port<0>(snode).my_successors.empty()), "Constructed split_node has successors");
6651c0b2f7Stbbdev     // tbb::flow::output_port<0>(snode)
6751c0b2f7Stbbdev     tbb::flow::make_edge(tbb::flow::output_port<0>(snode), rcvr);
6851c0b2f7Stbbdev     CHECK_MESSAGE( (!(tbb::flow::output_port<0>(snode).my_successors.empty())), "after make_edge, split_node has no successor.");
6951c0b2f7Stbbdev     snode.try_put(std::tuple<int>(1));
7051c0b2f7Stbbdev     g.wait_for_all();
7151c0b2f7Stbbdev     g.reset();
7251c0b2f7Stbbdev     CHECK_MESSAGE( (!(tbb::flow::output_port<0>(snode).my_successors.empty())), "after reset(), split_node has no successor.");
7351c0b2f7Stbbdev     g.reset(tbb::flow::rf_clear_edges);
7451c0b2f7Stbbdev     CHECK_MESSAGE( (tbb::flow::output_port<0>(snode).my_successors.empty()), "after reset(rf_clear_edges), split_node has a successor.");
7551c0b2f7Stbbdev }
7651c0b2f7Stbbdev 
7751c0b2f7Stbbdev // buffering nodes cannot have predecessors
7851c0b2f7Stbbdev // they do not reject messages and always save or forward
7951c0b2f7Stbbdev // they allow edge reversals from successors
8051c0b2f7Stbbdev template< typename B >
TestBufferingNode(const char * name)8151c0b2f7Stbbdev void TestBufferingNode(const char * name) {
8251c0b2f7Stbbdev     tbb::flow::graph g;
8351c0b2f7Stbbdev     B bnode(g);
8451c0b2f7Stbbdev     tbb::flow::function_node<int,int,tbb::flow::rejecting> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
8551c0b2f7Stbbdev     INFO("Testing " << name << ":");
8651c0b2f7Stbbdev     for(int icnt = 0; icnt < 2; icnt++) {
8751c0b2f7Stbbdev         bool reverse_edge = (icnt & 0x2) != 0;
8851c0b2f7Stbbdev         serial_fn_state0 = 0;  // reset to waiting state.
8951c0b2f7Stbbdev         INFO(" make_edge");
9051c0b2f7Stbbdev         tbb::flow::make_edge(bnode, fnode);
9151c0b2f7Stbbdev         CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after make_edge");
92478de5b1Stbbdev         std::thread t([&] {
9351c0b2f7Stbbdev             INFO(" try_put");
9451c0b2f7Stbbdev             bnode.try_put(1);  // will forward to the fnode
95478de5b1Stbbdev             g.wait_for_all();
96478de5b1Stbbdev         });
97478de5b1Stbbdev         utils::SpinWaitWhileEq(serial_fn_state0, 0);
9851c0b2f7Stbbdev         if(reverse_edge) {
9951c0b2f7Stbbdev             INFO(" try_put2");
100478de5b1Stbbdev             bnode.try_put(2);  // should reverse the edge
101478de5b1Stbbdev             // waiting for the edge to reverse
102478de5b1Stbbdev             utils::SpinWaitWhile([&] { return !bnode.my_successors.empty(); });
10351c0b2f7Stbbdev         }
10451c0b2f7Stbbdev         else {
10551c0b2f7Stbbdev             CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after forwarding message");
10651c0b2f7Stbbdev         }
10751c0b2f7Stbbdev         serial_fn_state0 = 0;  // release the function_node.
10851c0b2f7Stbbdev         if(reverse_edge) {
10951c0b2f7Stbbdev             // have to do a second release because the function_node will get the 2nd item
110478de5b1Stbbdev             utils::SpinWaitWhileEq(serial_fn_state0, 0);
11151c0b2f7Stbbdev             serial_fn_state0 = 0;  // release the function_node.
11251c0b2f7Stbbdev         }
113478de5b1Stbbdev         t.join();
11451c0b2f7Stbbdev         INFO(" remove_edge");
11551c0b2f7Stbbdev         tbb::flow::remove_edge(bnode, fnode);
11651c0b2f7Stbbdev         CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after remove_edge");
11751c0b2f7Stbbdev     }
11851c0b2f7Stbbdev     tbb::flow::join_node<std::tuple<int,int>,tbb::flow::reserving> jnode(g);
11951c0b2f7Stbbdev     tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode));  // will spawn a task
12051c0b2f7Stbbdev     g.wait_for_all();
12151c0b2f7Stbbdev     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after attaching to join");
12251c0b2f7Stbbdev     INFO(" reverse");
12351c0b2f7Stbbdev     bnode.try_put(1);  // the edge should reverse
12451c0b2f7Stbbdev     g.wait_for_all();
12551c0b2f7Stbbdev     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving");
12651c0b2f7Stbbdev     INFO(" reset()");
12751c0b2f7Stbbdev     g.wait_for_all();
12851c0b2f7Stbbdev     g.reset();  // should be in forward direction again
12951c0b2f7Stbbdev     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after reset()");
13051c0b2f7Stbbdev     INFO(" remove_edge");
13151c0b2f7Stbbdev     g.reset(tbb::flow::rf_clear_edges);
13251c0b2f7Stbbdev     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)");
13351c0b2f7Stbbdev     tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode));  // add edge again
13451c0b2f7Stbbdev     // reverse edge by adding to buffer.
13551c0b2f7Stbbdev     bnode.try_put(1);  // the edge should reverse
13651c0b2f7Stbbdev     g.wait_for_all();
13751c0b2f7Stbbdev     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving");
13851c0b2f7Stbbdev     INFO(" remove_edge(reversed)");
13951c0b2f7Stbbdev     g.reset(tbb::flow::rf_clear_edges);
14051c0b2f7Stbbdev     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has no successor after reset()");
14151c0b2f7Stbbdev     CHECK_MESSAGE( (tbb::flow::input_port<0>(jnode).my_predecessors.empty()), "predecessor not reset");
14251c0b2f7Stbbdev     INFO("  done\n");
14351c0b2f7Stbbdev     g.wait_for_all();
14451c0b2f7Stbbdev }
14551c0b2f7Stbbdev 
14651c0b2f7Stbbdev // continue_node has only predecessor count
14751c0b2f7Stbbdev // they do not have predecessors, only the counts
14851c0b2f7Stbbdev // successor edges cannot be reversed
TestContinueNode()14951c0b2f7Stbbdev void TestContinueNode() {
15051c0b2f7Stbbdev     tbb::flow::graph g;
15151c0b2f7Stbbdev     tbb::flow::function_node<int> fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
152478de5b1Stbbdev     tbb::flow::continue_node<int> cnode(g, /*number_of_predecessors*/ 1,
153478de5b1Stbbdev                                         serial_continue_body<int>(serial_continue_state0));
15451c0b2f7Stbbdev     tbb::flow::function_node<int> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state1));
15551c0b2f7Stbbdev     tbb::flow::make_edge(fnode0, cnode);
15651c0b2f7Stbbdev     tbb::flow::make_edge(cnode, fnode1);
15751c0b2f7Stbbdev     INFO("Testing continue_node:");
15851c0b2f7Stbbdev     for( int icnt = 0; icnt < 2; ++icnt ) {
15951c0b2f7Stbbdev         INFO( " initial" << icnt);
16051c0b2f7Stbbdev         CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor addition didn't increment count");
16151c0b2f7Stbbdev         CHECK_MESSAGE( (!cnode.successors().empty()), "successors empty though we added one");
16251c0b2f7Stbbdev         CHECK_MESSAGE( (cnode.my_current_count == 0), "state of continue_receiver incorrect");
16351c0b2f7Stbbdev         serial_continue_state0 = 0;
16451c0b2f7Stbbdev         serial_fn_state0 = 0;
16551c0b2f7Stbbdev         serial_fn_state1 = 0;
16651c0b2f7Stbbdev 
167478de5b1Stbbdev         std::thread t([&] {
16851c0b2f7Stbbdev             fnode0.try_put(1);  // start the first function node.
16951c0b2f7Stbbdev             if(icnt == 0) {  // first time through, let the continue_node fire
17051c0b2f7Stbbdev                 INFO(" firing");
17151c0b2f7Stbbdev                 fnode0.try_put(1);  // second message
17251c0b2f7Stbbdev                 g.wait_for_all();
17351c0b2f7Stbbdev 
17451c0b2f7Stbbdev                 // try a try_get()
17551c0b2f7Stbbdev                 {
17651c0b2f7Stbbdev                     int i;
17751c0b2f7Stbbdev                     CHECK_MESSAGE( (!cnode.try_get(i)), "try_get not rejected");
17851c0b2f7Stbbdev                 }
17951c0b2f7Stbbdev 
18051c0b2f7Stbbdev                 INFO(" reset");
18151c0b2f7Stbbdev                 CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (before reset)");
18251c0b2f7Stbbdev                 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (before reset)");
18351c0b2f7Stbbdev                 g.reset();  // should still be the same
18451c0b2f7Stbbdev                 CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (after reset)" );
18551c0b2f7Stbbdev                 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (after reset)");
18651c0b2f7Stbbdev             }
18751c0b2f7Stbbdev             else {  // we're going to see if the rf_clear_edges resets things.
18851c0b2f7Stbbdev                 g.wait_for_all();
18951c0b2f7Stbbdev                 INFO(" reset(rf_clear_edges)");
19051c0b2f7Stbbdev                 CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (before reset)" );
19151c0b2f7Stbbdev                 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (before reset)" );
19251c0b2f7Stbbdev                 g.reset(tbb::flow::rf_clear_edges);  // should be in forward direction again
19351c0b2f7Stbbdev                 CHECK_MESSAGE( (cnode.my_current_count == 0), "state of continue_receiver incorrect after reset(rf_clear_edges)" );
19451c0b2f7Stbbdev                 CHECK_MESSAGE( (cnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)" );
19551c0b2f7Stbbdev                 CHECK_MESSAGE( (cnode.my_predecessor_count == cnode.my_initial_predecessor_count), "predecessor count not reset" );
19651c0b2f7Stbbdev             }
197478de5b1Stbbdev         });
198478de5b1Stbbdev 
199478de5b1Stbbdev         utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting for the first message to arrive in function_node
200478de5b1Stbbdev         // Now the body of function_node 0 is executing.
201478de5b1Stbbdev         serial_fn_state0 = 0;  // release the node
202478de5b1Stbbdev         if (icnt == 0) {
203478de5b1Stbbdev             // wait for node to count the message (or for the node body to execute, which would be wrong)
204478de5b1Stbbdev             utils::SpinWaitWhile([&] {
20559ac78faSAlex                 tbb::spin_mutex::scoped_lock l(cnode.my_mutex);
206478de5b1Stbbdev                 return serial_continue_state0 == 0 && cnode.my_current_count == 0;
207478de5b1Stbbdev             });
208478de5b1Stbbdev             CHECK_MESSAGE( (serial_continue_state0 == 0), "Improperly released continue_node");
209478de5b1Stbbdev             CHECK_MESSAGE( (cnode.my_current_count == 1), "state of continue_receiver incorrect");
210478de5b1Stbbdev 
211478de5b1Stbbdev             utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting for the second message to arrive in function_node
212478de5b1Stbbdev             // Now the body of function_node 0 is executing.
213478de5b1Stbbdev             serial_fn_state0 = 0;  // release the node
214478de5b1Stbbdev 
215478de5b1Stbbdev             utils::SpinWaitWhileEq(serial_continue_state0, 0); // waiting for continue_node to start
216478de5b1Stbbdev             CHECK_MESSAGE( (cnode.my_current_count == 0), " my_current_count not reset before body of continue_node started");
217478de5b1Stbbdev             serial_continue_state0 = 0;  // release the continue_node
218478de5b1Stbbdev 
219478de5b1Stbbdev             utils::SpinWaitWhileEq(serial_fn_state1, 0); // wait for the successor function_node to enter body
220478de5b1Stbbdev             serial_fn_state1 = 0;  // release successor function_node.
221478de5b1Stbbdev         }
222478de5b1Stbbdev 
223478de5b1Stbbdev         t.join();
22451c0b2f7Stbbdev     }
22551c0b2f7Stbbdev 
22651c0b2f7Stbbdev     INFO(" done\n");
22751c0b2f7Stbbdev 
22851c0b2f7Stbbdev }
22951c0b2f7Stbbdev 
23051c0b2f7Stbbdev // function_node has predecessors and successors
23151c0b2f7Stbbdev // try_get() rejects
23251c0b2f7Stbbdev // successor edges cannot be reversed
23351c0b2f7Stbbdev // predecessors will reverse (only rejecting will reverse)
TestFunctionNode()23451c0b2f7Stbbdev void TestFunctionNode() {
23551c0b2f7Stbbdev     tbb::flow::graph g;
23651c0b2f7Stbbdev     tbb::flow::queue_node<int> qnode0(g);
23751c0b2f7Stbbdev     tbb::flow::function_node<int,int,   tbb::flow::rejecting > fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
238478de5b1Stbbdev     tbb::flow::function_node<int,int/*, tbb::flow::queueing*/> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
23951c0b2f7Stbbdev 
24051c0b2f7Stbbdev     tbb::flow::queue_node<int> qnode1(g);
24151c0b2f7Stbbdev 
24251c0b2f7Stbbdev     tbb::flow::make_edge(fnode0, qnode1);
24351c0b2f7Stbbdev     tbb::flow::make_edge(qnode0, fnode0);
24451c0b2f7Stbbdev 
24551c0b2f7Stbbdev     serial_fn_state0 = 2;  // just let it go
24651c0b2f7Stbbdev     qnode0.try_put(1);
24751c0b2f7Stbbdev     g.wait_for_all();
24851c0b2f7Stbbdev     int ii;
24951c0b2f7Stbbdev     CHECK_MESSAGE( (qnode1.try_get(ii) && ii == 1), "output not passed" );
25051c0b2f7Stbbdev     tbb::flow::remove_edge(qnode0, fnode0);
25151c0b2f7Stbbdev     tbb::flow::remove_edge(fnode0, qnode1);
25251c0b2f7Stbbdev 
25351c0b2f7Stbbdev     tbb::flow::make_edge(fnode1, qnode1);
25451c0b2f7Stbbdev     tbb::flow::make_edge(qnode0, fnode1);
25551c0b2f7Stbbdev 
25651c0b2f7Stbbdev     serial_fn_state0 = 2;  // just let it go
25751c0b2f7Stbbdev     qnode0.try_put(1);
25851c0b2f7Stbbdev     g.wait_for_all();
25951c0b2f7Stbbdev     CHECK_MESSAGE( (qnode1.try_get(ii) && ii == 1), "output not passed" );
26051c0b2f7Stbbdev     tbb::flow::remove_edge(qnode0, fnode1);
26151c0b2f7Stbbdev     tbb::flow::remove_edge(fnode1, qnode1);
26251c0b2f7Stbbdev 
26351c0b2f7Stbbdev     // rejecting
26451c0b2f7Stbbdev     serial_fn_state0 = 0;
26559ac78faSAlex     std::atomic<bool> rejected{ false };
2668b6f831cStbbdev     std::thread t([&] {
2678b6f831cStbbdev         g.reset(); // attach to the current arena
26851c0b2f7Stbbdev         tbb::flow::make_edge(fnode0, qnode1);
2698b6f831cStbbdev         tbb::flow::make_edge(qnode0, fnode0); // TODO: invesigate why it always creates a forwarding task
27051c0b2f7Stbbdev         INFO("Testing rejecting function_node:");
27151c0b2f7Stbbdev         CHECK_MESSAGE( (!fnode0.my_queue), "node should have no queue");
27251c0b2f7Stbbdev         CHECK_MESSAGE( (!fnode0.my_successors.empty()), "successor edge not added");
27351c0b2f7Stbbdev         qnode0.try_put(1);
27451c0b2f7Stbbdev         qnode0.try_put(2);   // rejecting node should reject, reverse.
27559ac78faSAlex         rejected = true;
27651c0b2f7Stbbdev         g.wait_for_all();
277478de5b1Stbbdev     });
278478de5b1Stbbdev     utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting rejecting node to start
27959ac78faSAlex     utils::SpinWaitWhileEq(rejected, false);
280*a080baf9SAlex     // TODO: the assest below is not stable due to the logical race between try_put(1)
281*a080baf9SAlex     // try_put(2) and wait_for_all.
282*a080baf9SAlex     // Additionally, empty() cannot be called concurrently due to null_mutex used in implementation
283*a080baf9SAlex     // CHECK(fnode0.my_predecessors.empty() == false);
284478de5b1Stbbdev     serial_fn_state0 = 2;   // release function_node body.
285478de5b1Stbbdev     t.join();
28651c0b2f7Stbbdev     INFO(" reset");
28751c0b2f7Stbbdev     g.reset();  // should reverse the edge from the input to the function node.
28851c0b2f7Stbbdev     CHECK_MESSAGE( (!qnode0.my_successors.empty()), "empty successors after reset()");
28951c0b2f7Stbbdev     CHECK_MESSAGE( (fnode0.my_predecessors.empty()), "predecessor not reversed");
29051c0b2f7Stbbdev     tbb::flow::remove_edge(qnode0, fnode0);
29151c0b2f7Stbbdev     tbb::flow::remove_edge(fnode0, qnode1);
29251c0b2f7Stbbdev     INFO("\n");
29351c0b2f7Stbbdev 
29451c0b2f7Stbbdev     // queueing
29551c0b2f7Stbbdev     tbb::flow::make_edge(fnode1, qnode1);
29651c0b2f7Stbbdev     INFO("Testing queueing function_node:");
29751c0b2f7Stbbdev     CHECK_MESSAGE( (fnode1.my_queue), "node should have no queue");
29851c0b2f7Stbbdev     CHECK_MESSAGE( (!fnode1.my_successors.empty()), "successor edge not added");
29951c0b2f7Stbbdev     INFO(" add_pred");
30051c0b2f7Stbbdev     CHECK_MESSAGE( (fnode1.register_predecessor(qnode0)), "Cannot register as predecessor");
30151c0b2f7Stbbdev     CHECK_MESSAGE( (!fnode1.my_predecessors.empty()), "Missing predecessor");
30251c0b2f7Stbbdev     INFO(" reset");
30351c0b2f7Stbbdev     g.wait_for_all();
30451c0b2f7Stbbdev     g.reset();  // should reverse the edge from the input to the function node.
30551c0b2f7Stbbdev     CHECK_MESSAGE( (!qnode0.my_successors.empty()), "empty successors after reset()");
30651c0b2f7Stbbdev     CHECK_MESSAGE( (fnode1.my_predecessors.empty()), "predecessor not reversed");
30751c0b2f7Stbbdev     tbb::flow::remove_edge(qnode0, fnode1);
30851c0b2f7Stbbdev     tbb::flow::remove_edge(fnode1, qnode1);
30951c0b2f7Stbbdev     INFO("\n");
31051c0b2f7Stbbdev 
31151c0b2f7Stbbdev     serial_fn_state0 = 0;  // make the function_node wait
31259ac78faSAlex     rejected = false;
313478de5b1Stbbdev     std::thread t2([&] {
314be66b3a1Stbbdev         g.reset(); // attach to the current arena
3158b6f831cStbbdev 
3168b6f831cStbbdev         tbb::flow::make_edge(qnode0, fnode0); // TODO: invesigate why it always creates a forwarding task
3178b6f831cStbbdev 
31851c0b2f7Stbbdev         INFO(" start_func");
31951c0b2f7Stbbdev         qnode0.try_put(1);
32051c0b2f7Stbbdev         // now if we put an item to the queues the edges to the function_node will reverse.
32151c0b2f7Stbbdev         INFO(" put_node(2)");
32251c0b2f7Stbbdev         qnode0.try_put(2);   // start queue node.
32359ac78faSAlex         rejected = true;
324478de5b1Stbbdev         g.wait_for_all();
325478de5b1Stbbdev     });
326478de5b1Stbbdev     utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting rejecting node to start
32751c0b2f7Stbbdev     // wait for the edges to reverse
32859ac78faSAlex     utils::SpinWaitWhileEq(rejected, false);
329*a080baf9SAlex     // TODO: the assest below is not stable due to the logical race between try_put(1)
330*a080baf9SAlex     // try_put(2) and wait_for_all.
331*a080baf9SAlex     // Additionally, empty() cannot be called concurrently due to null_mutex used in implementation
332*a080baf9SAlex     // CHECK(fnode0.my_predecessors.empty() == false);
33351c0b2f7Stbbdev     g.my_context->cancel_group_execution();
33451c0b2f7Stbbdev     // release the function_node
33551c0b2f7Stbbdev     serial_fn_state0 = 2;
336478de5b1Stbbdev     t2.join();
33751c0b2f7Stbbdev     g.reset(tbb::flow::rf_clear_edges);
33851c0b2f7Stbbdev     CHECK_MESSAGE( (fnode0.my_predecessors.empty() && qnode0.my_successors.empty()), "function_node edge not removed");
33951c0b2f7Stbbdev     CHECK_MESSAGE( (fnode0.my_successors.empty()), "successor to fnode not removed");
34051c0b2f7Stbbdev     INFO(" done\n");
34151c0b2f7Stbbdev }
34251c0b2f7Stbbdev 
34351c0b2f7Stbbdev template<typename TT>
34451c0b2f7Stbbdev class tag_func {
34551c0b2f7Stbbdev     TT my_mult;
34651c0b2f7Stbbdev public:
tag_func(TT multiplier)34751c0b2f7Stbbdev     tag_func(TT multiplier) : my_mult(multiplier) { }
34851c0b2f7Stbbdev     // operator() will return [0 .. Count)
operator ()(TT v)34951c0b2f7Stbbdev     tbb::flow::tag_value operator()( TT v) {
35051c0b2f7Stbbdev         tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult);
35151c0b2f7Stbbdev         return t;
35251c0b2f7Stbbdev     }
35351c0b2f7Stbbdev };
35451c0b2f7Stbbdev 
35551c0b2f7Stbbdev template<typename JNODE_TYPE>
35651c0b2f7Stbbdev void
TestSimpleSuccessorArc(const char * name)35751c0b2f7Stbbdev TestSimpleSuccessorArc(const char *name) {
35851c0b2f7Stbbdev     tbb::flow::graph g;
35951c0b2f7Stbbdev     {
36051c0b2f7Stbbdev         INFO("Join<" << name << "> successor test ");
36151c0b2f7Stbbdev         tbb::flow::join_node<std::tuple<int>, JNODE_TYPE> qj(g);
36251c0b2f7Stbbdev         tbb::flow::broadcast_node<std::tuple<int> > bnode(g);
36351c0b2f7Stbbdev         tbb::flow::make_edge(qj, bnode);
36451c0b2f7Stbbdev         CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after linking");
36551c0b2f7Stbbdev         g.reset();
36651c0b2f7Stbbdev         CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after reset()");
36751c0b2f7Stbbdev         g.reset(tbb::flow::rf_clear_edges);
36851c0b2f7Stbbdev         CHECK_MESSAGE( (qj.my_successors.empty()), "successors not removed after reset(rf_clear_edges)");
36951c0b2f7Stbbdev     }
37051c0b2f7Stbbdev }
37151c0b2f7Stbbdev 
37251c0b2f7Stbbdev template<>
37351c0b2f7Stbbdev void
TestSimpleSuccessorArc(const char * name)37451c0b2f7Stbbdev TestSimpleSuccessorArc<tbb::flow::tag_matching>(const char *name) {
37551c0b2f7Stbbdev     tbb::flow::graph g;
37651c0b2f7Stbbdev     {
37751c0b2f7Stbbdev         INFO("Join<" << name << "> successor test ");
37851c0b2f7Stbbdev         typedef std::tuple<int,int> my_tuple;
37951c0b2f7Stbbdev         tbb::flow::join_node<my_tuple, tbb::flow::tag_matching> qj(g,
38051c0b2f7Stbbdev                                                                    tag_func<int>(1),
38151c0b2f7Stbbdev                                                                    tag_func<int>(1)
38251c0b2f7Stbbdev         );
38351c0b2f7Stbbdev         tbb::flow::broadcast_node<my_tuple > bnode(g);
38451c0b2f7Stbbdev         tbb::flow::make_edge(qj, bnode);
38551c0b2f7Stbbdev         CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after linking");
38651c0b2f7Stbbdev         g.reset();
38751c0b2f7Stbbdev         CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after reset()");
38851c0b2f7Stbbdev         g.reset(tbb::flow::rf_clear_edges);
38951c0b2f7Stbbdev         CHECK_MESSAGE( (qj.my_successors.empty()), "successors not removed after reset(rf_clear_edges)");
39051c0b2f7Stbbdev     }
39151c0b2f7Stbbdev }
39251c0b2f7Stbbdev 
39351c0b2f7Stbbdev void
TestJoinNode()39451c0b2f7Stbbdev TestJoinNode() {
39551c0b2f7Stbbdev     tbb::flow::graph g;
39651c0b2f7Stbbdev 
39751c0b2f7Stbbdev     TestSimpleSuccessorArc<tbb::flow::queueing>("queueing");
39851c0b2f7Stbbdev     TestSimpleSuccessorArc<tbb::flow::reserving>("reserving");
39951c0b2f7Stbbdev     TestSimpleSuccessorArc<tbb::flow::tag_matching>("tag_matching");
40051c0b2f7Stbbdev 
40151c0b2f7Stbbdev     // queueing and tagging join nodes have input queues, so the input ports do not reverse.
40251c0b2f7Stbbdev     INFO(" reserving preds");
40351c0b2f7Stbbdev     {
40451c0b2f7Stbbdev         tbb::flow::join_node<std::tuple<int,int>, tbb::flow::reserving> rj(g);
40551c0b2f7Stbbdev         tbb::flow::queue_node<int> q0(g);
40651c0b2f7Stbbdev         tbb::flow::queue_node<int> q1(g);
40751c0b2f7Stbbdev         tbb::flow::make_edge(q0,tbb::flow::input_port<0>(rj));
40851c0b2f7Stbbdev         tbb::flow::make_edge(q1,tbb::flow::input_port<1>(rj));
40951c0b2f7Stbbdev         q0.try_put(1);
41051c0b2f7Stbbdev         g.wait_for_all();  // quiesce
41151c0b2f7Stbbdev         CHECK_MESSAGE( (!(tbb::flow::input_port<0>(rj).my_predecessors.empty())),"reversed port missing predecessor");
41251c0b2f7Stbbdev         CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"non-reversed port has pred");
41351c0b2f7Stbbdev         g.reset();
41451c0b2f7Stbbdev         CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"reversed port has pred after reset()");
41551c0b2f7Stbbdev         CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()");
41651c0b2f7Stbbdev         q1.try_put(2);
41751c0b2f7Stbbdev         g.wait_for_all();  // quiesce
41851c0b2f7Stbbdev         CHECK_MESSAGE( (!(tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port missing predecessor");
41951c0b2f7Stbbdev         CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred");
42051c0b2f7Stbbdev         g.reset();
42151c0b2f7Stbbdev         CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port has pred after reset()");
42251c0b2f7Stbbdev         CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()");
42351c0b2f7Stbbdev         // should reset predecessors just as regular reset.
42451c0b2f7Stbbdev         q1.try_put(3);
42551c0b2f7Stbbdev         g.wait_for_all();  // quiesce
42651c0b2f7Stbbdev         CHECK_MESSAGE( (!(tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port missing predecessor");
42751c0b2f7Stbbdev         CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred");
42851c0b2f7Stbbdev         g.reset(tbb::flow::rf_clear_edges);
42951c0b2f7Stbbdev         CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port has pred after reset()");
43051c0b2f7Stbbdev         CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()");
43151c0b2f7Stbbdev         CHECK_MESSAGE( (q0.my_successors.empty()), "edge not removed by reset(rf_clear_edges)");
43251c0b2f7Stbbdev         CHECK_MESSAGE( (q1.my_successors.empty()), "edge not removed by reset(rf_clear_edges)");
43351c0b2f7Stbbdev     }
43451c0b2f7Stbbdev     INFO(" done\n");
43551c0b2f7Stbbdev }
43651c0b2f7Stbbdev 
43751c0b2f7Stbbdev template <typename DecrementerType>
43851c0b2f7Stbbdev struct limiter_node_type {
43951c0b2f7Stbbdev     using type = tbb::flow::limiter_node<int, DecrementerType>;
44051c0b2f7Stbbdev     using dtype = DecrementerType;
44151c0b2f7Stbbdev };
44251c0b2f7Stbbdev 
44351c0b2f7Stbbdev template <>
44451c0b2f7Stbbdev struct limiter_node_type<void> {
44551c0b2f7Stbbdev     using type = tbb::flow::limiter_node<int>;
44651c0b2f7Stbbdev     using dtype = tbb::flow::continue_msg;
44751c0b2f7Stbbdev };
44851c0b2f7Stbbdev 
44951c0b2f7Stbbdev template <typename DType>
45051c0b2f7Stbbdev struct DecrementerHelper {
45151c0b2f7Stbbdev     template <typename Decrementer>
checkDecrementerHelper45249e08aacStbbdev     static void check(Decrementer&) {}
makeDTypeDecrementerHelper45351c0b2f7Stbbdev     static DType makeDType() {
45451c0b2f7Stbbdev         return DType(1);
45551c0b2f7Stbbdev     }
45651c0b2f7Stbbdev };
45751c0b2f7Stbbdev 
45851c0b2f7Stbbdev template <>
45951c0b2f7Stbbdev struct DecrementerHelper<tbb::flow::continue_msg> {
46051c0b2f7Stbbdev     template <typename Decrementer>
checkDecrementerHelper46149e08aacStbbdev     static void check(Decrementer& decrementer) {
46249e08aacStbbdev         auto& d = static_cast<tbb::detail::d1::continue_receiver&>(decrementer);
46351c0b2f7Stbbdev         CHECK_MESSAGE(d.my_predecessor_count == 0, "error in pred count");
46451c0b2f7Stbbdev         CHECK_MESSAGE(d.my_initial_predecessor_count == 0, "error in initial pred count");
46551c0b2f7Stbbdev         CHECK_MESSAGE(d.my_current_count == 0, "error in current count");
46651c0b2f7Stbbdev     }
makeDTypeDecrementerHelper46751c0b2f7Stbbdev     static tbb::flow::continue_msg makeDType() {
46851c0b2f7Stbbdev         return tbb::flow::continue_msg();
46951c0b2f7Stbbdev     }
47051c0b2f7Stbbdev };
47151c0b2f7Stbbdev 
47251c0b2f7Stbbdev template <typename DecrementerType>
TestLimiterNode()47351c0b2f7Stbbdev void TestLimiterNode() {
47451c0b2f7Stbbdev     int out_int{};
47551c0b2f7Stbbdev     tbb::flow::graph g;
47651c0b2f7Stbbdev     using dtype = typename limiter_node_type<DecrementerType>::dtype;
47751c0b2f7Stbbdev     typename limiter_node_type<DecrementerType>::type ln(g,1);
47851c0b2f7Stbbdev     INFO("Testing limiter_node: preds and succs");
47949e08aacStbbdev     DecrementerHelper<dtype>::check(ln.decrementer());
48051c0b2f7Stbbdev     CHECK_MESSAGE( (ln.my_threshold == 1), "error in my_threshold");
48151c0b2f7Stbbdev     tbb::flow::queue_node<int> inq(g);
48251c0b2f7Stbbdev     tbb::flow::queue_node<int> outq(g);
48351c0b2f7Stbbdev     tbb::flow::broadcast_node<dtype> bn(g);
48451c0b2f7Stbbdev 
48551c0b2f7Stbbdev     tbb::flow::make_edge(inq,ln);
48651c0b2f7Stbbdev     tbb::flow::make_edge(ln,outq);
48749e08aacStbbdev     tbb::flow::make_edge(bn,ln.decrementer());
48851c0b2f7Stbbdev 
48951c0b2f7Stbbdev     g.wait_for_all();
49051c0b2f7Stbbdev     CHECK_MESSAGE( (!(ln.my_successors.empty())),"successors empty after make_edge");
49151c0b2f7Stbbdev     CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge reversed");
49251c0b2f7Stbbdev     inq.try_put(1);
49351c0b2f7Stbbdev     g.wait_for_all();
49451c0b2f7Stbbdev     CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 1), "limiter_node didn't pass first value");
49551c0b2f7Stbbdev     CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge reversed");
49651c0b2f7Stbbdev     inq.try_put(2);
49751c0b2f7Stbbdev     g.wait_for_all();
49851c0b2f7Stbbdev     CHECK_MESSAGE( (!outq.try_get(out_int)), "limiter_node incorrectly passed second input");
49951c0b2f7Stbbdev     CHECK_MESSAGE( (!ln.my_predecessors.empty()), "input edge to limiter_node not reversed");
50051c0b2f7Stbbdev     bn.try_put(DecrementerHelper<dtype>::makeDType());
50151c0b2f7Stbbdev     g.wait_for_all();
50251c0b2f7Stbbdev     CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 2), "limiter_node didn't pass second value");
50351c0b2f7Stbbdev     g.wait_for_all();
50451c0b2f7Stbbdev     CHECK_MESSAGE( (!ln.my_predecessors.empty()), "input edge was reversed(after try_get())");
50551c0b2f7Stbbdev     g.reset();
50651c0b2f7Stbbdev     CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge not reset");
50751c0b2f7Stbbdev     inq.try_put(3);
50851c0b2f7Stbbdev     g.wait_for_all();
50951c0b2f7Stbbdev     CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 3), "limiter_node didn't pass third value");
51051c0b2f7Stbbdev 
51151c0b2f7Stbbdev     INFO(" rf_clear_edges");
51251c0b2f7Stbbdev     // currently the limiter_node will not pass another message
51351c0b2f7Stbbdev     g.reset(tbb::flow::rf_clear_edges);
51449e08aacStbbdev     DecrementerHelper<dtype>::check(ln.decrementer());
51551c0b2f7Stbbdev     CHECK_MESSAGE( (ln.my_threshold == 1), "error in my_threshold");
51651c0b2f7Stbbdev     CHECK_MESSAGE( (ln.my_predecessors.empty()), "preds not reset(rf_clear_edges)");
51751c0b2f7Stbbdev     CHECK_MESSAGE( (ln.my_successors.empty()), "preds not reset(rf_clear_edges)");
51851c0b2f7Stbbdev     CHECK_MESSAGE( (inq.my_successors.empty()), "Arc not removed on reset(rf_clear_edges)");
51951c0b2f7Stbbdev     CHECK_MESSAGE( (inq.my_successors.empty()), "Arc not removed on reset(rf_clear_edges)");
52051c0b2f7Stbbdev     CHECK_MESSAGE( (bn.my_successors.empty()), "control edge not removed on reset(rf_clear_edges)");
52151c0b2f7Stbbdev     tbb::flow::make_edge(inq,ln);
52251c0b2f7Stbbdev     tbb::flow::make_edge(ln,outq);
52351c0b2f7Stbbdev     inq.try_put(4);
52451c0b2f7Stbbdev     inq.try_put(5);
52551c0b2f7Stbbdev     g.wait_for_all();
52651c0b2f7Stbbdev     CHECK_MESSAGE( (outq.try_get(out_int)),"missing output after reset(rf_clear_edges)");
52751c0b2f7Stbbdev     CHECK_MESSAGE( (out_int == 4), "input incorrect (4)");
52851c0b2f7Stbbdev     bn.try_put(DecrementerHelper<dtype>::makeDType());
52951c0b2f7Stbbdev     g.wait_for_all();
53051c0b2f7Stbbdev     CHECK_MESSAGE( (!outq.try_get(out_int)),"second output incorrectly passed (rf_clear_edges)");
53151c0b2f7Stbbdev     INFO(" done\n");
53251c0b2f7Stbbdev }
53351c0b2f7Stbbdev 
53451c0b2f7Stbbdev template<typename MF_TYPE>
53551c0b2f7Stbbdev struct mf_body {
536478de5b1Stbbdev     std::atomic<int>& my_flag;
mf_bodymf_body537478de5b1Stbbdev     mf_body(std::atomic<int>& flag) : my_flag(flag) { }
operator ()mf_body53851c0b2f7Stbbdev     void operator()(const int& in, typename MF_TYPE::output_ports_type& outports) {
539478de5b1Stbbdev         if(my_flag == 0) {
540478de5b1Stbbdev             my_flag = 1;
541478de5b1Stbbdev 
542478de5b1Stbbdev             utils::SpinWaitWhileEq(my_flag, 1);
54351c0b2f7Stbbdev         }
54451c0b2f7Stbbdev 
545478de5b1Stbbdev         if (in & 0x1)
546478de5b1Stbbdev             std::get<1>(outports).try_put(in);
547478de5b1Stbbdev         else
548478de5b1Stbbdev             std::get<0>(outports).try_put(in);
54951c0b2f7Stbbdev     }
55051c0b2f7Stbbdev };
55151c0b2f7Stbbdev 
55251c0b2f7Stbbdev template<typename P, typename T>
55351c0b2f7Stbbdev struct test_reversal;
55451c0b2f7Stbbdev template<typename T>
55551c0b2f7Stbbdev struct test_reversal<tbb::flow::queueing, T> {
test_reversaltest_reversal55651c0b2f7Stbbdev     test_reversal() { INFO("<queueing>"); }
55751c0b2f7Stbbdev     // queueing node will not reverse.
operator ()test_reversal558478de5b1Stbbdev     bool operator()(T& node) const { return node.my_predecessors.empty(); }
55951c0b2f7Stbbdev };
56051c0b2f7Stbbdev 
56151c0b2f7Stbbdev template<typename T>
56251c0b2f7Stbbdev struct test_reversal<tbb::flow::rejecting, T> {
test_reversaltest_reversal56351c0b2f7Stbbdev     test_reversal() { INFO("<rejecting>"); }
operator ()test_reversal564478de5b1Stbbdev     bool operator()(T& node) const { return !node.my_predecessors.empty(); }
56551c0b2f7Stbbdev };
56651c0b2f7Stbbdev 
56751c0b2f7Stbbdev template<typename P>
TestMultifunctionNode()568478de5b1Stbbdev void TestMultifunctionNode() {
56951c0b2f7Stbbdev     typedef tbb::flow::multifunction_node<int, std::tuple<int, int>, P> multinode_type;
57051c0b2f7Stbbdev     INFO("Testing multifunction_node");
57151c0b2f7Stbbdev     test_reversal<P,multinode_type> my_test;
57251c0b2f7Stbbdev     INFO(":");
57351c0b2f7Stbbdev     tbb::flow::graph g;
57451c0b2f7Stbbdev     multinode_type mf(g, tbb::flow::serial, mf_body<multinode_type>(serial_fn_state0));
57551c0b2f7Stbbdev     tbb::flow::queue_node<int> qin(g);
57651c0b2f7Stbbdev     tbb::flow::queue_node<int> qodd_out(g);
57751c0b2f7Stbbdev     tbb::flow::queue_node<int> qeven_out(g);
57851c0b2f7Stbbdev     tbb::flow::make_edge(qin,mf);
57951c0b2f7Stbbdev     tbb::flow::make_edge(tbb::flow::output_port<0>(mf), qeven_out);
58051c0b2f7Stbbdev     tbb::flow::make_edge(tbb::flow::output_port<1>(mf), qodd_out);
58151c0b2f7Stbbdev     g.wait_for_all();
58251c0b2f7Stbbdev     for (int ii = 0; ii < 2 ; ++ii) {
583*a080baf9SAlex         std::atomic<bool> submitted{ false };
58451c0b2f7Stbbdev         serial_fn_state0 = 0;
58551c0b2f7Stbbdev         /* if(ii == 0) REMARK(" reset preds"); else REMARK(" 2nd");*/
586478de5b1Stbbdev         std::thread t([&] {
587be66b3a1Stbbdev             g.reset(); // attach to the current arena
58851c0b2f7Stbbdev             qin.try_put(0);
58951c0b2f7Stbbdev             qin.try_put(1);
590*a080baf9SAlex             submitted = true;
591478de5b1Stbbdev             g.wait_for_all();
592478de5b1Stbbdev         });
593478de5b1Stbbdev         // wait for node to be active
594478de5b1Stbbdev         utils::SpinWaitWhileEq(serial_fn_state0, 0);
595*a080baf9SAlex         utils::SpinWaitWhileEq(submitted, false);
59651c0b2f7Stbbdev         g.my_context->cancel_group_execution();
59751c0b2f7Stbbdev         // release node
59851c0b2f7Stbbdev         serial_fn_state0 = 2;
599478de5b1Stbbdev         t.join();
600*a080baf9SAlex         // The rejection test cannot guarantee the state of predecessors cache.
601*a080baf9SAlex         if (!std::is_same<P, tbb::flow::rejecting>::value) {
60251c0b2f7Stbbdev             CHECK_MESSAGE((my_test(mf)), "fail cancel group test");
603*a080baf9SAlex         }
60451c0b2f7Stbbdev         if( ii == 1) {
60551c0b2f7Stbbdev             INFO(" rf_clear_edges");
60651c0b2f7Stbbdev             g.reset(tbb::flow::rf_clear_edges);
60751c0b2f7Stbbdev             CHECK_MESSAGE( (tbb::flow::output_port<0>(mf).my_successors.empty()), "output_port<0> not reset (rf_clear_edges)");
60851c0b2f7Stbbdev             CHECK_MESSAGE( (tbb::flow::output_port<1>(mf).my_successors.empty()), "output_port<1> not reset (rf_clear_edges)");
60951c0b2f7Stbbdev         }
61051c0b2f7Stbbdev         else
61151c0b2f7Stbbdev         {
61251c0b2f7Stbbdev             g.reset();
61351c0b2f7Stbbdev         }
61451c0b2f7Stbbdev         CHECK_MESSAGE( (mf.my_predecessors.empty()), "edge didn't reset");
61551c0b2f7Stbbdev         CHECK_MESSAGE( ((ii == 0 && !qin.my_successors.empty()) || (ii == 1 && qin.my_successors.empty())), "edge didn't reset");
61651c0b2f7Stbbdev     }
61751c0b2f7Stbbdev     INFO(" done\n");
61851c0b2f7Stbbdev }
61951c0b2f7Stbbdev 
62051c0b2f7Stbbdev // indexer_node is like a broadcast_node, in that none of its inputs reverse, and it
62151c0b2f7Stbbdev // never allows a successor to reverse its edge, so we only need test the successors.
62251c0b2f7Stbbdev void
TestIndexerNode()62351c0b2f7Stbbdev TestIndexerNode() {
62451c0b2f7Stbbdev     tbb::flow::graph g;
62551c0b2f7Stbbdev     typedef tbb::flow::indexer_node< int, int > indexernode_type;
62651c0b2f7Stbbdev     indexernode_type inode(g);
62751c0b2f7Stbbdev     INFO("Testing indexer_node:");
62851c0b2f7Stbbdev     tbb::flow::queue_node<indexernode_type::output_type> qout(g);
62951c0b2f7Stbbdev     tbb::flow::make_edge(inode,qout);
63051c0b2f7Stbbdev     g.wait_for_all();
63151c0b2f7Stbbdev     CHECK_MESSAGE( (!inode.my_successors.empty()), "successor of indexer_node missing");
63251c0b2f7Stbbdev     g.reset();
63351c0b2f7Stbbdev     CHECK_MESSAGE( (!inode.my_successors.empty()), "successor of indexer_node missing after reset");
63451c0b2f7Stbbdev     g.reset(tbb::flow::rf_clear_edges);
63551c0b2f7Stbbdev     CHECK_MESSAGE( (inode.my_successors.empty()), "successor of indexer_node not removed by reset(rf_clear_edges)");
63651c0b2f7Stbbdev     INFO(" done\n");
63751c0b2f7Stbbdev }
63851c0b2f7Stbbdev 
63951c0b2f7Stbbdev template<typename Node>
64051c0b2f7Stbbdev void
TestScalarNode(const char * name)64151c0b2f7Stbbdev TestScalarNode(const char *name) {
64251c0b2f7Stbbdev     tbb::flow::graph g;
64351c0b2f7Stbbdev     Node on(g);
64451c0b2f7Stbbdev     tbb::flow::queue_node<int> qout(g);
64551c0b2f7Stbbdev     INFO("Testing " << name << ":");
64651c0b2f7Stbbdev     tbb::flow::make_edge(on,qout);
64751c0b2f7Stbbdev     g.wait_for_all();
64851c0b2f7Stbbdev     CHECK_MESSAGE( (!on.my_successors.empty()), "edge not added");
64951c0b2f7Stbbdev     g.reset();
65051c0b2f7Stbbdev     CHECK_MESSAGE( (!on.my_successors.empty()), "edge improperly removed");
65151c0b2f7Stbbdev     g.reset(tbb::flow::rf_clear_edges);
65251c0b2f7Stbbdev     CHECK_MESSAGE( (on.my_successors.empty()), "edge not removed by reset(rf_clear_edges)");
65351c0b2f7Stbbdev     INFO(" done\n");
65451c0b2f7Stbbdev }
65551c0b2f7Stbbdev 
65651c0b2f7Stbbdev struct seq_body {
operator ()seq_body65751c0b2f7Stbbdev     size_t operator()(const int &in) {
65851c0b2f7Stbbdev         return size_t(in / 3);
65951c0b2f7Stbbdev     }
66051c0b2f7Stbbdev };
66151c0b2f7Stbbdev 
66251c0b2f7Stbbdev // sequencer_node behaves like a queueing node, but requires a different constructor.
TestSequencerNode()663478de5b1Stbbdev void TestSequencerNode() {
66451c0b2f7Stbbdev     tbb::flow::graph g;
66551c0b2f7Stbbdev     tbb::flow::sequencer_node<int> bnode(g, seq_body());
66651c0b2f7Stbbdev     INFO("Testing sequencer_node:");
66751c0b2f7Stbbdev     tbb::flow::function_node<int> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
66851c0b2f7Stbbdev     INFO("Testing sequencer_node:");
66951c0b2f7Stbbdev     serial_fn_state0 = 0;  // reset to waiting state.
67051c0b2f7Stbbdev     INFO(" make_edge");
67151c0b2f7Stbbdev     tbb::flow::make_edge(bnode, fnode);
67251c0b2f7Stbbdev     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after make_edge" );
67351c0b2f7Stbbdev     INFO(" try_put");
674478de5b1Stbbdev     std::thread t([&]{
67551c0b2f7Stbbdev         bnode.try_put(0);  // will forward to the fnode
67651c0b2f7Stbbdev         g.wait_for_all();
677478de5b1Stbbdev     });
678478de5b1Stbbdev     // wait for the function_node to fire up
679478de5b1Stbbdev     utils::SpinWaitWhileEq(serial_fn_state0, 0);
680478de5b1Stbbdev     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after forwarding message" );
681478de5b1Stbbdev     serial_fn_state0 = 0;       // release the function node
682478de5b1Stbbdev     t.join();
683478de5b1Stbbdev 
68451c0b2f7Stbbdev     INFO(" remove_edge");
68551c0b2f7Stbbdev     tbb::flow::remove_edge(bnode, fnode);
68651c0b2f7Stbbdev     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after remove_edge");
68751c0b2f7Stbbdev     tbb::flow::join_node<std::tuple<int,int>,tbb::flow::reserving> jnode(g);
68851c0b2f7Stbbdev     tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode));  // will spawn a task
68951c0b2f7Stbbdev     g.wait_for_all();
69051c0b2f7Stbbdev     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after attaching to join");
69151c0b2f7Stbbdev     INFO(" reverse");
69251c0b2f7Stbbdev     bnode.try_put(3);  // the edge should reverse
69351c0b2f7Stbbdev     g.wait_for_all();
69451c0b2f7Stbbdev     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving");
69551c0b2f7Stbbdev     INFO(" reset()");
69651c0b2f7Stbbdev     g.wait_for_all();
69751c0b2f7Stbbdev     g.reset();  // should be in forward direction again
69851c0b2f7Stbbdev     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after reset()");
69951c0b2f7Stbbdev     INFO(" remove_edge");
70051c0b2f7Stbbdev     g.reset(tbb::flow::rf_clear_edges);  // should be in forward direction again
70151c0b2f7Stbbdev     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)");
70251c0b2f7Stbbdev     CHECK_MESSAGE( (fnode.my_predecessors.empty()), "buffering node reversed after reset(rf_clear_edges)");
70351c0b2f7Stbbdev     INFO("  done\n");
70451c0b2f7Stbbdev     g.wait_for_all();
70551c0b2f7Stbbdev }
70651c0b2f7Stbbdev 
70751c0b2f7Stbbdev struct snode_body {
70851c0b2f7Stbbdev     int max_cnt;
70951c0b2f7Stbbdev     int my_cnt;
snode_bodysnode_body71051c0b2f7Stbbdev     snode_body(const int& in) : max_cnt(in) { my_cnt = 0; }
operator ()snode_body71151c0b2f7Stbbdev     int operator()(tbb::flow_control& fc) {
71251c0b2f7Stbbdev         if (max_cnt <= my_cnt++) {
71351c0b2f7Stbbdev             fc.stop();
71451c0b2f7Stbbdev             return int();
71551c0b2f7Stbbdev         }
71651c0b2f7Stbbdev         return my_cnt;
71751c0b2f7Stbbdev     }
71851c0b2f7Stbbdev };
71951c0b2f7Stbbdev 
TestInputNode()720478de5b1Stbbdev void TestInputNode() {
72151c0b2f7Stbbdev     tbb::flow::graph g;
72251c0b2f7Stbbdev     tbb::flow::input_node<int> in(g, snode_body(4));
72351c0b2f7Stbbdev     INFO("Testing input_node:");
72451c0b2f7Stbbdev     tbb::flow::queue_node<int> qin(g);
72551c0b2f7Stbbdev     tbb::flow::join_node<std::tuple<int,int>, tbb::flow::reserving> jn(g);
72651c0b2f7Stbbdev     tbb::flow::queue_node<std::tuple<int,int> > qout(g);
72751c0b2f7Stbbdev 
72851c0b2f7Stbbdev     INFO(" make_edges");
72951c0b2f7Stbbdev     tbb::flow::make_edge(in, tbb::flow::input_port<0>(jn));
73051c0b2f7Stbbdev     tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn));
73151c0b2f7Stbbdev     tbb::flow::make_edge(jn,qout);
73251c0b2f7Stbbdev     CHECK_MESSAGE( (!in.my_successors.empty()), "input node has no successor after make_edge");
73351c0b2f7Stbbdev     g.wait_for_all();
73451c0b2f7Stbbdev     g.reset();
73551c0b2f7Stbbdev     CHECK_MESSAGE( (!in.my_successors.empty()), "input node has no successor after reset");
73651c0b2f7Stbbdev     g.wait_for_all();
73751c0b2f7Stbbdev     g.reset(tbb::flow::rf_clear_edges);
73851c0b2f7Stbbdev     CHECK_MESSAGE( (in.my_successors.empty()), "input node has successor after reset(rf_clear_edges)");
73951c0b2f7Stbbdev     tbb::flow::make_edge(in, tbb::flow::input_port<0>(jn));
74051c0b2f7Stbbdev     tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn));
74151c0b2f7Stbbdev     tbb::flow::make_edge(jn,qout);
74251c0b2f7Stbbdev     g.wait_for_all();
74351c0b2f7Stbbdev     INFO(" activate");
74451c0b2f7Stbbdev     in.activate();  // will forward to the fnode
74551c0b2f7Stbbdev     INFO(" wait1");
74651c0b2f7Stbbdev     g.wait_for_all();
747478de5b1Stbbdev     CHECK_MESSAGE( (in.my_successors.empty()), "input node has no successor after forwarding message");
74851c0b2f7Stbbdev     g.reset();
74951c0b2f7Stbbdev     CHECK_MESSAGE( (!in.my_successors.empty()), "input_node has no successors after reset");
75051c0b2f7Stbbdev     CHECK_MESSAGE( (tbb::flow::input_port<0>(jn).my_predecessors.empty()), "successor of input_node has pred after reset.");
75151c0b2f7Stbbdev     INFO(" done\n");
75251c0b2f7Stbbdev }
75351c0b2f7Stbbdev 
75451c0b2f7Stbbdev //! Test buffering nodes
75551c0b2f7Stbbdev //! \brief \ref error_guessing
75651c0b2f7Stbbdev TEST_CASE("Test buffering nodes"){
75751c0b2f7Stbbdev     unsigned int MinThread = utils::MinThread;
75851c0b2f7Stbbdev     if(MinThread < 3) MinThread = 3;
75951c0b2f7Stbbdev     tbb::task_arena arena(MinThread);
76051c0b2f7Stbbdev 	arena.execute(
__anon8a33875c0902() 76151c0b2f7Stbbdev         [&]() {
76251c0b2f7Stbbdev             // tests presume at least three threads
76351c0b2f7Stbbdev             TestBufferingNode< tbb::flow::buffer_node<int> >("buffer_node");
76451c0b2f7Stbbdev             TestBufferingNode< tbb::flow::priority_queue_node<int> >("priority_queue_node");
76551c0b2f7Stbbdev             TestBufferingNode< tbb::flow::queue_node<int> >("queue_node");
76651c0b2f7Stbbdev         }
76751c0b2f7Stbbdev 	);
76851c0b2f7Stbbdev }
76951c0b2f7Stbbdev 
77051c0b2f7Stbbdev //! Test sequencer_node
77151c0b2f7Stbbdev //! \brief \ref error_guessing
77251c0b2f7Stbbdev TEST_CASE("Test sequencer node"){
77351c0b2f7Stbbdev     TestSequencerNode();
77451c0b2f7Stbbdev }
77551c0b2f7Stbbdev 
77651c0b2f7Stbbdev TEST_SUITE("Test multifunction node") {
77751c0b2f7Stbbdev     //! Test multifunction_node with rejecting policy
77851c0b2f7Stbbdev     //! \brief \ref error_guessing
77951c0b2f7Stbbdev     TEST_CASE("with rejecting policy"){
78051c0b2f7Stbbdev         TestMultifunctionNode<tbb::flow::rejecting>();
78151c0b2f7Stbbdev     }
78251c0b2f7Stbbdev 
78351c0b2f7Stbbdev     //! Test multifunction_node with queueing policy
78451c0b2f7Stbbdev     //! \brief \ref error_guessing
78551c0b2f7Stbbdev     TEST_CASE("with queueing policy") {
78651c0b2f7Stbbdev         TestMultifunctionNode<tbb::flow::queueing>();
78751c0b2f7Stbbdev     }
78851c0b2f7Stbbdev }
78951c0b2f7Stbbdev 
79051c0b2f7Stbbdev //! Test input_node
79151c0b2f7Stbbdev //! \brief \ref error_guessing
79251c0b2f7Stbbdev TEST_CASE("Test input node"){
79351c0b2f7Stbbdev     TestInputNode();
79451c0b2f7Stbbdev }
79551c0b2f7Stbbdev 
79651c0b2f7Stbbdev //! Test continue_node
79751c0b2f7Stbbdev //! \brief \ref error_guessing
79851c0b2f7Stbbdev TEST_CASE("Test continue node"){
79951c0b2f7Stbbdev     TestContinueNode();
80051c0b2f7Stbbdev }
80151c0b2f7Stbbdev 
80251c0b2f7Stbbdev //! Test function_node
80351c0b2f7Stbbdev //! \brief \ref error_guessing
may_fail()80451c0b2f7Stbbdev TEST_CASE("Test function node" * doctest::may_fail()){
80551c0b2f7Stbbdev     TestFunctionNode();
80651c0b2f7Stbbdev }
80751c0b2f7Stbbdev 
80851c0b2f7Stbbdev //! Test join_node
80951c0b2f7Stbbdev //! \brief \ref error_guessing
81051c0b2f7Stbbdev TEST_CASE("Test join node"){
81151c0b2f7Stbbdev     TestJoinNode();
81251c0b2f7Stbbdev }
81351c0b2f7Stbbdev 
81451c0b2f7Stbbdev //! Test limiter_node
81551c0b2f7Stbbdev //! \brief \ref error_guessing
81651c0b2f7Stbbdev TEST_CASE("Test limiter node"){
81751c0b2f7Stbbdev     TestLimiterNode<void>();
81851c0b2f7Stbbdev     TestLimiterNode<int>();
81951c0b2f7Stbbdev     TestLimiterNode<tbb::flow::continue_msg>();
82051c0b2f7Stbbdev }
82151c0b2f7Stbbdev 
82251c0b2f7Stbbdev //! Test indexer_node
82351c0b2f7Stbbdev //! \brief \ref error_guessing
82451c0b2f7Stbbdev TEST_CASE("Test indexer node"){
82551c0b2f7Stbbdev     TestIndexerNode();
82651c0b2f7Stbbdev }
82751c0b2f7Stbbdev 
82851c0b2f7Stbbdev //! Test split_node
82951c0b2f7Stbbdev //! \brief \ref error_guessing
83051c0b2f7Stbbdev TEST_CASE("Test split node"){
83151c0b2f7Stbbdev     TestSplitNode();
83251c0b2f7Stbbdev }
83351c0b2f7Stbbdev 
83451c0b2f7Stbbdev //! Test broadcast, overwrite, write_once nodes
83551c0b2f7Stbbdev //! \brief \ref error_guessing
83651c0b2f7Stbbdev TEST_CASE("Test scalar node"){
83751c0b2f7Stbbdev     TestScalarNode<tbb::flow::broadcast_node<int> >("broadcast_node");
83851c0b2f7Stbbdev     TestScalarNode<tbb::flow::overwrite_node<int> >("overwrite_node");
83951c0b2f7Stbbdev     TestScalarNode<tbb::flow::write_once_node<int> >("write_once_node");
84051c0b2f7Stbbdev }
84151c0b2f7Stbbdev 
84251c0b2f7Stbbdev //! try_get in inactive graph
84351c0b2f7Stbbdev //! \brief \ref error_guessing
84451c0b2f7Stbbdev TEST_CASE("try_get in inactive graph"){
84551c0b2f7Stbbdev     tbb::flow::graph g;
84651c0b2f7Stbbdev 
__anon8a33875c0a02(tbb::flow_control& fc) 847478de5b1Stbbdev     tbb::flow::input_node<int> src(g, [&](tbb::flow_control& fc) { fc.stop(); return 0;});
84851c0b2f7Stbbdev     deactivate_graph(g);
84951c0b2f7Stbbdev 
85051c0b2f7Stbbdev     int tmp = -1;
85151c0b2f7Stbbdev     CHECK_MESSAGE((src.try_get(tmp) == false), "try_get can not succeed");
85251c0b2f7Stbbdev 
85351c0b2f7Stbbdev     src.activate();
85451c0b2f7Stbbdev     tmp = -1;
85551c0b2f7Stbbdev     CHECK_MESSAGE((src.try_get(tmp) == false), "try_get can not succeed");
85651c0b2f7Stbbdev }
85751c0b2f7Stbbdev 
85851c0b2f7Stbbdev //! Test make_edge in inactive graph
85951c0b2f7Stbbdev //! \brief \ref error_guessing
86051c0b2f7Stbbdev TEST_CASE("Test make_edge in inactive graph"){
86151c0b2f7Stbbdev     tbb::flow::graph g;
86251c0b2f7Stbbdev 
__anon8a33875c0b02(const tbb::flow::continue_msg&)86351c0b2f7Stbbdev     tbb::flow::continue_node<int> c(g, [](const tbb::flow::continue_msg&){ return 1; });
86451c0b2f7Stbbdev 
86551c0b2f7Stbbdev     tbb::flow::function_node<int, int> f(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
86651c0b2f7Stbbdev 
86751c0b2f7Stbbdev     c.try_put(tbb::flow::continue_msg());
86851c0b2f7Stbbdev     g.wait_for_all();
86951c0b2f7Stbbdev 
87051c0b2f7Stbbdev     deactivate_graph(g);
87151c0b2f7Stbbdev 
87251c0b2f7Stbbdev     make_edge(c, f);
87351c0b2f7Stbbdev }
87451c0b2f7Stbbdev 
87551c0b2f7Stbbdev //! Test make_edge from overwrite_node in inactive graph
87651c0b2f7Stbbdev //! \brief \ref error_guessing
87751c0b2f7Stbbdev TEST_CASE("Test make_edge from overwrite_node in inactive graph"){
87851c0b2f7Stbbdev     tbb::flow::graph g;
87951c0b2f7Stbbdev 
88051c0b2f7Stbbdev     tbb::flow::queue_node<int> q(g);
88151c0b2f7Stbbdev 
88251c0b2f7Stbbdev     tbb::flow::overwrite_node<int> on(g);
88351c0b2f7Stbbdev 
88451c0b2f7Stbbdev     on.try_put(1);
88551c0b2f7Stbbdev     g.wait_for_all();
88651c0b2f7Stbbdev 
88751c0b2f7Stbbdev     deactivate_graph(g);
88851c0b2f7Stbbdev 
88951c0b2f7Stbbdev     make_edge(on, q);
89051c0b2f7Stbbdev 
89151c0b2f7Stbbdev     int tmp = -1;
89251c0b2f7Stbbdev     CHECK_MESSAGE((q.try_get(tmp) == false), "Message should not be passed on");
89351c0b2f7Stbbdev }
89451c0b2f7Stbbdev 
89551c0b2f7Stbbdev //! Test iterators directly
89651c0b2f7Stbbdev //! \brief \ref error_guessing
89751c0b2f7Stbbdev TEST_CASE("graph_iterator details"){
89851c0b2f7Stbbdev     tbb::flow::graph g;
89951c0b2f7Stbbdev     const tbb::flow::graph cg;
90051c0b2f7Stbbdev 
90151c0b2f7Stbbdev     tbb::flow::graph::iterator b = g.begin();
90251c0b2f7Stbbdev     tbb::flow::graph::iterator b2 = g.begin();
90351c0b2f7Stbbdev     ++b2;
90451c0b2f7Stbbdev     // Cast to a volatile pointer to workaround self assignment warnings from some compilers.
90551c0b2f7Stbbdev     tbb::flow::graph::iterator* volatile b2_ptr = &b2;
90651c0b2f7Stbbdev     b2 = *b2_ptr;
90751c0b2f7Stbbdev     b = b2;
90851c0b2f7Stbbdev     CHECK_MESSAGE((b == b2), "Assignment should make iterators equal");
90951c0b2f7Stbbdev }
91051c0b2f7Stbbdev 
91151c0b2f7Stbbdev //! const graph
91251c0b2f7Stbbdev //! \brief \ref error_guessing
91351c0b2f7Stbbdev TEST_CASE("const graph"){
91451c0b2f7Stbbdev     using namespace tbb::flow;
91551c0b2f7Stbbdev 
91651c0b2f7Stbbdev     const graph g;
91751c0b2f7Stbbdev     CHECK_MESSAGE((g.cbegin() == g.cend()), "Starting graph is empty");
91851c0b2f7Stbbdev     CHECK_MESSAGE((g.begin() == g.end()), "Starting graph is empty");
91951c0b2f7Stbbdev 
92051c0b2f7Stbbdev     graph g2;
92151c0b2f7Stbbdev     CHECK_MESSAGE((g2.begin() == g2.end()), "Starting graph is empty");
92251c0b2f7Stbbdev }
92351c0b2f7Stbbdev 
92451c0b2f7Stbbdev //! Send message to continue_node while graph is inactive
92551c0b2f7Stbbdev //! \brief \ref error_guessing
92651c0b2f7Stbbdev TEST_CASE("Send message to continue_node while graph is inactive") {
92751c0b2f7Stbbdev     using namespace tbb::flow;
92851c0b2f7Stbbdev 
92951c0b2f7Stbbdev     graph g;
93051c0b2f7Stbbdev 
__anon8a33875c0c02(const continue_msg&)93151c0b2f7Stbbdev     continue_node<int> c(g, [](const continue_msg&){ return 1; });
93251c0b2f7Stbbdev     buffer_node<int> b(g);
93351c0b2f7Stbbdev 
93451c0b2f7Stbbdev     make_edge(c, b);
93551c0b2f7Stbbdev 
93651c0b2f7Stbbdev     deactivate_graph(g);
93751c0b2f7Stbbdev 
93851c0b2f7Stbbdev     c.try_put(continue_msg());
93951c0b2f7Stbbdev     g.wait_for_all();
94051c0b2f7Stbbdev 
94151c0b2f7Stbbdev     int tmp = -1;
94251c0b2f7Stbbdev     CHECK_MESSAGE((b.try_get(tmp) == false), "Message should not arrive");
94351c0b2f7Stbbdev     CHECK_MESSAGE((tmp == -1), "Value should not be altered");
94451c0b2f7Stbbdev }
94551c0b2f7Stbbdev 
94651c0b2f7Stbbdev 
94751c0b2f7Stbbdev //! Bypass of a successor's message in a node with lightweight policy
94851c0b2f7Stbbdev //! \brief \ref error_guessing
94951c0b2f7Stbbdev TEST_CASE("Bypass of a successor's message in a node with lightweight policy") {
95051c0b2f7Stbbdev     using namespace tbb::flow;
95151c0b2f7Stbbdev 
95251c0b2f7Stbbdev     graph g;
95351c0b2f7Stbbdev 
__anon8a33875c0d02(const int&v)95451c0b2f7Stbbdev     auto body = [](const int&v)->int { return v * 2; };
95551c0b2f7Stbbdev     function_node<int, int, lightweight> f1(g, unlimited, body);
95651c0b2f7Stbbdev 
__anon8a33875c0e02(const int&v)95751c0b2f7Stbbdev     auto body2 = [](const int&v)->int {return v / 2;};
95851c0b2f7Stbbdev     function_node<int, int> f2(g, unlimited, body2);
95951c0b2f7Stbbdev 
96051c0b2f7Stbbdev     buffer_node<int> b(g);
96151c0b2f7Stbbdev 
96251c0b2f7Stbbdev     make_edge(f1, f2);
96351c0b2f7Stbbdev     make_edge(f2, b);
96451c0b2f7Stbbdev 
96551c0b2f7Stbbdev     f1.try_put(1);
96651c0b2f7Stbbdev     g.wait_for_all();
96751c0b2f7Stbbdev 
96851c0b2f7Stbbdev     int tmp = -1;
96951c0b2f7Stbbdev     CHECK_MESSAGE((b.try_get(tmp) == true), "Functional nodes can work in succession");
97051c0b2f7Stbbdev     CHECK_MESSAGE((tmp == 1), "Value should not be altered");
97151c0b2f7Stbbdev }
972