xref: /oneTBB/test/tbb/test_eh_flow_graph.cpp (revision 89b2e0e3)
151c0b2f7Stbbdev /*
2*89b2e0e3SOlga Malysheva     Copyright (c) 2005-2023 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
1751c0b2f7Stbbdev //! \file test_eh_flow_graph.cpp
1851c0b2f7Stbbdev //! \brief Test for [flow_graph.copy_body flow_graph.broadcast_node flow_graph.priority_queue_node flow_graph.indexer_node flow_graph.sequencer_node flow_graph.remove_edge flow_graph.join_node flow_graph.split_node flow_graph.limiter_node flow_graph.write_once_node flow_graph.overwrite_node flow_graph.make_edge flow_graph.graph flow_graph.buffer_node flow_graph.function_node flow_graph.multifunction_node flow_graph.continue_node flow_graph.input_node] specification
1951c0b2f7Stbbdev 
2051c0b2f7Stbbdev #include "common/config.h"
2151c0b2f7Stbbdev 
2251c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
2351c0b2f7Stbbdev #include "tbb/task_scheduler_observer.h"
2451c0b2f7Stbbdev #endif
2551c0b2f7Stbbdev #include "tbb/flow_graph.h"
26552f342bSPavel #include "tbb/global_control.h"
2751c0b2f7Stbbdev 
2851c0b2f7Stbbdev #include "common/test.h"
2951c0b2f7Stbbdev 
3051c0b2f7Stbbdev #if TBB_USE_EXCEPTIONS
3151c0b2f7Stbbdev 
3251c0b2f7Stbbdev #include "common/utils.h"
3351c0b2f7Stbbdev #include "common/checktype.h"
3451c0b2f7Stbbdev #include "common/concurrency_tracker.h"
3551c0b2f7Stbbdev 
3651c0b2f7Stbbdev #if _MSC_VER
3751c0b2f7Stbbdev     #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning
3851c0b2f7Stbbdev #endif
3951c0b2f7Stbbdev 
4051c0b2f7Stbbdev #if __TBB_MSVC_UNREACHABLE_CODE_IGNORED
4151c0b2f7Stbbdev     // Suppress "unreachable code" warning by VC++ 17.0-18.0 (VS 2012 or newer)
4251c0b2f7Stbbdev     #pragma warning (disable: 4702)
4351c0b2f7Stbbdev #endif
4451c0b2f7Stbbdev 
4551c0b2f7Stbbdev // global task_scheduler_observer is an imperfect tool to find how many threads are really
4651c0b2f7Stbbdev // participating.  That was the hope, but it counts the entries into the marketplace,
4751c0b2f7Stbbdev // not the arena.
4851c0b2f7Stbbdev // TODO: Consider using local task scheduler observer
4951c0b2f7Stbbdev // #define USE_TASK_SCHEDULER_OBSERVER 1
5051c0b2f7Stbbdev 
5151c0b2f7Stbbdev #include <iostream>
5251c0b2f7Stbbdev #include <sstream>
5351c0b2f7Stbbdev #include <vector>
5451c0b2f7Stbbdev 
5551c0b2f7Stbbdev #include "common/exception_handling.h"
5651c0b2f7Stbbdev 
5751c0b2f7Stbbdev #include <stdexcept>
5851c0b2f7Stbbdev 
5951c0b2f7Stbbdev #define NUM_ITEMS 15
6051c0b2f7Stbbdev int g_NumItems;
6151c0b2f7Stbbdev 
6251c0b2f7Stbbdev std::atomic<unsigned> nExceptions;
6351c0b2f7Stbbdev std::atomic<intptr_t> g_TGCCancelled;
6451c0b2f7Stbbdev 
6551c0b2f7Stbbdev enum TestNodeTypeEnum { nonThrowing, isThrowing };
6651c0b2f7Stbbdev 
6751c0b2f7Stbbdev static const size_t unlimited_type = 0;
6851c0b2f7Stbbdev static const size_t serial_type = 1;
6951c0b2f7Stbbdev static const size_t limited_type = 4;
7051c0b2f7Stbbdev 
7151c0b2f7Stbbdev template<TestNodeTypeEnum T> struct TestNodeTypeName;
nameTestNodeTypeName7251c0b2f7Stbbdev template<> struct TestNodeTypeName<nonThrowing> { static const char *name() { return "nonThrowing"; } };
nameTestNodeTypeName7351c0b2f7Stbbdev template<> struct TestNodeTypeName<isThrowing> { static const char *name() { return "isThrowing"; } };
7451c0b2f7Stbbdev 
7551c0b2f7Stbbdev template<size_t Conc> struct concurrencyName;
nameconcurrencyName7651c0b2f7Stbbdev template<> struct concurrencyName<serial_type>{ static const char *name() { return "serial"; } };
nameconcurrencyName7751c0b2f7Stbbdev template<> struct concurrencyName<unlimited_type>{ static const char *name() { return "unlimited"; } };
nameconcurrencyName7851c0b2f7Stbbdev template<> struct concurrencyName<limited_type>{ static const char *name() { return "limited"; } };
7951c0b2f7Stbbdev 
8051c0b2f7Stbbdev // Class that provides waiting and throwing behavior.  If we are not throwing, do nothing
8151c0b2f7Stbbdev // If serial, we can't wait for concurrency to peak; we may be the bottleneck and will
8251c0b2f7Stbbdev // stop further processing.  We will execute g_NumThreads + 10 times (the "10" is somewhat
8351c0b2f7Stbbdev // arbitrary, and just makes sure there are enough items in the graph to keep it flowing),
8451c0b2f7Stbbdev // If parallel or serial and throwing, use utils::ConcurrencyTracker to wait.
8551c0b2f7Stbbdev 
8651c0b2f7Stbbdev template<size_t Conc, TestNodeTypeEnum t = nonThrowing>
8751c0b2f7Stbbdev class WaitThrow;
8851c0b2f7Stbbdev 
8951c0b2f7Stbbdev template<>
9051c0b2f7Stbbdev class WaitThrow<serial_type,nonThrowing> {
9151c0b2f7Stbbdev protected:
WaitAndThrow(int cnt,const char *)9251c0b2f7Stbbdev     void WaitAndThrow(int cnt, const char * /*name*/) {
9351c0b2f7Stbbdev         if(cnt > g_NumThreads + 10) {
9451c0b2f7Stbbdev             utils::ConcurrencyTracker ct;
9551c0b2f7Stbbdev             WaitUntilConcurrencyPeaks();
9651c0b2f7Stbbdev         }
9751c0b2f7Stbbdev     }
9851c0b2f7Stbbdev };
9951c0b2f7Stbbdev 
10051c0b2f7Stbbdev template<>
10151c0b2f7Stbbdev class WaitThrow<serial_type,isThrowing> {
10251c0b2f7Stbbdev protected:
WaitAndThrow(int cnt,const char *)10351c0b2f7Stbbdev     void WaitAndThrow(int cnt, const char * /*name*/) {
10451c0b2f7Stbbdev         if(cnt > g_NumThreads + 10) {
10551c0b2f7Stbbdev             utils::ConcurrencyTracker ct;
10651c0b2f7Stbbdev             WaitUntilConcurrencyPeaks();
10751c0b2f7Stbbdev             ThrowTestException(1);
10851c0b2f7Stbbdev         }
10951c0b2f7Stbbdev     }
11051c0b2f7Stbbdev };
11151c0b2f7Stbbdev 
11251c0b2f7Stbbdev // for nodes with limited concurrency, if that concurrency is < g_NumThreads, we need
11351c0b2f7Stbbdev // to make sure enough other nodes wait for concurrency to peak.  If we are attached to
11451c0b2f7Stbbdev // N successors, for each item we pass to a successor, we will get N executions of the
11551c0b2f7Stbbdev // "absorbers" (because we broadcast to successors.)  for an odd number of threads we
11651c0b2f7Stbbdev // need (g_NumThreads - limited + 1) / 2 items (that will give us one extra execution
11751c0b2f7Stbbdev // of an "absorber", but we can't change that without changing the behavior of the node.)
11851c0b2f7Stbbdev template<>
11951c0b2f7Stbbdev class WaitThrow<limited_type,nonThrowing> {
12051c0b2f7Stbbdev protected:
WaitAndThrow(int cnt,const char *)12151c0b2f7Stbbdev     void WaitAndThrow(int cnt, const char * /*name*/) {
12251c0b2f7Stbbdev         if(cnt <= (g_NumThreads - (int)limited_type + 1)/2) {
12351c0b2f7Stbbdev             return;
12451c0b2f7Stbbdev         }
12551c0b2f7Stbbdev         utils::ConcurrencyTracker ct;
12651c0b2f7Stbbdev         WaitUntilConcurrencyPeaks();
12751c0b2f7Stbbdev     }
12851c0b2f7Stbbdev };
12951c0b2f7Stbbdev 
13051c0b2f7Stbbdev template<>
13151c0b2f7Stbbdev class WaitThrow<limited_type,isThrowing> {
13251c0b2f7Stbbdev protected:
WaitAndThrow(int cnt,const char *)13351c0b2f7Stbbdev     void WaitAndThrow(int cnt, const char * /*name*/) {
13451c0b2f7Stbbdev         utils::ConcurrencyTracker ct;
13551c0b2f7Stbbdev         if(cnt <= (g_NumThreads - (int)limited_type + 1)/2) {
13651c0b2f7Stbbdev             return;
13751c0b2f7Stbbdev         }
13851c0b2f7Stbbdev         WaitUntilConcurrencyPeaks();
13951c0b2f7Stbbdev         ThrowTestException(1);
14051c0b2f7Stbbdev     }
14151c0b2f7Stbbdev };
14251c0b2f7Stbbdev 
14351c0b2f7Stbbdev template<>
14451c0b2f7Stbbdev class WaitThrow<unlimited_type,nonThrowing> {
14551c0b2f7Stbbdev protected:
WaitAndThrow(int,const char *)14651c0b2f7Stbbdev     void WaitAndThrow(int /*cnt*/, const char * /*name*/) {
14751c0b2f7Stbbdev         utils::ConcurrencyTracker ct;
14851c0b2f7Stbbdev         WaitUntilConcurrencyPeaks();
14951c0b2f7Stbbdev     }
15051c0b2f7Stbbdev };
15151c0b2f7Stbbdev 
15251c0b2f7Stbbdev template<>
15351c0b2f7Stbbdev class WaitThrow<unlimited_type,isThrowing> {
15451c0b2f7Stbbdev protected:
WaitAndThrow(int,const char *)15551c0b2f7Stbbdev     void WaitAndThrow(int /*cnt*/, const char * /*name*/) {
15651c0b2f7Stbbdev         utils::ConcurrencyTracker ct;
15751c0b2f7Stbbdev         WaitUntilConcurrencyPeaks();
15851c0b2f7Stbbdev         ThrowTestException(1);
15951c0b2f7Stbbdev     }
16051c0b2f7Stbbdev };
16151c0b2f7Stbbdev 
16251c0b2f7Stbbdev void
ResetGlobals(bool throwException=true,bool flog=false)16351c0b2f7Stbbdev ResetGlobals(bool throwException = true, bool flog = false) {
16451c0b2f7Stbbdev     nExceptions = 0;
16551c0b2f7Stbbdev     g_TGCCancelled = 0;
16651c0b2f7Stbbdev     ResetEhGlobals(throwException, flog);
16751c0b2f7Stbbdev }
16851c0b2f7Stbbdev 
16951c0b2f7Stbbdev // -------input_node body ------------------
17051c0b2f7Stbbdev template <class OutputType, TestNodeTypeEnum TType>
17151c0b2f7Stbbdev class test_input_body : WaitThrow<serial_type, TType> {
17251c0b2f7Stbbdev     using WaitThrow<serial_type, TType>::WaitAndThrow;
17351c0b2f7Stbbdev     std::atomic<int> *my_current_val;
17451c0b2f7Stbbdev     int my_mult;
17551c0b2f7Stbbdev public:
test_input_body(std::atomic<int> & my_cnt,int multiplier=1)17651c0b2f7Stbbdev     test_input_body(std::atomic<int> &my_cnt, int multiplier = 1) : my_current_val(&my_cnt), my_mult(multiplier) {
17751c0b2f7Stbbdev         // INFO("- --------- - - -   constructed " << (size_t)(my_current_val) << "\n");
17851c0b2f7Stbbdev     }
17951c0b2f7Stbbdev 
operator ()(tbb::flow_control & fc)18051c0b2f7Stbbdev     OutputType operator()(tbb::flow_control& fc) {
18151c0b2f7Stbbdev         UPDATE_COUNTS();
18251c0b2f7Stbbdev         OutputType ret = OutputType(my_mult * ++(*my_current_val));
18351c0b2f7Stbbdev         // TODO revamp: reconsider logging for the tests.
18451c0b2f7Stbbdev 
18551c0b2f7Stbbdev         // The following line is known to cause double frees. Therefore, commenting out frequent
18651c0b2f7Stbbdev         // calls to INFO() macro.
18751c0b2f7Stbbdev 
18851c0b2f7Stbbdev         // INFO("xx(" << (size_t)(my_current_val) << ") ret == " << (int)ret << "\n");
18951c0b2f7Stbbdev         if(*my_current_val > g_NumItems) {
19051c0b2f7Stbbdev             // INFO(" ------ End of the line!\n");
19151c0b2f7Stbbdev             *my_current_val = g_NumItems;
19251c0b2f7Stbbdev             fc.stop();
19351c0b2f7Stbbdev             return OutputType();
19451c0b2f7Stbbdev         }
19551c0b2f7Stbbdev         WaitAndThrow((int)ret,"test_input_body");
19651c0b2f7Stbbdev         return ret;
19751c0b2f7Stbbdev     }
19851c0b2f7Stbbdev 
count_value()19951c0b2f7Stbbdev     int count_value() { return (int)*my_current_val; }
20051c0b2f7Stbbdev };
20151c0b2f7Stbbdev 
20251c0b2f7Stbbdev template <TestNodeTypeEnum TType>
20351c0b2f7Stbbdev class test_input_body<tbb::flow::continue_msg, TType> : WaitThrow<serial_type, TType> {
20451c0b2f7Stbbdev     using WaitThrow<serial_type, TType>::WaitAndThrow;
20551c0b2f7Stbbdev     std::atomic<int> *my_current_val;
20651c0b2f7Stbbdev public:
test_input_body(std::atomic<int> & my_cnt)20751c0b2f7Stbbdev     test_input_body(std::atomic<int> &my_cnt) : my_current_val(&my_cnt) { }
20851c0b2f7Stbbdev 
operator ()(tbb::flow_control & fc)20951c0b2f7Stbbdev     tbb::flow::continue_msg operator()( tbb::flow_control & fc) {
21051c0b2f7Stbbdev         UPDATE_COUNTS();
21151c0b2f7Stbbdev         int outint = ++(*my_current_val);
21251c0b2f7Stbbdev         if(*my_current_val > g_NumItems) {
21351c0b2f7Stbbdev             *my_current_val = g_NumItems;
21451c0b2f7Stbbdev             fc.stop();
21551c0b2f7Stbbdev             return tbb::flow::continue_msg();
21651c0b2f7Stbbdev         }
21751c0b2f7Stbbdev         WaitAndThrow(outint,"test_input_body");
21851c0b2f7Stbbdev         return tbb::flow::continue_msg();
21951c0b2f7Stbbdev     }
22051c0b2f7Stbbdev 
count_value()22151c0b2f7Stbbdev     int count_value() { return (int)*my_current_val; }
22251c0b2f7Stbbdev };
22351c0b2f7Stbbdev 
22451c0b2f7Stbbdev // -------{function/continue}_node body ------------------
22551c0b2f7Stbbdev template<class InputType, class OutputType, TestNodeTypeEnum T, size_t Conc>
22651c0b2f7Stbbdev class absorber_body : WaitThrow<Conc,T> {
22751c0b2f7Stbbdev     using WaitThrow<Conc,T>::WaitAndThrow;
22851c0b2f7Stbbdev     std::atomic<int> *my_count;
22951c0b2f7Stbbdev public:
absorber_body(std::atomic<int> & my_cnt)23051c0b2f7Stbbdev     absorber_body(std::atomic<int> &my_cnt) : my_count(&my_cnt) { }
operator ()(const InputType &)23151c0b2f7Stbbdev     OutputType operator()(const InputType &/*p_in*/) {
23251c0b2f7Stbbdev         UPDATE_COUNTS();
23351c0b2f7Stbbdev         int out = ++(*my_count);
23451c0b2f7Stbbdev         WaitAndThrow(out,"absorber_body");
23551c0b2f7Stbbdev         return OutputType();
23651c0b2f7Stbbdev     }
count_value()23751c0b2f7Stbbdev     int count_value() { return *my_count; }
23851c0b2f7Stbbdev };
23951c0b2f7Stbbdev 
24051c0b2f7Stbbdev // -------multifunction_node body ------------------
24151c0b2f7Stbbdev 
24251c0b2f7Stbbdev // helper classes
24351c0b2f7Stbbdev template<int N,class PortsType>
24451c0b2f7Stbbdev struct IssueOutput {
24551c0b2f7Stbbdev     typedef typename std::tuple_element<N-1,PortsType>::type::output_type my_type;
24651c0b2f7Stbbdev 
issue_tuple_elementIssueOutput24751c0b2f7Stbbdev     static void issue_tuple_element( PortsType &my_ports) {
24851c0b2f7Stbbdev         CHECK_MESSAGE( (std::get<N-1>(my_ports).try_put(my_type())), "Error putting to successor");
24951c0b2f7Stbbdev         IssueOutput<N-1,PortsType>::issue_tuple_element(my_ports);
25051c0b2f7Stbbdev     }
25151c0b2f7Stbbdev };
25251c0b2f7Stbbdev 
25351c0b2f7Stbbdev template<class PortsType>
25451c0b2f7Stbbdev struct IssueOutput<1,PortsType> {
25551c0b2f7Stbbdev     typedef typename std::tuple_element<0,PortsType>::type::output_type my_type;
25651c0b2f7Stbbdev 
issue_tuple_elementIssueOutput25751c0b2f7Stbbdev     static void issue_tuple_element( PortsType &my_ports) {
25851c0b2f7Stbbdev         CHECK_MESSAGE( (std::get<0>(my_ports).try_put(my_type())), "Error putting to successor");
25951c0b2f7Stbbdev     }
26051c0b2f7Stbbdev };
26151c0b2f7Stbbdev 
26251c0b2f7Stbbdev template<class InputType, class OutputTupleType, TestNodeTypeEnum T, size_t Conc>
26351c0b2f7Stbbdev class multifunction_node_body : WaitThrow<Conc,T> {
26451c0b2f7Stbbdev     using WaitThrow<Conc,T>::WaitAndThrow;
26551c0b2f7Stbbdev     static const int N = std::tuple_size<OutputTupleType>::value;
26651c0b2f7Stbbdev     typedef typename tbb::flow::multifunction_node<InputType,OutputTupleType> NodeType;
26751c0b2f7Stbbdev     typedef typename NodeType::output_ports_type PortsType;
26851c0b2f7Stbbdev     std::atomic<int> *my_count;
26951c0b2f7Stbbdev public:
multifunction_node_body(std::atomic<int> & my_cnt)27051c0b2f7Stbbdev     multifunction_node_body(std::atomic<int> &my_cnt) : my_count(&my_cnt) { }
operator ()(const InputType &,PortsType & my_ports)27151c0b2f7Stbbdev     void operator()(const InputType& /*in*/, PortsType &my_ports) {
27251c0b2f7Stbbdev         UPDATE_COUNTS();
27351c0b2f7Stbbdev         int out = ++(*my_count);
27451c0b2f7Stbbdev         WaitAndThrow(out,"multifunction_node_body");
27551c0b2f7Stbbdev         // issue an item to each output port.
27651c0b2f7Stbbdev         IssueOutput<N,PortsType>::issue_tuple_element(my_ports);
27751c0b2f7Stbbdev     }
27851c0b2f7Stbbdev 
count_value()27951c0b2f7Stbbdev     int count_value() { return *my_count; }
28051c0b2f7Stbbdev };
28151c0b2f7Stbbdev 
28251c0b2f7Stbbdev // --------- body to sort items in sequencer_node
28351c0b2f7Stbbdev template<class BufferItemType>
28451c0b2f7Stbbdev struct sequencer_body {
operator ()sequencer_body28551c0b2f7Stbbdev     size_t operator()(const BufferItemType &s) {
28651c0b2f7Stbbdev         CHECK_MESSAGE( (s), "sequencer item out of range (== 0)");
28751c0b2f7Stbbdev         return size_t(s) - 1;
28851c0b2f7Stbbdev     }
28951c0b2f7Stbbdev };
29051c0b2f7Stbbdev 
29151c0b2f7Stbbdev // --------- type for < comparison in priority_queue_node.
29251c0b2f7Stbbdev template<class ItemType>
29351c0b2f7Stbbdev struct less_body {
operator ()less_body29451c0b2f7Stbbdev     bool operator()(const ItemType &lhs, const ItemType &rhs) {
29551c0b2f7Stbbdev         return (int(lhs) % 3) < (int(rhs) % 3);
29651c0b2f7Stbbdev     }
29751c0b2f7Stbbdev };
29851c0b2f7Stbbdev 
29951c0b2f7Stbbdev // --------- tag methods for tag_matching join_node
30051c0b2f7Stbbdev template<typename TT>
30151c0b2f7Stbbdev class tag_func {
30251c0b2f7Stbbdev     TT my_mult;
30351c0b2f7Stbbdev public:
tag_func(TT multiplier)30451c0b2f7Stbbdev     tag_func(TT multiplier) : my_mult(multiplier) { }
30551c0b2f7Stbbdev     // operator() will return [0 .. Count)
operator ()(TT v)30651c0b2f7Stbbdev     tbb::flow::tag_value operator()( TT v) {
30751c0b2f7Stbbdev         tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult);
30851c0b2f7Stbbdev         return t;
30951c0b2f7Stbbdev     }
31051c0b2f7Stbbdev };
31151c0b2f7Stbbdev 
31251c0b2f7Stbbdev // --------- Input body for split_node test.
31351c0b2f7Stbbdev template <class OutputTuple, TestNodeTypeEnum TType>
31451c0b2f7Stbbdev class tuple_test_input_body : WaitThrow<serial_type, TType> {
31551c0b2f7Stbbdev     typedef typename std::tuple_element<0,OutputTuple>::type ItemType0;
31651c0b2f7Stbbdev     typedef typename std::tuple_element<1,OutputTuple>::type ItemType1;
31751c0b2f7Stbbdev     using WaitThrow<serial_type, TType>::WaitAndThrow;
31851c0b2f7Stbbdev     std::atomic<int> *my_current_val;
31951c0b2f7Stbbdev public:
tuple_test_input_body(std::atomic<int> & my_cnt)32051c0b2f7Stbbdev     tuple_test_input_body(std::atomic<int> &my_cnt) : my_current_val(&my_cnt) { }
32151c0b2f7Stbbdev 
operator ()(tbb::flow_control & fc)32251c0b2f7Stbbdev     OutputTuple operator()(tbb::flow_control& fc) {
32351c0b2f7Stbbdev         UPDATE_COUNTS();
32451c0b2f7Stbbdev         int ival = ++(*my_current_val);
32551c0b2f7Stbbdev         if(*my_current_val > g_NumItems) {
32651c0b2f7Stbbdev             *my_current_val = g_NumItems;  // jam the final value; we assert on it later.
32751c0b2f7Stbbdev             fc.stop();
32851c0b2f7Stbbdev             return OutputTuple();
32951c0b2f7Stbbdev         }
33051c0b2f7Stbbdev         WaitAndThrow(ival,"tuple_test_input_body");
33151c0b2f7Stbbdev         return OutputTuple(ItemType0(ival),ItemType1(ival));
33251c0b2f7Stbbdev     }
33351c0b2f7Stbbdev 
count_value()33451c0b2f7Stbbdev     int count_value() { return (int)*my_current_val; }
33551c0b2f7Stbbdev };
33651c0b2f7Stbbdev 
33751c0b2f7Stbbdev // ------- end of node bodies
33851c0b2f7Stbbdev 
33951c0b2f7Stbbdev // input_node is only-serial.  input_node can throw, or the function_node can throw.
34051c0b2f7Stbbdev // graph being tested is
34151c0b2f7Stbbdev //
34251c0b2f7Stbbdev //      input_node+---+parallel function_node
34351c0b2f7Stbbdev //
34451c0b2f7Stbbdev //    After each run the graph is reset(), to test the reset functionality.
34551c0b2f7Stbbdev //
34651c0b2f7Stbbdev 
34751c0b2f7Stbbdev 
34851c0b2f7Stbbdev template<class ItemType, TestNodeTypeEnum inpThrowType, TestNodeTypeEnum absorbThrowType>
run_one_input_node_test(bool throwException,bool flog)34951c0b2f7Stbbdev void run_one_input_node_test(bool throwException, bool flog) {
35051c0b2f7Stbbdev     typedef test_input_body<ItemType,inpThrowType> src_body_type;
35151c0b2f7Stbbdev     typedef absorber_body<ItemType, tbb::flow::continue_msg, absorbThrowType, unlimited_type> parallel_absorb_body_type;
35251c0b2f7Stbbdev     std::atomic<int> input_body_count;
35351c0b2f7Stbbdev     std::atomic<int> absorber_body_count;
35451c0b2f7Stbbdev     input_body_count = 0;
35551c0b2f7Stbbdev     absorber_body_count = 0;
35651c0b2f7Stbbdev 
35751c0b2f7Stbbdev     tbb::flow::graph g;
35851c0b2f7Stbbdev 
35951c0b2f7Stbbdev     g_Master = std::this_thread::get_id();
36051c0b2f7Stbbdev 
36151c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
36251c0b2f7Stbbdev     eh_test_observer o;
36351c0b2f7Stbbdev     o.observe(true);
36451c0b2f7Stbbdev #endif
36551c0b2f7Stbbdev 
36651c0b2f7Stbbdev     tbb::flow::input_node<ItemType> sn(g, src_body_type(input_body_count));
36751c0b2f7Stbbdev     parallel_absorb_body_type ab2(absorber_body_count);
36851c0b2f7Stbbdev     tbb::flow::function_node<ItemType> parallel_fn(g,tbb::flow::unlimited,ab2);
36951c0b2f7Stbbdev     make_edge(sn, parallel_fn);
37051c0b2f7Stbbdev     for(int runcnt = 0; runcnt < 2; ++runcnt) {
37151c0b2f7Stbbdev         ResetGlobals(throwException,flog);
37251c0b2f7Stbbdev         if(throwException) {
37351c0b2f7Stbbdev             TRY();
37451c0b2f7Stbbdev                 sn.activate();
37551c0b2f7Stbbdev                 g.wait_for_all();
37651c0b2f7Stbbdev             CATCH_AND_ASSERT();
37751c0b2f7Stbbdev         }
37851c0b2f7Stbbdev         else {
37951c0b2f7Stbbdev             TRY();
38051c0b2f7Stbbdev                 sn.activate();
38151c0b2f7Stbbdev                 g.wait_for_all();
38251c0b2f7Stbbdev             CATCH_AND_FAIL();
38351c0b2f7Stbbdev         }
38451c0b2f7Stbbdev 
38551c0b2f7Stbbdev         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
38651c0b2f7Stbbdev         int src_cnt = tbb::flow::copy_body<src_body_type>(sn).count_value();
38751c0b2f7Stbbdev         int sink_cnt = tbb::flow::copy_body<parallel_absorb_body_type>(parallel_fn).count_value();
38851c0b2f7Stbbdev         if(throwException) {
38951c0b2f7Stbbdev             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception flag in flow::graph not set");
39051c0b2f7Stbbdev             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "canceled flag not set");
39151c0b2f7Stbbdev             CHECK_MESSAGE( (src_cnt <= g_NumItems), "Too many input_node items emitted");
39251c0b2f7Stbbdev             CHECK_MESSAGE( (sink_cnt <= src_cnt), "Too many input_node items received");
39351c0b2f7Stbbdev         }
39451c0b2f7Stbbdev         else {
39551c0b2f7Stbbdev             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
39651c0b2f7Stbbdev             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
39751c0b2f7Stbbdev             CHECK_MESSAGE( (src_cnt == g_NumItems), "Incorrect # input_node items emitted");
39851c0b2f7Stbbdev             CHECK_MESSAGE( (sink_cnt == src_cnt), "Incorrect # input_node items received");
39951c0b2f7Stbbdev         }
40051c0b2f7Stbbdev         g.reset();  // resets the body of the input_node and the absorb_nodes.
40151c0b2f7Stbbdev         input_body_count = 0;
40251c0b2f7Stbbdev         absorber_body_count = 0;
40351c0b2f7Stbbdev         CHECK_MESSAGE( (!g.exception_thrown()), "Reset didn't clear exception_thrown()");
40451c0b2f7Stbbdev         CHECK_MESSAGE( (!g.is_cancelled()), "Reset didn't clear is_cancelled()");
40551c0b2f7Stbbdev         src_cnt = tbb::flow::copy_body<src_body_type>(sn).count_value();
40651c0b2f7Stbbdev         sink_cnt = tbb::flow::copy_body<parallel_absorb_body_type>(parallel_fn).count_value();
40751c0b2f7Stbbdev         CHECK_MESSAGE( (src_cnt == 0), "input_node count not reset");
40851c0b2f7Stbbdev         CHECK_MESSAGE( (sink_cnt == 0), "sink_node count not reset");
40951c0b2f7Stbbdev     }
41051c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
41151c0b2f7Stbbdev     o.observe(false);
41251c0b2f7Stbbdev #endif
41351c0b2f7Stbbdev }  // run_one_input_node_test
41451c0b2f7Stbbdev 
41551c0b2f7Stbbdev 
41651c0b2f7Stbbdev template<class ItemType, TestNodeTypeEnum inpThrowType, TestNodeTypeEnum absorbThrowType>
run_input_node_test()41751c0b2f7Stbbdev void run_input_node_test() {
41851c0b2f7Stbbdev     run_one_input_node_test<ItemType,inpThrowType,absorbThrowType>(false,false);
41951c0b2f7Stbbdev     run_one_input_node_test<ItemType,inpThrowType,absorbThrowType>(true,false);
42051c0b2f7Stbbdev     run_one_input_node_test<ItemType,inpThrowType,absorbThrowType>(true,true);
42151c0b2f7Stbbdev }  // run_input_node_test
42251c0b2f7Stbbdev 
test_input_node()42351c0b2f7Stbbdev void test_input_node() {
42451c0b2f7Stbbdev     INFO("Testing input_node\n");
42551c0b2f7Stbbdev     CheckType<int>::check_type_counter = 0;
42651c0b2f7Stbbdev     g_Wakeup_Msg = "input_node(1): Missed wakeup or machine is overloaded?";
42751c0b2f7Stbbdev     run_input_node_test<CheckType<int>, isThrowing, nonThrowing>();
42851c0b2f7Stbbdev     CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test");
42951c0b2f7Stbbdev     g_Wakeup_Msg = "input_node(2): Missed wakeup or machine is overloaded?";
43051c0b2f7Stbbdev     run_input_node_test<int, isThrowing, nonThrowing>();
43151c0b2f7Stbbdev     g_Wakeup_Msg = "input_node(3): Missed wakeup or machine is overloaded?";
43251c0b2f7Stbbdev     run_input_node_test<int, nonThrowing, isThrowing>();
43351c0b2f7Stbbdev     g_Wakeup_Msg = "input_node(4): Missed wakeup or machine is overloaded?";
43451c0b2f7Stbbdev     run_input_node_test<int, isThrowing, isThrowing>();
43551c0b2f7Stbbdev     g_Wakeup_Msg = "input_node(5): Missed wakeup or machine is overloaded?";
43651c0b2f7Stbbdev     run_input_node_test<CheckType<int>, isThrowing, isThrowing>();
43751c0b2f7Stbbdev     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
43851c0b2f7Stbbdev     CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test");
43951c0b2f7Stbbdev }
44051c0b2f7Stbbdev 
44151c0b2f7Stbbdev // -------- utilities & types to test function_node and multifunction_node.
44251c0b2f7Stbbdev 
44351c0b2f7Stbbdev // need to tell the template which node type I am using so it attaches successors correctly.
44451c0b2f7Stbbdev enum NodeFetchType { func_node_type, multifunc_node_type };
44551c0b2f7Stbbdev 
44651c0b2f7Stbbdev template<class NodeType, class ItemType, int indx, NodeFetchType NFT>
44751c0b2f7Stbbdev struct AttachPoint;
44851c0b2f7Stbbdev 
44951c0b2f7Stbbdev template<class NodeType, class ItemType, int indx>
45051c0b2f7Stbbdev struct AttachPoint<NodeType,ItemType,indx,multifunc_node_type> {
GetSenderAttachPoint45151c0b2f7Stbbdev     static tbb::flow::sender<ItemType> &GetSender(NodeType &n) {
45251c0b2f7Stbbdev         return tbb::flow::output_port<indx>(n);
45351c0b2f7Stbbdev     }
45451c0b2f7Stbbdev };
45551c0b2f7Stbbdev 
45651c0b2f7Stbbdev template<class NodeType, class ItemType, int indx>
45751c0b2f7Stbbdev struct AttachPoint<NodeType,ItemType,indx,func_node_type> {
GetSenderAttachPoint45851c0b2f7Stbbdev     static tbb::flow::sender<ItemType> &GetSender(NodeType &n) {
45951c0b2f7Stbbdev         return n;
46051c0b2f7Stbbdev     }
46151c0b2f7Stbbdev };
46251c0b2f7Stbbdev 
46351c0b2f7Stbbdev 
46451c0b2f7Stbbdev // common template for running function_node, multifunction_node.  continue_node
46551c0b2f7Stbbdev // has different firing requirements, so it needs a different graph topology.
46651c0b2f7Stbbdev template<
46751c0b2f7Stbbdev     class InputNodeType,
46851c0b2f7Stbbdev     class InputNodeBodyType0,
46951c0b2f7Stbbdev     class InputNodeBodyType1,
47051c0b2f7Stbbdev     NodeFetchType NFT,
47151c0b2f7Stbbdev     class TestNodeType,
47251c0b2f7Stbbdev     class TestNodeBodyType,
47351c0b2f7Stbbdev     class TypeToSink0,          // what kind of item are we sending to sink0
47451c0b2f7Stbbdev     class TypeToSink1,          // what kind of item are we sending to sink1
47551c0b2f7Stbbdev     class SinkNodeType0,        // will be same for function;
47651c0b2f7Stbbdev     class SinkNodeType1,        // may differ for multifunction_node
47751c0b2f7Stbbdev     class SinkNodeBodyType0,
47851c0b2f7Stbbdev     class SinkNodeBodyType1,
47951c0b2f7Stbbdev     size_t Conc
48051c0b2f7Stbbdev     >
48151c0b2f7Stbbdev void
run_one_functype_node_test(bool throwException,bool flog,const char *)48251c0b2f7Stbbdev run_one_functype_node_test(bool throwException, bool flog, const char * /*name*/) {
48351c0b2f7Stbbdev 
48451c0b2f7Stbbdev     std::stringstream ss;
48551c0b2f7Stbbdev     char *saved_msg = const_cast<char *>(g_Wakeup_Msg);
48651c0b2f7Stbbdev     tbb::flow::graph g;
48751c0b2f7Stbbdev 
48851c0b2f7Stbbdev     std::atomic<int> input0_count;
48951c0b2f7Stbbdev     std::atomic<int> input1_count;
49051c0b2f7Stbbdev     std::atomic<int> sink0_count;
49151c0b2f7Stbbdev     std::atomic<int> sink1_count;
49251c0b2f7Stbbdev     std::atomic<int> test_count;
49351c0b2f7Stbbdev     input0_count = input1_count = sink0_count = sink1_count = test_count = 0;
49451c0b2f7Stbbdev 
49551c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
49651c0b2f7Stbbdev     eh_test_observer o;
49751c0b2f7Stbbdev     o.observe(true);
49851c0b2f7Stbbdev #endif
49951c0b2f7Stbbdev 
50051c0b2f7Stbbdev     g_Master = std::this_thread::get_id();
50151c0b2f7Stbbdev     InputNodeType input0(g, InputNodeBodyType0(input0_count));
50251c0b2f7Stbbdev     InputNodeType input1(g, InputNodeBodyType1(input1_count));
50351c0b2f7Stbbdev     TestNodeType node_to_test(g, Conc, TestNodeBodyType(test_count));
50451c0b2f7Stbbdev     SinkNodeType0 sink0(g,tbb::flow::unlimited,SinkNodeBodyType0(sink0_count));
50551c0b2f7Stbbdev     SinkNodeType1 sink1(g,tbb::flow::unlimited,SinkNodeBodyType1(sink1_count));
50651c0b2f7Stbbdev     make_edge(input0, node_to_test);
50751c0b2f7Stbbdev     make_edge(input1, node_to_test);
50851c0b2f7Stbbdev     make_edge(AttachPoint<TestNodeType, TypeToSink0, 0, NFT>::GetSender(node_to_test), sink0);
50951c0b2f7Stbbdev     make_edge(AttachPoint<TestNodeType, TypeToSink1, 1, NFT>::GetSender(node_to_test), sink1);
51051c0b2f7Stbbdev 
51151c0b2f7Stbbdev     for(int iter = 0; iter < 2; ++iter) {  // run, reset, run again
51251c0b2f7Stbbdev         ss.clear();
51351c0b2f7Stbbdev         ss << saved_msg << " iter=" << iter << ", threads=" << g_NumThreads << ", throw=" << (throwException ? "T" : "F") << ", flow=" << (flog ? "T" : "F");
51451c0b2f7Stbbdev         g_Wakeup_Msg = ss.str().c_str();
51551c0b2f7Stbbdev         ResetGlobals(throwException,flog);
51651c0b2f7Stbbdev         if(throwException) {
51751c0b2f7Stbbdev             TRY();
51851c0b2f7Stbbdev                 input0.activate();
51951c0b2f7Stbbdev                 input1.activate();
52051c0b2f7Stbbdev                 g.wait_for_all();
52151c0b2f7Stbbdev             CATCH_AND_ASSERT();
52251c0b2f7Stbbdev         }
52351c0b2f7Stbbdev         else {
52451c0b2f7Stbbdev             TRY();
52551c0b2f7Stbbdev                 input0.activate();
52651c0b2f7Stbbdev                 input1.activate();
52751c0b2f7Stbbdev                 g.wait_for_all();
52851c0b2f7Stbbdev             CATCH_AND_FAIL();
52951c0b2f7Stbbdev         }
53051c0b2f7Stbbdev         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
53151c0b2f7Stbbdev         int ib0_cnt = tbb::flow::copy_body<InputNodeBodyType0>(input0).count_value();
53251c0b2f7Stbbdev         int ib1_cnt = tbb::flow::copy_body<InputNodeBodyType1>(input1).count_value();
53351c0b2f7Stbbdev         int t_cnt   = tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value();
53451c0b2f7Stbbdev         int nb0_cnt = tbb::flow::copy_body<SinkNodeBodyType0>(sink0).count_value();
53551c0b2f7Stbbdev         int nb1_cnt = tbb::flow::copy_body<SinkNodeBodyType1>(sink1).count_value();
53651c0b2f7Stbbdev         if(throwException) {
53751c0b2f7Stbbdev             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
53851c0b2f7Stbbdev             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
53951c0b2f7Stbbdev             CHECK_MESSAGE( (ib0_cnt + ib1_cnt <= 2*g_NumItems), "Too many items sent by inputs");
54051c0b2f7Stbbdev             CHECK_MESSAGE( (ib0_cnt + ib1_cnt >= t_cnt), "Too many items received by test node");
54151c0b2f7Stbbdev             CHECK_MESSAGE( (nb0_cnt + nb1_cnt <= t_cnt*2), "Too many items received by sink nodes");
54251c0b2f7Stbbdev         }
54351c0b2f7Stbbdev         else {
54451c0b2f7Stbbdev             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
54551c0b2f7Stbbdev             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
54651c0b2f7Stbbdev             CHECK_MESSAGE( (ib0_cnt + ib1_cnt == 2*g_NumItems), "Missing invocations of input_nodes");
54751c0b2f7Stbbdev             CHECK_MESSAGE( (t_cnt == 2*g_NumItems), "Not all items reached test node");
54851c0b2f7Stbbdev             CHECK_MESSAGE( (nb0_cnt == 2*g_NumItems && nb1_cnt == 2*g_NumItems), "Missing items in absorbers");
54951c0b2f7Stbbdev         }
55051c0b2f7Stbbdev         g.reset();  // resets the body of the input_nodes, test_node and the absorb_nodes.
55151c0b2f7Stbbdev         input0_count = input1_count = sink0_count = sink1_count = test_count = 0;
55251c0b2f7Stbbdev         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType0>(input0).count_value()),"Reset input 0 failed");
55351c0b2f7Stbbdev         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType1>(input1).count_value()),"Reset input 1 failed");
55451c0b2f7Stbbdev         CHECK_MESSAGE( (0 == tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value()),"Reset test_node failed");
55551c0b2f7Stbbdev         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType0>(sink0).count_value()),"Reset sink 0 failed");
55651c0b2f7Stbbdev         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType1>(sink1).count_value()),"Reset sink 1 failed");
55751c0b2f7Stbbdev 
55851c0b2f7Stbbdev         g_Wakeup_Msg = saved_msg;
55951c0b2f7Stbbdev     }
56051c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
56151c0b2f7Stbbdev     o.observe(false);
56251c0b2f7Stbbdev #endif
56351c0b2f7Stbbdev }
56451c0b2f7Stbbdev 
56551c0b2f7Stbbdev //  Test function_node
56651c0b2f7Stbbdev //
56751c0b2f7Stbbdev // graph being tested is
56851c0b2f7Stbbdev //
56951c0b2f7Stbbdev //         input_node -\                 /- parallel function_node
57051c0b2f7Stbbdev //                      \               /
57151c0b2f7Stbbdev //                       +function_node+
57251c0b2f7Stbbdev //                      /               \                                  x
57351c0b2f7Stbbdev //         input_node -/                 \- parallel function_node
57451c0b2f7Stbbdev //
57551c0b2f7Stbbdev //    After each run the graph is reset(), to test the reset functionality.
57651c0b2f7Stbbdev //
57751c0b2f7Stbbdev template<
57851c0b2f7Stbbdev     TestNodeTypeEnum IType1,                          // does input node 1 throw?
57951c0b2f7Stbbdev     TestNodeTypeEnum IType2,                          // does input node 2 throw?
58051c0b2f7Stbbdev     class Item12,                                     // type of item passed between inputs and test node
58151c0b2f7Stbbdev     TestNodeTypeEnum FType,                           // does function node throw?
58251c0b2f7Stbbdev     class Item23,                                     // type passed from function_node to sink nodes
58351c0b2f7Stbbdev     TestNodeTypeEnum NType1,                          // does sink node 1 throw?
58451c0b2f7Stbbdev     TestNodeTypeEnum NType2,                          // does sink node 1 throw?
58551c0b2f7Stbbdev     class NodePolicy,                                 // rejecting,queueing
58651c0b2f7Stbbdev     size_t Conc                                       // is node concurrent? {serial | limited | unlimited}
58751c0b2f7Stbbdev >
run_function_node_test()58851c0b2f7Stbbdev void run_function_node_test() {
58951c0b2f7Stbbdev 
59051c0b2f7Stbbdev     typedef test_input_body<Item12,IType1> IBodyType1;
59151c0b2f7Stbbdev     typedef test_input_body<Item12,IType2> IBodyType2;
59251c0b2f7Stbbdev     typedef absorber_body<Item12, Item23, FType, Conc> TestBodyType;
59351c0b2f7Stbbdev     typedef absorber_body<Item23,tbb::flow::continue_msg, NType1, unlimited_type> SinkBodyType1;
59451c0b2f7Stbbdev     typedef absorber_body<Item23,tbb::flow::continue_msg, NType2, unlimited_type> SinkBodyType2;
59551c0b2f7Stbbdev 
59651c0b2f7Stbbdev     typedef tbb::flow::input_node<Item12> InputType;
59751c0b2f7Stbbdev     typedef tbb::flow::function_node<Item12, Item23, NodePolicy> TestType;
59851c0b2f7Stbbdev     typedef tbb::flow::function_node<Item23,tbb::flow::continue_msg> SnkType;
59951c0b2f7Stbbdev 
60051c0b2f7Stbbdev     for(int i = 0; i < 4; ++i ) {
60151c0b2f7Stbbdev         if(i != 2) {  // doesn't make sense to flog a non-throwing test
60251c0b2f7Stbbdev             bool doThrow = (i & 0x1) != 0;
60351c0b2f7Stbbdev             bool doFlog = (i & 0x2) != 0;
60451c0b2f7Stbbdev             run_one_functype_node_test<
60551c0b2f7Stbbdev                 /*InputNodeType*/       InputType,
60651c0b2f7Stbbdev                 /*InputNodeBodyType0*/  IBodyType1,
60751c0b2f7Stbbdev                 /*InputNodeBodyType1*/  IBodyType2,
60851c0b2f7Stbbdev                 /* NFT */               func_node_type,
60951c0b2f7Stbbdev                 /*TestNodeType*/        TestType,
61051c0b2f7Stbbdev                 /*TestNodeBodyType*/    TestBodyType,
61151c0b2f7Stbbdev                 /*TypeToSink0 */        Item23,
61251c0b2f7Stbbdev                 /*TypeToSink1 */        Item23,
61351c0b2f7Stbbdev                 /*SinkNodeType0*/       SnkType,
61451c0b2f7Stbbdev                 /*SinkNodeType1*/       SnkType,
61551c0b2f7Stbbdev                 /*SinkNodeBodyType1*/   SinkBodyType1,
61651c0b2f7Stbbdev                 /*SinkNodeBodyType2*/   SinkBodyType2,
61751c0b2f7Stbbdev                 /*Conc*/                Conc>
61851c0b2f7Stbbdev                     (doThrow,doFlog,"function_node");
61951c0b2f7Stbbdev         }
62051c0b2f7Stbbdev     }
62151c0b2f7Stbbdev }  // run_function_node_test
62251c0b2f7Stbbdev 
test_function_node()62351c0b2f7Stbbdev void test_function_node() {
62451c0b2f7Stbbdev     INFO("Testing function_node\n");
62551c0b2f7Stbbdev     // serial rejecting
62651c0b2f7Stbbdev     g_Wakeup_Msg = "function_node(1a): Missed wakeup or machine is overloaded?";
62751c0b2f7Stbbdev     run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
62851c0b2f7Stbbdev     g_Wakeup_Msg = "function_node(1b): Missed wakeup or machine is overloaded?";
62951c0b2f7Stbbdev     run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
63051c0b2f7Stbbdev     g_Wakeup_Msg = "function_node(1c): Missed wakeup or machine is overloaded?";
63151c0b2f7Stbbdev     run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, isThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
63251c0b2f7Stbbdev 
63351c0b2f7Stbbdev     // serial queueing
63451c0b2f7Stbbdev     g_Wakeup_Msg = "function_node(2): Missed wakeup or machine is overloaded?";
63551c0b2f7Stbbdev     run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
63651c0b2f7Stbbdev     run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
63751c0b2f7Stbbdev     run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
63851c0b2f7Stbbdev     CheckType<int>::check_type_counter = 0;
63951c0b2f7Stbbdev     run_function_node_test<nonThrowing, nonThrowing, CheckType<int>, nonThrowing, CheckType<int>, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
64051c0b2f7Stbbdev     CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test");
64151c0b2f7Stbbdev 
64251c0b2f7Stbbdev     // unlimited parallel rejecting
64351c0b2f7Stbbdev     g_Wakeup_Msg = "function_node(3): Missed wakeup or machine is overloaded?";
64451c0b2f7Stbbdev     run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
64551c0b2f7Stbbdev     run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
64651c0b2f7Stbbdev     run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
64751c0b2f7Stbbdev 
64851c0b2f7Stbbdev     // limited parallel rejecting
64951c0b2f7Stbbdev     g_Wakeup_Msg = "function_node(4): Missed wakeup or machine is overloaded?";
65051c0b2f7Stbbdev     run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, limited_type>();
65151c0b2f7Stbbdev     run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, (size_t)limited_type>();
65251c0b2f7Stbbdev     run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::rejecting, (size_t)limited_type>();
65351c0b2f7Stbbdev 
65451c0b2f7Stbbdev     // limited parallel queueing
65551c0b2f7Stbbdev     g_Wakeup_Msg = "function_node(5): Missed wakeup or machine is overloaded?";
65651c0b2f7Stbbdev     run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
65751c0b2f7Stbbdev     run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
65851c0b2f7Stbbdev     run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::queueing, (size_t)limited_type>();
65951c0b2f7Stbbdev 
66051c0b2f7Stbbdev     // everyone throwing
66151c0b2f7Stbbdev     g_Wakeup_Msg = "function_node(6): Missed wakeup or machine is overloaded?";
66251c0b2f7Stbbdev     run_function_node_test<isThrowing, isThrowing, int, isThrowing, int, isThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
66351c0b2f7Stbbdev     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
66451c0b2f7Stbbdev }
66551c0b2f7Stbbdev 
66651c0b2f7Stbbdev // ----------------------------------- multifunction_node ----------------------------------
66751c0b2f7Stbbdev //  Test multifunction_node.
66851c0b2f7Stbbdev //
66951c0b2f7Stbbdev // graph being tested is
67051c0b2f7Stbbdev //
67151c0b2f7Stbbdev //         input_node -\                      /- parallel function_node
67251c0b2f7Stbbdev //                      \                    /
67351c0b2f7Stbbdev //                       +multifunction_node+
67451c0b2f7Stbbdev //                      /                    \                                  x
67551c0b2f7Stbbdev //         input_node -/                      \- parallel function_node
67651c0b2f7Stbbdev //
67751c0b2f7Stbbdev //    After each run the graph is reset(), to test the reset functionality.  The
67851c0b2f7Stbbdev //    multifunction_node will put an item to each successor for every item
67951c0b2f7Stbbdev //    received.
68051c0b2f7Stbbdev //
68151c0b2f7Stbbdev template<
68251c0b2f7Stbbdev     TestNodeTypeEnum IType0,                          // does input node 1 throw?
68351c0b2f7Stbbdev     TestNodeTypeEnum IType1,                          // does input node 2 thorw?
68451c0b2f7Stbbdev     class Item12,                                 // type of item passed between inputs and test node
68551c0b2f7Stbbdev     TestNodeTypeEnum FType,                           // does multifunction node throw?
68651c0b2f7Stbbdev     class ItemTuple,                              // tuple of types passed from multifunction_node to sink nodes
68751c0b2f7Stbbdev     TestNodeTypeEnum NType1,                          // does sink node 1 throw?
68851c0b2f7Stbbdev     TestNodeTypeEnum NType2,                          // does sink node 2 throw?
68951c0b2f7Stbbdev     class  NodePolicy,                            // rejecting,queueing
69051c0b2f7Stbbdev     size_t Conc                                   // is node concurrent? {serial | limited | unlimited}
69151c0b2f7Stbbdev >
run_multifunction_node_test()69251c0b2f7Stbbdev void run_multifunction_node_test() {
69351c0b2f7Stbbdev 
69451c0b2f7Stbbdev     typedef typename std::tuple_element<0,ItemTuple>::type Item23Type0;
69551c0b2f7Stbbdev     typedef typename std::tuple_element<1,ItemTuple>::type Item23Type1;
69651c0b2f7Stbbdev     typedef test_input_body<Item12,IType0> IBodyType1;
69751c0b2f7Stbbdev     typedef test_input_body<Item12,IType1> IBodyType2;
69851c0b2f7Stbbdev     typedef multifunction_node_body<Item12, ItemTuple, FType, Conc> TestBodyType;
69951c0b2f7Stbbdev     typedef absorber_body<Item23Type0,tbb::flow::continue_msg, NType1, unlimited_type> SinkBodyType1;
70051c0b2f7Stbbdev     typedef absorber_body<Item23Type1,tbb::flow::continue_msg, NType2, unlimited_type> SinkBodyType2;
70151c0b2f7Stbbdev 
70251c0b2f7Stbbdev     typedef tbb::flow::input_node<Item12> InputType;
70351c0b2f7Stbbdev     typedef tbb::flow::multifunction_node<Item12, ItemTuple, NodePolicy> TestType;
70451c0b2f7Stbbdev     typedef tbb::flow::function_node<Item23Type0,tbb::flow::continue_msg> SnkType0;
70551c0b2f7Stbbdev     typedef tbb::flow::function_node<Item23Type1,tbb::flow::continue_msg> SnkType1;
70651c0b2f7Stbbdev 
70751c0b2f7Stbbdev     for(int i = 0; i < 4; ++i ) {
70851c0b2f7Stbbdev         if(i != 2) {  // doesn't make sense to flog a non-throwing test
70951c0b2f7Stbbdev             bool doThrow = (i & 0x1) != 0;
71051c0b2f7Stbbdev             bool doFlog = (i & 0x2) != 0;
71151c0b2f7Stbbdev     run_one_functype_node_test<
71251c0b2f7Stbbdev         /*InputNodeType*/       InputType,
71351c0b2f7Stbbdev         /*InputNodeBodyType0*/  IBodyType1,
71451c0b2f7Stbbdev         /*InputNodeBodyType1*/  IBodyType2,
71551c0b2f7Stbbdev         /*NFT*/                 multifunc_node_type,
71651c0b2f7Stbbdev         /*TestNodeType*/        TestType,
71751c0b2f7Stbbdev         /*TestNodeBodyType*/    TestBodyType,
71851c0b2f7Stbbdev         /*TypeToSink0*/         Item23Type0,
71951c0b2f7Stbbdev         /*TypeToSink1*/         Item23Type1,
72051c0b2f7Stbbdev         /*SinkNodeType0*/       SnkType0,
72151c0b2f7Stbbdev         /*SinkNodeType1*/       SnkType1,
72251c0b2f7Stbbdev         /*SinkNodeBodyType0*/   SinkBodyType1,
72351c0b2f7Stbbdev         /*SinkNodeBodyType1*/   SinkBodyType2,
72451c0b2f7Stbbdev         /*Conc*/                Conc>
72551c0b2f7Stbbdev             (doThrow,doFlog,"multifunction_node");
72651c0b2f7Stbbdev         }
72751c0b2f7Stbbdev     }
72851c0b2f7Stbbdev }  // run_multifunction_node_test
72951c0b2f7Stbbdev 
test_multifunction_node()73051c0b2f7Stbbdev void test_multifunction_node() {
73151c0b2f7Stbbdev     INFO("Testing multifunction_node\n");
73251c0b2f7Stbbdev     g_Wakeup_Msg = "multifunction_node(input throws,rejecting,serial): Missed wakeup or machine is overloaded?";
73351c0b2f7Stbbdev     // serial rejecting
73451c0b2f7Stbbdev     run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,float>, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
73551c0b2f7Stbbdev     g_Wakeup_Msg = "multifunction_node(test throws,rejecting,serial): Missed wakeup or machine is overloaded?";
73651c0b2f7Stbbdev     run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
73751c0b2f7Stbbdev     g_Wakeup_Msg = "multifunction_node(sink throws,rejecting,serial): Missed wakeup or machine is overloaded?";
73851c0b2f7Stbbdev     run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, isThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
73951c0b2f7Stbbdev 
74051c0b2f7Stbbdev     g_Wakeup_Msg = "multifunction_node(2): Missed wakeup or machine is overloaded?";
74151c0b2f7Stbbdev     // serial queueing
74251c0b2f7Stbbdev     run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
74351c0b2f7Stbbdev     run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
74451c0b2f7Stbbdev     run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
74551c0b2f7Stbbdev     CheckType<int>::check_type_counter = 0;
74651c0b2f7Stbbdev     run_multifunction_node_test<nonThrowing, nonThrowing, CheckType<int>, nonThrowing, std::tuple<CheckType<int>, CheckType<int> >, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
74751c0b2f7Stbbdev     CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test");
74851c0b2f7Stbbdev 
74951c0b2f7Stbbdev     g_Wakeup_Msg = "multifunction_node(3): Missed wakeup or machine is overloaded?";
75051c0b2f7Stbbdev     // unlimited parallel rejecting
75151c0b2f7Stbbdev     run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
75251c0b2f7Stbbdev     run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
75351c0b2f7Stbbdev     run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
75451c0b2f7Stbbdev 
75551c0b2f7Stbbdev     g_Wakeup_Msg = "multifunction_node(4): Missed wakeup or machine is overloaded?";
75651c0b2f7Stbbdev     // limited parallel rejecting
75751c0b2f7Stbbdev     run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, limited_type>();
75851c0b2f7Stbbdev     run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, (size_t)limited_type>();
75951c0b2f7Stbbdev     run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::rejecting, (size_t)limited_type>();
76051c0b2f7Stbbdev 
76151c0b2f7Stbbdev     g_Wakeup_Msg = "multifunction_node(5): Missed wakeup or machine is overloaded?";
76251c0b2f7Stbbdev     // limited parallel queueing
76351c0b2f7Stbbdev     run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
76451c0b2f7Stbbdev     run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
76551c0b2f7Stbbdev     run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::queueing, (size_t)limited_type>();
76651c0b2f7Stbbdev 
76751c0b2f7Stbbdev     g_Wakeup_Msg = "multifunction_node(6): Missed wakeup or machine is overloaded?";
76851c0b2f7Stbbdev     // everyone throwing
76951c0b2f7Stbbdev     run_multifunction_node_test<isThrowing, isThrowing, int, isThrowing, std::tuple<int,int>, isThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
77051c0b2f7Stbbdev     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
77151c0b2f7Stbbdev }
77251c0b2f7Stbbdev 
77351c0b2f7Stbbdev //
77451c0b2f7Stbbdev // Continue node has T predecessors.  when it receives messages (continue_msg) on T predecessors
77551c0b2f7Stbbdev // it executes the body of the node, and forwards a continue_msg to its successors.
77651c0b2f7Stbbdev // However many predecessors the continue_node has, that's how many continue_msgs it receives
77751c0b2f7Stbbdev // on input before forwarding a message.
77851c0b2f7Stbbdev //
77951c0b2f7Stbbdev // The graph will look like
78051c0b2f7Stbbdev //
78151c0b2f7Stbbdev //                                          +broadcast_node+
78251c0b2f7Stbbdev //                                         /                \             ___
78351c0b2f7Stbbdev //       input_node+------>+broadcast_node+                  +continue_node+--->+absorber
78451c0b2f7Stbbdev //                                         \                /
78551c0b2f7Stbbdev //                                          +broadcast_node+
78651c0b2f7Stbbdev //
78751c0b2f7Stbbdev // The continue_node has unlimited parallelism, no input buffering, and broadcasts to successors.
78851c0b2f7Stbbdev // The absorber is parallel, so each item emitted by the input will result in one thread
78951c0b2f7Stbbdev // spinning.  So for N threads we pass N-1 continue_messages, then spin wait and then throw if
79051c0b2f7Stbbdev // we are allowed to.
79151c0b2f7Stbbdev 
79251c0b2f7Stbbdev template < class InputNodeType, class InputNodeBodyType, class TTestNodeType, class TestNodeBodyType,
79351c0b2f7Stbbdev         class SinkNodeType, class SinkNodeBodyType>
run_one_continue_node_test(bool throwException,bool flog)79451c0b2f7Stbbdev void run_one_continue_node_test (bool throwException, bool flog) {
79551c0b2f7Stbbdev     tbb::flow::graph g;
79651c0b2f7Stbbdev 
79751c0b2f7Stbbdev     std::atomic<int> input_count;
79851c0b2f7Stbbdev     std::atomic<int> test_count;
79951c0b2f7Stbbdev     std::atomic<int> sink_count;
80051c0b2f7Stbbdev     input_count = test_count = sink_count = 0;
80151c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
80251c0b2f7Stbbdev     eh_test_observer o;
80351c0b2f7Stbbdev     o.observe(true);
80451c0b2f7Stbbdev #endif
80551c0b2f7Stbbdev     g_Master = std::this_thread::get_id();
80651c0b2f7Stbbdev     InputNodeType input(g, InputNodeBodyType(input_count));
80751c0b2f7Stbbdev     TTestNodeType node_to_test(g, TestNodeBodyType(test_count));
80851c0b2f7Stbbdev     SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
80951c0b2f7Stbbdev     tbb::flow::broadcast_node<tbb::flow::continue_msg> b1(g), b2(g), b3(g);
81051c0b2f7Stbbdev     make_edge(input, b1);
81151c0b2f7Stbbdev     make_edge(b1,b2);
81251c0b2f7Stbbdev     make_edge(b1,b3);
81351c0b2f7Stbbdev     make_edge(b2,node_to_test);
81451c0b2f7Stbbdev     make_edge(b3,node_to_test);
81551c0b2f7Stbbdev     make_edge(node_to_test, sink);
81651c0b2f7Stbbdev     for(int iter = 0; iter < 2; ++iter) {
81751c0b2f7Stbbdev         ResetGlobals(throwException,flog);
81851c0b2f7Stbbdev         if(throwException) {
81951c0b2f7Stbbdev             TRY();
82051c0b2f7Stbbdev                 input.activate();
82151c0b2f7Stbbdev                 g.wait_for_all();
82251c0b2f7Stbbdev             CATCH_AND_ASSERT();
82351c0b2f7Stbbdev         }
82451c0b2f7Stbbdev         else {
82551c0b2f7Stbbdev             TRY();
82651c0b2f7Stbbdev                 input.activate();
82751c0b2f7Stbbdev                 g.wait_for_all();
82851c0b2f7Stbbdev             CATCH_AND_FAIL();
82951c0b2f7Stbbdev         }
83051c0b2f7Stbbdev         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
83151c0b2f7Stbbdev         int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value();
83251c0b2f7Stbbdev         int t_cnt   = tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value();
83351c0b2f7Stbbdev         int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
83451c0b2f7Stbbdev         if(throwException) {
83551c0b2f7Stbbdev             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
83651c0b2f7Stbbdev             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
83751c0b2f7Stbbdev             CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs");
83851c0b2f7Stbbdev             CHECK_MESSAGE( (ib_cnt >= t_cnt), "Too many items received by test node");
83951c0b2f7Stbbdev             CHECK_MESSAGE( (nb_cnt <= t_cnt), "Too many items received by sink nodes");
84051c0b2f7Stbbdev         }
84151c0b2f7Stbbdev         else {
84251c0b2f7Stbbdev             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
84351c0b2f7Stbbdev             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
84451c0b2f7Stbbdev             CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node");
84551c0b2f7Stbbdev             CHECK_MESSAGE( (t_cnt == g_NumItems), "Not all items reached test node");
84651c0b2f7Stbbdev             CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
84751c0b2f7Stbbdev         }
84851c0b2f7Stbbdev         g.reset();  // resets the body of the input_nodes, test_node and the absorb_nodes.
84951c0b2f7Stbbdev         input_count = test_count = sink_count = 0;
85051c0b2f7Stbbdev         CHECK_MESSAGE( (0 == (int)test_count), "Atomic wasn't reset properly");
85151c0b2f7Stbbdev         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed");
85251c0b2f7Stbbdev         CHECK_MESSAGE( (0 == tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value()),"Reset test_node failed");
85351c0b2f7Stbbdev         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed");
85451c0b2f7Stbbdev     }
85551c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
85651c0b2f7Stbbdev     o.observe(false);
85751c0b2f7Stbbdev #endif
85851c0b2f7Stbbdev }
85951c0b2f7Stbbdev 
86051c0b2f7Stbbdev template<
86151c0b2f7Stbbdev     class ItemType,
86251c0b2f7Stbbdev     TestNodeTypeEnum IType,   // does input node throw?
86351c0b2f7Stbbdev     TestNodeTypeEnum CType,   // does continue_node throw?
86451c0b2f7Stbbdev     TestNodeTypeEnum AType>    // does absorber throw
run_continue_node_test()86551c0b2f7Stbbdev void run_continue_node_test() {
86651c0b2f7Stbbdev     typedef test_input_body<tbb::flow::continue_msg,IType> IBodyType;
86751c0b2f7Stbbdev     typedef absorber_body<tbb::flow::continue_msg,ItemType,CType,unlimited_type> ContBodyType;
86851c0b2f7Stbbdev     typedef absorber_body<ItemType,tbb::flow::continue_msg, AType, unlimited_type> SinkBodyType;
86951c0b2f7Stbbdev 
87051c0b2f7Stbbdev     typedef tbb::flow::input_node<tbb::flow::continue_msg> InputType;
87151c0b2f7Stbbdev     typedef tbb::flow::continue_node<ItemType> TestType;
87251c0b2f7Stbbdev     typedef tbb::flow::function_node<ItemType,tbb::flow::continue_msg> SnkType;
87351c0b2f7Stbbdev 
87451c0b2f7Stbbdev     for(int i = 0; i < 4; ++i ) {
87551c0b2f7Stbbdev         if(i == 2) continue;  // don't run (false,true); it doesn't make sense.
87651c0b2f7Stbbdev         bool doThrow = (i & 0x1) != 0;
87751c0b2f7Stbbdev         bool doFlog = (i & 0x2) != 0;
87851c0b2f7Stbbdev         run_one_continue_node_test<
87951c0b2f7Stbbdev             /*InputNodeType*/       InputType,
88051c0b2f7Stbbdev             /*InputNodeBodyType*/   IBodyType,
88151c0b2f7Stbbdev             /*TestNodeType*/        TestType,
88251c0b2f7Stbbdev             /*TestNodeBodyType*/    ContBodyType,
88351c0b2f7Stbbdev             /*SinkNodeType*/        SnkType,
88451c0b2f7Stbbdev             /*SinkNodeBodyType*/    SinkBodyType>
88551c0b2f7Stbbdev             (doThrow,doFlog);
88651c0b2f7Stbbdev     }
88751c0b2f7Stbbdev }
88851c0b2f7Stbbdev 
88951c0b2f7Stbbdev //
test_continue_node()89051c0b2f7Stbbdev void test_continue_node() {
89151c0b2f7Stbbdev     INFO("Testing continue_node\n");
89251c0b2f7Stbbdev     g_Wakeup_Msg = "buffer_node(non,is,non): Missed wakeup or machine is overloaded?";
89351c0b2f7Stbbdev     run_continue_node_test<int,nonThrowing,isThrowing,nonThrowing>();
89451c0b2f7Stbbdev     g_Wakeup_Msg = "buffer_node(non,non,is): Missed wakeup or machine is overloaded?";
89551c0b2f7Stbbdev     run_continue_node_test<int,nonThrowing,nonThrowing,isThrowing>();
89651c0b2f7Stbbdev     g_Wakeup_Msg = "buffer_node(is,non,non): Missed wakeup or machine is overloaded?";
89751c0b2f7Stbbdev     run_continue_node_test<int,isThrowing,nonThrowing,nonThrowing>();
89851c0b2f7Stbbdev     g_Wakeup_Msg = "buffer_node(is,is,is): Missed wakeup or machine is overloaded?";
89951c0b2f7Stbbdev     run_continue_node_test<int,isThrowing,isThrowing,isThrowing>();
90051c0b2f7Stbbdev     CheckType<double>::check_type_counter = 0;
90151c0b2f7Stbbdev     run_continue_node_test<CheckType<double>,isThrowing,isThrowing,isThrowing>();
90251c0b2f7Stbbdev     CHECK_MESSAGE( (!CheckType<double>::check_type_counter), "Dropped objects in continue_node test");
90351c0b2f7Stbbdev     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
90451c0b2f7Stbbdev }
90551c0b2f7Stbbdev 
90651c0b2f7Stbbdev // ---------- buffer_node queue_node overwrite_node --------------
90751c0b2f7Stbbdev 
90851c0b2f7Stbbdev template<
90951c0b2f7Stbbdev     class BufferItemType,       //
91051c0b2f7Stbbdev     class InputNodeType,
91151c0b2f7Stbbdev     class InputNodeBodyType,
91251c0b2f7Stbbdev     class TestNodeType,
91351c0b2f7Stbbdev     class SinkNodeType,
91451c0b2f7Stbbdev     class SinkNodeBodyType >
run_one_buffer_node_test(bool throwException,bool flog)91551c0b2f7Stbbdev void run_one_buffer_node_test(bool throwException,bool flog) {
91651c0b2f7Stbbdev     tbb::flow::graph g;
91751c0b2f7Stbbdev 
91851c0b2f7Stbbdev     std::atomic<int> input_count;
91951c0b2f7Stbbdev     std::atomic<int> sink_count;
92051c0b2f7Stbbdev     input_count = sink_count = 0;
92151c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
92251c0b2f7Stbbdev     eh_test_observer o;
92351c0b2f7Stbbdev     o.observe(true);
92451c0b2f7Stbbdev #endif
92551c0b2f7Stbbdev     g_Master = std::this_thread::get_id();
92651c0b2f7Stbbdev     InputNodeType input(g, InputNodeBodyType(input_count));
92751c0b2f7Stbbdev     TestNodeType node_to_test(g);
92851c0b2f7Stbbdev     SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
92951c0b2f7Stbbdev     make_edge(input,node_to_test);
93051c0b2f7Stbbdev     make_edge(node_to_test, sink);
93151c0b2f7Stbbdev     for(int iter = 0; iter < 2; ++iter) {
93251c0b2f7Stbbdev         ResetGlobals(throwException,flog);
93351c0b2f7Stbbdev         if(throwException) {
93451c0b2f7Stbbdev             TRY();
93551c0b2f7Stbbdev                 input.activate();
93651c0b2f7Stbbdev                 g.wait_for_all();
93751c0b2f7Stbbdev             CATCH_AND_ASSERT();
93851c0b2f7Stbbdev         }
93951c0b2f7Stbbdev         else {
94051c0b2f7Stbbdev             TRY();
94151c0b2f7Stbbdev                 input.activate();
94251c0b2f7Stbbdev                 g.wait_for_all();
94351c0b2f7Stbbdev             CATCH_AND_FAIL();
94451c0b2f7Stbbdev         }
94551c0b2f7Stbbdev         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
94651c0b2f7Stbbdev         int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value();
94751c0b2f7Stbbdev         int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
94851c0b2f7Stbbdev         if(throwException) {
94951c0b2f7Stbbdev             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
95051c0b2f7Stbbdev             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
95151c0b2f7Stbbdev             CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs");
95251c0b2f7Stbbdev             CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes");
95351c0b2f7Stbbdev         }
95451c0b2f7Stbbdev         else {
95551c0b2f7Stbbdev             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
95651c0b2f7Stbbdev             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
95751c0b2f7Stbbdev             CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node");
95851c0b2f7Stbbdev             CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
95951c0b2f7Stbbdev         }
96051c0b2f7Stbbdev         if(iter == 0) {
96151c0b2f7Stbbdev             remove_edge(node_to_test, sink);
96251c0b2f7Stbbdev             node_to_test.try_put(BufferItemType());
96351c0b2f7Stbbdev             g.wait_for_all();
96451c0b2f7Stbbdev             g.reset();
96551c0b2f7Stbbdev             input_count = sink_count = 0;
96651c0b2f7Stbbdev             BufferItemType tmp;
96751c0b2f7Stbbdev             CHECK_MESSAGE( (!node_to_test.try_get(tmp)), "node not empty");
96851c0b2f7Stbbdev             make_edge(node_to_test, sink);
96951c0b2f7Stbbdev             g.wait_for_all();
97051c0b2f7Stbbdev         }
97151c0b2f7Stbbdev         else {
97251c0b2f7Stbbdev             g.reset();
97351c0b2f7Stbbdev             input_count = sink_count = 0;
97451c0b2f7Stbbdev         }
97551c0b2f7Stbbdev         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed");
97651c0b2f7Stbbdev         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed");
97751c0b2f7Stbbdev     }
97851c0b2f7Stbbdev 
97951c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
98051c0b2f7Stbbdev     o.observe(false);
98151c0b2f7Stbbdev #endif
98251c0b2f7Stbbdev }
98351c0b2f7Stbbdev template<class BufferItemType,
98451c0b2f7Stbbdev          TestNodeTypeEnum InputThrowType,
98551c0b2f7Stbbdev          TestNodeTypeEnum SinkThrowType>
run_buffer_queue_and_overwrite_node_test()98651c0b2f7Stbbdev void run_buffer_queue_and_overwrite_node_test() {
98751c0b2f7Stbbdev     typedef test_input_body<BufferItemType,InputThrowType> InputBodyType;
98851c0b2f7Stbbdev     typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
98951c0b2f7Stbbdev 
99051c0b2f7Stbbdev     typedef tbb::flow::input_node<BufferItemType> InputType;
99151c0b2f7Stbbdev     typedef tbb::flow::buffer_node<BufferItemType> BufType;
99251c0b2f7Stbbdev     typedef tbb::flow::queue_node<BufferItemType>  QueType;
99351c0b2f7Stbbdev     typedef tbb::flow::overwrite_node<BufferItemType>  OvrType;
99451c0b2f7Stbbdev     typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
99551c0b2f7Stbbdev 
99651c0b2f7Stbbdev     for(int i = 0; i < 4; ++i) {
99751c0b2f7Stbbdev         if(i == 2) continue;  // no need to test flog w/o throws
99851c0b2f7Stbbdev         bool throwException = (i & 0x1) != 0;
99951c0b2f7Stbbdev         bool doFlog = (i & 0x2) != 0;
100051c0b2f7Stbbdev         run_one_buffer_node_test<
100151c0b2f7Stbbdev             /* class BufferItemType*/     BufferItemType,
100251c0b2f7Stbbdev             /*class InputNodeType*/       InputType,
100351c0b2f7Stbbdev             /*class InputNodeBodyType*/   InputBodyType,
100451c0b2f7Stbbdev             /*class TestNodeType*/        BufType,
100551c0b2f7Stbbdev             /*class SinkNodeType*/        SnkType,
100651c0b2f7Stbbdev             /*class SinkNodeBodyType*/    SinkBodyType
100751c0b2f7Stbbdev             >(throwException, doFlog);
100851c0b2f7Stbbdev         run_one_buffer_node_test<
100951c0b2f7Stbbdev             /* class BufferItemType*/     BufferItemType,
101051c0b2f7Stbbdev             /*class InputNodeType*/       InputType,
101151c0b2f7Stbbdev             /*class InputNodeBodyType*/   InputBodyType,
101251c0b2f7Stbbdev             /*class TestNodeType*/        QueType,
101351c0b2f7Stbbdev             /*class SinkNodeType*/        SnkType,
101451c0b2f7Stbbdev             /*class SinkNodeBodyType*/    SinkBodyType
101551c0b2f7Stbbdev             >(throwException, doFlog);
101651c0b2f7Stbbdev         run_one_buffer_node_test<
101751c0b2f7Stbbdev             /* class BufferItemType*/     BufferItemType,
101851c0b2f7Stbbdev             /*class InputNodeType*/       InputType,
101951c0b2f7Stbbdev             /*class InputNodeBodyType*/   InputBodyType,
102051c0b2f7Stbbdev             /*class TestNodeType*/        OvrType,
102151c0b2f7Stbbdev             /*class SinkNodeType*/        SnkType,
102251c0b2f7Stbbdev             /*class SinkNodeBodyType*/    SinkBodyType
102351c0b2f7Stbbdev             >(throwException, doFlog);
102451c0b2f7Stbbdev     }
102551c0b2f7Stbbdev }
102651c0b2f7Stbbdev 
test_buffer_queue_and_overwrite_node()102751c0b2f7Stbbdev void test_buffer_queue_and_overwrite_node() {
102851c0b2f7Stbbdev     INFO("Testing buffer_node, queue_node and overwrite_node\n");
102951c0b2f7Stbbdev     g_Wakeup_Msg = "buffer, queue, overwrite(is,non): Missed wakeup or machine is overloaded?";
103051c0b2f7Stbbdev     run_buffer_queue_and_overwrite_node_test<int,isThrowing,nonThrowing>();
103151c0b2f7Stbbdev     g_Wakeup_Msg = "buffer, queue, overwrite(non,is): Missed wakeup or machine is overloaded?";
103251c0b2f7Stbbdev     run_buffer_queue_and_overwrite_node_test<int,nonThrowing,isThrowing>();
103351c0b2f7Stbbdev     g_Wakeup_Msg = "buffer, queue, overwrite(is,is): Missed wakeup or machine is overloaded?";
103451c0b2f7Stbbdev     run_buffer_queue_and_overwrite_node_test<int,isThrowing,isThrowing>();
103551c0b2f7Stbbdev     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
103651c0b2f7Stbbdev }
103751c0b2f7Stbbdev 
103851c0b2f7Stbbdev // ---------- sequencer_node -------------------------
103951c0b2f7Stbbdev 
104051c0b2f7Stbbdev 
104151c0b2f7Stbbdev template<
104251c0b2f7Stbbdev     class BufferItemType,       //
104351c0b2f7Stbbdev     class InputNodeType,
104451c0b2f7Stbbdev     class InputNodeBodyType,
104551c0b2f7Stbbdev     class TestNodeType,
104651c0b2f7Stbbdev     class SeqBodyType,
104751c0b2f7Stbbdev     class SinkNodeType,
104851c0b2f7Stbbdev     class SinkNodeBodyType >
run_one_sequencer_node_test(bool throwException,bool flog)104951c0b2f7Stbbdev void run_one_sequencer_node_test(bool throwException,bool flog) {
105051c0b2f7Stbbdev     tbb::flow::graph g;
105151c0b2f7Stbbdev 
105251c0b2f7Stbbdev     std::atomic<int> input_count;
105351c0b2f7Stbbdev     std::atomic<int> sink_count;
105451c0b2f7Stbbdev     input_count = sink_count = 0;
105551c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
105651c0b2f7Stbbdev     eh_test_observer o;
105751c0b2f7Stbbdev     o.observe(true);
105851c0b2f7Stbbdev #endif
105951c0b2f7Stbbdev     g_Master = std::this_thread::get_id();
106051c0b2f7Stbbdev     InputNodeType input(g, InputNodeBodyType(input_count));
106151c0b2f7Stbbdev     TestNodeType node_to_test(g,SeqBodyType());
106251c0b2f7Stbbdev     SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
106351c0b2f7Stbbdev     make_edge(input,node_to_test);
106451c0b2f7Stbbdev     make_edge(node_to_test, sink);
106551c0b2f7Stbbdev     for(int iter = 0; iter < 2; ++iter) {
106651c0b2f7Stbbdev         ResetGlobals(throwException,flog);
106751c0b2f7Stbbdev         if(throwException) {
106851c0b2f7Stbbdev             TRY();
106951c0b2f7Stbbdev                 input.activate();
107051c0b2f7Stbbdev                 g.wait_for_all();
107151c0b2f7Stbbdev             CATCH_AND_ASSERT();
107251c0b2f7Stbbdev         }
107351c0b2f7Stbbdev         else {
107451c0b2f7Stbbdev             TRY();
107551c0b2f7Stbbdev                 input.activate();
107651c0b2f7Stbbdev                 g.wait_for_all();
107751c0b2f7Stbbdev             CATCH_AND_FAIL();
107851c0b2f7Stbbdev         }
107951c0b2f7Stbbdev         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
108051c0b2f7Stbbdev         int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value();
108151c0b2f7Stbbdev         int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
108251c0b2f7Stbbdev         if(throwException) {
108351c0b2f7Stbbdev             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
108451c0b2f7Stbbdev             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
108551c0b2f7Stbbdev             CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs");
108651c0b2f7Stbbdev             CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes");
108751c0b2f7Stbbdev         }
108851c0b2f7Stbbdev         else {
108951c0b2f7Stbbdev             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
109051c0b2f7Stbbdev             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
109151c0b2f7Stbbdev             CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node");
109251c0b2f7Stbbdev             CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
109351c0b2f7Stbbdev         }
109451c0b2f7Stbbdev         if(iter == 0) {
109551c0b2f7Stbbdev             remove_edge(node_to_test, sink);
109651c0b2f7Stbbdev             node_to_test.try_put(BufferItemType(g_NumItems + 1));
109751c0b2f7Stbbdev             node_to_test.try_put(BufferItemType(1));
109851c0b2f7Stbbdev             g.wait_for_all();
109951c0b2f7Stbbdev             g.reset();
110051c0b2f7Stbbdev             input_count = sink_count = 0;
110151c0b2f7Stbbdev             make_edge(node_to_test, sink);
110251c0b2f7Stbbdev             g.wait_for_all();
110351c0b2f7Stbbdev         }
110451c0b2f7Stbbdev         else {
110551c0b2f7Stbbdev             g.reset();
110651c0b2f7Stbbdev             input_count = sink_count = 0;
110751c0b2f7Stbbdev         }
110851c0b2f7Stbbdev         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed");
110951c0b2f7Stbbdev         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed");
111051c0b2f7Stbbdev     }
111151c0b2f7Stbbdev 
111251c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
111351c0b2f7Stbbdev     o.observe(false);
111451c0b2f7Stbbdev #endif
111551c0b2f7Stbbdev }
111651c0b2f7Stbbdev 
111751c0b2f7Stbbdev template<class BufferItemType,
111851c0b2f7Stbbdev          TestNodeTypeEnum InputThrowType,
111951c0b2f7Stbbdev          TestNodeTypeEnum SinkThrowType>
run_sequencer_node_test()112051c0b2f7Stbbdev void run_sequencer_node_test() {
112151c0b2f7Stbbdev     typedef test_input_body<BufferItemType,InputThrowType> InputBodyType;
112251c0b2f7Stbbdev     typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
112351c0b2f7Stbbdev     typedef sequencer_body<BufferItemType> SeqBodyType;
112451c0b2f7Stbbdev 
112551c0b2f7Stbbdev     typedef tbb::flow::input_node<BufferItemType> InputType;
112651c0b2f7Stbbdev     typedef tbb::flow::sequencer_node<BufferItemType>  SeqType;
112751c0b2f7Stbbdev     typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
112851c0b2f7Stbbdev 
112951c0b2f7Stbbdev     for(int i = 0; i < 4; ++i) {
113051c0b2f7Stbbdev         if(i == 2) continue;  // no need to test flog w/o throws
113151c0b2f7Stbbdev         bool throwException = (i & 0x1) != 0;
113251c0b2f7Stbbdev         bool doFlog = (i & 0x2) != 0;
113351c0b2f7Stbbdev         run_one_sequencer_node_test<
113451c0b2f7Stbbdev             /* class BufferItemType*/     BufferItemType,
113551c0b2f7Stbbdev             /*class InputNodeType*/       InputType,
113651c0b2f7Stbbdev             /*class InputNodeBodyType*/   InputBodyType,
113751c0b2f7Stbbdev             /*class TestNodeType*/        SeqType,
113851c0b2f7Stbbdev             /*class SeqBodyType*/         SeqBodyType,
113951c0b2f7Stbbdev             /*class SinkNodeType*/        SnkType,
114051c0b2f7Stbbdev             /*class SinkNodeBodyType*/    SinkBodyType
114151c0b2f7Stbbdev             >(throwException, doFlog);
114251c0b2f7Stbbdev     }
114351c0b2f7Stbbdev }
114451c0b2f7Stbbdev 
114551c0b2f7Stbbdev 
114651c0b2f7Stbbdev 
test_sequencer_node()114751c0b2f7Stbbdev void test_sequencer_node() {
114851c0b2f7Stbbdev     INFO("Testing sequencer_node\n");
114951c0b2f7Stbbdev     g_Wakeup_Msg = "sequencer_node(is,non): Missed wakeup or machine is overloaded?";
115051c0b2f7Stbbdev     run_sequencer_node_test<int, isThrowing,nonThrowing>();
115151c0b2f7Stbbdev     CheckType<int>::check_type_counter = 0;
115251c0b2f7Stbbdev     g_Wakeup_Msg = "sequencer_node(non,is): Missed wakeup or machine is overloaded?";
115351c0b2f7Stbbdev     run_sequencer_node_test<CheckType<int>, nonThrowing,isThrowing>();
115451c0b2f7Stbbdev     CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Dropped objects in sequencer_node test");
115551c0b2f7Stbbdev     g_Wakeup_Msg = "sequencer_node(is,is): Missed wakeup or machine is overloaded?";
115651c0b2f7Stbbdev     run_sequencer_node_test<int, isThrowing,isThrowing>();
115751c0b2f7Stbbdev     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
115851c0b2f7Stbbdev }
115951c0b2f7Stbbdev 
116051c0b2f7Stbbdev // ------------ priority_queue_node ------------------
116151c0b2f7Stbbdev 
116251c0b2f7Stbbdev template<
116351c0b2f7Stbbdev     class BufferItemType,
116451c0b2f7Stbbdev     class InputNodeType,
116551c0b2f7Stbbdev     class InputNodeBodyType,
116651c0b2f7Stbbdev     class TestNodeType,
116751c0b2f7Stbbdev     class SinkNodeType,
116851c0b2f7Stbbdev     class SinkNodeBodyType >
run_one_priority_queue_node_test(bool throwException,bool flog)116951c0b2f7Stbbdev void run_one_priority_queue_node_test(bool throwException,bool flog) {
117051c0b2f7Stbbdev     tbb::flow::graph g;
117151c0b2f7Stbbdev 
117251c0b2f7Stbbdev     std::atomic<int> input_count;
117351c0b2f7Stbbdev     std::atomic<int> sink_count;
117451c0b2f7Stbbdev     input_count = sink_count = 0;
117551c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
117651c0b2f7Stbbdev     eh_test_observer o;
117751c0b2f7Stbbdev     o.observe(true);
117851c0b2f7Stbbdev #endif
117951c0b2f7Stbbdev     g_Master = std::this_thread::get_id();
118051c0b2f7Stbbdev     InputNodeType input(g, InputNodeBodyType(input_count));
118151c0b2f7Stbbdev 
118251c0b2f7Stbbdev     TestNodeType node_to_test(g);
118351c0b2f7Stbbdev 
118451c0b2f7Stbbdev     SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
118551c0b2f7Stbbdev 
118651c0b2f7Stbbdev     make_edge(input,node_to_test);
118751c0b2f7Stbbdev     make_edge(node_to_test, sink);
118851c0b2f7Stbbdev     for(int iter = 0; iter < 2; ++iter) {
118951c0b2f7Stbbdev         ResetGlobals(throwException,flog);
119051c0b2f7Stbbdev         if(throwException) {
119151c0b2f7Stbbdev             TRY();
119251c0b2f7Stbbdev                 input.activate();
119351c0b2f7Stbbdev                 g.wait_for_all();
119451c0b2f7Stbbdev             CATCH_AND_ASSERT();
119551c0b2f7Stbbdev         }
119651c0b2f7Stbbdev         else {
119751c0b2f7Stbbdev             TRY();
119851c0b2f7Stbbdev                 input.activate();
119951c0b2f7Stbbdev                 g.wait_for_all();
120051c0b2f7Stbbdev             CATCH_AND_FAIL();
120151c0b2f7Stbbdev         }
120251c0b2f7Stbbdev         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
120351c0b2f7Stbbdev         int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value();
120451c0b2f7Stbbdev         int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
120551c0b2f7Stbbdev         if(throwException) {
120651c0b2f7Stbbdev             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
120751c0b2f7Stbbdev             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
120851c0b2f7Stbbdev             CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs");
120951c0b2f7Stbbdev             CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes");
121051c0b2f7Stbbdev         }
121151c0b2f7Stbbdev         else {
121251c0b2f7Stbbdev             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
121351c0b2f7Stbbdev             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
121451c0b2f7Stbbdev             CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node");
121551c0b2f7Stbbdev             CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
121651c0b2f7Stbbdev         }
121751c0b2f7Stbbdev         if(iter == 0) {
121851c0b2f7Stbbdev             remove_edge(node_to_test, sink);
121951c0b2f7Stbbdev             node_to_test.try_put(BufferItemType(g_NumItems + 1));
122051c0b2f7Stbbdev             node_to_test.try_put(BufferItemType(g_NumItems + 2));
122151c0b2f7Stbbdev             node_to_test.try_put(BufferItemType());
122251c0b2f7Stbbdev             g.wait_for_all();
122351c0b2f7Stbbdev             g.reset();
122451c0b2f7Stbbdev             input_count = sink_count = 0;
122551c0b2f7Stbbdev             make_edge(node_to_test, sink);
122651c0b2f7Stbbdev             g.wait_for_all();
122751c0b2f7Stbbdev         }
122851c0b2f7Stbbdev         else {
122951c0b2f7Stbbdev             g.reset();
123051c0b2f7Stbbdev             input_count = sink_count = 0;
123151c0b2f7Stbbdev         }
123251c0b2f7Stbbdev         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed");
123351c0b2f7Stbbdev         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed");
123451c0b2f7Stbbdev     }
123551c0b2f7Stbbdev 
123651c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
123751c0b2f7Stbbdev     o.observe(false);
123851c0b2f7Stbbdev #endif
123951c0b2f7Stbbdev }
124051c0b2f7Stbbdev 
124151c0b2f7Stbbdev template<class BufferItemType,
124251c0b2f7Stbbdev          TestNodeTypeEnum InputThrowType,
124351c0b2f7Stbbdev          TestNodeTypeEnum SinkThrowType>
run_priority_queue_node_test()124451c0b2f7Stbbdev void run_priority_queue_node_test() {
124551c0b2f7Stbbdev     typedef test_input_body<BufferItemType,InputThrowType> InputBodyType;
124651c0b2f7Stbbdev     typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
124751c0b2f7Stbbdev     typedef less_body<BufferItemType> LessBodyType;
124851c0b2f7Stbbdev 
124951c0b2f7Stbbdev     typedef tbb::flow::input_node<BufferItemType> InputType;
125051c0b2f7Stbbdev     typedef tbb::flow::priority_queue_node<BufferItemType,LessBodyType>  PrqType;
125151c0b2f7Stbbdev     typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
125251c0b2f7Stbbdev 
125351c0b2f7Stbbdev     for(int i = 0; i < 4; ++i) {
125451c0b2f7Stbbdev         if(i == 2) continue;  // no need to test flog w/o throws
125551c0b2f7Stbbdev         bool throwException = (i & 0x1) != 0;
125651c0b2f7Stbbdev         bool doFlog = (i & 0x2) != 0;
125751c0b2f7Stbbdev         run_one_priority_queue_node_test<
125851c0b2f7Stbbdev             /* class BufferItemType*/     BufferItemType,
125951c0b2f7Stbbdev             /*class InputNodeType*/       InputType,
126051c0b2f7Stbbdev             /*class InputNodeBodyType*/   InputBodyType,
126151c0b2f7Stbbdev             /*class TestNodeType*/        PrqType,
126251c0b2f7Stbbdev             /*class SinkNodeType*/        SnkType,
126351c0b2f7Stbbdev             /*class SinkNodeBodyType*/    SinkBodyType
126451c0b2f7Stbbdev             >(throwException, doFlog);
126551c0b2f7Stbbdev     }
126651c0b2f7Stbbdev }
126751c0b2f7Stbbdev 
test_priority_queue_node()126851c0b2f7Stbbdev void test_priority_queue_node() {
126951c0b2f7Stbbdev     INFO("Testing priority_queue_node\n");
127051c0b2f7Stbbdev     g_Wakeup_Msg = "priority_queue_node(is,non): Missed wakeup or machine is overloaded?";
127151c0b2f7Stbbdev     run_priority_queue_node_test<int, isThrowing,nonThrowing>();
127251c0b2f7Stbbdev     CheckType<int>::check_type_counter = 0;
127351c0b2f7Stbbdev     g_Wakeup_Msg = "priority_queue_node(non,is): Missed wakeup or machine is overloaded?";
127451c0b2f7Stbbdev     run_priority_queue_node_test<CheckType<int>, nonThrowing,isThrowing>();
127551c0b2f7Stbbdev     CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Dropped objects in priority_queue_node test");
127651c0b2f7Stbbdev     g_Wakeup_Msg = "priority_queue_node(is,is): Missed wakeup or machine is overloaded?";
127751c0b2f7Stbbdev     run_priority_queue_node_test<int, isThrowing,isThrowing>();
127851c0b2f7Stbbdev     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
127951c0b2f7Stbbdev }
128051c0b2f7Stbbdev 
128151c0b2f7Stbbdev // ------------------- join_node ----------------
128251c0b2f7Stbbdev template<class JP> struct graph_policy_name{
namegraph_policy_name128351c0b2f7Stbbdev     static const char* name() {return "unknown"; }
128451c0b2f7Stbbdev };
128551c0b2f7Stbbdev template<> struct graph_policy_name<tbb::flow::queueing>  {
namegraph_policy_name128651c0b2f7Stbbdev     static const char* name() {return "queueing"; }
128751c0b2f7Stbbdev };
128851c0b2f7Stbbdev template<> struct graph_policy_name<tbb::flow::reserving> {
namegraph_policy_name128951c0b2f7Stbbdev     static const char* name() {return "reserving"; }
129051c0b2f7Stbbdev };
129151c0b2f7Stbbdev template<> struct graph_policy_name<tbb::flow::tag_matching> {
namegraph_policy_name129251c0b2f7Stbbdev     static const char* name() {return "tag_matching"; }
129351c0b2f7Stbbdev };
129451c0b2f7Stbbdev 
129551c0b2f7Stbbdev 
129651c0b2f7Stbbdev template<
129751c0b2f7Stbbdev     class JP,
129851c0b2f7Stbbdev     class OutputTuple,
129951c0b2f7Stbbdev     class InputType0,
130051c0b2f7Stbbdev     class InputBodyType0,
130151c0b2f7Stbbdev     class InputType1,
130251c0b2f7Stbbdev     class InputBodyType1,
130351c0b2f7Stbbdev     class TestJoinType,
130451c0b2f7Stbbdev     class SinkType,
130551c0b2f7Stbbdev     class SinkBodyType
130651c0b2f7Stbbdev     >
130751c0b2f7Stbbdev struct run_one_join_node_test {
run_one_join_node_testrun_one_join_node_test130851c0b2f7Stbbdev     run_one_join_node_test() {}
execute_testrun_one_join_node_test130951c0b2f7Stbbdev     static void execute_test(bool throwException,bool flog) {
131051c0b2f7Stbbdev         typedef typename std::tuple_element<0,OutputTuple>::type ItemType0;
131151c0b2f7Stbbdev         typedef typename std::tuple_element<1,OutputTuple>::type ItemType1;
131251c0b2f7Stbbdev 
131351c0b2f7Stbbdev         tbb::flow::graph g;
131451c0b2f7Stbbdev         std::atomic<int>input0_count;
131551c0b2f7Stbbdev         std::atomic<int>input1_count;
131651c0b2f7Stbbdev         std::atomic<int>sink_count;
131751c0b2f7Stbbdev         input0_count = input1_count = sink_count = 0;
131851c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
131951c0b2f7Stbbdev         eh_test_observer o;
132051c0b2f7Stbbdev         o.observe(true);
132151c0b2f7Stbbdev #endif
132251c0b2f7Stbbdev         g_Master = std::this_thread::get_id();
132351c0b2f7Stbbdev         InputType0 input0(g, InputBodyType0(input0_count));
132451c0b2f7Stbbdev         InputType1 input1(g, InputBodyType1(input1_count));
132551c0b2f7Stbbdev         TestJoinType node_to_test(g);
132651c0b2f7Stbbdev         SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count));
132751c0b2f7Stbbdev         make_edge(input0,tbb::flow::input_port<0>(node_to_test));
132851c0b2f7Stbbdev         make_edge(input1,tbb::flow::input_port<1>(node_to_test));
132951c0b2f7Stbbdev         make_edge(node_to_test, sink);
133051c0b2f7Stbbdev         for(int iter = 0; iter < 2; ++iter) {
133151c0b2f7Stbbdev             ResetGlobals(throwException,flog);
133251c0b2f7Stbbdev             if(throwException) {
133351c0b2f7Stbbdev                 TRY();
133451c0b2f7Stbbdev                     input0.activate();
133551c0b2f7Stbbdev                     input1.activate();
133651c0b2f7Stbbdev                     g.wait_for_all();
133751c0b2f7Stbbdev                 CATCH_AND_ASSERT();
133851c0b2f7Stbbdev             }
133951c0b2f7Stbbdev             else {
134051c0b2f7Stbbdev                 TRY();
134151c0b2f7Stbbdev                     input0.activate();
134251c0b2f7Stbbdev                     input1.activate();
134351c0b2f7Stbbdev                     g.wait_for_all();
134451c0b2f7Stbbdev                 CATCH_AND_FAIL();
134551c0b2f7Stbbdev             }
134651c0b2f7Stbbdev             bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
134751c0b2f7Stbbdev             int ib0_cnt = tbb::flow::copy_body<InputBodyType0>(input0).count_value();
134851c0b2f7Stbbdev             int ib1_cnt = tbb::flow::copy_body<InputBodyType1>(input1).count_value();
134951c0b2f7Stbbdev             int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
135051c0b2f7Stbbdev             if(throwException) {
135151c0b2f7Stbbdev                 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
135251c0b2f7Stbbdev                 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
135351c0b2f7Stbbdev                 CHECK_MESSAGE( (ib0_cnt <= g_NumItems && ib1_cnt <= g_NumItems), "Too many items sent by inputs");
135451c0b2f7Stbbdev                 CHECK_MESSAGE( (nb_cnt <= ((ib0_cnt < ib1_cnt) ? ib0_cnt : ib1_cnt)), "Too many items received by sink nodes");
135551c0b2f7Stbbdev             }
135651c0b2f7Stbbdev             else {
135751c0b2f7Stbbdev                 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
135851c0b2f7Stbbdev                 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
135951c0b2f7Stbbdev                 if(ib0_cnt != g_NumItems) {
136051c0b2f7Stbbdev                 //     INFO("throwException == %s\n" << (throwException ? "true" : "false"));
136151c0b2f7Stbbdev                 //     INFO("iter == " << iter << "\n");
136251c0b2f7Stbbdev                 //     INFO("ib0_cnt == " << ib0_cnt << "\n");
136351c0b2f7Stbbdev                 //     INFO("g_NumItems == " << g_NumItems << "\n");
136451c0b2f7Stbbdev                 }
136551c0b2f7Stbbdev                 CHECK_MESSAGE( (ib0_cnt == g_NumItems), "Missing invocations of input_node0");  // this one
136651c0b2f7Stbbdev                 CHECK_MESSAGE( (ib1_cnt == g_NumItems), "Missing invocations of input_node1");
136751c0b2f7Stbbdev                 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
136851c0b2f7Stbbdev             }
136951c0b2f7Stbbdev             if(iter == 0) {
137051c0b2f7Stbbdev                 remove_edge(node_to_test, sink);
137151c0b2f7Stbbdev                 tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 1));
137251c0b2f7Stbbdev                 tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2));
137351c0b2f7Stbbdev                 g.wait_for_all();
137451c0b2f7Stbbdev                 g.reset();
137551c0b2f7Stbbdev                 input0_count = input1_count = sink_count = 0;
137651c0b2f7Stbbdev                 make_edge(node_to_test, sink);
137751c0b2f7Stbbdev                 g.wait_for_all();
137851c0b2f7Stbbdev             }
137951c0b2f7Stbbdev             else {
138051c0b2f7Stbbdev                 g.wait_for_all();
138151c0b2f7Stbbdev                 g.reset();
138251c0b2f7Stbbdev                 input0_count = input1_count = sink_count = 0;
138351c0b2f7Stbbdev             }
138451c0b2f7Stbbdev             CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType0>(input0).count_value()),"Reset input failed");
138551c0b2f7Stbbdev             CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType1>(input1).count_value()),"Reset input failed");
138651c0b2f7Stbbdev             nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
138751c0b2f7Stbbdev             CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value()),"Reset sink failed");
138851c0b2f7Stbbdev         }
138951c0b2f7Stbbdev 
139051c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
139151c0b2f7Stbbdev         o.observe(false);
139251c0b2f7Stbbdev #endif
139351c0b2f7Stbbdev     }
139451c0b2f7Stbbdev };  // run_one_join_node_test
139551c0b2f7Stbbdev 
139651c0b2f7Stbbdev template<
139751c0b2f7Stbbdev     class OutputTuple,
139851c0b2f7Stbbdev     class InputType0,
139951c0b2f7Stbbdev     class InputBodyType0,
140051c0b2f7Stbbdev     class InputType1,
140151c0b2f7Stbbdev     class InputBodyType1,
140251c0b2f7Stbbdev     class TestJoinType,
140351c0b2f7Stbbdev     class SinkType,
140451c0b2f7Stbbdev     class SinkBodyType
140551c0b2f7Stbbdev     >
140651c0b2f7Stbbdev struct run_one_join_node_test<
140751c0b2f7Stbbdev         tbb::flow::tag_matching,
140851c0b2f7Stbbdev         OutputTuple,
140951c0b2f7Stbbdev         InputType0,
141051c0b2f7Stbbdev         InputBodyType0,
141151c0b2f7Stbbdev         InputType1,
141251c0b2f7Stbbdev         InputBodyType1,
141351c0b2f7Stbbdev         TestJoinType,
141451c0b2f7Stbbdev         SinkType,
141551c0b2f7Stbbdev         SinkBodyType
141651c0b2f7Stbbdev     > {
run_one_join_node_testrun_one_join_node_test141751c0b2f7Stbbdev     run_one_join_node_test() {}
execute_testrun_one_join_node_test141851c0b2f7Stbbdev     static void execute_test(bool throwException,bool flog) {
141951c0b2f7Stbbdev         typedef typename std::tuple_element<0,OutputTuple>::type ItemType0;
142051c0b2f7Stbbdev         typedef typename std::tuple_element<1,OutputTuple>::type ItemType1;
142151c0b2f7Stbbdev 
142251c0b2f7Stbbdev         tbb::flow::graph g;
142351c0b2f7Stbbdev 
142451c0b2f7Stbbdev         std::atomic<int>input0_count;
142551c0b2f7Stbbdev         std::atomic<int>input1_count;
142651c0b2f7Stbbdev         std::atomic<int>sink_count;
142751c0b2f7Stbbdev         input0_count = input1_count = sink_count = 0;
142851c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
142951c0b2f7Stbbdev         eh_test_observer o;
143051c0b2f7Stbbdev         o.observe(true);
143151c0b2f7Stbbdev #endif
143251c0b2f7Stbbdev         g_Master = std::this_thread::get_id();
143351c0b2f7Stbbdev         InputType0 input0(g, InputBodyType0(input0_count, 2));
143451c0b2f7Stbbdev         InputType1 input1(g, InputBodyType1(input1_count, 3));
143551c0b2f7Stbbdev         TestJoinType node_to_test(g, tag_func<ItemType0>(ItemType0(2)), tag_func<ItemType1>(ItemType1(3)));
143651c0b2f7Stbbdev         SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count));
143751c0b2f7Stbbdev         make_edge(input0,tbb::flow::input_port<0>(node_to_test));
143851c0b2f7Stbbdev         make_edge(input1,tbb::flow::input_port<1>(node_to_test));
143951c0b2f7Stbbdev         make_edge(node_to_test, sink);
144051c0b2f7Stbbdev         for(int iter = 0; iter < 2; ++iter) {
144151c0b2f7Stbbdev             ResetGlobals(throwException,flog);
144251c0b2f7Stbbdev             if(throwException) {
144351c0b2f7Stbbdev                 TRY();
144451c0b2f7Stbbdev                     input0.activate();
144551c0b2f7Stbbdev                     input1.activate();
144651c0b2f7Stbbdev                     g.wait_for_all();
144751c0b2f7Stbbdev                 CATCH_AND_ASSERT();
144851c0b2f7Stbbdev             }
144951c0b2f7Stbbdev             else {
145051c0b2f7Stbbdev                 TRY();
145151c0b2f7Stbbdev                     input0.activate();
145251c0b2f7Stbbdev                     input1.activate();
145351c0b2f7Stbbdev                     g.wait_for_all();
145451c0b2f7Stbbdev                 CATCH_AND_FAIL();
145551c0b2f7Stbbdev             }
145651c0b2f7Stbbdev             bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
145751c0b2f7Stbbdev             int ib0_cnt = tbb::flow::copy_body<InputBodyType0>(input0).count_value();
145851c0b2f7Stbbdev             int ib1_cnt = tbb::flow::copy_body<InputBodyType1>(input1).count_value();
145951c0b2f7Stbbdev             int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
146051c0b2f7Stbbdev             if(throwException) {
146151c0b2f7Stbbdev                 CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
146251c0b2f7Stbbdev                 CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
146351c0b2f7Stbbdev                 CHECK_MESSAGE( (ib0_cnt <= g_NumItems && ib1_cnt <= g_NumItems), "Too many items sent by inputs");
146451c0b2f7Stbbdev                 CHECK_MESSAGE( (nb_cnt <= ((ib0_cnt < ib1_cnt) ? ib0_cnt : ib1_cnt)), "Too many items received by sink nodes");
146551c0b2f7Stbbdev             }
146651c0b2f7Stbbdev             else {
146751c0b2f7Stbbdev                 CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
146851c0b2f7Stbbdev                 CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
146951c0b2f7Stbbdev                 CHECK_MESSAGE( (ib0_cnt == g_NumItems), "Missing invocations of input_node0");
147051c0b2f7Stbbdev                 CHECK_MESSAGE( (ib1_cnt == g_NumItems), "Missing invocations of input_node1");
147151c0b2f7Stbbdev                 CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
147251c0b2f7Stbbdev             }
147351c0b2f7Stbbdev             if(iter == 0) {
147451c0b2f7Stbbdev                 remove_edge(node_to_test, sink);
147551c0b2f7Stbbdev                 tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 4));
147651c0b2f7Stbbdev                 tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2));
147751c0b2f7Stbbdev                 g.wait_for_all();   // have to wait for the graph to stop again....
147851c0b2f7Stbbdev                 g.reset();  // resets the body of the input_nodes, test_node and the absorb_nodes.
147951c0b2f7Stbbdev                 input0_count = input1_count = sink_count = 0;
148051c0b2f7Stbbdev                 make_edge(node_to_test, sink);
148151c0b2f7Stbbdev                 g.wait_for_all();   // have to wait for the graph to stop again....
148251c0b2f7Stbbdev             }
148351c0b2f7Stbbdev             else {
148451c0b2f7Stbbdev                 g.wait_for_all();
148551c0b2f7Stbbdev                 g.reset();
148651c0b2f7Stbbdev                 input0_count = input1_count = sink_count = 0;
148751c0b2f7Stbbdev             }
148851c0b2f7Stbbdev             CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType0>(input0).count_value()),"Reset input failed");
148951c0b2f7Stbbdev             CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType1>(input1).count_value()),"Reset input failed");
149051c0b2f7Stbbdev             nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
149151c0b2f7Stbbdev             CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value()),"Reset sink failed");
149251c0b2f7Stbbdev         }
149351c0b2f7Stbbdev 
149451c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
149551c0b2f7Stbbdev         o.observe(false);
149651c0b2f7Stbbdev #endif
149751c0b2f7Stbbdev     }
149851c0b2f7Stbbdev };  // run_one_join_node_test<tag_matching>
149951c0b2f7Stbbdev 
150051c0b2f7Stbbdev template<class JP, class OutputTuple,
150151c0b2f7Stbbdev              TestNodeTypeEnum InputThrowType,
150251c0b2f7Stbbdev              TestNodeTypeEnum SinkThrowType>
run_join_node_test()150351c0b2f7Stbbdev void run_join_node_test() {
150451c0b2f7Stbbdev     typedef typename std::tuple_element<0,OutputTuple>::type ItemType0;
150551c0b2f7Stbbdev     typedef typename std::tuple_element<1,OutputTuple>::type ItemType1;
150651c0b2f7Stbbdev     typedef test_input_body<ItemType0,InputThrowType> InputBodyType0;
150751c0b2f7Stbbdev     typedef test_input_body<ItemType1,InputThrowType> InputBodyType1;
150851c0b2f7Stbbdev     typedef absorber_body<OutputTuple,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
150951c0b2f7Stbbdev 
151051c0b2f7Stbbdev     typedef typename tbb::flow::input_node<ItemType0> InputType0;
151151c0b2f7Stbbdev     typedef typename tbb::flow::input_node<ItemType1> InputType1;
151251c0b2f7Stbbdev     typedef typename tbb::flow::join_node<OutputTuple,JP> TestJoinType;
151351c0b2f7Stbbdev     typedef typename tbb::flow::function_node<OutputTuple,tbb::flow::continue_msg> SinkType;
151451c0b2f7Stbbdev 
151551c0b2f7Stbbdev     for(int i = 0; i < 4; ++i) {
151651c0b2f7Stbbdev         if(2 == i) continue;
151751c0b2f7Stbbdev         bool throwException = (i & 0x1) != 0;
151851c0b2f7Stbbdev         bool doFlog = (i & 0x2) != 0;
151951c0b2f7Stbbdev         run_one_join_node_test<
152051c0b2f7Stbbdev              JP,
152151c0b2f7Stbbdev              OutputTuple,
152251c0b2f7Stbbdev              InputType0,
152351c0b2f7Stbbdev              InputBodyType0,
152451c0b2f7Stbbdev              InputType1,
152551c0b2f7Stbbdev              InputBodyType1,
152651c0b2f7Stbbdev              TestJoinType,
152751c0b2f7Stbbdev              SinkType,
152851c0b2f7Stbbdev              SinkBodyType>::execute_test(throwException,doFlog);
152951c0b2f7Stbbdev     }
153051c0b2f7Stbbdev }
153151c0b2f7Stbbdev 
153251c0b2f7Stbbdev template<class JP>
test_join_node()153351c0b2f7Stbbdev void test_join_node() {
153451c0b2f7Stbbdev     INFO("Testing join_node<" << graph_policy_name<JP>::name() << ">\n");
153551c0b2f7Stbbdev     // only doing two-input joins
153651c0b2f7Stbbdev     g_Wakeup_Msg = "join(is,non): Missed wakeup or machine is overloaded?";
153751c0b2f7Stbbdev     run_join_node_test<JP, std::tuple<int,int>,  isThrowing, nonThrowing>();
153851c0b2f7Stbbdev     CheckType<int>::check_type_counter = 0;
153951c0b2f7Stbbdev     g_Wakeup_Msg = "join(non,is): Missed wakeup or machine is overloaded?";
154051c0b2f7Stbbdev     run_join_node_test<JP, std::tuple<CheckType<int>,int>, nonThrowing, isThrowing>();
154151c0b2f7Stbbdev     CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Dropped items in test");
154251c0b2f7Stbbdev     g_Wakeup_Msg = "join(is,is): Missed wakeup or machine is overloaded?";
154351c0b2f7Stbbdev     run_join_node_test<JP, std::tuple<int,int>,  isThrowing, isThrowing>();
154451c0b2f7Stbbdev     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
154551c0b2f7Stbbdev }
154651c0b2f7Stbbdev 
154751c0b2f7Stbbdev // ------------------- limiter_node -------------
154851c0b2f7Stbbdev 
154951c0b2f7Stbbdev template<
155051c0b2f7Stbbdev     class BufferItemType,       //
155151c0b2f7Stbbdev     class InputNodeType,
155251c0b2f7Stbbdev     class InputNodeBodyType,
155351c0b2f7Stbbdev     class TestNodeType,
155451c0b2f7Stbbdev     class SinkNodeType,
155551c0b2f7Stbbdev     class SinkNodeBodyType >
run_one_limiter_node_test(bool throwException,bool flog)155651c0b2f7Stbbdev void run_one_limiter_node_test(bool throwException,bool flog) {
155751c0b2f7Stbbdev     tbb::flow::graph g;
155851c0b2f7Stbbdev 
155951c0b2f7Stbbdev     std::atomic<int> input_count;
156051c0b2f7Stbbdev     std::atomic<int> sink_count;
156151c0b2f7Stbbdev     input_count = sink_count = 0;
156251c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
156351c0b2f7Stbbdev     eh_test_observer o;
156451c0b2f7Stbbdev     o.observe(true);
156551c0b2f7Stbbdev #endif
156651c0b2f7Stbbdev     g_Master = std::this_thread::get_id();
156751c0b2f7Stbbdev     InputNodeType input(g, InputNodeBodyType(input_count));
156851c0b2f7Stbbdev     TestNodeType node_to_test(g,g_NumThreads + 1);
156951c0b2f7Stbbdev     SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
157051c0b2f7Stbbdev     make_edge(input,node_to_test);
157151c0b2f7Stbbdev     make_edge(node_to_test, sink);
157251c0b2f7Stbbdev     for(int iter = 0; iter < 2; ++iter) {
157351c0b2f7Stbbdev         ResetGlobals(throwException,flog);
157451c0b2f7Stbbdev         if(throwException) {
157551c0b2f7Stbbdev             TRY();
157651c0b2f7Stbbdev                 input.activate();
157751c0b2f7Stbbdev                 g.wait_for_all();
157851c0b2f7Stbbdev             CATCH_AND_ASSERT();
157951c0b2f7Stbbdev         }
158051c0b2f7Stbbdev         else {
158151c0b2f7Stbbdev             TRY();
158251c0b2f7Stbbdev                 input.activate();
158351c0b2f7Stbbdev                 g.wait_for_all();
158451c0b2f7Stbbdev             CATCH_AND_FAIL();
158551c0b2f7Stbbdev         }
158651c0b2f7Stbbdev         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
158751c0b2f7Stbbdev         int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value();
158851c0b2f7Stbbdev         int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
158951c0b2f7Stbbdev         if(throwException) {
159051c0b2f7Stbbdev             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
159151c0b2f7Stbbdev             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
159251c0b2f7Stbbdev             CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs");
159351c0b2f7Stbbdev             CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes");
159451c0b2f7Stbbdev         }
159551c0b2f7Stbbdev         else {
159651c0b2f7Stbbdev             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
159751c0b2f7Stbbdev             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
159851c0b2f7Stbbdev             // we stop after limiter's limit, which is g_NumThreads + 1.  The input_node
159951c0b2f7Stbbdev             // is invoked one extra time, filling its buffer, so its limit is g_NumThreads + 2.
160051c0b2f7Stbbdev             CHECK_MESSAGE( (ib_cnt == g_NumThreads + 2), "Missing invocations of input_node");
160151c0b2f7Stbbdev             CHECK_MESSAGE( (nb_cnt == g_NumThreads + 1), "Missing items in absorbers");
160251c0b2f7Stbbdev         }
160351c0b2f7Stbbdev         if(iter == 0) {
160451c0b2f7Stbbdev             remove_edge(node_to_test, sink);
160551c0b2f7Stbbdev             node_to_test.try_put(BufferItemType());
160651c0b2f7Stbbdev             node_to_test.try_put(BufferItemType());
160751c0b2f7Stbbdev             g.wait_for_all();
160851c0b2f7Stbbdev             g.reset();
160951c0b2f7Stbbdev             input_count = sink_count = 0;
161051c0b2f7Stbbdev             BufferItemType tmp;
161151c0b2f7Stbbdev             CHECK_MESSAGE( (!node_to_test.try_get(tmp)), "node not empty");
161251c0b2f7Stbbdev             make_edge(node_to_test, sink);
161351c0b2f7Stbbdev             g.wait_for_all();
161451c0b2f7Stbbdev         }
161551c0b2f7Stbbdev         else {
161651c0b2f7Stbbdev             g.reset();
161751c0b2f7Stbbdev             input_count = sink_count = 0;
161851c0b2f7Stbbdev         }
161951c0b2f7Stbbdev         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed");
162051c0b2f7Stbbdev         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed");
162151c0b2f7Stbbdev     }
162251c0b2f7Stbbdev 
162351c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
162451c0b2f7Stbbdev     o.observe(false);
162551c0b2f7Stbbdev #endif
162651c0b2f7Stbbdev }
162751c0b2f7Stbbdev 
162851c0b2f7Stbbdev template<class BufferItemType,
162951c0b2f7Stbbdev          TestNodeTypeEnum InputThrowType,
163051c0b2f7Stbbdev          TestNodeTypeEnum SinkThrowType>
run_limiter_node_test()163151c0b2f7Stbbdev void run_limiter_node_test() {
163251c0b2f7Stbbdev     typedef test_input_body<BufferItemType,InputThrowType> InputBodyType;
163351c0b2f7Stbbdev     typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
163451c0b2f7Stbbdev 
163551c0b2f7Stbbdev     typedef tbb::flow::input_node<BufferItemType> InputType;
163651c0b2f7Stbbdev     typedef tbb::flow::limiter_node<BufferItemType>  LmtType;
163751c0b2f7Stbbdev     typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
163851c0b2f7Stbbdev 
163951c0b2f7Stbbdev     for(int i = 0; i < 4; ++i) {
164051c0b2f7Stbbdev         if(i == 2) continue;  // no need to test flog w/o throws
164151c0b2f7Stbbdev         bool throwException = (i & 0x1) != 0;
164251c0b2f7Stbbdev         bool doFlog = (i & 0x2) != 0;
164351c0b2f7Stbbdev         run_one_limiter_node_test<
164451c0b2f7Stbbdev             /* class BufferItemType*/     BufferItemType,
164551c0b2f7Stbbdev             /*class InputNodeType*/       InputType,
164651c0b2f7Stbbdev             /*class InputNodeBodyType*/   InputBodyType,
164751c0b2f7Stbbdev             /*class TestNodeType*/        LmtType,
164851c0b2f7Stbbdev             /*class SinkNodeType*/        SnkType,
164951c0b2f7Stbbdev             /*class SinkNodeBodyType*/    SinkBodyType
165051c0b2f7Stbbdev             >(throwException, doFlog);
165151c0b2f7Stbbdev     }
165251c0b2f7Stbbdev }
165351c0b2f7Stbbdev 
test_limiter_node()165451c0b2f7Stbbdev void test_limiter_node() {
165551c0b2f7Stbbdev     INFO("Testing limiter_node\n");
165651c0b2f7Stbbdev     g_Wakeup_Msg = "limiter_node(is,non): Missed wakeup or machine is overloaded?";
165751c0b2f7Stbbdev     run_limiter_node_test<int,isThrowing,nonThrowing>();
165851c0b2f7Stbbdev     g_Wakeup_Msg = "limiter_node(non,is): Missed wakeup or machine is overloaded?";
165951c0b2f7Stbbdev     run_limiter_node_test<int,nonThrowing,isThrowing>();
166051c0b2f7Stbbdev     g_Wakeup_Msg = "limiter_node(is,is): Missed wakeup or machine is overloaded?";
166151c0b2f7Stbbdev     run_limiter_node_test<int,isThrowing,isThrowing>();
166251c0b2f7Stbbdev     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
166351c0b2f7Stbbdev }
166451c0b2f7Stbbdev 
166551c0b2f7Stbbdev // -------- split_node --------------------
166651c0b2f7Stbbdev 
166751c0b2f7Stbbdev template<
166851c0b2f7Stbbdev     class InputTuple,
166951c0b2f7Stbbdev     class InputType,
167051c0b2f7Stbbdev     class InputBodyType,
167151c0b2f7Stbbdev     class TestSplitType,
167251c0b2f7Stbbdev     class SinkType0,
167351c0b2f7Stbbdev     class SinkBodyType0,
167451c0b2f7Stbbdev     class SinkType1,
167551c0b2f7Stbbdev     class SinkBodyType1>
run_one_split_node_test(bool throwException,bool flog)167651c0b2f7Stbbdev void run_one_split_node_test(bool throwException, bool flog) {
167751c0b2f7Stbbdev 
167851c0b2f7Stbbdev     tbb::flow::graph g;
167951c0b2f7Stbbdev 
168051c0b2f7Stbbdev     std::atomic<int> input_count;
168151c0b2f7Stbbdev     std::atomic<int> sink0_count;
168251c0b2f7Stbbdev     std::atomic<int> sink1_count;
168351c0b2f7Stbbdev     input_count = sink0_count = sink1_count = 0;
168451c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
168551c0b2f7Stbbdev     eh_test_observer o;
168651c0b2f7Stbbdev     o.observe(true);
168751c0b2f7Stbbdev #endif
168851c0b2f7Stbbdev 
168951c0b2f7Stbbdev     g_Master = std::this_thread::get_id();
169051c0b2f7Stbbdev     InputType input(g, InputBodyType(input_count));
169151c0b2f7Stbbdev     TestSplitType node_to_test(g);
169251c0b2f7Stbbdev     SinkType0 sink0(g,tbb::flow::unlimited,SinkBodyType0(sink0_count));
169351c0b2f7Stbbdev     SinkType1 sink1(g,tbb::flow::unlimited,SinkBodyType1(sink1_count));
169451c0b2f7Stbbdev     make_edge(input, node_to_test);
169551c0b2f7Stbbdev     make_edge(tbb::flow::output_port<0>(node_to_test), sink0);
169651c0b2f7Stbbdev     make_edge(tbb::flow::output_port<1>(node_to_test), sink1);
169751c0b2f7Stbbdev 
169851c0b2f7Stbbdev     for(int iter = 0; iter < 2; ++iter) {  // run, reset, run again
169951c0b2f7Stbbdev         ResetGlobals(throwException,flog);
170051c0b2f7Stbbdev         if(throwException) {
170151c0b2f7Stbbdev             TRY();
170251c0b2f7Stbbdev                 input.activate();
170351c0b2f7Stbbdev                 g.wait_for_all();
170451c0b2f7Stbbdev             CATCH_AND_ASSERT();
170551c0b2f7Stbbdev         }
170651c0b2f7Stbbdev         else {
170751c0b2f7Stbbdev             TRY();
170851c0b2f7Stbbdev                 input.activate();
170951c0b2f7Stbbdev                 g.wait_for_all();
171051c0b2f7Stbbdev             CATCH_AND_FAIL();
171151c0b2f7Stbbdev         }
171251c0b2f7Stbbdev         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
171351c0b2f7Stbbdev         int ib_cnt = tbb::flow::copy_body<InputBodyType>(input).count_value();
171451c0b2f7Stbbdev         int nb0_cnt = tbb::flow::copy_body<SinkBodyType0>(sink0).count_value();
171551c0b2f7Stbbdev         int nb1_cnt = tbb::flow::copy_body<SinkBodyType1>(sink1).count_value();
171651c0b2f7Stbbdev         if(throwException) {
171751c0b2f7Stbbdev             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
171851c0b2f7Stbbdev             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
171951c0b2f7Stbbdev             CHECK_MESSAGE( (ib_cnt <= 2*g_NumItems), "Too many items sent by input");
172051c0b2f7Stbbdev             CHECK_MESSAGE( (nb0_cnt + nb1_cnt <= ib_cnt*2), "Too many items received by sink nodes");
172151c0b2f7Stbbdev         }
172251c0b2f7Stbbdev         else {
172351c0b2f7Stbbdev             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
172451c0b2f7Stbbdev             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
172551c0b2f7Stbbdev             CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_nodes");
172651c0b2f7Stbbdev             CHECK_MESSAGE( (nb0_cnt == g_NumItems && nb1_cnt == g_NumItems), "Missing items in absorbers");
172751c0b2f7Stbbdev         }
172851c0b2f7Stbbdev         g.reset();  // resets the body of the input_nodes and the absorb_nodes.
172951c0b2f7Stbbdev         input_count = sink0_count = sink1_count = 0;
173051c0b2f7Stbbdev         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType>(input).count_value()),"Reset input failed");
173151c0b2f7Stbbdev         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType0>(sink0).count_value()),"Reset sink 0 failed");
173251c0b2f7Stbbdev         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType1>(sink1).count_value()),"Reset sink 1 failed");
173351c0b2f7Stbbdev     }
173451c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
173551c0b2f7Stbbdev     o.observe(false);
173651c0b2f7Stbbdev #endif
173751c0b2f7Stbbdev }
173851c0b2f7Stbbdev 
173951c0b2f7Stbbdev template<class InputTuple,
174051c0b2f7Stbbdev              TestNodeTypeEnum InputThrowType,
174151c0b2f7Stbbdev              TestNodeTypeEnum SinkThrowType>
run_split_node_test()174251c0b2f7Stbbdev void run_split_node_test() {
174351c0b2f7Stbbdev     typedef typename std::tuple_element<0,InputTuple>::type ItemType0;
174451c0b2f7Stbbdev     typedef typename std::tuple_element<1,InputTuple>::type ItemType1;
174551c0b2f7Stbbdev     typedef tuple_test_input_body<InputTuple,InputThrowType> InputBodyType;
174651c0b2f7Stbbdev     typedef absorber_body<ItemType0,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType0;
174751c0b2f7Stbbdev     typedef absorber_body<ItemType1,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType1;
174851c0b2f7Stbbdev 
174951c0b2f7Stbbdev     typedef typename tbb::flow::input_node<InputTuple> InputType;
175051c0b2f7Stbbdev     typedef typename tbb::flow::split_node<InputTuple> TestSplitType;
175151c0b2f7Stbbdev     typedef typename tbb::flow::function_node<ItemType0,tbb::flow::continue_msg> SinkType0;
175251c0b2f7Stbbdev     typedef typename tbb::flow::function_node<ItemType1,tbb::flow::continue_msg> SinkType1;
175351c0b2f7Stbbdev 
175451c0b2f7Stbbdev     for(int i = 0; i < 4; ++i) {
175551c0b2f7Stbbdev         if(2 == i) continue;
175651c0b2f7Stbbdev         bool throwException = (i & 0x1) != 0;
175751c0b2f7Stbbdev         bool doFlog = (i & 0x2) != 0;
175851c0b2f7Stbbdev         run_one_split_node_test<
175951c0b2f7Stbbdev             InputTuple,
176051c0b2f7Stbbdev             InputType,
176151c0b2f7Stbbdev             InputBodyType,
176251c0b2f7Stbbdev             TestSplitType,
176351c0b2f7Stbbdev             SinkType0,
176451c0b2f7Stbbdev             SinkBodyType0,
176551c0b2f7Stbbdev             SinkType1,
176651c0b2f7Stbbdev             SinkBodyType1>
176751c0b2f7Stbbdev                 (throwException,doFlog);
176851c0b2f7Stbbdev     }
176951c0b2f7Stbbdev }
177051c0b2f7Stbbdev 
test_split_node()177151c0b2f7Stbbdev void test_split_node() {
177251c0b2f7Stbbdev     INFO("Testing split_node\n");
177351c0b2f7Stbbdev     g_Wakeup_Msg = "split_node(is,non): Missed wakeup or machine is overloaded?";
177451c0b2f7Stbbdev     run_split_node_test<std::tuple<int,int>, isThrowing, nonThrowing>();
177551c0b2f7Stbbdev     g_Wakeup_Msg = "split_node(non,is): Missed wakeup or machine is overloaded?";
177651c0b2f7Stbbdev     run_split_node_test<std::tuple<int,int>, nonThrowing, isThrowing>();
177751c0b2f7Stbbdev     g_Wakeup_Msg = "split_node(is,is): Missed wakeup or machine is overloaded?";
177851c0b2f7Stbbdev     run_split_node_test<std::tuple<int,int>, isThrowing,  isThrowing>();
177951c0b2f7Stbbdev     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
178051c0b2f7Stbbdev }
178151c0b2f7Stbbdev 
178251c0b2f7Stbbdev // --------- indexer_node ----------------------
178351c0b2f7Stbbdev 
178451c0b2f7Stbbdev template < class InputTuple,
178551c0b2f7Stbbdev     class InputType0,
178651c0b2f7Stbbdev     class InputBodyType0,
178751c0b2f7Stbbdev     class InputType1,
178851c0b2f7Stbbdev     class InputBodyType1,
178951c0b2f7Stbbdev     class TestNodeType,
179051c0b2f7Stbbdev     class SinkType,
179151c0b2f7Stbbdev     class SinkBodyType>
run_one_indexer_node_test(bool throwException,bool flog)179251c0b2f7Stbbdev void run_one_indexer_node_test(bool throwException,bool flog) {
179351c0b2f7Stbbdev     typedef typename std::tuple_element<0,InputTuple>::type ItemType0;
179451c0b2f7Stbbdev     typedef typename std::tuple_element<1,InputTuple>::type ItemType1;
179551c0b2f7Stbbdev 
179651c0b2f7Stbbdev     tbb::flow::graph g;
179751c0b2f7Stbbdev 
179851c0b2f7Stbbdev     std::atomic<int> input0_count;
179951c0b2f7Stbbdev     std::atomic<int> input1_count;
180051c0b2f7Stbbdev     std::atomic<int> sink_count;
180151c0b2f7Stbbdev     input0_count = input1_count = sink_count = 0;
180251c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
180351c0b2f7Stbbdev     eh_test_observer o;
180451c0b2f7Stbbdev     o.observe(true);
180551c0b2f7Stbbdev #endif
180651c0b2f7Stbbdev     g_Master = std::this_thread::get_id();
180751c0b2f7Stbbdev     InputType0 input0(g, InputBodyType0(input0_count));
180851c0b2f7Stbbdev     InputType1 input1(g, InputBodyType1(input1_count));
180951c0b2f7Stbbdev     TestNodeType node_to_test(g);
181051c0b2f7Stbbdev     SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count));
181151c0b2f7Stbbdev     make_edge(input0,tbb::flow::input_port<0>(node_to_test));
181251c0b2f7Stbbdev     make_edge(input1,tbb::flow::input_port<1>(node_to_test));
181351c0b2f7Stbbdev     make_edge(node_to_test, sink);
181451c0b2f7Stbbdev     for(int iter = 0; iter < 2; ++iter) {
181551c0b2f7Stbbdev         ResetGlobals(throwException,flog);
181651c0b2f7Stbbdev         if(throwException) {
181751c0b2f7Stbbdev             TRY();
181851c0b2f7Stbbdev                 input0.activate();
181951c0b2f7Stbbdev                 input1.activate();
182051c0b2f7Stbbdev                 g.wait_for_all();
182151c0b2f7Stbbdev             CATCH_AND_ASSERT();
182251c0b2f7Stbbdev         }
182351c0b2f7Stbbdev         else {
182451c0b2f7Stbbdev             TRY();
182551c0b2f7Stbbdev                 input0.activate();
182651c0b2f7Stbbdev                 input1.activate();
182751c0b2f7Stbbdev                 g.wait_for_all();
182851c0b2f7Stbbdev             CATCH_AND_FAIL();
182951c0b2f7Stbbdev         }
183051c0b2f7Stbbdev         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
183151c0b2f7Stbbdev         int ib0_cnt = tbb::flow::copy_body<InputBodyType0>(input0).count_value();
183251c0b2f7Stbbdev         int ib1_cnt = tbb::flow::copy_body<InputBodyType1>(input1).count_value();
183351c0b2f7Stbbdev         int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
183451c0b2f7Stbbdev         if(throwException) {
183551c0b2f7Stbbdev             CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
183651c0b2f7Stbbdev             CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
183751c0b2f7Stbbdev             CHECK_MESSAGE( (ib0_cnt <= g_NumItems && ib1_cnt <= g_NumItems), "Too many items sent by inputs");
183851c0b2f7Stbbdev             CHECK_MESSAGE( (nb_cnt <= ib0_cnt + ib1_cnt), "Too many items received by sink nodes");
183951c0b2f7Stbbdev         }
184051c0b2f7Stbbdev         else {
184151c0b2f7Stbbdev             CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
184251c0b2f7Stbbdev             CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
184351c0b2f7Stbbdev             CHECK_MESSAGE( (ib0_cnt == g_NumItems), "Missing invocations of input_node0");
184451c0b2f7Stbbdev             CHECK_MESSAGE( (ib1_cnt == g_NumItems), "Missing invocations of input_node1");
184551c0b2f7Stbbdev             CHECK_MESSAGE( (nb_cnt == 2*g_NumItems), "Missing items in absorbers");
184651c0b2f7Stbbdev         }
184751c0b2f7Stbbdev         if(iter == 0) {
184851c0b2f7Stbbdev             remove_edge(node_to_test, sink);
184951c0b2f7Stbbdev             tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 4));
185051c0b2f7Stbbdev             tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2));
185151c0b2f7Stbbdev             g.wait_for_all();
185251c0b2f7Stbbdev             g.reset();
185351c0b2f7Stbbdev             input0_count = input1_count = sink_count = 0;
185451c0b2f7Stbbdev             make_edge(node_to_test, sink);
185551c0b2f7Stbbdev             g.wait_for_all();
185651c0b2f7Stbbdev         }
185751c0b2f7Stbbdev         else {
185851c0b2f7Stbbdev             g.wait_for_all();
185951c0b2f7Stbbdev             g.reset();
186051c0b2f7Stbbdev             input0_count = input1_count = sink_count = 0;
186151c0b2f7Stbbdev         }
186251c0b2f7Stbbdev         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType0>(input0).count_value()),"Reset input failed");
186351c0b2f7Stbbdev         CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType1>(input1).count_value()),"Reset input failed");
186451c0b2f7Stbbdev         nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
186551c0b2f7Stbbdev         CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value()),"Reset sink failed");
186651c0b2f7Stbbdev     }
186751c0b2f7Stbbdev 
186851c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
186951c0b2f7Stbbdev     o.observe(false);
187051c0b2f7Stbbdev #endif
187151c0b2f7Stbbdev }
187251c0b2f7Stbbdev 
187351c0b2f7Stbbdev template<class InputTuple,
187451c0b2f7Stbbdev     TestNodeTypeEnum InputThrowType,
187551c0b2f7Stbbdev     TestNodeTypeEnum SinkThrowType>
run_indexer_node_test()187651c0b2f7Stbbdev void run_indexer_node_test() {
187751c0b2f7Stbbdev     typedef typename std::tuple_element<0,InputTuple>::type ItemType0;
187851c0b2f7Stbbdev     typedef typename std::tuple_element<1,InputTuple>::type ItemType1;
187951c0b2f7Stbbdev     typedef test_input_body<ItemType0,InputThrowType> InputBodyType0;
188051c0b2f7Stbbdev     typedef test_input_body<ItemType1,InputThrowType> InputBodyType1;
188151c0b2f7Stbbdev     typedef typename tbb::flow::indexer_node<ItemType0, ItemType1> TestNodeType;
188251c0b2f7Stbbdev     typedef absorber_body<typename TestNodeType::output_type,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
188351c0b2f7Stbbdev 
188451c0b2f7Stbbdev     typedef typename tbb::flow::input_node<ItemType0> InputType0;
188551c0b2f7Stbbdev     typedef typename tbb::flow::input_node<ItemType1> InputType1;
188651c0b2f7Stbbdev     typedef typename tbb::flow::function_node<typename TestNodeType::output_type,tbb::flow::continue_msg> SinkType;
188751c0b2f7Stbbdev 
188851c0b2f7Stbbdev     for(int i = 0; i < 4; ++i) {
188951c0b2f7Stbbdev         if(2 == i) continue;
189051c0b2f7Stbbdev         bool throwException = (i & 0x1) != 0;
189151c0b2f7Stbbdev         bool doFlog = (i & 0x2) != 0;
189251c0b2f7Stbbdev         run_one_indexer_node_test<
189351c0b2f7Stbbdev              InputTuple,
189451c0b2f7Stbbdev              InputType0,
189551c0b2f7Stbbdev              InputBodyType0,
189651c0b2f7Stbbdev              InputType1,
189751c0b2f7Stbbdev              InputBodyType1,
189851c0b2f7Stbbdev              TestNodeType,
189951c0b2f7Stbbdev              SinkType,
190051c0b2f7Stbbdev              SinkBodyType>(throwException,doFlog);
190151c0b2f7Stbbdev     }
190251c0b2f7Stbbdev }
190351c0b2f7Stbbdev 
test_indexer_node()190451c0b2f7Stbbdev void test_indexer_node() {
190551c0b2f7Stbbdev     INFO("Testing indexer_node\n");
190651c0b2f7Stbbdev     g_Wakeup_Msg = "indexer_node(is,non): Missed wakeup or machine is overloaded?";
190751c0b2f7Stbbdev     run_indexer_node_test<std::tuple<int,int>, isThrowing, nonThrowing>();
190851c0b2f7Stbbdev     g_Wakeup_Msg = "indexer_node(non,is): Missed wakeup or machine is overloaded?";
190951c0b2f7Stbbdev     run_indexer_node_test<std::tuple<int,int>, nonThrowing, isThrowing>();
191051c0b2f7Stbbdev     g_Wakeup_Msg = "indexer_node(is,is): Missed wakeup or machine is overloaded?";
191151c0b2f7Stbbdev     run_indexer_node_test<std::tuple<int,int>, isThrowing,  isThrowing>();
19125e91b2c0SVladislav Shchapov     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
191351c0b2f7Stbbdev }
191451c0b2f7Stbbdev 
191551c0b2f7Stbbdev ///////////////////////////////////////////////
191651c0b2f7Stbbdev // whole-graph exception test
191751c0b2f7Stbbdev 
191851c0b2f7Stbbdev class Foo {
191951c0b2f7Stbbdev private:
192051c0b2f7Stbbdev     // std::vector<int>& m_vec;
192151c0b2f7Stbbdev     std::vector<int>* m_vec;
192251c0b2f7Stbbdev public:
Foo(std::vector<int> & vec)192351c0b2f7Stbbdev     Foo(std::vector<int>& vec) : m_vec(&vec) { }
operator ()(tbb::flow::continue_msg) const192451c0b2f7Stbbdev     void operator() (tbb::flow::continue_msg) const {
192551c0b2f7Stbbdev         ++nExceptions;
192651c0b2f7Stbbdev         (void)m_vec->at(m_vec->size()); // Will throw out_of_range exception
192751c0b2f7Stbbdev         CHECK_MESSAGE( (false), "Exception not thrown by invalid access");
192851c0b2f7Stbbdev     }
192951c0b2f7Stbbdev };
193051c0b2f7Stbbdev 
1931*89b2e0e3SOlga Malysheva // test from user ahelwer: https://community.intel.com/t5/Intel-oneAPI-Threading-Building/Exception-in-flow-graph-results-in-graph-wait-for-all-hanging/td-p/789352
193251c0b2f7Stbbdev // exception thrown in graph node, not caught in wait_for_all()
193351c0b2f7Stbbdev void
test_flow_graph_exception0()193451c0b2f7Stbbdev test_flow_graph_exception0() {
193551c0b2f7Stbbdev     // Initializes body
193651c0b2f7Stbbdev     std::vector<int> vec;
193751c0b2f7Stbbdev     vec.push_back(0);
193851c0b2f7Stbbdev     Foo f(vec);
193951c0b2f7Stbbdev     nExceptions = 0;
194051c0b2f7Stbbdev 
194151c0b2f7Stbbdev     // Construct graph and nodes
194251c0b2f7Stbbdev     tbb::flow::graph g;
194351c0b2f7Stbbdev     tbb::flow::broadcast_node<tbb::flow::continue_msg> start(g);
194451c0b2f7Stbbdev     tbb::flow::continue_node<tbb::flow::continue_msg> fooNode(g, f);
194551c0b2f7Stbbdev 
194651c0b2f7Stbbdev     // Construct edge
194751c0b2f7Stbbdev     tbb::flow::make_edge(start, fooNode);
194851c0b2f7Stbbdev 
194951c0b2f7Stbbdev     // Execute graph
195051c0b2f7Stbbdev     CHECK_MESSAGE( (!g.exception_thrown()), "exception_thrown flag already set");
195151c0b2f7Stbbdev     CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag already set");
195251c0b2f7Stbbdev     try {
195351c0b2f7Stbbdev         start.try_put(tbb::flow::continue_msg());
195451c0b2f7Stbbdev         g.wait_for_all();
195551c0b2f7Stbbdev         CHECK_MESSAGE( (false), "Exception not thrown");
195651c0b2f7Stbbdev     }
195751c0b2f7Stbbdev     catch(std::out_of_range& ex) {
195851c0b2f7Stbbdev         INFO("Exception: " << ex.what() << "(expected)\n");
195951c0b2f7Stbbdev     }
196051c0b2f7Stbbdev     catch(...) {
196151c0b2f7Stbbdev         INFO("Unknown exception caught (expected)\n");
196251c0b2f7Stbbdev     }
196351c0b2f7Stbbdev     CHECK_MESSAGE( (nExceptions > 0), "Exception caught, but no body signaled exception being thrown");
196451c0b2f7Stbbdev     nExceptions = 0;
196551c0b2f7Stbbdev     CHECK_MESSAGE( (g.exception_thrown()), "Exception not intercepted");
196651c0b2f7Stbbdev     // if exception set, cancellation also set.
196751c0b2f7Stbbdev     CHECK_MESSAGE( (g.is_cancelled()), "Exception cancellation not signaled");
196851c0b2f7Stbbdev     // in case we got an exception
196951c0b2f7Stbbdev     try {
197051c0b2f7Stbbdev         g.wait_for_all();  // context still signalled canceled, my_exception still set.
197151c0b2f7Stbbdev     }
197251c0b2f7Stbbdev     catch(...) {
197351c0b2f7Stbbdev         CHECK_MESSAGE( (false), "Second exception thrown but no task executing");
197451c0b2f7Stbbdev     }
197551c0b2f7Stbbdev     CHECK_MESSAGE( (nExceptions == 0), "body signaled exception being thrown, but no body executed");
197651c0b2f7Stbbdev     CHECK_MESSAGE( (!g.exception_thrown()), "exception_thrown flag not reset");
197751c0b2f7Stbbdev     CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag not reset");
197851c0b2f7Stbbdev }
197951c0b2f7Stbbdev 
TestOneThreadNum(int nThread)198051c0b2f7Stbbdev void TestOneThreadNum(int nThread) {
198151c0b2f7Stbbdev     INFO("Testing " << nThread << "%d threads\n");
198251c0b2f7Stbbdev     g_NumItems = ((nThread > NUM_ITEMS) ? nThread *2 : NUM_ITEMS);
198351c0b2f7Stbbdev     g_NumThreads = nThread;
198451c0b2f7Stbbdev     tbb::task_arena arena(nThread);
198551c0b2f7Stbbdev 	arena.execute(
198651c0b2f7Stbbdev         [&]() {
198751c0b2f7Stbbdev             // whole-graph exception catch and rethrow test
198851c0b2f7Stbbdev             test_flow_graph_exception0();
198951c0b2f7Stbbdev             for(int i = 0; i < 4; ++i) {
199051c0b2f7Stbbdev                 g_ExceptionInMaster = (i & 1) != 0;
199151c0b2f7Stbbdev                 g_SolitaryException = (i & 2) != 0;
199251c0b2f7Stbbdev                 INFO("g_ExceptionInMaster == " << (g_ExceptionInMaster ? "T":"F")
199351c0b2f7Stbbdev                      << ", g_SolitaryException == " << (g_SolitaryException ? "T":"F") << "\n");
199451c0b2f7Stbbdev                 test_input_node();
199551c0b2f7Stbbdev                 test_function_node();
199651c0b2f7Stbbdev                 test_continue_node();  // also test broadcast_node
199751c0b2f7Stbbdev                 test_multifunction_node();
199851c0b2f7Stbbdev                 // single- and multi-item buffering nodes
199951c0b2f7Stbbdev                 test_buffer_queue_and_overwrite_node();
200051c0b2f7Stbbdev                 test_sequencer_node();
200151c0b2f7Stbbdev                 test_priority_queue_node();
200251c0b2f7Stbbdev 
200351c0b2f7Stbbdev                 // join_nodes
200451c0b2f7Stbbdev                 test_join_node<tbb::flow::queueing>();
200551c0b2f7Stbbdev                 test_join_node<tbb::flow::reserving>();
200651c0b2f7Stbbdev                 test_join_node<tbb::flow::tag_matching>();
200751c0b2f7Stbbdev 
200851c0b2f7Stbbdev                 test_limiter_node();
200951c0b2f7Stbbdev                 test_split_node();
201051c0b2f7Stbbdev                 // graph for write_once_node will be complicated by the fact the node will
201151c0b2f7Stbbdev                 // not do try_puts after it has been set.  To get parallelism of N we have
201251c0b2f7Stbbdev                 // to attach N successor nodes to the write_once (or play some similar game).
201351c0b2f7Stbbdev                 // test_write_once_node();
201451c0b2f7Stbbdev                 test_indexer_node();
201551c0b2f7Stbbdev             }
201651c0b2f7Stbbdev         }
201751c0b2f7Stbbdev     );
201851c0b2f7Stbbdev }
201951c0b2f7Stbbdev 
202051c0b2f7Stbbdev //! Test exceptions with parallelism
202151c0b2f7Stbbdev //! \brief \ref error_guessing
202251c0b2f7Stbbdev TEST_CASE("Testing several threads"){
202351c0b2f7Stbbdev     // reverse order of tests
202451c0b2f7Stbbdev     for(unsigned int nThread=utils::MaxThread; nThread >= utils::MinThread; --nThread) {
2025552f342bSPavel         tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, nThread);
202651c0b2f7Stbbdev         TestOneThreadNum(nThread);
202751c0b2f7Stbbdev     }
202851c0b2f7Stbbdev }
202951c0b2f7Stbbdev 
203051c0b2f7Stbbdev #endif // TBB_USE_EXCEPTIONS
2031