151c0b2f7Stbbdev /*
2*89b2e0e3SOlga Malysheva Copyright (c) 2005-2023 Intel Corporation
351c0b2f7Stbbdev
451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev you may not use this file except in compliance with the License.
651c0b2f7Stbbdev You may obtain a copy of the License at
751c0b2f7Stbbdev
851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev
1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev See the License for the specific language governing permissions and
1451c0b2f7Stbbdev limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev
1751c0b2f7Stbbdev //! \file test_eh_flow_graph.cpp
1851c0b2f7Stbbdev //! \brief Test for [flow_graph.copy_body flow_graph.broadcast_node flow_graph.priority_queue_node flow_graph.indexer_node flow_graph.sequencer_node flow_graph.remove_edge flow_graph.join_node flow_graph.split_node flow_graph.limiter_node flow_graph.write_once_node flow_graph.overwrite_node flow_graph.make_edge flow_graph.graph flow_graph.buffer_node flow_graph.function_node flow_graph.multifunction_node flow_graph.continue_node flow_graph.input_node] specification
1951c0b2f7Stbbdev
2051c0b2f7Stbbdev #include "common/config.h"
2151c0b2f7Stbbdev
2251c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
2351c0b2f7Stbbdev #include "tbb/task_scheduler_observer.h"
2451c0b2f7Stbbdev #endif
2551c0b2f7Stbbdev #include "tbb/flow_graph.h"
26552f342bSPavel #include "tbb/global_control.h"
2751c0b2f7Stbbdev
2851c0b2f7Stbbdev #include "common/test.h"
2951c0b2f7Stbbdev
3051c0b2f7Stbbdev #if TBB_USE_EXCEPTIONS
3151c0b2f7Stbbdev
3251c0b2f7Stbbdev #include "common/utils.h"
3351c0b2f7Stbbdev #include "common/checktype.h"
3451c0b2f7Stbbdev #include "common/concurrency_tracker.h"
3551c0b2f7Stbbdev
3651c0b2f7Stbbdev #if _MSC_VER
3751c0b2f7Stbbdev #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning
3851c0b2f7Stbbdev #endif
3951c0b2f7Stbbdev
4051c0b2f7Stbbdev #if __TBB_MSVC_UNREACHABLE_CODE_IGNORED
4151c0b2f7Stbbdev // Suppress "unreachable code" warning by VC++ 17.0-18.0 (VS 2012 or newer)
4251c0b2f7Stbbdev #pragma warning (disable: 4702)
4351c0b2f7Stbbdev #endif
4451c0b2f7Stbbdev
4551c0b2f7Stbbdev // global task_scheduler_observer is an imperfect tool to find how many threads are really
4651c0b2f7Stbbdev // participating. That was the hope, but it counts the entries into the marketplace,
4751c0b2f7Stbbdev // not the arena.
4851c0b2f7Stbbdev // TODO: Consider using local task scheduler observer
4951c0b2f7Stbbdev // #define USE_TASK_SCHEDULER_OBSERVER 1
5051c0b2f7Stbbdev
5151c0b2f7Stbbdev #include <iostream>
5251c0b2f7Stbbdev #include <sstream>
5351c0b2f7Stbbdev #include <vector>
5451c0b2f7Stbbdev
5551c0b2f7Stbbdev #include "common/exception_handling.h"
5651c0b2f7Stbbdev
5751c0b2f7Stbbdev #include <stdexcept>
5851c0b2f7Stbbdev
5951c0b2f7Stbbdev #define NUM_ITEMS 15
6051c0b2f7Stbbdev int g_NumItems;
6151c0b2f7Stbbdev
6251c0b2f7Stbbdev std::atomic<unsigned> nExceptions;
6351c0b2f7Stbbdev std::atomic<intptr_t> g_TGCCancelled;
6451c0b2f7Stbbdev
6551c0b2f7Stbbdev enum TestNodeTypeEnum { nonThrowing, isThrowing };
6651c0b2f7Stbbdev
6751c0b2f7Stbbdev static const size_t unlimited_type = 0;
6851c0b2f7Stbbdev static const size_t serial_type = 1;
6951c0b2f7Stbbdev static const size_t limited_type = 4;
7051c0b2f7Stbbdev
7151c0b2f7Stbbdev template<TestNodeTypeEnum T> struct TestNodeTypeName;
nameTestNodeTypeName7251c0b2f7Stbbdev template<> struct TestNodeTypeName<nonThrowing> { static const char *name() { return "nonThrowing"; } };
nameTestNodeTypeName7351c0b2f7Stbbdev template<> struct TestNodeTypeName<isThrowing> { static const char *name() { return "isThrowing"; } };
7451c0b2f7Stbbdev
7551c0b2f7Stbbdev template<size_t Conc> struct concurrencyName;
nameconcurrencyName7651c0b2f7Stbbdev template<> struct concurrencyName<serial_type>{ static const char *name() { return "serial"; } };
nameconcurrencyName7751c0b2f7Stbbdev template<> struct concurrencyName<unlimited_type>{ static const char *name() { return "unlimited"; } };
nameconcurrencyName7851c0b2f7Stbbdev template<> struct concurrencyName<limited_type>{ static const char *name() { return "limited"; } };
7951c0b2f7Stbbdev
8051c0b2f7Stbbdev // Class that provides waiting and throwing behavior. If we are not throwing, do nothing
8151c0b2f7Stbbdev // If serial, we can't wait for concurrency to peak; we may be the bottleneck and will
8251c0b2f7Stbbdev // stop further processing. We will execute g_NumThreads + 10 times (the "10" is somewhat
8351c0b2f7Stbbdev // arbitrary, and just makes sure there are enough items in the graph to keep it flowing),
8451c0b2f7Stbbdev // If parallel or serial and throwing, use utils::ConcurrencyTracker to wait.
8551c0b2f7Stbbdev
8651c0b2f7Stbbdev template<size_t Conc, TestNodeTypeEnum t = nonThrowing>
8751c0b2f7Stbbdev class WaitThrow;
8851c0b2f7Stbbdev
8951c0b2f7Stbbdev template<>
9051c0b2f7Stbbdev class WaitThrow<serial_type,nonThrowing> {
9151c0b2f7Stbbdev protected:
WaitAndThrow(int cnt,const char *)9251c0b2f7Stbbdev void WaitAndThrow(int cnt, const char * /*name*/) {
9351c0b2f7Stbbdev if(cnt > g_NumThreads + 10) {
9451c0b2f7Stbbdev utils::ConcurrencyTracker ct;
9551c0b2f7Stbbdev WaitUntilConcurrencyPeaks();
9651c0b2f7Stbbdev }
9751c0b2f7Stbbdev }
9851c0b2f7Stbbdev };
9951c0b2f7Stbbdev
10051c0b2f7Stbbdev template<>
10151c0b2f7Stbbdev class WaitThrow<serial_type,isThrowing> {
10251c0b2f7Stbbdev protected:
WaitAndThrow(int cnt,const char *)10351c0b2f7Stbbdev void WaitAndThrow(int cnt, const char * /*name*/) {
10451c0b2f7Stbbdev if(cnt > g_NumThreads + 10) {
10551c0b2f7Stbbdev utils::ConcurrencyTracker ct;
10651c0b2f7Stbbdev WaitUntilConcurrencyPeaks();
10751c0b2f7Stbbdev ThrowTestException(1);
10851c0b2f7Stbbdev }
10951c0b2f7Stbbdev }
11051c0b2f7Stbbdev };
11151c0b2f7Stbbdev
11251c0b2f7Stbbdev // for nodes with limited concurrency, if that concurrency is < g_NumThreads, we need
11351c0b2f7Stbbdev // to make sure enough other nodes wait for concurrency to peak. If we are attached to
11451c0b2f7Stbbdev // N successors, for each item we pass to a successor, we will get N executions of the
11551c0b2f7Stbbdev // "absorbers" (because we broadcast to successors.) for an odd number of threads we
11651c0b2f7Stbbdev // need (g_NumThreads - limited + 1) / 2 items (that will give us one extra execution
11751c0b2f7Stbbdev // of an "absorber", but we can't change that without changing the behavior of the node.)
11851c0b2f7Stbbdev template<>
11951c0b2f7Stbbdev class WaitThrow<limited_type,nonThrowing> {
12051c0b2f7Stbbdev protected:
WaitAndThrow(int cnt,const char *)12151c0b2f7Stbbdev void WaitAndThrow(int cnt, const char * /*name*/) {
12251c0b2f7Stbbdev if(cnt <= (g_NumThreads - (int)limited_type + 1)/2) {
12351c0b2f7Stbbdev return;
12451c0b2f7Stbbdev }
12551c0b2f7Stbbdev utils::ConcurrencyTracker ct;
12651c0b2f7Stbbdev WaitUntilConcurrencyPeaks();
12751c0b2f7Stbbdev }
12851c0b2f7Stbbdev };
12951c0b2f7Stbbdev
13051c0b2f7Stbbdev template<>
13151c0b2f7Stbbdev class WaitThrow<limited_type,isThrowing> {
13251c0b2f7Stbbdev protected:
WaitAndThrow(int cnt,const char *)13351c0b2f7Stbbdev void WaitAndThrow(int cnt, const char * /*name*/) {
13451c0b2f7Stbbdev utils::ConcurrencyTracker ct;
13551c0b2f7Stbbdev if(cnt <= (g_NumThreads - (int)limited_type + 1)/2) {
13651c0b2f7Stbbdev return;
13751c0b2f7Stbbdev }
13851c0b2f7Stbbdev WaitUntilConcurrencyPeaks();
13951c0b2f7Stbbdev ThrowTestException(1);
14051c0b2f7Stbbdev }
14151c0b2f7Stbbdev };
14251c0b2f7Stbbdev
14351c0b2f7Stbbdev template<>
14451c0b2f7Stbbdev class WaitThrow<unlimited_type,nonThrowing> {
14551c0b2f7Stbbdev protected:
WaitAndThrow(int,const char *)14651c0b2f7Stbbdev void WaitAndThrow(int /*cnt*/, const char * /*name*/) {
14751c0b2f7Stbbdev utils::ConcurrencyTracker ct;
14851c0b2f7Stbbdev WaitUntilConcurrencyPeaks();
14951c0b2f7Stbbdev }
15051c0b2f7Stbbdev };
15151c0b2f7Stbbdev
15251c0b2f7Stbbdev template<>
15351c0b2f7Stbbdev class WaitThrow<unlimited_type,isThrowing> {
15451c0b2f7Stbbdev protected:
WaitAndThrow(int,const char *)15551c0b2f7Stbbdev void WaitAndThrow(int /*cnt*/, const char * /*name*/) {
15651c0b2f7Stbbdev utils::ConcurrencyTracker ct;
15751c0b2f7Stbbdev WaitUntilConcurrencyPeaks();
15851c0b2f7Stbbdev ThrowTestException(1);
15951c0b2f7Stbbdev }
16051c0b2f7Stbbdev };
16151c0b2f7Stbbdev
16251c0b2f7Stbbdev void
ResetGlobals(bool throwException=true,bool flog=false)16351c0b2f7Stbbdev ResetGlobals(bool throwException = true, bool flog = false) {
16451c0b2f7Stbbdev nExceptions = 0;
16551c0b2f7Stbbdev g_TGCCancelled = 0;
16651c0b2f7Stbbdev ResetEhGlobals(throwException, flog);
16751c0b2f7Stbbdev }
16851c0b2f7Stbbdev
16951c0b2f7Stbbdev // -------input_node body ------------------
17051c0b2f7Stbbdev template <class OutputType, TestNodeTypeEnum TType>
17151c0b2f7Stbbdev class test_input_body : WaitThrow<serial_type, TType> {
17251c0b2f7Stbbdev using WaitThrow<serial_type, TType>::WaitAndThrow;
17351c0b2f7Stbbdev std::atomic<int> *my_current_val;
17451c0b2f7Stbbdev int my_mult;
17551c0b2f7Stbbdev public:
test_input_body(std::atomic<int> & my_cnt,int multiplier=1)17651c0b2f7Stbbdev test_input_body(std::atomic<int> &my_cnt, int multiplier = 1) : my_current_val(&my_cnt), my_mult(multiplier) {
17751c0b2f7Stbbdev // INFO("- --------- - - - constructed " << (size_t)(my_current_val) << "\n");
17851c0b2f7Stbbdev }
17951c0b2f7Stbbdev
operator ()(tbb::flow_control & fc)18051c0b2f7Stbbdev OutputType operator()(tbb::flow_control& fc) {
18151c0b2f7Stbbdev UPDATE_COUNTS();
18251c0b2f7Stbbdev OutputType ret = OutputType(my_mult * ++(*my_current_val));
18351c0b2f7Stbbdev // TODO revamp: reconsider logging for the tests.
18451c0b2f7Stbbdev
18551c0b2f7Stbbdev // The following line is known to cause double frees. Therefore, commenting out frequent
18651c0b2f7Stbbdev // calls to INFO() macro.
18751c0b2f7Stbbdev
18851c0b2f7Stbbdev // INFO("xx(" << (size_t)(my_current_val) << ") ret == " << (int)ret << "\n");
18951c0b2f7Stbbdev if(*my_current_val > g_NumItems) {
19051c0b2f7Stbbdev // INFO(" ------ End of the line!\n");
19151c0b2f7Stbbdev *my_current_val = g_NumItems;
19251c0b2f7Stbbdev fc.stop();
19351c0b2f7Stbbdev return OutputType();
19451c0b2f7Stbbdev }
19551c0b2f7Stbbdev WaitAndThrow((int)ret,"test_input_body");
19651c0b2f7Stbbdev return ret;
19751c0b2f7Stbbdev }
19851c0b2f7Stbbdev
count_value()19951c0b2f7Stbbdev int count_value() { return (int)*my_current_val; }
20051c0b2f7Stbbdev };
20151c0b2f7Stbbdev
20251c0b2f7Stbbdev template <TestNodeTypeEnum TType>
20351c0b2f7Stbbdev class test_input_body<tbb::flow::continue_msg, TType> : WaitThrow<serial_type, TType> {
20451c0b2f7Stbbdev using WaitThrow<serial_type, TType>::WaitAndThrow;
20551c0b2f7Stbbdev std::atomic<int> *my_current_val;
20651c0b2f7Stbbdev public:
test_input_body(std::atomic<int> & my_cnt)20751c0b2f7Stbbdev test_input_body(std::atomic<int> &my_cnt) : my_current_val(&my_cnt) { }
20851c0b2f7Stbbdev
operator ()(tbb::flow_control & fc)20951c0b2f7Stbbdev tbb::flow::continue_msg operator()( tbb::flow_control & fc) {
21051c0b2f7Stbbdev UPDATE_COUNTS();
21151c0b2f7Stbbdev int outint = ++(*my_current_val);
21251c0b2f7Stbbdev if(*my_current_val > g_NumItems) {
21351c0b2f7Stbbdev *my_current_val = g_NumItems;
21451c0b2f7Stbbdev fc.stop();
21551c0b2f7Stbbdev return tbb::flow::continue_msg();
21651c0b2f7Stbbdev }
21751c0b2f7Stbbdev WaitAndThrow(outint,"test_input_body");
21851c0b2f7Stbbdev return tbb::flow::continue_msg();
21951c0b2f7Stbbdev }
22051c0b2f7Stbbdev
count_value()22151c0b2f7Stbbdev int count_value() { return (int)*my_current_val; }
22251c0b2f7Stbbdev };
22351c0b2f7Stbbdev
22451c0b2f7Stbbdev // -------{function/continue}_node body ------------------
22551c0b2f7Stbbdev template<class InputType, class OutputType, TestNodeTypeEnum T, size_t Conc>
22651c0b2f7Stbbdev class absorber_body : WaitThrow<Conc,T> {
22751c0b2f7Stbbdev using WaitThrow<Conc,T>::WaitAndThrow;
22851c0b2f7Stbbdev std::atomic<int> *my_count;
22951c0b2f7Stbbdev public:
absorber_body(std::atomic<int> & my_cnt)23051c0b2f7Stbbdev absorber_body(std::atomic<int> &my_cnt) : my_count(&my_cnt) { }
operator ()(const InputType &)23151c0b2f7Stbbdev OutputType operator()(const InputType &/*p_in*/) {
23251c0b2f7Stbbdev UPDATE_COUNTS();
23351c0b2f7Stbbdev int out = ++(*my_count);
23451c0b2f7Stbbdev WaitAndThrow(out,"absorber_body");
23551c0b2f7Stbbdev return OutputType();
23651c0b2f7Stbbdev }
count_value()23751c0b2f7Stbbdev int count_value() { return *my_count; }
23851c0b2f7Stbbdev };
23951c0b2f7Stbbdev
24051c0b2f7Stbbdev // -------multifunction_node body ------------------
24151c0b2f7Stbbdev
24251c0b2f7Stbbdev // helper classes
24351c0b2f7Stbbdev template<int N,class PortsType>
24451c0b2f7Stbbdev struct IssueOutput {
24551c0b2f7Stbbdev typedef typename std::tuple_element<N-1,PortsType>::type::output_type my_type;
24651c0b2f7Stbbdev
issue_tuple_elementIssueOutput24751c0b2f7Stbbdev static void issue_tuple_element( PortsType &my_ports) {
24851c0b2f7Stbbdev CHECK_MESSAGE( (std::get<N-1>(my_ports).try_put(my_type())), "Error putting to successor");
24951c0b2f7Stbbdev IssueOutput<N-1,PortsType>::issue_tuple_element(my_ports);
25051c0b2f7Stbbdev }
25151c0b2f7Stbbdev };
25251c0b2f7Stbbdev
25351c0b2f7Stbbdev template<class PortsType>
25451c0b2f7Stbbdev struct IssueOutput<1,PortsType> {
25551c0b2f7Stbbdev typedef typename std::tuple_element<0,PortsType>::type::output_type my_type;
25651c0b2f7Stbbdev
issue_tuple_elementIssueOutput25751c0b2f7Stbbdev static void issue_tuple_element( PortsType &my_ports) {
25851c0b2f7Stbbdev CHECK_MESSAGE( (std::get<0>(my_ports).try_put(my_type())), "Error putting to successor");
25951c0b2f7Stbbdev }
26051c0b2f7Stbbdev };
26151c0b2f7Stbbdev
26251c0b2f7Stbbdev template<class InputType, class OutputTupleType, TestNodeTypeEnum T, size_t Conc>
26351c0b2f7Stbbdev class multifunction_node_body : WaitThrow<Conc,T> {
26451c0b2f7Stbbdev using WaitThrow<Conc,T>::WaitAndThrow;
26551c0b2f7Stbbdev static const int N = std::tuple_size<OutputTupleType>::value;
26651c0b2f7Stbbdev typedef typename tbb::flow::multifunction_node<InputType,OutputTupleType> NodeType;
26751c0b2f7Stbbdev typedef typename NodeType::output_ports_type PortsType;
26851c0b2f7Stbbdev std::atomic<int> *my_count;
26951c0b2f7Stbbdev public:
multifunction_node_body(std::atomic<int> & my_cnt)27051c0b2f7Stbbdev multifunction_node_body(std::atomic<int> &my_cnt) : my_count(&my_cnt) { }
operator ()(const InputType &,PortsType & my_ports)27151c0b2f7Stbbdev void operator()(const InputType& /*in*/, PortsType &my_ports) {
27251c0b2f7Stbbdev UPDATE_COUNTS();
27351c0b2f7Stbbdev int out = ++(*my_count);
27451c0b2f7Stbbdev WaitAndThrow(out,"multifunction_node_body");
27551c0b2f7Stbbdev // issue an item to each output port.
27651c0b2f7Stbbdev IssueOutput<N,PortsType>::issue_tuple_element(my_ports);
27751c0b2f7Stbbdev }
27851c0b2f7Stbbdev
count_value()27951c0b2f7Stbbdev int count_value() { return *my_count; }
28051c0b2f7Stbbdev };
28151c0b2f7Stbbdev
28251c0b2f7Stbbdev // --------- body to sort items in sequencer_node
28351c0b2f7Stbbdev template<class BufferItemType>
28451c0b2f7Stbbdev struct sequencer_body {
operator ()sequencer_body28551c0b2f7Stbbdev size_t operator()(const BufferItemType &s) {
28651c0b2f7Stbbdev CHECK_MESSAGE( (s), "sequencer item out of range (== 0)");
28751c0b2f7Stbbdev return size_t(s) - 1;
28851c0b2f7Stbbdev }
28951c0b2f7Stbbdev };
29051c0b2f7Stbbdev
29151c0b2f7Stbbdev // --------- type for < comparison in priority_queue_node.
29251c0b2f7Stbbdev template<class ItemType>
29351c0b2f7Stbbdev struct less_body {
operator ()less_body29451c0b2f7Stbbdev bool operator()(const ItemType &lhs, const ItemType &rhs) {
29551c0b2f7Stbbdev return (int(lhs) % 3) < (int(rhs) % 3);
29651c0b2f7Stbbdev }
29751c0b2f7Stbbdev };
29851c0b2f7Stbbdev
29951c0b2f7Stbbdev // --------- tag methods for tag_matching join_node
30051c0b2f7Stbbdev template<typename TT>
30151c0b2f7Stbbdev class tag_func {
30251c0b2f7Stbbdev TT my_mult;
30351c0b2f7Stbbdev public:
tag_func(TT multiplier)30451c0b2f7Stbbdev tag_func(TT multiplier) : my_mult(multiplier) { }
30551c0b2f7Stbbdev // operator() will return [0 .. Count)
operator ()(TT v)30651c0b2f7Stbbdev tbb::flow::tag_value operator()( TT v) {
30751c0b2f7Stbbdev tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult);
30851c0b2f7Stbbdev return t;
30951c0b2f7Stbbdev }
31051c0b2f7Stbbdev };
31151c0b2f7Stbbdev
31251c0b2f7Stbbdev // --------- Input body for split_node test.
31351c0b2f7Stbbdev template <class OutputTuple, TestNodeTypeEnum TType>
31451c0b2f7Stbbdev class tuple_test_input_body : WaitThrow<serial_type, TType> {
31551c0b2f7Stbbdev typedef typename std::tuple_element<0,OutputTuple>::type ItemType0;
31651c0b2f7Stbbdev typedef typename std::tuple_element<1,OutputTuple>::type ItemType1;
31751c0b2f7Stbbdev using WaitThrow<serial_type, TType>::WaitAndThrow;
31851c0b2f7Stbbdev std::atomic<int> *my_current_val;
31951c0b2f7Stbbdev public:
tuple_test_input_body(std::atomic<int> & my_cnt)32051c0b2f7Stbbdev tuple_test_input_body(std::atomic<int> &my_cnt) : my_current_val(&my_cnt) { }
32151c0b2f7Stbbdev
operator ()(tbb::flow_control & fc)32251c0b2f7Stbbdev OutputTuple operator()(tbb::flow_control& fc) {
32351c0b2f7Stbbdev UPDATE_COUNTS();
32451c0b2f7Stbbdev int ival = ++(*my_current_val);
32551c0b2f7Stbbdev if(*my_current_val > g_NumItems) {
32651c0b2f7Stbbdev *my_current_val = g_NumItems; // jam the final value; we assert on it later.
32751c0b2f7Stbbdev fc.stop();
32851c0b2f7Stbbdev return OutputTuple();
32951c0b2f7Stbbdev }
33051c0b2f7Stbbdev WaitAndThrow(ival,"tuple_test_input_body");
33151c0b2f7Stbbdev return OutputTuple(ItemType0(ival),ItemType1(ival));
33251c0b2f7Stbbdev }
33351c0b2f7Stbbdev
count_value()33451c0b2f7Stbbdev int count_value() { return (int)*my_current_val; }
33551c0b2f7Stbbdev };
33651c0b2f7Stbbdev
33751c0b2f7Stbbdev // ------- end of node bodies
33851c0b2f7Stbbdev
33951c0b2f7Stbbdev // input_node is only-serial. input_node can throw, or the function_node can throw.
34051c0b2f7Stbbdev // graph being tested is
34151c0b2f7Stbbdev //
34251c0b2f7Stbbdev // input_node+---+parallel function_node
34351c0b2f7Stbbdev //
34451c0b2f7Stbbdev // After each run the graph is reset(), to test the reset functionality.
34551c0b2f7Stbbdev //
34651c0b2f7Stbbdev
34751c0b2f7Stbbdev
34851c0b2f7Stbbdev template<class ItemType, TestNodeTypeEnum inpThrowType, TestNodeTypeEnum absorbThrowType>
run_one_input_node_test(bool throwException,bool flog)34951c0b2f7Stbbdev void run_one_input_node_test(bool throwException, bool flog) {
35051c0b2f7Stbbdev typedef test_input_body<ItemType,inpThrowType> src_body_type;
35151c0b2f7Stbbdev typedef absorber_body<ItemType, tbb::flow::continue_msg, absorbThrowType, unlimited_type> parallel_absorb_body_type;
35251c0b2f7Stbbdev std::atomic<int> input_body_count;
35351c0b2f7Stbbdev std::atomic<int> absorber_body_count;
35451c0b2f7Stbbdev input_body_count = 0;
35551c0b2f7Stbbdev absorber_body_count = 0;
35651c0b2f7Stbbdev
35751c0b2f7Stbbdev tbb::flow::graph g;
35851c0b2f7Stbbdev
35951c0b2f7Stbbdev g_Master = std::this_thread::get_id();
36051c0b2f7Stbbdev
36151c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
36251c0b2f7Stbbdev eh_test_observer o;
36351c0b2f7Stbbdev o.observe(true);
36451c0b2f7Stbbdev #endif
36551c0b2f7Stbbdev
36651c0b2f7Stbbdev tbb::flow::input_node<ItemType> sn(g, src_body_type(input_body_count));
36751c0b2f7Stbbdev parallel_absorb_body_type ab2(absorber_body_count);
36851c0b2f7Stbbdev tbb::flow::function_node<ItemType> parallel_fn(g,tbb::flow::unlimited,ab2);
36951c0b2f7Stbbdev make_edge(sn, parallel_fn);
37051c0b2f7Stbbdev for(int runcnt = 0; runcnt < 2; ++runcnt) {
37151c0b2f7Stbbdev ResetGlobals(throwException,flog);
37251c0b2f7Stbbdev if(throwException) {
37351c0b2f7Stbbdev TRY();
37451c0b2f7Stbbdev sn.activate();
37551c0b2f7Stbbdev g.wait_for_all();
37651c0b2f7Stbbdev CATCH_AND_ASSERT();
37751c0b2f7Stbbdev }
37851c0b2f7Stbbdev else {
37951c0b2f7Stbbdev TRY();
38051c0b2f7Stbbdev sn.activate();
38151c0b2f7Stbbdev g.wait_for_all();
38251c0b2f7Stbbdev CATCH_AND_FAIL();
38351c0b2f7Stbbdev }
38451c0b2f7Stbbdev
38551c0b2f7Stbbdev bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
38651c0b2f7Stbbdev int src_cnt = tbb::flow::copy_body<src_body_type>(sn).count_value();
38751c0b2f7Stbbdev int sink_cnt = tbb::flow::copy_body<parallel_absorb_body_type>(parallel_fn).count_value();
38851c0b2f7Stbbdev if(throwException) {
38951c0b2f7Stbbdev CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception flag in flow::graph not set");
39051c0b2f7Stbbdev CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "canceled flag not set");
39151c0b2f7Stbbdev CHECK_MESSAGE( (src_cnt <= g_NumItems), "Too many input_node items emitted");
39251c0b2f7Stbbdev CHECK_MESSAGE( (sink_cnt <= src_cnt), "Too many input_node items received");
39351c0b2f7Stbbdev }
39451c0b2f7Stbbdev else {
39551c0b2f7Stbbdev CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
39651c0b2f7Stbbdev CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
39751c0b2f7Stbbdev CHECK_MESSAGE( (src_cnt == g_NumItems), "Incorrect # input_node items emitted");
39851c0b2f7Stbbdev CHECK_MESSAGE( (sink_cnt == src_cnt), "Incorrect # input_node items received");
39951c0b2f7Stbbdev }
40051c0b2f7Stbbdev g.reset(); // resets the body of the input_node and the absorb_nodes.
40151c0b2f7Stbbdev input_body_count = 0;
40251c0b2f7Stbbdev absorber_body_count = 0;
40351c0b2f7Stbbdev CHECK_MESSAGE( (!g.exception_thrown()), "Reset didn't clear exception_thrown()");
40451c0b2f7Stbbdev CHECK_MESSAGE( (!g.is_cancelled()), "Reset didn't clear is_cancelled()");
40551c0b2f7Stbbdev src_cnt = tbb::flow::copy_body<src_body_type>(sn).count_value();
40651c0b2f7Stbbdev sink_cnt = tbb::flow::copy_body<parallel_absorb_body_type>(parallel_fn).count_value();
40751c0b2f7Stbbdev CHECK_MESSAGE( (src_cnt == 0), "input_node count not reset");
40851c0b2f7Stbbdev CHECK_MESSAGE( (sink_cnt == 0), "sink_node count not reset");
40951c0b2f7Stbbdev }
41051c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
41151c0b2f7Stbbdev o.observe(false);
41251c0b2f7Stbbdev #endif
41351c0b2f7Stbbdev } // run_one_input_node_test
41451c0b2f7Stbbdev
41551c0b2f7Stbbdev
41651c0b2f7Stbbdev template<class ItemType, TestNodeTypeEnum inpThrowType, TestNodeTypeEnum absorbThrowType>
run_input_node_test()41751c0b2f7Stbbdev void run_input_node_test() {
41851c0b2f7Stbbdev run_one_input_node_test<ItemType,inpThrowType,absorbThrowType>(false,false);
41951c0b2f7Stbbdev run_one_input_node_test<ItemType,inpThrowType,absorbThrowType>(true,false);
42051c0b2f7Stbbdev run_one_input_node_test<ItemType,inpThrowType,absorbThrowType>(true,true);
42151c0b2f7Stbbdev } // run_input_node_test
42251c0b2f7Stbbdev
test_input_node()42351c0b2f7Stbbdev void test_input_node() {
42451c0b2f7Stbbdev INFO("Testing input_node\n");
42551c0b2f7Stbbdev CheckType<int>::check_type_counter = 0;
42651c0b2f7Stbbdev g_Wakeup_Msg = "input_node(1): Missed wakeup or machine is overloaded?";
42751c0b2f7Stbbdev run_input_node_test<CheckType<int>, isThrowing, nonThrowing>();
42851c0b2f7Stbbdev CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test");
42951c0b2f7Stbbdev g_Wakeup_Msg = "input_node(2): Missed wakeup or machine is overloaded?";
43051c0b2f7Stbbdev run_input_node_test<int, isThrowing, nonThrowing>();
43151c0b2f7Stbbdev g_Wakeup_Msg = "input_node(3): Missed wakeup or machine is overloaded?";
43251c0b2f7Stbbdev run_input_node_test<int, nonThrowing, isThrowing>();
43351c0b2f7Stbbdev g_Wakeup_Msg = "input_node(4): Missed wakeup or machine is overloaded?";
43451c0b2f7Stbbdev run_input_node_test<int, isThrowing, isThrowing>();
43551c0b2f7Stbbdev g_Wakeup_Msg = "input_node(5): Missed wakeup or machine is overloaded?";
43651c0b2f7Stbbdev run_input_node_test<CheckType<int>, isThrowing, isThrowing>();
43751c0b2f7Stbbdev g_Wakeup_Msg = g_Orig_Wakeup_Msg;
43851c0b2f7Stbbdev CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test");
43951c0b2f7Stbbdev }
44051c0b2f7Stbbdev
44151c0b2f7Stbbdev // -------- utilities & types to test function_node and multifunction_node.
44251c0b2f7Stbbdev
44351c0b2f7Stbbdev // need to tell the template which node type I am using so it attaches successors correctly.
44451c0b2f7Stbbdev enum NodeFetchType { func_node_type, multifunc_node_type };
44551c0b2f7Stbbdev
44651c0b2f7Stbbdev template<class NodeType, class ItemType, int indx, NodeFetchType NFT>
44751c0b2f7Stbbdev struct AttachPoint;
44851c0b2f7Stbbdev
44951c0b2f7Stbbdev template<class NodeType, class ItemType, int indx>
45051c0b2f7Stbbdev struct AttachPoint<NodeType,ItemType,indx,multifunc_node_type> {
GetSenderAttachPoint45151c0b2f7Stbbdev static tbb::flow::sender<ItemType> &GetSender(NodeType &n) {
45251c0b2f7Stbbdev return tbb::flow::output_port<indx>(n);
45351c0b2f7Stbbdev }
45451c0b2f7Stbbdev };
45551c0b2f7Stbbdev
45651c0b2f7Stbbdev template<class NodeType, class ItemType, int indx>
45751c0b2f7Stbbdev struct AttachPoint<NodeType,ItemType,indx,func_node_type> {
GetSenderAttachPoint45851c0b2f7Stbbdev static tbb::flow::sender<ItemType> &GetSender(NodeType &n) {
45951c0b2f7Stbbdev return n;
46051c0b2f7Stbbdev }
46151c0b2f7Stbbdev };
46251c0b2f7Stbbdev
46351c0b2f7Stbbdev
46451c0b2f7Stbbdev // common template for running function_node, multifunction_node. continue_node
46551c0b2f7Stbbdev // has different firing requirements, so it needs a different graph topology.
46651c0b2f7Stbbdev template<
46751c0b2f7Stbbdev class InputNodeType,
46851c0b2f7Stbbdev class InputNodeBodyType0,
46951c0b2f7Stbbdev class InputNodeBodyType1,
47051c0b2f7Stbbdev NodeFetchType NFT,
47151c0b2f7Stbbdev class TestNodeType,
47251c0b2f7Stbbdev class TestNodeBodyType,
47351c0b2f7Stbbdev class TypeToSink0, // what kind of item are we sending to sink0
47451c0b2f7Stbbdev class TypeToSink1, // what kind of item are we sending to sink1
47551c0b2f7Stbbdev class SinkNodeType0, // will be same for function;
47651c0b2f7Stbbdev class SinkNodeType1, // may differ for multifunction_node
47751c0b2f7Stbbdev class SinkNodeBodyType0,
47851c0b2f7Stbbdev class SinkNodeBodyType1,
47951c0b2f7Stbbdev size_t Conc
48051c0b2f7Stbbdev >
48151c0b2f7Stbbdev void
run_one_functype_node_test(bool throwException,bool flog,const char *)48251c0b2f7Stbbdev run_one_functype_node_test(bool throwException, bool flog, const char * /*name*/) {
48351c0b2f7Stbbdev
48451c0b2f7Stbbdev std::stringstream ss;
48551c0b2f7Stbbdev char *saved_msg = const_cast<char *>(g_Wakeup_Msg);
48651c0b2f7Stbbdev tbb::flow::graph g;
48751c0b2f7Stbbdev
48851c0b2f7Stbbdev std::atomic<int> input0_count;
48951c0b2f7Stbbdev std::atomic<int> input1_count;
49051c0b2f7Stbbdev std::atomic<int> sink0_count;
49151c0b2f7Stbbdev std::atomic<int> sink1_count;
49251c0b2f7Stbbdev std::atomic<int> test_count;
49351c0b2f7Stbbdev input0_count = input1_count = sink0_count = sink1_count = test_count = 0;
49451c0b2f7Stbbdev
49551c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
49651c0b2f7Stbbdev eh_test_observer o;
49751c0b2f7Stbbdev o.observe(true);
49851c0b2f7Stbbdev #endif
49951c0b2f7Stbbdev
50051c0b2f7Stbbdev g_Master = std::this_thread::get_id();
50151c0b2f7Stbbdev InputNodeType input0(g, InputNodeBodyType0(input0_count));
50251c0b2f7Stbbdev InputNodeType input1(g, InputNodeBodyType1(input1_count));
50351c0b2f7Stbbdev TestNodeType node_to_test(g, Conc, TestNodeBodyType(test_count));
50451c0b2f7Stbbdev SinkNodeType0 sink0(g,tbb::flow::unlimited,SinkNodeBodyType0(sink0_count));
50551c0b2f7Stbbdev SinkNodeType1 sink1(g,tbb::flow::unlimited,SinkNodeBodyType1(sink1_count));
50651c0b2f7Stbbdev make_edge(input0, node_to_test);
50751c0b2f7Stbbdev make_edge(input1, node_to_test);
50851c0b2f7Stbbdev make_edge(AttachPoint<TestNodeType, TypeToSink0, 0, NFT>::GetSender(node_to_test), sink0);
50951c0b2f7Stbbdev make_edge(AttachPoint<TestNodeType, TypeToSink1, 1, NFT>::GetSender(node_to_test), sink1);
51051c0b2f7Stbbdev
51151c0b2f7Stbbdev for(int iter = 0; iter < 2; ++iter) { // run, reset, run again
51251c0b2f7Stbbdev ss.clear();
51351c0b2f7Stbbdev ss << saved_msg << " iter=" << iter << ", threads=" << g_NumThreads << ", throw=" << (throwException ? "T" : "F") << ", flow=" << (flog ? "T" : "F");
51451c0b2f7Stbbdev g_Wakeup_Msg = ss.str().c_str();
51551c0b2f7Stbbdev ResetGlobals(throwException,flog);
51651c0b2f7Stbbdev if(throwException) {
51751c0b2f7Stbbdev TRY();
51851c0b2f7Stbbdev input0.activate();
51951c0b2f7Stbbdev input1.activate();
52051c0b2f7Stbbdev g.wait_for_all();
52151c0b2f7Stbbdev CATCH_AND_ASSERT();
52251c0b2f7Stbbdev }
52351c0b2f7Stbbdev else {
52451c0b2f7Stbbdev TRY();
52551c0b2f7Stbbdev input0.activate();
52651c0b2f7Stbbdev input1.activate();
52751c0b2f7Stbbdev g.wait_for_all();
52851c0b2f7Stbbdev CATCH_AND_FAIL();
52951c0b2f7Stbbdev }
53051c0b2f7Stbbdev bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
53151c0b2f7Stbbdev int ib0_cnt = tbb::flow::copy_body<InputNodeBodyType0>(input0).count_value();
53251c0b2f7Stbbdev int ib1_cnt = tbb::flow::copy_body<InputNodeBodyType1>(input1).count_value();
53351c0b2f7Stbbdev int t_cnt = tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value();
53451c0b2f7Stbbdev int nb0_cnt = tbb::flow::copy_body<SinkNodeBodyType0>(sink0).count_value();
53551c0b2f7Stbbdev int nb1_cnt = tbb::flow::copy_body<SinkNodeBodyType1>(sink1).count_value();
53651c0b2f7Stbbdev if(throwException) {
53751c0b2f7Stbbdev CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
53851c0b2f7Stbbdev CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
53951c0b2f7Stbbdev CHECK_MESSAGE( (ib0_cnt + ib1_cnt <= 2*g_NumItems), "Too many items sent by inputs");
54051c0b2f7Stbbdev CHECK_MESSAGE( (ib0_cnt + ib1_cnt >= t_cnt), "Too many items received by test node");
54151c0b2f7Stbbdev CHECK_MESSAGE( (nb0_cnt + nb1_cnt <= t_cnt*2), "Too many items received by sink nodes");
54251c0b2f7Stbbdev }
54351c0b2f7Stbbdev else {
54451c0b2f7Stbbdev CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
54551c0b2f7Stbbdev CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
54651c0b2f7Stbbdev CHECK_MESSAGE( (ib0_cnt + ib1_cnt == 2*g_NumItems), "Missing invocations of input_nodes");
54751c0b2f7Stbbdev CHECK_MESSAGE( (t_cnt == 2*g_NumItems), "Not all items reached test node");
54851c0b2f7Stbbdev CHECK_MESSAGE( (nb0_cnt == 2*g_NumItems && nb1_cnt == 2*g_NumItems), "Missing items in absorbers");
54951c0b2f7Stbbdev }
55051c0b2f7Stbbdev g.reset(); // resets the body of the input_nodes, test_node and the absorb_nodes.
55151c0b2f7Stbbdev input0_count = input1_count = sink0_count = sink1_count = test_count = 0;
55251c0b2f7Stbbdev CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType0>(input0).count_value()),"Reset input 0 failed");
55351c0b2f7Stbbdev CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType1>(input1).count_value()),"Reset input 1 failed");
55451c0b2f7Stbbdev CHECK_MESSAGE( (0 == tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value()),"Reset test_node failed");
55551c0b2f7Stbbdev CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType0>(sink0).count_value()),"Reset sink 0 failed");
55651c0b2f7Stbbdev CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType1>(sink1).count_value()),"Reset sink 1 failed");
55751c0b2f7Stbbdev
55851c0b2f7Stbbdev g_Wakeup_Msg = saved_msg;
55951c0b2f7Stbbdev }
56051c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
56151c0b2f7Stbbdev o.observe(false);
56251c0b2f7Stbbdev #endif
56351c0b2f7Stbbdev }
56451c0b2f7Stbbdev
56551c0b2f7Stbbdev // Test function_node
56651c0b2f7Stbbdev //
56751c0b2f7Stbbdev // graph being tested is
56851c0b2f7Stbbdev //
56951c0b2f7Stbbdev // input_node -\ /- parallel function_node
57051c0b2f7Stbbdev // \ /
57151c0b2f7Stbbdev // +function_node+
57251c0b2f7Stbbdev // / \ x
57351c0b2f7Stbbdev // input_node -/ \- parallel function_node
57451c0b2f7Stbbdev //
57551c0b2f7Stbbdev // After each run the graph is reset(), to test the reset functionality.
57651c0b2f7Stbbdev //
57751c0b2f7Stbbdev template<
57851c0b2f7Stbbdev TestNodeTypeEnum IType1, // does input node 1 throw?
57951c0b2f7Stbbdev TestNodeTypeEnum IType2, // does input node 2 throw?
58051c0b2f7Stbbdev class Item12, // type of item passed between inputs and test node
58151c0b2f7Stbbdev TestNodeTypeEnum FType, // does function node throw?
58251c0b2f7Stbbdev class Item23, // type passed from function_node to sink nodes
58351c0b2f7Stbbdev TestNodeTypeEnum NType1, // does sink node 1 throw?
58451c0b2f7Stbbdev TestNodeTypeEnum NType2, // does sink node 1 throw?
58551c0b2f7Stbbdev class NodePolicy, // rejecting,queueing
58651c0b2f7Stbbdev size_t Conc // is node concurrent? {serial | limited | unlimited}
58751c0b2f7Stbbdev >
run_function_node_test()58851c0b2f7Stbbdev void run_function_node_test() {
58951c0b2f7Stbbdev
59051c0b2f7Stbbdev typedef test_input_body<Item12,IType1> IBodyType1;
59151c0b2f7Stbbdev typedef test_input_body<Item12,IType2> IBodyType2;
59251c0b2f7Stbbdev typedef absorber_body<Item12, Item23, FType, Conc> TestBodyType;
59351c0b2f7Stbbdev typedef absorber_body<Item23,tbb::flow::continue_msg, NType1, unlimited_type> SinkBodyType1;
59451c0b2f7Stbbdev typedef absorber_body<Item23,tbb::flow::continue_msg, NType2, unlimited_type> SinkBodyType2;
59551c0b2f7Stbbdev
59651c0b2f7Stbbdev typedef tbb::flow::input_node<Item12> InputType;
59751c0b2f7Stbbdev typedef tbb::flow::function_node<Item12, Item23, NodePolicy> TestType;
59851c0b2f7Stbbdev typedef tbb::flow::function_node<Item23,tbb::flow::continue_msg> SnkType;
59951c0b2f7Stbbdev
60051c0b2f7Stbbdev for(int i = 0; i < 4; ++i ) {
60151c0b2f7Stbbdev if(i != 2) { // doesn't make sense to flog a non-throwing test
60251c0b2f7Stbbdev bool doThrow = (i & 0x1) != 0;
60351c0b2f7Stbbdev bool doFlog = (i & 0x2) != 0;
60451c0b2f7Stbbdev run_one_functype_node_test<
60551c0b2f7Stbbdev /*InputNodeType*/ InputType,
60651c0b2f7Stbbdev /*InputNodeBodyType0*/ IBodyType1,
60751c0b2f7Stbbdev /*InputNodeBodyType1*/ IBodyType2,
60851c0b2f7Stbbdev /* NFT */ func_node_type,
60951c0b2f7Stbbdev /*TestNodeType*/ TestType,
61051c0b2f7Stbbdev /*TestNodeBodyType*/ TestBodyType,
61151c0b2f7Stbbdev /*TypeToSink0 */ Item23,
61251c0b2f7Stbbdev /*TypeToSink1 */ Item23,
61351c0b2f7Stbbdev /*SinkNodeType0*/ SnkType,
61451c0b2f7Stbbdev /*SinkNodeType1*/ SnkType,
61551c0b2f7Stbbdev /*SinkNodeBodyType1*/ SinkBodyType1,
61651c0b2f7Stbbdev /*SinkNodeBodyType2*/ SinkBodyType2,
61751c0b2f7Stbbdev /*Conc*/ Conc>
61851c0b2f7Stbbdev (doThrow,doFlog,"function_node");
61951c0b2f7Stbbdev }
62051c0b2f7Stbbdev }
62151c0b2f7Stbbdev } // run_function_node_test
62251c0b2f7Stbbdev
test_function_node()62351c0b2f7Stbbdev void test_function_node() {
62451c0b2f7Stbbdev INFO("Testing function_node\n");
62551c0b2f7Stbbdev // serial rejecting
62651c0b2f7Stbbdev g_Wakeup_Msg = "function_node(1a): Missed wakeup or machine is overloaded?";
62751c0b2f7Stbbdev run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
62851c0b2f7Stbbdev g_Wakeup_Msg = "function_node(1b): Missed wakeup or machine is overloaded?";
62951c0b2f7Stbbdev run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
63051c0b2f7Stbbdev g_Wakeup_Msg = "function_node(1c): Missed wakeup or machine is overloaded?";
63151c0b2f7Stbbdev run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, isThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
63251c0b2f7Stbbdev
63351c0b2f7Stbbdev // serial queueing
63451c0b2f7Stbbdev g_Wakeup_Msg = "function_node(2): Missed wakeup or machine is overloaded?";
63551c0b2f7Stbbdev run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
63651c0b2f7Stbbdev run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
63751c0b2f7Stbbdev run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
63851c0b2f7Stbbdev CheckType<int>::check_type_counter = 0;
63951c0b2f7Stbbdev run_function_node_test<nonThrowing, nonThrowing, CheckType<int>, nonThrowing, CheckType<int>, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
64051c0b2f7Stbbdev CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test");
64151c0b2f7Stbbdev
64251c0b2f7Stbbdev // unlimited parallel rejecting
64351c0b2f7Stbbdev g_Wakeup_Msg = "function_node(3): Missed wakeup or machine is overloaded?";
64451c0b2f7Stbbdev run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
64551c0b2f7Stbbdev run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
64651c0b2f7Stbbdev run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
64751c0b2f7Stbbdev
64851c0b2f7Stbbdev // limited parallel rejecting
64951c0b2f7Stbbdev g_Wakeup_Msg = "function_node(4): Missed wakeup or machine is overloaded?";
65051c0b2f7Stbbdev run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, limited_type>();
65151c0b2f7Stbbdev run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, (size_t)limited_type>();
65251c0b2f7Stbbdev run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::rejecting, (size_t)limited_type>();
65351c0b2f7Stbbdev
65451c0b2f7Stbbdev // limited parallel queueing
65551c0b2f7Stbbdev g_Wakeup_Msg = "function_node(5): Missed wakeup or machine is overloaded?";
65651c0b2f7Stbbdev run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
65751c0b2f7Stbbdev run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
65851c0b2f7Stbbdev run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::queueing, (size_t)limited_type>();
65951c0b2f7Stbbdev
66051c0b2f7Stbbdev // everyone throwing
66151c0b2f7Stbbdev g_Wakeup_Msg = "function_node(6): Missed wakeup or machine is overloaded?";
66251c0b2f7Stbbdev run_function_node_test<isThrowing, isThrowing, int, isThrowing, int, isThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
66351c0b2f7Stbbdev g_Wakeup_Msg = g_Orig_Wakeup_Msg;
66451c0b2f7Stbbdev }
66551c0b2f7Stbbdev
66651c0b2f7Stbbdev // ----------------------------------- multifunction_node ----------------------------------
66751c0b2f7Stbbdev // Test multifunction_node.
66851c0b2f7Stbbdev //
66951c0b2f7Stbbdev // graph being tested is
67051c0b2f7Stbbdev //
67151c0b2f7Stbbdev // input_node -\ /- parallel function_node
67251c0b2f7Stbbdev // \ /
67351c0b2f7Stbbdev // +multifunction_node+
67451c0b2f7Stbbdev // / \ x
67551c0b2f7Stbbdev // input_node -/ \- parallel function_node
67651c0b2f7Stbbdev //
67751c0b2f7Stbbdev // After each run the graph is reset(), to test the reset functionality. The
67851c0b2f7Stbbdev // multifunction_node will put an item to each successor for every item
67951c0b2f7Stbbdev // received.
68051c0b2f7Stbbdev //
68151c0b2f7Stbbdev template<
68251c0b2f7Stbbdev TestNodeTypeEnum IType0, // does input node 1 throw?
68351c0b2f7Stbbdev TestNodeTypeEnum IType1, // does input node 2 thorw?
68451c0b2f7Stbbdev class Item12, // type of item passed between inputs and test node
68551c0b2f7Stbbdev TestNodeTypeEnum FType, // does multifunction node throw?
68651c0b2f7Stbbdev class ItemTuple, // tuple of types passed from multifunction_node to sink nodes
68751c0b2f7Stbbdev TestNodeTypeEnum NType1, // does sink node 1 throw?
68851c0b2f7Stbbdev TestNodeTypeEnum NType2, // does sink node 2 throw?
68951c0b2f7Stbbdev class NodePolicy, // rejecting,queueing
69051c0b2f7Stbbdev size_t Conc // is node concurrent? {serial | limited | unlimited}
69151c0b2f7Stbbdev >
run_multifunction_node_test()69251c0b2f7Stbbdev void run_multifunction_node_test() {
69351c0b2f7Stbbdev
69451c0b2f7Stbbdev typedef typename std::tuple_element<0,ItemTuple>::type Item23Type0;
69551c0b2f7Stbbdev typedef typename std::tuple_element<1,ItemTuple>::type Item23Type1;
69651c0b2f7Stbbdev typedef test_input_body<Item12,IType0> IBodyType1;
69751c0b2f7Stbbdev typedef test_input_body<Item12,IType1> IBodyType2;
69851c0b2f7Stbbdev typedef multifunction_node_body<Item12, ItemTuple, FType, Conc> TestBodyType;
69951c0b2f7Stbbdev typedef absorber_body<Item23Type0,tbb::flow::continue_msg, NType1, unlimited_type> SinkBodyType1;
70051c0b2f7Stbbdev typedef absorber_body<Item23Type1,tbb::flow::continue_msg, NType2, unlimited_type> SinkBodyType2;
70151c0b2f7Stbbdev
70251c0b2f7Stbbdev typedef tbb::flow::input_node<Item12> InputType;
70351c0b2f7Stbbdev typedef tbb::flow::multifunction_node<Item12, ItemTuple, NodePolicy> TestType;
70451c0b2f7Stbbdev typedef tbb::flow::function_node<Item23Type0,tbb::flow::continue_msg> SnkType0;
70551c0b2f7Stbbdev typedef tbb::flow::function_node<Item23Type1,tbb::flow::continue_msg> SnkType1;
70651c0b2f7Stbbdev
70751c0b2f7Stbbdev for(int i = 0; i < 4; ++i ) {
70851c0b2f7Stbbdev if(i != 2) { // doesn't make sense to flog a non-throwing test
70951c0b2f7Stbbdev bool doThrow = (i & 0x1) != 0;
71051c0b2f7Stbbdev bool doFlog = (i & 0x2) != 0;
71151c0b2f7Stbbdev run_one_functype_node_test<
71251c0b2f7Stbbdev /*InputNodeType*/ InputType,
71351c0b2f7Stbbdev /*InputNodeBodyType0*/ IBodyType1,
71451c0b2f7Stbbdev /*InputNodeBodyType1*/ IBodyType2,
71551c0b2f7Stbbdev /*NFT*/ multifunc_node_type,
71651c0b2f7Stbbdev /*TestNodeType*/ TestType,
71751c0b2f7Stbbdev /*TestNodeBodyType*/ TestBodyType,
71851c0b2f7Stbbdev /*TypeToSink0*/ Item23Type0,
71951c0b2f7Stbbdev /*TypeToSink1*/ Item23Type1,
72051c0b2f7Stbbdev /*SinkNodeType0*/ SnkType0,
72151c0b2f7Stbbdev /*SinkNodeType1*/ SnkType1,
72251c0b2f7Stbbdev /*SinkNodeBodyType0*/ SinkBodyType1,
72351c0b2f7Stbbdev /*SinkNodeBodyType1*/ SinkBodyType2,
72451c0b2f7Stbbdev /*Conc*/ Conc>
72551c0b2f7Stbbdev (doThrow,doFlog,"multifunction_node");
72651c0b2f7Stbbdev }
72751c0b2f7Stbbdev }
72851c0b2f7Stbbdev } // run_multifunction_node_test
72951c0b2f7Stbbdev
test_multifunction_node()73051c0b2f7Stbbdev void test_multifunction_node() {
73151c0b2f7Stbbdev INFO("Testing multifunction_node\n");
73251c0b2f7Stbbdev g_Wakeup_Msg = "multifunction_node(input throws,rejecting,serial): Missed wakeup or machine is overloaded?";
73351c0b2f7Stbbdev // serial rejecting
73451c0b2f7Stbbdev run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,float>, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
73551c0b2f7Stbbdev g_Wakeup_Msg = "multifunction_node(test throws,rejecting,serial): Missed wakeup or machine is overloaded?";
73651c0b2f7Stbbdev run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
73751c0b2f7Stbbdev g_Wakeup_Msg = "multifunction_node(sink throws,rejecting,serial): Missed wakeup or machine is overloaded?";
73851c0b2f7Stbbdev run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, isThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
73951c0b2f7Stbbdev
74051c0b2f7Stbbdev g_Wakeup_Msg = "multifunction_node(2): Missed wakeup or machine is overloaded?";
74151c0b2f7Stbbdev // serial queueing
74251c0b2f7Stbbdev run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
74351c0b2f7Stbbdev run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
74451c0b2f7Stbbdev run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
74551c0b2f7Stbbdev CheckType<int>::check_type_counter = 0;
74651c0b2f7Stbbdev run_multifunction_node_test<nonThrowing, nonThrowing, CheckType<int>, nonThrowing, std::tuple<CheckType<int>, CheckType<int> >, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
74751c0b2f7Stbbdev CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Some items leaked in test");
74851c0b2f7Stbbdev
74951c0b2f7Stbbdev g_Wakeup_Msg = "multifunction_node(3): Missed wakeup or machine is overloaded?";
75051c0b2f7Stbbdev // unlimited parallel rejecting
75151c0b2f7Stbbdev run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
75251c0b2f7Stbbdev run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
75351c0b2f7Stbbdev run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
75451c0b2f7Stbbdev
75551c0b2f7Stbbdev g_Wakeup_Msg = "multifunction_node(4): Missed wakeup or machine is overloaded?";
75651c0b2f7Stbbdev // limited parallel rejecting
75751c0b2f7Stbbdev run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, limited_type>();
75851c0b2f7Stbbdev run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, (size_t)limited_type>();
75951c0b2f7Stbbdev run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::rejecting, (size_t)limited_type>();
76051c0b2f7Stbbdev
76151c0b2f7Stbbdev g_Wakeup_Msg = "multifunction_node(5): Missed wakeup or machine is overloaded?";
76251c0b2f7Stbbdev // limited parallel queueing
76351c0b2f7Stbbdev run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
76451c0b2f7Stbbdev run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, std::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
76551c0b2f7Stbbdev run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, std::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::queueing, (size_t)limited_type>();
76651c0b2f7Stbbdev
76751c0b2f7Stbbdev g_Wakeup_Msg = "multifunction_node(6): Missed wakeup or machine is overloaded?";
76851c0b2f7Stbbdev // everyone throwing
76951c0b2f7Stbbdev run_multifunction_node_test<isThrowing, isThrowing, int, isThrowing, std::tuple<int,int>, isThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
77051c0b2f7Stbbdev g_Wakeup_Msg = g_Orig_Wakeup_Msg;
77151c0b2f7Stbbdev }
77251c0b2f7Stbbdev
77351c0b2f7Stbbdev //
77451c0b2f7Stbbdev // Continue node has T predecessors. when it receives messages (continue_msg) on T predecessors
77551c0b2f7Stbbdev // it executes the body of the node, and forwards a continue_msg to its successors.
77651c0b2f7Stbbdev // However many predecessors the continue_node has, that's how many continue_msgs it receives
77751c0b2f7Stbbdev // on input before forwarding a message.
77851c0b2f7Stbbdev //
77951c0b2f7Stbbdev // The graph will look like
78051c0b2f7Stbbdev //
78151c0b2f7Stbbdev // +broadcast_node+
78251c0b2f7Stbbdev // / \ ___
78351c0b2f7Stbbdev // input_node+------>+broadcast_node+ +continue_node+--->+absorber
78451c0b2f7Stbbdev // \ /
78551c0b2f7Stbbdev // +broadcast_node+
78651c0b2f7Stbbdev //
78751c0b2f7Stbbdev // The continue_node has unlimited parallelism, no input buffering, and broadcasts to successors.
78851c0b2f7Stbbdev // The absorber is parallel, so each item emitted by the input will result in one thread
78951c0b2f7Stbbdev // spinning. So for N threads we pass N-1 continue_messages, then spin wait and then throw if
79051c0b2f7Stbbdev // we are allowed to.
79151c0b2f7Stbbdev
79251c0b2f7Stbbdev template < class InputNodeType, class InputNodeBodyType, class TTestNodeType, class TestNodeBodyType,
79351c0b2f7Stbbdev class SinkNodeType, class SinkNodeBodyType>
run_one_continue_node_test(bool throwException,bool flog)79451c0b2f7Stbbdev void run_one_continue_node_test (bool throwException, bool flog) {
79551c0b2f7Stbbdev tbb::flow::graph g;
79651c0b2f7Stbbdev
79751c0b2f7Stbbdev std::atomic<int> input_count;
79851c0b2f7Stbbdev std::atomic<int> test_count;
79951c0b2f7Stbbdev std::atomic<int> sink_count;
80051c0b2f7Stbbdev input_count = test_count = sink_count = 0;
80151c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
80251c0b2f7Stbbdev eh_test_observer o;
80351c0b2f7Stbbdev o.observe(true);
80451c0b2f7Stbbdev #endif
80551c0b2f7Stbbdev g_Master = std::this_thread::get_id();
80651c0b2f7Stbbdev InputNodeType input(g, InputNodeBodyType(input_count));
80751c0b2f7Stbbdev TTestNodeType node_to_test(g, TestNodeBodyType(test_count));
80851c0b2f7Stbbdev SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
80951c0b2f7Stbbdev tbb::flow::broadcast_node<tbb::flow::continue_msg> b1(g), b2(g), b3(g);
81051c0b2f7Stbbdev make_edge(input, b1);
81151c0b2f7Stbbdev make_edge(b1,b2);
81251c0b2f7Stbbdev make_edge(b1,b3);
81351c0b2f7Stbbdev make_edge(b2,node_to_test);
81451c0b2f7Stbbdev make_edge(b3,node_to_test);
81551c0b2f7Stbbdev make_edge(node_to_test, sink);
81651c0b2f7Stbbdev for(int iter = 0; iter < 2; ++iter) {
81751c0b2f7Stbbdev ResetGlobals(throwException,flog);
81851c0b2f7Stbbdev if(throwException) {
81951c0b2f7Stbbdev TRY();
82051c0b2f7Stbbdev input.activate();
82151c0b2f7Stbbdev g.wait_for_all();
82251c0b2f7Stbbdev CATCH_AND_ASSERT();
82351c0b2f7Stbbdev }
82451c0b2f7Stbbdev else {
82551c0b2f7Stbbdev TRY();
82651c0b2f7Stbbdev input.activate();
82751c0b2f7Stbbdev g.wait_for_all();
82851c0b2f7Stbbdev CATCH_AND_FAIL();
82951c0b2f7Stbbdev }
83051c0b2f7Stbbdev bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
83151c0b2f7Stbbdev int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value();
83251c0b2f7Stbbdev int t_cnt = tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value();
83351c0b2f7Stbbdev int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
83451c0b2f7Stbbdev if(throwException) {
83551c0b2f7Stbbdev CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
83651c0b2f7Stbbdev CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
83751c0b2f7Stbbdev CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs");
83851c0b2f7Stbbdev CHECK_MESSAGE( (ib_cnt >= t_cnt), "Too many items received by test node");
83951c0b2f7Stbbdev CHECK_MESSAGE( (nb_cnt <= t_cnt), "Too many items received by sink nodes");
84051c0b2f7Stbbdev }
84151c0b2f7Stbbdev else {
84251c0b2f7Stbbdev CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
84351c0b2f7Stbbdev CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
84451c0b2f7Stbbdev CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node");
84551c0b2f7Stbbdev CHECK_MESSAGE( (t_cnt == g_NumItems), "Not all items reached test node");
84651c0b2f7Stbbdev CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
84751c0b2f7Stbbdev }
84851c0b2f7Stbbdev g.reset(); // resets the body of the input_nodes, test_node and the absorb_nodes.
84951c0b2f7Stbbdev input_count = test_count = sink_count = 0;
85051c0b2f7Stbbdev CHECK_MESSAGE( (0 == (int)test_count), "Atomic wasn't reset properly");
85151c0b2f7Stbbdev CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed");
85251c0b2f7Stbbdev CHECK_MESSAGE( (0 == tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value()),"Reset test_node failed");
85351c0b2f7Stbbdev CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed");
85451c0b2f7Stbbdev }
85551c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
85651c0b2f7Stbbdev o.observe(false);
85751c0b2f7Stbbdev #endif
85851c0b2f7Stbbdev }
85951c0b2f7Stbbdev
86051c0b2f7Stbbdev template<
86151c0b2f7Stbbdev class ItemType,
86251c0b2f7Stbbdev TestNodeTypeEnum IType, // does input node throw?
86351c0b2f7Stbbdev TestNodeTypeEnum CType, // does continue_node throw?
86451c0b2f7Stbbdev TestNodeTypeEnum AType> // does absorber throw
run_continue_node_test()86551c0b2f7Stbbdev void run_continue_node_test() {
86651c0b2f7Stbbdev typedef test_input_body<tbb::flow::continue_msg,IType> IBodyType;
86751c0b2f7Stbbdev typedef absorber_body<tbb::flow::continue_msg,ItemType,CType,unlimited_type> ContBodyType;
86851c0b2f7Stbbdev typedef absorber_body<ItemType,tbb::flow::continue_msg, AType, unlimited_type> SinkBodyType;
86951c0b2f7Stbbdev
87051c0b2f7Stbbdev typedef tbb::flow::input_node<tbb::flow::continue_msg> InputType;
87151c0b2f7Stbbdev typedef tbb::flow::continue_node<ItemType> TestType;
87251c0b2f7Stbbdev typedef tbb::flow::function_node<ItemType,tbb::flow::continue_msg> SnkType;
87351c0b2f7Stbbdev
87451c0b2f7Stbbdev for(int i = 0; i < 4; ++i ) {
87551c0b2f7Stbbdev if(i == 2) continue; // don't run (false,true); it doesn't make sense.
87651c0b2f7Stbbdev bool doThrow = (i & 0x1) != 0;
87751c0b2f7Stbbdev bool doFlog = (i & 0x2) != 0;
87851c0b2f7Stbbdev run_one_continue_node_test<
87951c0b2f7Stbbdev /*InputNodeType*/ InputType,
88051c0b2f7Stbbdev /*InputNodeBodyType*/ IBodyType,
88151c0b2f7Stbbdev /*TestNodeType*/ TestType,
88251c0b2f7Stbbdev /*TestNodeBodyType*/ ContBodyType,
88351c0b2f7Stbbdev /*SinkNodeType*/ SnkType,
88451c0b2f7Stbbdev /*SinkNodeBodyType*/ SinkBodyType>
88551c0b2f7Stbbdev (doThrow,doFlog);
88651c0b2f7Stbbdev }
88751c0b2f7Stbbdev }
88851c0b2f7Stbbdev
88951c0b2f7Stbbdev //
test_continue_node()89051c0b2f7Stbbdev void test_continue_node() {
89151c0b2f7Stbbdev INFO("Testing continue_node\n");
89251c0b2f7Stbbdev g_Wakeup_Msg = "buffer_node(non,is,non): Missed wakeup or machine is overloaded?";
89351c0b2f7Stbbdev run_continue_node_test<int,nonThrowing,isThrowing,nonThrowing>();
89451c0b2f7Stbbdev g_Wakeup_Msg = "buffer_node(non,non,is): Missed wakeup or machine is overloaded?";
89551c0b2f7Stbbdev run_continue_node_test<int,nonThrowing,nonThrowing,isThrowing>();
89651c0b2f7Stbbdev g_Wakeup_Msg = "buffer_node(is,non,non): Missed wakeup or machine is overloaded?";
89751c0b2f7Stbbdev run_continue_node_test<int,isThrowing,nonThrowing,nonThrowing>();
89851c0b2f7Stbbdev g_Wakeup_Msg = "buffer_node(is,is,is): Missed wakeup or machine is overloaded?";
89951c0b2f7Stbbdev run_continue_node_test<int,isThrowing,isThrowing,isThrowing>();
90051c0b2f7Stbbdev CheckType<double>::check_type_counter = 0;
90151c0b2f7Stbbdev run_continue_node_test<CheckType<double>,isThrowing,isThrowing,isThrowing>();
90251c0b2f7Stbbdev CHECK_MESSAGE( (!CheckType<double>::check_type_counter), "Dropped objects in continue_node test");
90351c0b2f7Stbbdev g_Wakeup_Msg = g_Orig_Wakeup_Msg;
90451c0b2f7Stbbdev }
90551c0b2f7Stbbdev
90651c0b2f7Stbbdev // ---------- buffer_node queue_node overwrite_node --------------
90751c0b2f7Stbbdev
90851c0b2f7Stbbdev template<
90951c0b2f7Stbbdev class BufferItemType, //
91051c0b2f7Stbbdev class InputNodeType,
91151c0b2f7Stbbdev class InputNodeBodyType,
91251c0b2f7Stbbdev class TestNodeType,
91351c0b2f7Stbbdev class SinkNodeType,
91451c0b2f7Stbbdev class SinkNodeBodyType >
run_one_buffer_node_test(bool throwException,bool flog)91551c0b2f7Stbbdev void run_one_buffer_node_test(bool throwException,bool flog) {
91651c0b2f7Stbbdev tbb::flow::graph g;
91751c0b2f7Stbbdev
91851c0b2f7Stbbdev std::atomic<int> input_count;
91951c0b2f7Stbbdev std::atomic<int> sink_count;
92051c0b2f7Stbbdev input_count = sink_count = 0;
92151c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
92251c0b2f7Stbbdev eh_test_observer o;
92351c0b2f7Stbbdev o.observe(true);
92451c0b2f7Stbbdev #endif
92551c0b2f7Stbbdev g_Master = std::this_thread::get_id();
92651c0b2f7Stbbdev InputNodeType input(g, InputNodeBodyType(input_count));
92751c0b2f7Stbbdev TestNodeType node_to_test(g);
92851c0b2f7Stbbdev SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
92951c0b2f7Stbbdev make_edge(input,node_to_test);
93051c0b2f7Stbbdev make_edge(node_to_test, sink);
93151c0b2f7Stbbdev for(int iter = 0; iter < 2; ++iter) {
93251c0b2f7Stbbdev ResetGlobals(throwException,flog);
93351c0b2f7Stbbdev if(throwException) {
93451c0b2f7Stbbdev TRY();
93551c0b2f7Stbbdev input.activate();
93651c0b2f7Stbbdev g.wait_for_all();
93751c0b2f7Stbbdev CATCH_AND_ASSERT();
93851c0b2f7Stbbdev }
93951c0b2f7Stbbdev else {
94051c0b2f7Stbbdev TRY();
94151c0b2f7Stbbdev input.activate();
94251c0b2f7Stbbdev g.wait_for_all();
94351c0b2f7Stbbdev CATCH_AND_FAIL();
94451c0b2f7Stbbdev }
94551c0b2f7Stbbdev bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
94651c0b2f7Stbbdev int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value();
94751c0b2f7Stbbdev int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
94851c0b2f7Stbbdev if(throwException) {
94951c0b2f7Stbbdev CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
95051c0b2f7Stbbdev CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
95151c0b2f7Stbbdev CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs");
95251c0b2f7Stbbdev CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes");
95351c0b2f7Stbbdev }
95451c0b2f7Stbbdev else {
95551c0b2f7Stbbdev CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
95651c0b2f7Stbbdev CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
95751c0b2f7Stbbdev CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node");
95851c0b2f7Stbbdev CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
95951c0b2f7Stbbdev }
96051c0b2f7Stbbdev if(iter == 0) {
96151c0b2f7Stbbdev remove_edge(node_to_test, sink);
96251c0b2f7Stbbdev node_to_test.try_put(BufferItemType());
96351c0b2f7Stbbdev g.wait_for_all();
96451c0b2f7Stbbdev g.reset();
96551c0b2f7Stbbdev input_count = sink_count = 0;
96651c0b2f7Stbbdev BufferItemType tmp;
96751c0b2f7Stbbdev CHECK_MESSAGE( (!node_to_test.try_get(tmp)), "node not empty");
96851c0b2f7Stbbdev make_edge(node_to_test, sink);
96951c0b2f7Stbbdev g.wait_for_all();
97051c0b2f7Stbbdev }
97151c0b2f7Stbbdev else {
97251c0b2f7Stbbdev g.reset();
97351c0b2f7Stbbdev input_count = sink_count = 0;
97451c0b2f7Stbbdev }
97551c0b2f7Stbbdev CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed");
97651c0b2f7Stbbdev CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed");
97751c0b2f7Stbbdev }
97851c0b2f7Stbbdev
97951c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
98051c0b2f7Stbbdev o.observe(false);
98151c0b2f7Stbbdev #endif
98251c0b2f7Stbbdev }
98351c0b2f7Stbbdev template<class BufferItemType,
98451c0b2f7Stbbdev TestNodeTypeEnum InputThrowType,
98551c0b2f7Stbbdev TestNodeTypeEnum SinkThrowType>
run_buffer_queue_and_overwrite_node_test()98651c0b2f7Stbbdev void run_buffer_queue_and_overwrite_node_test() {
98751c0b2f7Stbbdev typedef test_input_body<BufferItemType,InputThrowType> InputBodyType;
98851c0b2f7Stbbdev typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
98951c0b2f7Stbbdev
99051c0b2f7Stbbdev typedef tbb::flow::input_node<BufferItemType> InputType;
99151c0b2f7Stbbdev typedef tbb::flow::buffer_node<BufferItemType> BufType;
99251c0b2f7Stbbdev typedef tbb::flow::queue_node<BufferItemType> QueType;
99351c0b2f7Stbbdev typedef tbb::flow::overwrite_node<BufferItemType> OvrType;
99451c0b2f7Stbbdev typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
99551c0b2f7Stbbdev
99651c0b2f7Stbbdev for(int i = 0; i < 4; ++i) {
99751c0b2f7Stbbdev if(i == 2) continue; // no need to test flog w/o throws
99851c0b2f7Stbbdev bool throwException = (i & 0x1) != 0;
99951c0b2f7Stbbdev bool doFlog = (i & 0x2) != 0;
100051c0b2f7Stbbdev run_one_buffer_node_test<
100151c0b2f7Stbbdev /* class BufferItemType*/ BufferItemType,
100251c0b2f7Stbbdev /*class InputNodeType*/ InputType,
100351c0b2f7Stbbdev /*class InputNodeBodyType*/ InputBodyType,
100451c0b2f7Stbbdev /*class TestNodeType*/ BufType,
100551c0b2f7Stbbdev /*class SinkNodeType*/ SnkType,
100651c0b2f7Stbbdev /*class SinkNodeBodyType*/ SinkBodyType
100751c0b2f7Stbbdev >(throwException, doFlog);
100851c0b2f7Stbbdev run_one_buffer_node_test<
100951c0b2f7Stbbdev /* class BufferItemType*/ BufferItemType,
101051c0b2f7Stbbdev /*class InputNodeType*/ InputType,
101151c0b2f7Stbbdev /*class InputNodeBodyType*/ InputBodyType,
101251c0b2f7Stbbdev /*class TestNodeType*/ QueType,
101351c0b2f7Stbbdev /*class SinkNodeType*/ SnkType,
101451c0b2f7Stbbdev /*class SinkNodeBodyType*/ SinkBodyType
101551c0b2f7Stbbdev >(throwException, doFlog);
101651c0b2f7Stbbdev run_one_buffer_node_test<
101751c0b2f7Stbbdev /* class BufferItemType*/ BufferItemType,
101851c0b2f7Stbbdev /*class InputNodeType*/ InputType,
101951c0b2f7Stbbdev /*class InputNodeBodyType*/ InputBodyType,
102051c0b2f7Stbbdev /*class TestNodeType*/ OvrType,
102151c0b2f7Stbbdev /*class SinkNodeType*/ SnkType,
102251c0b2f7Stbbdev /*class SinkNodeBodyType*/ SinkBodyType
102351c0b2f7Stbbdev >(throwException, doFlog);
102451c0b2f7Stbbdev }
102551c0b2f7Stbbdev }
102651c0b2f7Stbbdev
test_buffer_queue_and_overwrite_node()102751c0b2f7Stbbdev void test_buffer_queue_and_overwrite_node() {
102851c0b2f7Stbbdev INFO("Testing buffer_node, queue_node and overwrite_node\n");
102951c0b2f7Stbbdev g_Wakeup_Msg = "buffer, queue, overwrite(is,non): Missed wakeup or machine is overloaded?";
103051c0b2f7Stbbdev run_buffer_queue_and_overwrite_node_test<int,isThrowing,nonThrowing>();
103151c0b2f7Stbbdev g_Wakeup_Msg = "buffer, queue, overwrite(non,is): Missed wakeup or machine is overloaded?";
103251c0b2f7Stbbdev run_buffer_queue_and_overwrite_node_test<int,nonThrowing,isThrowing>();
103351c0b2f7Stbbdev g_Wakeup_Msg = "buffer, queue, overwrite(is,is): Missed wakeup or machine is overloaded?";
103451c0b2f7Stbbdev run_buffer_queue_and_overwrite_node_test<int,isThrowing,isThrowing>();
103551c0b2f7Stbbdev g_Wakeup_Msg = g_Orig_Wakeup_Msg;
103651c0b2f7Stbbdev }
103751c0b2f7Stbbdev
103851c0b2f7Stbbdev // ---------- sequencer_node -------------------------
103951c0b2f7Stbbdev
104051c0b2f7Stbbdev
104151c0b2f7Stbbdev template<
104251c0b2f7Stbbdev class BufferItemType, //
104351c0b2f7Stbbdev class InputNodeType,
104451c0b2f7Stbbdev class InputNodeBodyType,
104551c0b2f7Stbbdev class TestNodeType,
104651c0b2f7Stbbdev class SeqBodyType,
104751c0b2f7Stbbdev class SinkNodeType,
104851c0b2f7Stbbdev class SinkNodeBodyType >
run_one_sequencer_node_test(bool throwException,bool flog)104951c0b2f7Stbbdev void run_one_sequencer_node_test(bool throwException,bool flog) {
105051c0b2f7Stbbdev tbb::flow::graph g;
105151c0b2f7Stbbdev
105251c0b2f7Stbbdev std::atomic<int> input_count;
105351c0b2f7Stbbdev std::atomic<int> sink_count;
105451c0b2f7Stbbdev input_count = sink_count = 0;
105551c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
105651c0b2f7Stbbdev eh_test_observer o;
105751c0b2f7Stbbdev o.observe(true);
105851c0b2f7Stbbdev #endif
105951c0b2f7Stbbdev g_Master = std::this_thread::get_id();
106051c0b2f7Stbbdev InputNodeType input(g, InputNodeBodyType(input_count));
106151c0b2f7Stbbdev TestNodeType node_to_test(g,SeqBodyType());
106251c0b2f7Stbbdev SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
106351c0b2f7Stbbdev make_edge(input,node_to_test);
106451c0b2f7Stbbdev make_edge(node_to_test, sink);
106551c0b2f7Stbbdev for(int iter = 0; iter < 2; ++iter) {
106651c0b2f7Stbbdev ResetGlobals(throwException,flog);
106751c0b2f7Stbbdev if(throwException) {
106851c0b2f7Stbbdev TRY();
106951c0b2f7Stbbdev input.activate();
107051c0b2f7Stbbdev g.wait_for_all();
107151c0b2f7Stbbdev CATCH_AND_ASSERT();
107251c0b2f7Stbbdev }
107351c0b2f7Stbbdev else {
107451c0b2f7Stbbdev TRY();
107551c0b2f7Stbbdev input.activate();
107651c0b2f7Stbbdev g.wait_for_all();
107751c0b2f7Stbbdev CATCH_AND_FAIL();
107851c0b2f7Stbbdev }
107951c0b2f7Stbbdev bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
108051c0b2f7Stbbdev int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value();
108151c0b2f7Stbbdev int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
108251c0b2f7Stbbdev if(throwException) {
108351c0b2f7Stbbdev CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
108451c0b2f7Stbbdev CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
108551c0b2f7Stbbdev CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs");
108651c0b2f7Stbbdev CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes");
108751c0b2f7Stbbdev }
108851c0b2f7Stbbdev else {
108951c0b2f7Stbbdev CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
109051c0b2f7Stbbdev CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
109151c0b2f7Stbbdev CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node");
109251c0b2f7Stbbdev CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
109351c0b2f7Stbbdev }
109451c0b2f7Stbbdev if(iter == 0) {
109551c0b2f7Stbbdev remove_edge(node_to_test, sink);
109651c0b2f7Stbbdev node_to_test.try_put(BufferItemType(g_NumItems + 1));
109751c0b2f7Stbbdev node_to_test.try_put(BufferItemType(1));
109851c0b2f7Stbbdev g.wait_for_all();
109951c0b2f7Stbbdev g.reset();
110051c0b2f7Stbbdev input_count = sink_count = 0;
110151c0b2f7Stbbdev make_edge(node_to_test, sink);
110251c0b2f7Stbbdev g.wait_for_all();
110351c0b2f7Stbbdev }
110451c0b2f7Stbbdev else {
110551c0b2f7Stbbdev g.reset();
110651c0b2f7Stbbdev input_count = sink_count = 0;
110751c0b2f7Stbbdev }
110851c0b2f7Stbbdev CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed");
110951c0b2f7Stbbdev CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed");
111051c0b2f7Stbbdev }
111151c0b2f7Stbbdev
111251c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
111351c0b2f7Stbbdev o.observe(false);
111451c0b2f7Stbbdev #endif
111551c0b2f7Stbbdev }
111651c0b2f7Stbbdev
111751c0b2f7Stbbdev template<class BufferItemType,
111851c0b2f7Stbbdev TestNodeTypeEnum InputThrowType,
111951c0b2f7Stbbdev TestNodeTypeEnum SinkThrowType>
run_sequencer_node_test()112051c0b2f7Stbbdev void run_sequencer_node_test() {
112151c0b2f7Stbbdev typedef test_input_body<BufferItemType,InputThrowType> InputBodyType;
112251c0b2f7Stbbdev typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
112351c0b2f7Stbbdev typedef sequencer_body<BufferItemType> SeqBodyType;
112451c0b2f7Stbbdev
112551c0b2f7Stbbdev typedef tbb::flow::input_node<BufferItemType> InputType;
112651c0b2f7Stbbdev typedef tbb::flow::sequencer_node<BufferItemType> SeqType;
112751c0b2f7Stbbdev typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
112851c0b2f7Stbbdev
112951c0b2f7Stbbdev for(int i = 0; i < 4; ++i) {
113051c0b2f7Stbbdev if(i == 2) continue; // no need to test flog w/o throws
113151c0b2f7Stbbdev bool throwException = (i & 0x1) != 0;
113251c0b2f7Stbbdev bool doFlog = (i & 0x2) != 0;
113351c0b2f7Stbbdev run_one_sequencer_node_test<
113451c0b2f7Stbbdev /* class BufferItemType*/ BufferItemType,
113551c0b2f7Stbbdev /*class InputNodeType*/ InputType,
113651c0b2f7Stbbdev /*class InputNodeBodyType*/ InputBodyType,
113751c0b2f7Stbbdev /*class TestNodeType*/ SeqType,
113851c0b2f7Stbbdev /*class SeqBodyType*/ SeqBodyType,
113951c0b2f7Stbbdev /*class SinkNodeType*/ SnkType,
114051c0b2f7Stbbdev /*class SinkNodeBodyType*/ SinkBodyType
114151c0b2f7Stbbdev >(throwException, doFlog);
114251c0b2f7Stbbdev }
114351c0b2f7Stbbdev }
114451c0b2f7Stbbdev
114551c0b2f7Stbbdev
114651c0b2f7Stbbdev
test_sequencer_node()114751c0b2f7Stbbdev void test_sequencer_node() {
114851c0b2f7Stbbdev INFO("Testing sequencer_node\n");
114951c0b2f7Stbbdev g_Wakeup_Msg = "sequencer_node(is,non): Missed wakeup or machine is overloaded?";
115051c0b2f7Stbbdev run_sequencer_node_test<int, isThrowing,nonThrowing>();
115151c0b2f7Stbbdev CheckType<int>::check_type_counter = 0;
115251c0b2f7Stbbdev g_Wakeup_Msg = "sequencer_node(non,is): Missed wakeup or machine is overloaded?";
115351c0b2f7Stbbdev run_sequencer_node_test<CheckType<int>, nonThrowing,isThrowing>();
115451c0b2f7Stbbdev CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Dropped objects in sequencer_node test");
115551c0b2f7Stbbdev g_Wakeup_Msg = "sequencer_node(is,is): Missed wakeup or machine is overloaded?";
115651c0b2f7Stbbdev run_sequencer_node_test<int, isThrowing,isThrowing>();
115751c0b2f7Stbbdev g_Wakeup_Msg = g_Orig_Wakeup_Msg;
115851c0b2f7Stbbdev }
115951c0b2f7Stbbdev
116051c0b2f7Stbbdev // ------------ priority_queue_node ------------------
116151c0b2f7Stbbdev
116251c0b2f7Stbbdev template<
116351c0b2f7Stbbdev class BufferItemType,
116451c0b2f7Stbbdev class InputNodeType,
116551c0b2f7Stbbdev class InputNodeBodyType,
116651c0b2f7Stbbdev class TestNodeType,
116751c0b2f7Stbbdev class SinkNodeType,
116851c0b2f7Stbbdev class SinkNodeBodyType >
run_one_priority_queue_node_test(bool throwException,bool flog)116951c0b2f7Stbbdev void run_one_priority_queue_node_test(bool throwException,bool flog) {
117051c0b2f7Stbbdev tbb::flow::graph g;
117151c0b2f7Stbbdev
117251c0b2f7Stbbdev std::atomic<int> input_count;
117351c0b2f7Stbbdev std::atomic<int> sink_count;
117451c0b2f7Stbbdev input_count = sink_count = 0;
117551c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
117651c0b2f7Stbbdev eh_test_observer o;
117751c0b2f7Stbbdev o.observe(true);
117851c0b2f7Stbbdev #endif
117951c0b2f7Stbbdev g_Master = std::this_thread::get_id();
118051c0b2f7Stbbdev InputNodeType input(g, InputNodeBodyType(input_count));
118151c0b2f7Stbbdev
118251c0b2f7Stbbdev TestNodeType node_to_test(g);
118351c0b2f7Stbbdev
118451c0b2f7Stbbdev SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
118551c0b2f7Stbbdev
118651c0b2f7Stbbdev make_edge(input,node_to_test);
118751c0b2f7Stbbdev make_edge(node_to_test, sink);
118851c0b2f7Stbbdev for(int iter = 0; iter < 2; ++iter) {
118951c0b2f7Stbbdev ResetGlobals(throwException,flog);
119051c0b2f7Stbbdev if(throwException) {
119151c0b2f7Stbbdev TRY();
119251c0b2f7Stbbdev input.activate();
119351c0b2f7Stbbdev g.wait_for_all();
119451c0b2f7Stbbdev CATCH_AND_ASSERT();
119551c0b2f7Stbbdev }
119651c0b2f7Stbbdev else {
119751c0b2f7Stbbdev TRY();
119851c0b2f7Stbbdev input.activate();
119951c0b2f7Stbbdev g.wait_for_all();
120051c0b2f7Stbbdev CATCH_AND_FAIL();
120151c0b2f7Stbbdev }
120251c0b2f7Stbbdev bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
120351c0b2f7Stbbdev int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value();
120451c0b2f7Stbbdev int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
120551c0b2f7Stbbdev if(throwException) {
120651c0b2f7Stbbdev CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
120751c0b2f7Stbbdev CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
120851c0b2f7Stbbdev CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs");
120951c0b2f7Stbbdev CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes");
121051c0b2f7Stbbdev }
121151c0b2f7Stbbdev else {
121251c0b2f7Stbbdev CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
121351c0b2f7Stbbdev CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
121451c0b2f7Stbbdev CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_node");
121551c0b2f7Stbbdev CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
121651c0b2f7Stbbdev }
121751c0b2f7Stbbdev if(iter == 0) {
121851c0b2f7Stbbdev remove_edge(node_to_test, sink);
121951c0b2f7Stbbdev node_to_test.try_put(BufferItemType(g_NumItems + 1));
122051c0b2f7Stbbdev node_to_test.try_put(BufferItemType(g_NumItems + 2));
122151c0b2f7Stbbdev node_to_test.try_put(BufferItemType());
122251c0b2f7Stbbdev g.wait_for_all();
122351c0b2f7Stbbdev g.reset();
122451c0b2f7Stbbdev input_count = sink_count = 0;
122551c0b2f7Stbbdev make_edge(node_to_test, sink);
122651c0b2f7Stbbdev g.wait_for_all();
122751c0b2f7Stbbdev }
122851c0b2f7Stbbdev else {
122951c0b2f7Stbbdev g.reset();
123051c0b2f7Stbbdev input_count = sink_count = 0;
123151c0b2f7Stbbdev }
123251c0b2f7Stbbdev CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed");
123351c0b2f7Stbbdev CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed");
123451c0b2f7Stbbdev }
123551c0b2f7Stbbdev
123651c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
123751c0b2f7Stbbdev o.observe(false);
123851c0b2f7Stbbdev #endif
123951c0b2f7Stbbdev }
124051c0b2f7Stbbdev
124151c0b2f7Stbbdev template<class BufferItemType,
124251c0b2f7Stbbdev TestNodeTypeEnum InputThrowType,
124351c0b2f7Stbbdev TestNodeTypeEnum SinkThrowType>
run_priority_queue_node_test()124451c0b2f7Stbbdev void run_priority_queue_node_test() {
124551c0b2f7Stbbdev typedef test_input_body<BufferItemType,InputThrowType> InputBodyType;
124651c0b2f7Stbbdev typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
124751c0b2f7Stbbdev typedef less_body<BufferItemType> LessBodyType;
124851c0b2f7Stbbdev
124951c0b2f7Stbbdev typedef tbb::flow::input_node<BufferItemType> InputType;
125051c0b2f7Stbbdev typedef tbb::flow::priority_queue_node<BufferItemType,LessBodyType> PrqType;
125151c0b2f7Stbbdev typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
125251c0b2f7Stbbdev
125351c0b2f7Stbbdev for(int i = 0; i < 4; ++i) {
125451c0b2f7Stbbdev if(i == 2) continue; // no need to test flog w/o throws
125551c0b2f7Stbbdev bool throwException = (i & 0x1) != 0;
125651c0b2f7Stbbdev bool doFlog = (i & 0x2) != 0;
125751c0b2f7Stbbdev run_one_priority_queue_node_test<
125851c0b2f7Stbbdev /* class BufferItemType*/ BufferItemType,
125951c0b2f7Stbbdev /*class InputNodeType*/ InputType,
126051c0b2f7Stbbdev /*class InputNodeBodyType*/ InputBodyType,
126151c0b2f7Stbbdev /*class TestNodeType*/ PrqType,
126251c0b2f7Stbbdev /*class SinkNodeType*/ SnkType,
126351c0b2f7Stbbdev /*class SinkNodeBodyType*/ SinkBodyType
126451c0b2f7Stbbdev >(throwException, doFlog);
126551c0b2f7Stbbdev }
126651c0b2f7Stbbdev }
126751c0b2f7Stbbdev
test_priority_queue_node()126851c0b2f7Stbbdev void test_priority_queue_node() {
126951c0b2f7Stbbdev INFO("Testing priority_queue_node\n");
127051c0b2f7Stbbdev g_Wakeup_Msg = "priority_queue_node(is,non): Missed wakeup or machine is overloaded?";
127151c0b2f7Stbbdev run_priority_queue_node_test<int, isThrowing,nonThrowing>();
127251c0b2f7Stbbdev CheckType<int>::check_type_counter = 0;
127351c0b2f7Stbbdev g_Wakeup_Msg = "priority_queue_node(non,is): Missed wakeup or machine is overloaded?";
127451c0b2f7Stbbdev run_priority_queue_node_test<CheckType<int>, nonThrowing,isThrowing>();
127551c0b2f7Stbbdev CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Dropped objects in priority_queue_node test");
127651c0b2f7Stbbdev g_Wakeup_Msg = "priority_queue_node(is,is): Missed wakeup or machine is overloaded?";
127751c0b2f7Stbbdev run_priority_queue_node_test<int, isThrowing,isThrowing>();
127851c0b2f7Stbbdev g_Wakeup_Msg = g_Orig_Wakeup_Msg;
127951c0b2f7Stbbdev }
128051c0b2f7Stbbdev
128151c0b2f7Stbbdev // ------------------- join_node ----------------
128251c0b2f7Stbbdev template<class JP> struct graph_policy_name{
namegraph_policy_name128351c0b2f7Stbbdev static const char* name() {return "unknown"; }
128451c0b2f7Stbbdev };
128551c0b2f7Stbbdev template<> struct graph_policy_name<tbb::flow::queueing> {
namegraph_policy_name128651c0b2f7Stbbdev static const char* name() {return "queueing"; }
128751c0b2f7Stbbdev };
128851c0b2f7Stbbdev template<> struct graph_policy_name<tbb::flow::reserving> {
namegraph_policy_name128951c0b2f7Stbbdev static const char* name() {return "reserving"; }
129051c0b2f7Stbbdev };
129151c0b2f7Stbbdev template<> struct graph_policy_name<tbb::flow::tag_matching> {
namegraph_policy_name129251c0b2f7Stbbdev static const char* name() {return "tag_matching"; }
129351c0b2f7Stbbdev };
129451c0b2f7Stbbdev
129551c0b2f7Stbbdev
129651c0b2f7Stbbdev template<
129751c0b2f7Stbbdev class JP,
129851c0b2f7Stbbdev class OutputTuple,
129951c0b2f7Stbbdev class InputType0,
130051c0b2f7Stbbdev class InputBodyType0,
130151c0b2f7Stbbdev class InputType1,
130251c0b2f7Stbbdev class InputBodyType1,
130351c0b2f7Stbbdev class TestJoinType,
130451c0b2f7Stbbdev class SinkType,
130551c0b2f7Stbbdev class SinkBodyType
130651c0b2f7Stbbdev >
130751c0b2f7Stbbdev struct run_one_join_node_test {
run_one_join_node_testrun_one_join_node_test130851c0b2f7Stbbdev run_one_join_node_test() {}
execute_testrun_one_join_node_test130951c0b2f7Stbbdev static void execute_test(bool throwException,bool flog) {
131051c0b2f7Stbbdev typedef typename std::tuple_element<0,OutputTuple>::type ItemType0;
131151c0b2f7Stbbdev typedef typename std::tuple_element<1,OutputTuple>::type ItemType1;
131251c0b2f7Stbbdev
131351c0b2f7Stbbdev tbb::flow::graph g;
131451c0b2f7Stbbdev std::atomic<int>input0_count;
131551c0b2f7Stbbdev std::atomic<int>input1_count;
131651c0b2f7Stbbdev std::atomic<int>sink_count;
131751c0b2f7Stbbdev input0_count = input1_count = sink_count = 0;
131851c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
131951c0b2f7Stbbdev eh_test_observer o;
132051c0b2f7Stbbdev o.observe(true);
132151c0b2f7Stbbdev #endif
132251c0b2f7Stbbdev g_Master = std::this_thread::get_id();
132351c0b2f7Stbbdev InputType0 input0(g, InputBodyType0(input0_count));
132451c0b2f7Stbbdev InputType1 input1(g, InputBodyType1(input1_count));
132551c0b2f7Stbbdev TestJoinType node_to_test(g);
132651c0b2f7Stbbdev SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count));
132751c0b2f7Stbbdev make_edge(input0,tbb::flow::input_port<0>(node_to_test));
132851c0b2f7Stbbdev make_edge(input1,tbb::flow::input_port<1>(node_to_test));
132951c0b2f7Stbbdev make_edge(node_to_test, sink);
133051c0b2f7Stbbdev for(int iter = 0; iter < 2; ++iter) {
133151c0b2f7Stbbdev ResetGlobals(throwException,flog);
133251c0b2f7Stbbdev if(throwException) {
133351c0b2f7Stbbdev TRY();
133451c0b2f7Stbbdev input0.activate();
133551c0b2f7Stbbdev input1.activate();
133651c0b2f7Stbbdev g.wait_for_all();
133751c0b2f7Stbbdev CATCH_AND_ASSERT();
133851c0b2f7Stbbdev }
133951c0b2f7Stbbdev else {
134051c0b2f7Stbbdev TRY();
134151c0b2f7Stbbdev input0.activate();
134251c0b2f7Stbbdev input1.activate();
134351c0b2f7Stbbdev g.wait_for_all();
134451c0b2f7Stbbdev CATCH_AND_FAIL();
134551c0b2f7Stbbdev }
134651c0b2f7Stbbdev bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
134751c0b2f7Stbbdev int ib0_cnt = tbb::flow::copy_body<InputBodyType0>(input0).count_value();
134851c0b2f7Stbbdev int ib1_cnt = tbb::flow::copy_body<InputBodyType1>(input1).count_value();
134951c0b2f7Stbbdev int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
135051c0b2f7Stbbdev if(throwException) {
135151c0b2f7Stbbdev CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
135251c0b2f7Stbbdev CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
135351c0b2f7Stbbdev CHECK_MESSAGE( (ib0_cnt <= g_NumItems && ib1_cnt <= g_NumItems), "Too many items sent by inputs");
135451c0b2f7Stbbdev CHECK_MESSAGE( (nb_cnt <= ((ib0_cnt < ib1_cnt) ? ib0_cnt : ib1_cnt)), "Too many items received by sink nodes");
135551c0b2f7Stbbdev }
135651c0b2f7Stbbdev else {
135751c0b2f7Stbbdev CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
135851c0b2f7Stbbdev CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
135951c0b2f7Stbbdev if(ib0_cnt != g_NumItems) {
136051c0b2f7Stbbdev // INFO("throwException == %s\n" << (throwException ? "true" : "false"));
136151c0b2f7Stbbdev // INFO("iter == " << iter << "\n");
136251c0b2f7Stbbdev // INFO("ib0_cnt == " << ib0_cnt << "\n");
136351c0b2f7Stbbdev // INFO("g_NumItems == " << g_NumItems << "\n");
136451c0b2f7Stbbdev }
136551c0b2f7Stbbdev CHECK_MESSAGE( (ib0_cnt == g_NumItems), "Missing invocations of input_node0"); // this one
136651c0b2f7Stbbdev CHECK_MESSAGE( (ib1_cnt == g_NumItems), "Missing invocations of input_node1");
136751c0b2f7Stbbdev CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
136851c0b2f7Stbbdev }
136951c0b2f7Stbbdev if(iter == 0) {
137051c0b2f7Stbbdev remove_edge(node_to_test, sink);
137151c0b2f7Stbbdev tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 1));
137251c0b2f7Stbbdev tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2));
137351c0b2f7Stbbdev g.wait_for_all();
137451c0b2f7Stbbdev g.reset();
137551c0b2f7Stbbdev input0_count = input1_count = sink_count = 0;
137651c0b2f7Stbbdev make_edge(node_to_test, sink);
137751c0b2f7Stbbdev g.wait_for_all();
137851c0b2f7Stbbdev }
137951c0b2f7Stbbdev else {
138051c0b2f7Stbbdev g.wait_for_all();
138151c0b2f7Stbbdev g.reset();
138251c0b2f7Stbbdev input0_count = input1_count = sink_count = 0;
138351c0b2f7Stbbdev }
138451c0b2f7Stbbdev CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType0>(input0).count_value()),"Reset input failed");
138551c0b2f7Stbbdev CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType1>(input1).count_value()),"Reset input failed");
138651c0b2f7Stbbdev nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
138751c0b2f7Stbbdev CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value()),"Reset sink failed");
138851c0b2f7Stbbdev }
138951c0b2f7Stbbdev
139051c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
139151c0b2f7Stbbdev o.observe(false);
139251c0b2f7Stbbdev #endif
139351c0b2f7Stbbdev }
139451c0b2f7Stbbdev }; // run_one_join_node_test
139551c0b2f7Stbbdev
139651c0b2f7Stbbdev template<
139751c0b2f7Stbbdev class OutputTuple,
139851c0b2f7Stbbdev class InputType0,
139951c0b2f7Stbbdev class InputBodyType0,
140051c0b2f7Stbbdev class InputType1,
140151c0b2f7Stbbdev class InputBodyType1,
140251c0b2f7Stbbdev class TestJoinType,
140351c0b2f7Stbbdev class SinkType,
140451c0b2f7Stbbdev class SinkBodyType
140551c0b2f7Stbbdev >
140651c0b2f7Stbbdev struct run_one_join_node_test<
140751c0b2f7Stbbdev tbb::flow::tag_matching,
140851c0b2f7Stbbdev OutputTuple,
140951c0b2f7Stbbdev InputType0,
141051c0b2f7Stbbdev InputBodyType0,
141151c0b2f7Stbbdev InputType1,
141251c0b2f7Stbbdev InputBodyType1,
141351c0b2f7Stbbdev TestJoinType,
141451c0b2f7Stbbdev SinkType,
141551c0b2f7Stbbdev SinkBodyType
141651c0b2f7Stbbdev > {
run_one_join_node_testrun_one_join_node_test141751c0b2f7Stbbdev run_one_join_node_test() {}
execute_testrun_one_join_node_test141851c0b2f7Stbbdev static void execute_test(bool throwException,bool flog) {
141951c0b2f7Stbbdev typedef typename std::tuple_element<0,OutputTuple>::type ItemType0;
142051c0b2f7Stbbdev typedef typename std::tuple_element<1,OutputTuple>::type ItemType1;
142151c0b2f7Stbbdev
142251c0b2f7Stbbdev tbb::flow::graph g;
142351c0b2f7Stbbdev
142451c0b2f7Stbbdev std::atomic<int>input0_count;
142551c0b2f7Stbbdev std::atomic<int>input1_count;
142651c0b2f7Stbbdev std::atomic<int>sink_count;
142751c0b2f7Stbbdev input0_count = input1_count = sink_count = 0;
142851c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
142951c0b2f7Stbbdev eh_test_observer o;
143051c0b2f7Stbbdev o.observe(true);
143151c0b2f7Stbbdev #endif
143251c0b2f7Stbbdev g_Master = std::this_thread::get_id();
143351c0b2f7Stbbdev InputType0 input0(g, InputBodyType0(input0_count, 2));
143451c0b2f7Stbbdev InputType1 input1(g, InputBodyType1(input1_count, 3));
143551c0b2f7Stbbdev TestJoinType node_to_test(g, tag_func<ItemType0>(ItemType0(2)), tag_func<ItemType1>(ItemType1(3)));
143651c0b2f7Stbbdev SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count));
143751c0b2f7Stbbdev make_edge(input0,tbb::flow::input_port<0>(node_to_test));
143851c0b2f7Stbbdev make_edge(input1,tbb::flow::input_port<1>(node_to_test));
143951c0b2f7Stbbdev make_edge(node_to_test, sink);
144051c0b2f7Stbbdev for(int iter = 0; iter < 2; ++iter) {
144151c0b2f7Stbbdev ResetGlobals(throwException,flog);
144251c0b2f7Stbbdev if(throwException) {
144351c0b2f7Stbbdev TRY();
144451c0b2f7Stbbdev input0.activate();
144551c0b2f7Stbbdev input1.activate();
144651c0b2f7Stbbdev g.wait_for_all();
144751c0b2f7Stbbdev CATCH_AND_ASSERT();
144851c0b2f7Stbbdev }
144951c0b2f7Stbbdev else {
145051c0b2f7Stbbdev TRY();
145151c0b2f7Stbbdev input0.activate();
145251c0b2f7Stbbdev input1.activate();
145351c0b2f7Stbbdev g.wait_for_all();
145451c0b2f7Stbbdev CATCH_AND_FAIL();
145551c0b2f7Stbbdev }
145651c0b2f7Stbbdev bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
145751c0b2f7Stbbdev int ib0_cnt = tbb::flow::copy_body<InputBodyType0>(input0).count_value();
145851c0b2f7Stbbdev int ib1_cnt = tbb::flow::copy_body<InputBodyType1>(input1).count_value();
145951c0b2f7Stbbdev int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
146051c0b2f7Stbbdev if(throwException) {
146151c0b2f7Stbbdev CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
146251c0b2f7Stbbdev CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
146351c0b2f7Stbbdev CHECK_MESSAGE( (ib0_cnt <= g_NumItems && ib1_cnt <= g_NumItems), "Too many items sent by inputs");
146451c0b2f7Stbbdev CHECK_MESSAGE( (nb_cnt <= ((ib0_cnt < ib1_cnt) ? ib0_cnt : ib1_cnt)), "Too many items received by sink nodes");
146551c0b2f7Stbbdev }
146651c0b2f7Stbbdev else {
146751c0b2f7Stbbdev CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
146851c0b2f7Stbbdev CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
146951c0b2f7Stbbdev CHECK_MESSAGE( (ib0_cnt == g_NumItems), "Missing invocations of input_node0");
147051c0b2f7Stbbdev CHECK_MESSAGE( (ib1_cnt == g_NumItems), "Missing invocations of input_node1");
147151c0b2f7Stbbdev CHECK_MESSAGE( (nb_cnt == g_NumItems), "Missing items in absorbers");
147251c0b2f7Stbbdev }
147351c0b2f7Stbbdev if(iter == 0) {
147451c0b2f7Stbbdev remove_edge(node_to_test, sink);
147551c0b2f7Stbbdev tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 4));
147651c0b2f7Stbbdev tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2));
147751c0b2f7Stbbdev g.wait_for_all(); // have to wait for the graph to stop again....
147851c0b2f7Stbbdev g.reset(); // resets the body of the input_nodes, test_node and the absorb_nodes.
147951c0b2f7Stbbdev input0_count = input1_count = sink_count = 0;
148051c0b2f7Stbbdev make_edge(node_to_test, sink);
148151c0b2f7Stbbdev g.wait_for_all(); // have to wait for the graph to stop again....
148251c0b2f7Stbbdev }
148351c0b2f7Stbbdev else {
148451c0b2f7Stbbdev g.wait_for_all();
148551c0b2f7Stbbdev g.reset();
148651c0b2f7Stbbdev input0_count = input1_count = sink_count = 0;
148751c0b2f7Stbbdev }
148851c0b2f7Stbbdev CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType0>(input0).count_value()),"Reset input failed");
148951c0b2f7Stbbdev CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType1>(input1).count_value()),"Reset input failed");
149051c0b2f7Stbbdev nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
149151c0b2f7Stbbdev CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value()),"Reset sink failed");
149251c0b2f7Stbbdev }
149351c0b2f7Stbbdev
149451c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
149551c0b2f7Stbbdev o.observe(false);
149651c0b2f7Stbbdev #endif
149751c0b2f7Stbbdev }
149851c0b2f7Stbbdev }; // run_one_join_node_test<tag_matching>
149951c0b2f7Stbbdev
150051c0b2f7Stbbdev template<class JP, class OutputTuple,
150151c0b2f7Stbbdev TestNodeTypeEnum InputThrowType,
150251c0b2f7Stbbdev TestNodeTypeEnum SinkThrowType>
run_join_node_test()150351c0b2f7Stbbdev void run_join_node_test() {
150451c0b2f7Stbbdev typedef typename std::tuple_element<0,OutputTuple>::type ItemType0;
150551c0b2f7Stbbdev typedef typename std::tuple_element<1,OutputTuple>::type ItemType1;
150651c0b2f7Stbbdev typedef test_input_body<ItemType0,InputThrowType> InputBodyType0;
150751c0b2f7Stbbdev typedef test_input_body<ItemType1,InputThrowType> InputBodyType1;
150851c0b2f7Stbbdev typedef absorber_body<OutputTuple,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
150951c0b2f7Stbbdev
151051c0b2f7Stbbdev typedef typename tbb::flow::input_node<ItemType0> InputType0;
151151c0b2f7Stbbdev typedef typename tbb::flow::input_node<ItemType1> InputType1;
151251c0b2f7Stbbdev typedef typename tbb::flow::join_node<OutputTuple,JP> TestJoinType;
151351c0b2f7Stbbdev typedef typename tbb::flow::function_node<OutputTuple,tbb::flow::continue_msg> SinkType;
151451c0b2f7Stbbdev
151551c0b2f7Stbbdev for(int i = 0; i < 4; ++i) {
151651c0b2f7Stbbdev if(2 == i) continue;
151751c0b2f7Stbbdev bool throwException = (i & 0x1) != 0;
151851c0b2f7Stbbdev bool doFlog = (i & 0x2) != 0;
151951c0b2f7Stbbdev run_one_join_node_test<
152051c0b2f7Stbbdev JP,
152151c0b2f7Stbbdev OutputTuple,
152251c0b2f7Stbbdev InputType0,
152351c0b2f7Stbbdev InputBodyType0,
152451c0b2f7Stbbdev InputType1,
152551c0b2f7Stbbdev InputBodyType1,
152651c0b2f7Stbbdev TestJoinType,
152751c0b2f7Stbbdev SinkType,
152851c0b2f7Stbbdev SinkBodyType>::execute_test(throwException,doFlog);
152951c0b2f7Stbbdev }
153051c0b2f7Stbbdev }
153151c0b2f7Stbbdev
153251c0b2f7Stbbdev template<class JP>
test_join_node()153351c0b2f7Stbbdev void test_join_node() {
153451c0b2f7Stbbdev INFO("Testing join_node<" << graph_policy_name<JP>::name() << ">\n");
153551c0b2f7Stbbdev // only doing two-input joins
153651c0b2f7Stbbdev g_Wakeup_Msg = "join(is,non): Missed wakeup or machine is overloaded?";
153751c0b2f7Stbbdev run_join_node_test<JP, std::tuple<int,int>, isThrowing, nonThrowing>();
153851c0b2f7Stbbdev CheckType<int>::check_type_counter = 0;
153951c0b2f7Stbbdev g_Wakeup_Msg = "join(non,is): Missed wakeup or machine is overloaded?";
154051c0b2f7Stbbdev run_join_node_test<JP, std::tuple<CheckType<int>,int>, nonThrowing, isThrowing>();
154151c0b2f7Stbbdev CHECK_MESSAGE( (!CheckType<int>::check_type_counter), "Dropped items in test");
154251c0b2f7Stbbdev g_Wakeup_Msg = "join(is,is): Missed wakeup or machine is overloaded?";
154351c0b2f7Stbbdev run_join_node_test<JP, std::tuple<int,int>, isThrowing, isThrowing>();
154451c0b2f7Stbbdev g_Wakeup_Msg = g_Orig_Wakeup_Msg;
154551c0b2f7Stbbdev }
154651c0b2f7Stbbdev
154751c0b2f7Stbbdev // ------------------- limiter_node -------------
154851c0b2f7Stbbdev
154951c0b2f7Stbbdev template<
155051c0b2f7Stbbdev class BufferItemType, //
155151c0b2f7Stbbdev class InputNodeType,
155251c0b2f7Stbbdev class InputNodeBodyType,
155351c0b2f7Stbbdev class TestNodeType,
155451c0b2f7Stbbdev class SinkNodeType,
155551c0b2f7Stbbdev class SinkNodeBodyType >
run_one_limiter_node_test(bool throwException,bool flog)155651c0b2f7Stbbdev void run_one_limiter_node_test(bool throwException,bool flog) {
155751c0b2f7Stbbdev tbb::flow::graph g;
155851c0b2f7Stbbdev
155951c0b2f7Stbbdev std::atomic<int> input_count;
156051c0b2f7Stbbdev std::atomic<int> sink_count;
156151c0b2f7Stbbdev input_count = sink_count = 0;
156251c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
156351c0b2f7Stbbdev eh_test_observer o;
156451c0b2f7Stbbdev o.observe(true);
156551c0b2f7Stbbdev #endif
156651c0b2f7Stbbdev g_Master = std::this_thread::get_id();
156751c0b2f7Stbbdev InputNodeType input(g, InputNodeBodyType(input_count));
156851c0b2f7Stbbdev TestNodeType node_to_test(g,g_NumThreads + 1);
156951c0b2f7Stbbdev SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
157051c0b2f7Stbbdev make_edge(input,node_to_test);
157151c0b2f7Stbbdev make_edge(node_to_test, sink);
157251c0b2f7Stbbdev for(int iter = 0; iter < 2; ++iter) {
157351c0b2f7Stbbdev ResetGlobals(throwException,flog);
157451c0b2f7Stbbdev if(throwException) {
157551c0b2f7Stbbdev TRY();
157651c0b2f7Stbbdev input.activate();
157751c0b2f7Stbbdev g.wait_for_all();
157851c0b2f7Stbbdev CATCH_AND_ASSERT();
157951c0b2f7Stbbdev }
158051c0b2f7Stbbdev else {
158151c0b2f7Stbbdev TRY();
158251c0b2f7Stbbdev input.activate();
158351c0b2f7Stbbdev g.wait_for_all();
158451c0b2f7Stbbdev CATCH_AND_FAIL();
158551c0b2f7Stbbdev }
158651c0b2f7Stbbdev bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
158751c0b2f7Stbbdev int ib_cnt = tbb::flow::copy_body<InputNodeBodyType>(input).count_value();
158851c0b2f7Stbbdev int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
158951c0b2f7Stbbdev if(throwException) {
159051c0b2f7Stbbdev CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
159151c0b2f7Stbbdev CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
159251c0b2f7Stbbdev CHECK_MESSAGE( (ib_cnt <= g_NumItems), "Too many items sent by inputs");
159351c0b2f7Stbbdev CHECK_MESSAGE( (nb_cnt <= ib_cnt), "Too many items received by sink nodes");
159451c0b2f7Stbbdev }
159551c0b2f7Stbbdev else {
159651c0b2f7Stbbdev CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
159751c0b2f7Stbbdev CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
159851c0b2f7Stbbdev // we stop after limiter's limit, which is g_NumThreads + 1. The input_node
159951c0b2f7Stbbdev // is invoked one extra time, filling its buffer, so its limit is g_NumThreads + 2.
160051c0b2f7Stbbdev CHECK_MESSAGE( (ib_cnt == g_NumThreads + 2), "Missing invocations of input_node");
160151c0b2f7Stbbdev CHECK_MESSAGE( (nb_cnt == g_NumThreads + 1), "Missing items in absorbers");
160251c0b2f7Stbbdev }
160351c0b2f7Stbbdev if(iter == 0) {
160451c0b2f7Stbbdev remove_edge(node_to_test, sink);
160551c0b2f7Stbbdev node_to_test.try_put(BufferItemType());
160651c0b2f7Stbbdev node_to_test.try_put(BufferItemType());
160751c0b2f7Stbbdev g.wait_for_all();
160851c0b2f7Stbbdev g.reset();
160951c0b2f7Stbbdev input_count = sink_count = 0;
161051c0b2f7Stbbdev BufferItemType tmp;
161151c0b2f7Stbbdev CHECK_MESSAGE( (!node_to_test.try_get(tmp)), "node not empty");
161251c0b2f7Stbbdev make_edge(node_to_test, sink);
161351c0b2f7Stbbdev g.wait_for_all();
161451c0b2f7Stbbdev }
161551c0b2f7Stbbdev else {
161651c0b2f7Stbbdev g.reset();
161751c0b2f7Stbbdev input_count = sink_count = 0;
161851c0b2f7Stbbdev }
161951c0b2f7Stbbdev CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputNodeBodyType>(input).count_value()),"Reset input failed");
162051c0b2f7Stbbdev CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value()),"Reset sink failed");
162151c0b2f7Stbbdev }
162251c0b2f7Stbbdev
162351c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
162451c0b2f7Stbbdev o.observe(false);
162551c0b2f7Stbbdev #endif
162651c0b2f7Stbbdev }
162751c0b2f7Stbbdev
162851c0b2f7Stbbdev template<class BufferItemType,
162951c0b2f7Stbbdev TestNodeTypeEnum InputThrowType,
163051c0b2f7Stbbdev TestNodeTypeEnum SinkThrowType>
run_limiter_node_test()163151c0b2f7Stbbdev void run_limiter_node_test() {
163251c0b2f7Stbbdev typedef test_input_body<BufferItemType,InputThrowType> InputBodyType;
163351c0b2f7Stbbdev typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
163451c0b2f7Stbbdev
163551c0b2f7Stbbdev typedef tbb::flow::input_node<BufferItemType> InputType;
163651c0b2f7Stbbdev typedef tbb::flow::limiter_node<BufferItemType> LmtType;
163751c0b2f7Stbbdev typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
163851c0b2f7Stbbdev
163951c0b2f7Stbbdev for(int i = 0; i < 4; ++i) {
164051c0b2f7Stbbdev if(i == 2) continue; // no need to test flog w/o throws
164151c0b2f7Stbbdev bool throwException = (i & 0x1) != 0;
164251c0b2f7Stbbdev bool doFlog = (i & 0x2) != 0;
164351c0b2f7Stbbdev run_one_limiter_node_test<
164451c0b2f7Stbbdev /* class BufferItemType*/ BufferItemType,
164551c0b2f7Stbbdev /*class InputNodeType*/ InputType,
164651c0b2f7Stbbdev /*class InputNodeBodyType*/ InputBodyType,
164751c0b2f7Stbbdev /*class TestNodeType*/ LmtType,
164851c0b2f7Stbbdev /*class SinkNodeType*/ SnkType,
164951c0b2f7Stbbdev /*class SinkNodeBodyType*/ SinkBodyType
165051c0b2f7Stbbdev >(throwException, doFlog);
165151c0b2f7Stbbdev }
165251c0b2f7Stbbdev }
165351c0b2f7Stbbdev
test_limiter_node()165451c0b2f7Stbbdev void test_limiter_node() {
165551c0b2f7Stbbdev INFO("Testing limiter_node\n");
165651c0b2f7Stbbdev g_Wakeup_Msg = "limiter_node(is,non): Missed wakeup or machine is overloaded?";
165751c0b2f7Stbbdev run_limiter_node_test<int,isThrowing,nonThrowing>();
165851c0b2f7Stbbdev g_Wakeup_Msg = "limiter_node(non,is): Missed wakeup or machine is overloaded?";
165951c0b2f7Stbbdev run_limiter_node_test<int,nonThrowing,isThrowing>();
166051c0b2f7Stbbdev g_Wakeup_Msg = "limiter_node(is,is): Missed wakeup or machine is overloaded?";
166151c0b2f7Stbbdev run_limiter_node_test<int,isThrowing,isThrowing>();
166251c0b2f7Stbbdev g_Wakeup_Msg = g_Orig_Wakeup_Msg;
166351c0b2f7Stbbdev }
166451c0b2f7Stbbdev
166551c0b2f7Stbbdev // -------- split_node --------------------
166651c0b2f7Stbbdev
166751c0b2f7Stbbdev template<
166851c0b2f7Stbbdev class InputTuple,
166951c0b2f7Stbbdev class InputType,
167051c0b2f7Stbbdev class InputBodyType,
167151c0b2f7Stbbdev class TestSplitType,
167251c0b2f7Stbbdev class SinkType0,
167351c0b2f7Stbbdev class SinkBodyType0,
167451c0b2f7Stbbdev class SinkType1,
167551c0b2f7Stbbdev class SinkBodyType1>
run_one_split_node_test(bool throwException,bool flog)167651c0b2f7Stbbdev void run_one_split_node_test(bool throwException, bool flog) {
167751c0b2f7Stbbdev
167851c0b2f7Stbbdev tbb::flow::graph g;
167951c0b2f7Stbbdev
168051c0b2f7Stbbdev std::atomic<int> input_count;
168151c0b2f7Stbbdev std::atomic<int> sink0_count;
168251c0b2f7Stbbdev std::atomic<int> sink1_count;
168351c0b2f7Stbbdev input_count = sink0_count = sink1_count = 0;
168451c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
168551c0b2f7Stbbdev eh_test_observer o;
168651c0b2f7Stbbdev o.observe(true);
168751c0b2f7Stbbdev #endif
168851c0b2f7Stbbdev
168951c0b2f7Stbbdev g_Master = std::this_thread::get_id();
169051c0b2f7Stbbdev InputType input(g, InputBodyType(input_count));
169151c0b2f7Stbbdev TestSplitType node_to_test(g);
169251c0b2f7Stbbdev SinkType0 sink0(g,tbb::flow::unlimited,SinkBodyType0(sink0_count));
169351c0b2f7Stbbdev SinkType1 sink1(g,tbb::flow::unlimited,SinkBodyType1(sink1_count));
169451c0b2f7Stbbdev make_edge(input, node_to_test);
169551c0b2f7Stbbdev make_edge(tbb::flow::output_port<0>(node_to_test), sink0);
169651c0b2f7Stbbdev make_edge(tbb::flow::output_port<1>(node_to_test), sink1);
169751c0b2f7Stbbdev
169851c0b2f7Stbbdev for(int iter = 0; iter < 2; ++iter) { // run, reset, run again
169951c0b2f7Stbbdev ResetGlobals(throwException,flog);
170051c0b2f7Stbbdev if(throwException) {
170151c0b2f7Stbbdev TRY();
170251c0b2f7Stbbdev input.activate();
170351c0b2f7Stbbdev g.wait_for_all();
170451c0b2f7Stbbdev CATCH_AND_ASSERT();
170551c0b2f7Stbbdev }
170651c0b2f7Stbbdev else {
170751c0b2f7Stbbdev TRY();
170851c0b2f7Stbbdev input.activate();
170951c0b2f7Stbbdev g.wait_for_all();
171051c0b2f7Stbbdev CATCH_AND_FAIL();
171151c0b2f7Stbbdev }
171251c0b2f7Stbbdev bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
171351c0b2f7Stbbdev int ib_cnt = tbb::flow::copy_body<InputBodyType>(input).count_value();
171451c0b2f7Stbbdev int nb0_cnt = tbb::flow::copy_body<SinkBodyType0>(sink0).count_value();
171551c0b2f7Stbbdev int nb1_cnt = tbb::flow::copy_body<SinkBodyType1>(sink1).count_value();
171651c0b2f7Stbbdev if(throwException) {
171751c0b2f7Stbbdev CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
171851c0b2f7Stbbdev CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
171951c0b2f7Stbbdev CHECK_MESSAGE( (ib_cnt <= 2*g_NumItems), "Too many items sent by input");
172051c0b2f7Stbbdev CHECK_MESSAGE( (nb0_cnt + nb1_cnt <= ib_cnt*2), "Too many items received by sink nodes");
172151c0b2f7Stbbdev }
172251c0b2f7Stbbdev else {
172351c0b2f7Stbbdev CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
172451c0b2f7Stbbdev CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
172551c0b2f7Stbbdev CHECK_MESSAGE( (ib_cnt == g_NumItems), "Missing invocations of input_nodes");
172651c0b2f7Stbbdev CHECK_MESSAGE( (nb0_cnt == g_NumItems && nb1_cnt == g_NumItems), "Missing items in absorbers");
172751c0b2f7Stbbdev }
172851c0b2f7Stbbdev g.reset(); // resets the body of the input_nodes and the absorb_nodes.
172951c0b2f7Stbbdev input_count = sink0_count = sink1_count = 0;
173051c0b2f7Stbbdev CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType>(input).count_value()),"Reset input failed");
173151c0b2f7Stbbdev CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType0>(sink0).count_value()),"Reset sink 0 failed");
173251c0b2f7Stbbdev CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType1>(sink1).count_value()),"Reset sink 1 failed");
173351c0b2f7Stbbdev }
173451c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
173551c0b2f7Stbbdev o.observe(false);
173651c0b2f7Stbbdev #endif
173751c0b2f7Stbbdev }
173851c0b2f7Stbbdev
173951c0b2f7Stbbdev template<class InputTuple,
174051c0b2f7Stbbdev TestNodeTypeEnum InputThrowType,
174151c0b2f7Stbbdev TestNodeTypeEnum SinkThrowType>
run_split_node_test()174251c0b2f7Stbbdev void run_split_node_test() {
174351c0b2f7Stbbdev typedef typename std::tuple_element<0,InputTuple>::type ItemType0;
174451c0b2f7Stbbdev typedef typename std::tuple_element<1,InputTuple>::type ItemType1;
174551c0b2f7Stbbdev typedef tuple_test_input_body<InputTuple,InputThrowType> InputBodyType;
174651c0b2f7Stbbdev typedef absorber_body<ItemType0,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType0;
174751c0b2f7Stbbdev typedef absorber_body<ItemType1,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType1;
174851c0b2f7Stbbdev
174951c0b2f7Stbbdev typedef typename tbb::flow::input_node<InputTuple> InputType;
175051c0b2f7Stbbdev typedef typename tbb::flow::split_node<InputTuple> TestSplitType;
175151c0b2f7Stbbdev typedef typename tbb::flow::function_node<ItemType0,tbb::flow::continue_msg> SinkType0;
175251c0b2f7Stbbdev typedef typename tbb::flow::function_node<ItemType1,tbb::flow::continue_msg> SinkType1;
175351c0b2f7Stbbdev
175451c0b2f7Stbbdev for(int i = 0; i < 4; ++i) {
175551c0b2f7Stbbdev if(2 == i) continue;
175651c0b2f7Stbbdev bool throwException = (i & 0x1) != 0;
175751c0b2f7Stbbdev bool doFlog = (i & 0x2) != 0;
175851c0b2f7Stbbdev run_one_split_node_test<
175951c0b2f7Stbbdev InputTuple,
176051c0b2f7Stbbdev InputType,
176151c0b2f7Stbbdev InputBodyType,
176251c0b2f7Stbbdev TestSplitType,
176351c0b2f7Stbbdev SinkType0,
176451c0b2f7Stbbdev SinkBodyType0,
176551c0b2f7Stbbdev SinkType1,
176651c0b2f7Stbbdev SinkBodyType1>
176751c0b2f7Stbbdev (throwException,doFlog);
176851c0b2f7Stbbdev }
176951c0b2f7Stbbdev }
177051c0b2f7Stbbdev
test_split_node()177151c0b2f7Stbbdev void test_split_node() {
177251c0b2f7Stbbdev INFO("Testing split_node\n");
177351c0b2f7Stbbdev g_Wakeup_Msg = "split_node(is,non): Missed wakeup or machine is overloaded?";
177451c0b2f7Stbbdev run_split_node_test<std::tuple<int,int>, isThrowing, nonThrowing>();
177551c0b2f7Stbbdev g_Wakeup_Msg = "split_node(non,is): Missed wakeup or machine is overloaded?";
177651c0b2f7Stbbdev run_split_node_test<std::tuple<int,int>, nonThrowing, isThrowing>();
177751c0b2f7Stbbdev g_Wakeup_Msg = "split_node(is,is): Missed wakeup or machine is overloaded?";
177851c0b2f7Stbbdev run_split_node_test<std::tuple<int,int>, isThrowing, isThrowing>();
177951c0b2f7Stbbdev g_Wakeup_Msg = g_Orig_Wakeup_Msg;
178051c0b2f7Stbbdev }
178151c0b2f7Stbbdev
178251c0b2f7Stbbdev // --------- indexer_node ----------------------
178351c0b2f7Stbbdev
178451c0b2f7Stbbdev template < class InputTuple,
178551c0b2f7Stbbdev class InputType0,
178651c0b2f7Stbbdev class InputBodyType0,
178751c0b2f7Stbbdev class InputType1,
178851c0b2f7Stbbdev class InputBodyType1,
178951c0b2f7Stbbdev class TestNodeType,
179051c0b2f7Stbbdev class SinkType,
179151c0b2f7Stbbdev class SinkBodyType>
run_one_indexer_node_test(bool throwException,bool flog)179251c0b2f7Stbbdev void run_one_indexer_node_test(bool throwException,bool flog) {
179351c0b2f7Stbbdev typedef typename std::tuple_element<0,InputTuple>::type ItemType0;
179451c0b2f7Stbbdev typedef typename std::tuple_element<1,InputTuple>::type ItemType1;
179551c0b2f7Stbbdev
179651c0b2f7Stbbdev tbb::flow::graph g;
179751c0b2f7Stbbdev
179851c0b2f7Stbbdev std::atomic<int> input0_count;
179951c0b2f7Stbbdev std::atomic<int> input1_count;
180051c0b2f7Stbbdev std::atomic<int> sink_count;
180151c0b2f7Stbbdev input0_count = input1_count = sink_count = 0;
180251c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
180351c0b2f7Stbbdev eh_test_observer o;
180451c0b2f7Stbbdev o.observe(true);
180551c0b2f7Stbbdev #endif
180651c0b2f7Stbbdev g_Master = std::this_thread::get_id();
180751c0b2f7Stbbdev InputType0 input0(g, InputBodyType0(input0_count));
180851c0b2f7Stbbdev InputType1 input1(g, InputBodyType1(input1_count));
180951c0b2f7Stbbdev TestNodeType node_to_test(g);
181051c0b2f7Stbbdev SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count));
181151c0b2f7Stbbdev make_edge(input0,tbb::flow::input_port<0>(node_to_test));
181251c0b2f7Stbbdev make_edge(input1,tbb::flow::input_port<1>(node_to_test));
181351c0b2f7Stbbdev make_edge(node_to_test, sink);
181451c0b2f7Stbbdev for(int iter = 0; iter < 2; ++iter) {
181551c0b2f7Stbbdev ResetGlobals(throwException,flog);
181651c0b2f7Stbbdev if(throwException) {
181751c0b2f7Stbbdev TRY();
181851c0b2f7Stbbdev input0.activate();
181951c0b2f7Stbbdev input1.activate();
182051c0b2f7Stbbdev g.wait_for_all();
182151c0b2f7Stbbdev CATCH_AND_ASSERT();
182251c0b2f7Stbbdev }
182351c0b2f7Stbbdev else {
182451c0b2f7Stbbdev TRY();
182551c0b2f7Stbbdev input0.activate();
182651c0b2f7Stbbdev input1.activate();
182751c0b2f7Stbbdev g.wait_for_all();
182851c0b2f7Stbbdev CATCH_AND_FAIL();
182951c0b2f7Stbbdev }
183051c0b2f7Stbbdev bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
183151c0b2f7Stbbdev int ib0_cnt = tbb::flow::copy_body<InputBodyType0>(input0).count_value();
183251c0b2f7Stbbdev int ib1_cnt = tbb::flow::copy_body<InputBodyType1>(input1).count_value();
183351c0b2f7Stbbdev int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
183451c0b2f7Stbbdev if(throwException) {
183551c0b2f7Stbbdev CHECK_MESSAGE( (g.exception_thrown() || okayNoExceptionsCaught), "Exception not caught by graph");
183651c0b2f7Stbbdev CHECK_MESSAGE( (g.is_cancelled() || okayNoExceptionsCaught), "Cancellation not signalled in graph");
183751c0b2f7Stbbdev CHECK_MESSAGE( (ib0_cnt <= g_NumItems && ib1_cnt <= g_NumItems), "Too many items sent by inputs");
183851c0b2f7Stbbdev CHECK_MESSAGE( (nb_cnt <= ib0_cnt + ib1_cnt), "Too many items received by sink nodes");
183951c0b2f7Stbbdev }
184051c0b2f7Stbbdev else {
184151c0b2f7Stbbdev CHECK_MESSAGE( (!g.exception_thrown()), "Exception flag in flow::graph set but no throw occurred");
184251c0b2f7Stbbdev CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag set but no throw occurred");
184351c0b2f7Stbbdev CHECK_MESSAGE( (ib0_cnt == g_NumItems), "Missing invocations of input_node0");
184451c0b2f7Stbbdev CHECK_MESSAGE( (ib1_cnt == g_NumItems), "Missing invocations of input_node1");
184551c0b2f7Stbbdev CHECK_MESSAGE( (nb_cnt == 2*g_NumItems), "Missing items in absorbers");
184651c0b2f7Stbbdev }
184751c0b2f7Stbbdev if(iter == 0) {
184851c0b2f7Stbbdev remove_edge(node_to_test, sink);
184951c0b2f7Stbbdev tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 4));
185051c0b2f7Stbbdev tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2));
185151c0b2f7Stbbdev g.wait_for_all();
185251c0b2f7Stbbdev g.reset();
185351c0b2f7Stbbdev input0_count = input1_count = sink_count = 0;
185451c0b2f7Stbbdev make_edge(node_to_test, sink);
185551c0b2f7Stbbdev g.wait_for_all();
185651c0b2f7Stbbdev }
185751c0b2f7Stbbdev else {
185851c0b2f7Stbbdev g.wait_for_all();
185951c0b2f7Stbbdev g.reset();
186051c0b2f7Stbbdev input0_count = input1_count = sink_count = 0;
186151c0b2f7Stbbdev }
186251c0b2f7Stbbdev CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType0>(input0).count_value()),"Reset input failed");
186351c0b2f7Stbbdev CHECK_MESSAGE( (0 == tbb::flow::copy_body<InputBodyType1>(input1).count_value()),"Reset input failed");
186451c0b2f7Stbbdev nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
186551c0b2f7Stbbdev CHECK_MESSAGE( (0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value()),"Reset sink failed");
186651c0b2f7Stbbdev }
186751c0b2f7Stbbdev
186851c0b2f7Stbbdev #if USE_TASK_SCHEDULER_OBSERVER
186951c0b2f7Stbbdev o.observe(false);
187051c0b2f7Stbbdev #endif
187151c0b2f7Stbbdev }
187251c0b2f7Stbbdev
187351c0b2f7Stbbdev template<class InputTuple,
187451c0b2f7Stbbdev TestNodeTypeEnum InputThrowType,
187551c0b2f7Stbbdev TestNodeTypeEnum SinkThrowType>
run_indexer_node_test()187651c0b2f7Stbbdev void run_indexer_node_test() {
187751c0b2f7Stbbdev typedef typename std::tuple_element<0,InputTuple>::type ItemType0;
187851c0b2f7Stbbdev typedef typename std::tuple_element<1,InputTuple>::type ItemType1;
187951c0b2f7Stbbdev typedef test_input_body<ItemType0,InputThrowType> InputBodyType0;
188051c0b2f7Stbbdev typedef test_input_body<ItemType1,InputThrowType> InputBodyType1;
188151c0b2f7Stbbdev typedef typename tbb::flow::indexer_node<ItemType0, ItemType1> TestNodeType;
188251c0b2f7Stbbdev typedef absorber_body<typename TestNodeType::output_type,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
188351c0b2f7Stbbdev
188451c0b2f7Stbbdev typedef typename tbb::flow::input_node<ItemType0> InputType0;
188551c0b2f7Stbbdev typedef typename tbb::flow::input_node<ItemType1> InputType1;
188651c0b2f7Stbbdev typedef typename tbb::flow::function_node<typename TestNodeType::output_type,tbb::flow::continue_msg> SinkType;
188751c0b2f7Stbbdev
188851c0b2f7Stbbdev for(int i = 0; i < 4; ++i) {
188951c0b2f7Stbbdev if(2 == i) continue;
189051c0b2f7Stbbdev bool throwException = (i & 0x1) != 0;
189151c0b2f7Stbbdev bool doFlog = (i & 0x2) != 0;
189251c0b2f7Stbbdev run_one_indexer_node_test<
189351c0b2f7Stbbdev InputTuple,
189451c0b2f7Stbbdev InputType0,
189551c0b2f7Stbbdev InputBodyType0,
189651c0b2f7Stbbdev InputType1,
189751c0b2f7Stbbdev InputBodyType1,
189851c0b2f7Stbbdev TestNodeType,
189951c0b2f7Stbbdev SinkType,
190051c0b2f7Stbbdev SinkBodyType>(throwException,doFlog);
190151c0b2f7Stbbdev }
190251c0b2f7Stbbdev }
190351c0b2f7Stbbdev
test_indexer_node()190451c0b2f7Stbbdev void test_indexer_node() {
190551c0b2f7Stbbdev INFO("Testing indexer_node\n");
190651c0b2f7Stbbdev g_Wakeup_Msg = "indexer_node(is,non): Missed wakeup or machine is overloaded?";
190751c0b2f7Stbbdev run_indexer_node_test<std::tuple<int,int>, isThrowing, nonThrowing>();
190851c0b2f7Stbbdev g_Wakeup_Msg = "indexer_node(non,is): Missed wakeup or machine is overloaded?";
190951c0b2f7Stbbdev run_indexer_node_test<std::tuple<int,int>, nonThrowing, isThrowing>();
191051c0b2f7Stbbdev g_Wakeup_Msg = "indexer_node(is,is): Missed wakeup or machine is overloaded?";
191151c0b2f7Stbbdev run_indexer_node_test<std::tuple<int,int>, isThrowing, isThrowing>();
19125e91b2c0SVladislav Shchapov g_Wakeup_Msg = g_Orig_Wakeup_Msg;
191351c0b2f7Stbbdev }
191451c0b2f7Stbbdev
191551c0b2f7Stbbdev ///////////////////////////////////////////////
191651c0b2f7Stbbdev // whole-graph exception test
191751c0b2f7Stbbdev
191851c0b2f7Stbbdev class Foo {
191951c0b2f7Stbbdev private:
192051c0b2f7Stbbdev // std::vector<int>& m_vec;
192151c0b2f7Stbbdev std::vector<int>* m_vec;
192251c0b2f7Stbbdev public:
Foo(std::vector<int> & vec)192351c0b2f7Stbbdev Foo(std::vector<int>& vec) : m_vec(&vec) { }
operator ()(tbb::flow::continue_msg) const192451c0b2f7Stbbdev void operator() (tbb::flow::continue_msg) const {
192551c0b2f7Stbbdev ++nExceptions;
192651c0b2f7Stbbdev (void)m_vec->at(m_vec->size()); // Will throw out_of_range exception
192751c0b2f7Stbbdev CHECK_MESSAGE( (false), "Exception not thrown by invalid access");
192851c0b2f7Stbbdev }
192951c0b2f7Stbbdev };
193051c0b2f7Stbbdev
1931*89b2e0e3SOlga Malysheva // test from user ahelwer: https://community.intel.com/t5/Intel-oneAPI-Threading-Building/Exception-in-flow-graph-results-in-graph-wait-for-all-hanging/td-p/789352
193251c0b2f7Stbbdev // exception thrown in graph node, not caught in wait_for_all()
193351c0b2f7Stbbdev void
test_flow_graph_exception0()193451c0b2f7Stbbdev test_flow_graph_exception0() {
193551c0b2f7Stbbdev // Initializes body
193651c0b2f7Stbbdev std::vector<int> vec;
193751c0b2f7Stbbdev vec.push_back(0);
193851c0b2f7Stbbdev Foo f(vec);
193951c0b2f7Stbbdev nExceptions = 0;
194051c0b2f7Stbbdev
194151c0b2f7Stbbdev // Construct graph and nodes
194251c0b2f7Stbbdev tbb::flow::graph g;
194351c0b2f7Stbbdev tbb::flow::broadcast_node<tbb::flow::continue_msg> start(g);
194451c0b2f7Stbbdev tbb::flow::continue_node<tbb::flow::continue_msg> fooNode(g, f);
194551c0b2f7Stbbdev
194651c0b2f7Stbbdev // Construct edge
194751c0b2f7Stbbdev tbb::flow::make_edge(start, fooNode);
194851c0b2f7Stbbdev
194951c0b2f7Stbbdev // Execute graph
195051c0b2f7Stbbdev CHECK_MESSAGE( (!g.exception_thrown()), "exception_thrown flag already set");
195151c0b2f7Stbbdev CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag already set");
195251c0b2f7Stbbdev try {
195351c0b2f7Stbbdev start.try_put(tbb::flow::continue_msg());
195451c0b2f7Stbbdev g.wait_for_all();
195551c0b2f7Stbbdev CHECK_MESSAGE( (false), "Exception not thrown");
195651c0b2f7Stbbdev }
195751c0b2f7Stbbdev catch(std::out_of_range& ex) {
195851c0b2f7Stbbdev INFO("Exception: " << ex.what() << "(expected)\n");
195951c0b2f7Stbbdev }
196051c0b2f7Stbbdev catch(...) {
196151c0b2f7Stbbdev INFO("Unknown exception caught (expected)\n");
196251c0b2f7Stbbdev }
196351c0b2f7Stbbdev CHECK_MESSAGE( (nExceptions > 0), "Exception caught, but no body signaled exception being thrown");
196451c0b2f7Stbbdev nExceptions = 0;
196551c0b2f7Stbbdev CHECK_MESSAGE( (g.exception_thrown()), "Exception not intercepted");
196651c0b2f7Stbbdev // if exception set, cancellation also set.
196751c0b2f7Stbbdev CHECK_MESSAGE( (g.is_cancelled()), "Exception cancellation not signaled");
196851c0b2f7Stbbdev // in case we got an exception
196951c0b2f7Stbbdev try {
197051c0b2f7Stbbdev g.wait_for_all(); // context still signalled canceled, my_exception still set.
197151c0b2f7Stbbdev }
197251c0b2f7Stbbdev catch(...) {
197351c0b2f7Stbbdev CHECK_MESSAGE( (false), "Second exception thrown but no task executing");
197451c0b2f7Stbbdev }
197551c0b2f7Stbbdev CHECK_MESSAGE( (nExceptions == 0), "body signaled exception being thrown, but no body executed");
197651c0b2f7Stbbdev CHECK_MESSAGE( (!g.exception_thrown()), "exception_thrown flag not reset");
197751c0b2f7Stbbdev CHECK_MESSAGE( (!g.is_cancelled()), "canceled flag not reset");
197851c0b2f7Stbbdev }
197951c0b2f7Stbbdev
TestOneThreadNum(int nThread)198051c0b2f7Stbbdev void TestOneThreadNum(int nThread) {
198151c0b2f7Stbbdev INFO("Testing " << nThread << "%d threads\n");
198251c0b2f7Stbbdev g_NumItems = ((nThread > NUM_ITEMS) ? nThread *2 : NUM_ITEMS);
198351c0b2f7Stbbdev g_NumThreads = nThread;
198451c0b2f7Stbbdev tbb::task_arena arena(nThread);
198551c0b2f7Stbbdev arena.execute(
198651c0b2f7Stbbdev [&]() {
198751c0b2f7Stbbdev // whole-graph exception catch and rethrow test
198851c0b2f7Stbbdev test_flow_graph_exception0();
198951c0b2f7Stbbdev for(int i = 0; i < 4; ++i) {
199051c0b2f7Stbbdev g_ExceptionInMaster = (i & 1) != 0;
199151c0b2f7Stbbdev g_SolitaryException = (i & 2) != 0;
199251c0b2f7Stbbdev INFO("g_ExceptionInMaster == " << (g_ExceptionInMaster ? "T":"F")
199351c0b2f7Stbbdev << ", g_SolitaryException == " << (g_SolitaryException ? "T":"F") << "\n");
199451c0b2f7Stbbdev test_input_node();
199551c0b2f7Stbbdev test_function_node();
199651c0b2f7Stbbdev test_continue_node(); // also test broadcast_node
199751c0b2f7Stbbdev test_multifunction_node();
199851c0b2f7Stbbdev // single- and multi-item buffering nodes
199951c0b2f7Stbbdev test_buffer_queue_and_overwrite_node();
200051c0b2f7Stbbdev test_sequencer_node();
200151c0b2f7Stbbdev test_priority_queue_node();
200251c0b2f7Stbbdev
200351c0b2f7Stbbdev // join_nodes
200451c0b2f7Stbbdev test_join_node<tbb::flow::queueing>();
200551c0b2f7Stbbdev test_join_node<tbb::flow::reserving>();
200651c0b2f7Stbbdev test_join_node<tbb::flow::tag_matching>();
200751c0b2f7Stbbdev
200851c0b2f7Stbbdev test_limiter_node();
200951c0b2f7Stbbdev test_split_node();
201051c0b2f7Stbbdev // graph for write_once_node will be complicated by the fact the node will
201151c0b2f7Stbbdev // not do try_puts after it has been set. To get parallelism of N we have
201251c0b2f7Stbbdev // to attach N successor nodes to the write_once (or play some similar game).
201351c0b2f7Stbbdev // test_write_once_node();
201451c0b2f7Stbbdev test_indexer_node();
201551c0b2f7Stbbdev }
201651c0b2f7Stbbdev }
201751c0b2f7Stbbdev );
201851c0b2f7Stbbdev }
201951c0b2f7Stbbdev
202051c0b2f7Stbbdev //! Test exceptions with parallelism
202151c0b2f7Stbbdev //! \brief \ref error_guessing
202251c0b2f7Stbbdev TEST_CASE("Testing several threads"){
202351c0b2f7Stbbdev // reverse order of tests
202451c0b2f7Stbbdev for(unsigned int nThread=utils::MaxThread; nThread >= utils::MinThread; --nThread) {
2025552f342bSPavel tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, nThread);
202651c0b2f7Stbbdev TestOneThreadNum(nThread);
202751c0b2f7Stbbdev }
202851c0b2f7Stbbdev }
202951c0b2f7Stbbdev
203051c0b2f7Stbbdev #endif // TBB_USE_EXCEPTIONS
2031