151c0b2f7Stbbdev /*
2b15aabb3Stbbdev     Copyright (c) 2020-2021 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
17b15aabb3Stbbdev #if __INTEL_COMPILER && _MSC_VER
18b15aabb3Stbbdev #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19b15aabb3Stbbdev #endif
2051c0b2f7Stbbdev 
2151c0b2f7Stbbdev #include "conformance_flowgraph.h"
2251c0b2f7Stbbdev 
2351c0b2f7Stbbdev //! \file conformance_join_node.cpp
2451c0b2f7Stbbdev //! \brief Test for [flow_graph.join_node] specification
2551c0b2f7Stbbdev 
26*de0109beSIlya Mishin using input_msg = conformance::message</*default_ctor*/true, /*copy_ctor*/true, /*copy_assign*/true>;
27*de0109beSIlya Mishin using my_input_tuple = std::tuple<int, float, input_msg>;
2851c0b2f7Stbbdev 
29*de0109beSIlya Mishin std::vector<my_input_tuple> get_values( conformance::test_push_receiver<my_input_tuple>& rr ) {
30*de0109beSIlya Mishin     std::vector<my_input_tuple> messages;
31*de0109beSIlya Mishin     int val = 0;
32*de0109beSIlya Mishin     for(my_input_tuple tmp(0, 0.f, input_msg(0)); rr.try_get(tmp); ++val) {
33*de0109beSIlya Mishin         messages.push_back(tmp);
34*de0109beSIlya Mishin     }
35*de0109beSIlya Mishin     return messages;
3651c0b2f7Stbbdev }
3751c0b2f7Stbbdev 
38*de0109beSIlya Mishin #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
39*de0109beSIlya Mishin void test_deduction_guides() {
40*de0109beSIlya Mishin     using namespace tbb::flow;
4151c0b2f7Stbbdev 
4251c0b2f7Stbbdev     graph g;
43*de0109beSIlya Mishin     using tuple_type = std::tuple<int, int, int>;
44*de0109beSIlya Mishin     broadcast_node<int> b1(g), b2(g), b3(g);
45*de0109beSIlya Mishin     broadcast_node<tuple_type> b4(g);
46*de0109beSIlya Mishin     join_node<tuple_type> j0(g);
4751c0b2f7Stbbdev 
48*de0109beSIlya Mishin #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
49*de0109beSIlya Mishin     join_node j1(follows(b1, b2, b3));
50*de0109beSIlya Mishin     static_assert(std::is_same_v<decltype(j1), join_node<tuple_type>>);
51*de0109beSIlya Mishin 
52*de0109beSIlya Mishin     join_node j2(follows(b1, b2, b3), reserving());
53*de0109beSIlya Mishin     static_assert(std::is_same_v<decltype(j2), join_node<tuple_type, reserving>>);
54*de0109beSIlya Mishin 
55*de0109beSIlya Mishin     join_node j3(precedes(b4));
56*de0109beSIlya Mishin     static_assert(std::is_same_v<decltype(j3), join_node<tuple_type>>);
57*de0109beSIlya Mishin 
58*de0109beSIlya Mishin     join_node j4(precedes(b4), reserving());
59*de0109beSIlya Mishin     static_assert(std::is_same_v<decltype(j4), join_node<tuple_type, reserving>>);
60*de0109beSIlya Mishin #endif
61*de0109beSIlya Mishin 
62*de0109beSIlya Mishin     join_node j5(j0);
63*de0109beSIlya Mishin     static_assert(std::is_same_v<decltype(j5), join_node<tuple_type>>);
6451c0b2f7Stbbdev }
6551c0b2f7Stbbdev 
66*de0109beSIlya Mishin #endif
6751c0b2f7Stbbdev 
68*de0109beSIlya Mishin //! The node that is constructed has a reference to the same graph object as src.
69*de0109beSIlya Mishin //! The list of predecessors, messages in the input ports, and successors are not copied.
7051c0b2f7Stbbdev //! \brief \ref interface
7151c0b2f7Stbbdev TEST_CASE("join_node copy constructor"){
72*de0109beSIlya Mishin     oneapi::tbb::flow::graph g;
73*de0109beSIlya Mishin     oneapi::tbb::flow::continue_node<int> node0( g,
74*de0109beSIlya Mishin                                 [](oneapi::tbb::flow::continue_msg) { return 1; } );
75*de0109beSIlya Mishin 
76*de0109beSIlya Mishin     oneapi::tbb::flow::join_node<std::tuple<int>> node1(g);
77*de0109beSIlya Mishin     conformance::test_push_receiver<std::tuple<int>> node2(g);
78*de0109beSIlya Mishin     conformance::test_push_receiver<std::tuple<int>> node3(g);
79*de0109beSIlya Mishin 
80*de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(node0, oneapi::tbb::flow::input_port<0>(node1));
81*de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(node1, node2);
82*de0109beSIlya Mishin     oneapi::tbb::flow::join_node<std::tuple<int>> node_copy(node1);
83*de0109beSIlya Mishin 
84*de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(node_copy, node3);
85*de0109beSIlya Mishin 
86*de0109beSIlya Mishin     oneapi::tbb::flow::input_port<0>(node_copy).try_put(1);
87*de0109beSIlya Mishin     g.wait_for_all();
88*de0109beSIlya Mishin 
89*de0109beSIlya Mishin     auto values = conformance::get_values(node3);
90*de0109beSIlya Mishin     CHECK_MESSAGE((conformance::get_values(node2).size() == 0 && values.size() == 1), "Copied node doesn`t copy successor");
91*de0109beSIlya Mishin 
92*de0109beSIlya Mishin     node0.try_put(oneapi::tbb::flow::continue_msg());
93*de0109beSIlya Mishin     g.wait_for_all();
94*de0109beSIlya Mishin 
95*de0109beSIlya Mishin     CHECK_MESSAGE((conformance::get_values(node2).size() == 1 && conformance::get_values(node3).size() == 0), "Copied node doesn`t copy predecessor");
96*de0109beSIlya Mishin 
97*de0109beSIlya Mishin     oneapi::tbb::flow::remove_edge(node1, node2);
98*de0109beSIlya Mishin     oneapi::tbb::flow::input_port<0>(node1).try_put(1);
99*de0109beSIlya Mishin     g.wait_for_all();
100*de0109beSIlya Mishin     oneapi::tbb::flow::join_node<std::tuple<int>> node_copy2(node1);
101*de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(node_copy2, node3);
102*de0109beSIlya Mishin     oneapi::tbb::flow::input_port<0>(node_copy2).try_put(2);
103*de0109beSIlya Mishin     g.wait_for_all();
104*de0109beSIlya Mishin     CHECK_MESSAGE((std::get<0>(conformance::get_values(node3)[0]) == 2), "Copied node doesn`t copy messages in the input ports");
10551c0b2f7Stbbdev }
10651c0b2f7Stbbdev 
10751c0b2f7Stbbdev //! Test inheritance relations
10851c0b2f7Stbbdev //! \brief \ref interface
10951c0b2f7Stbbdev TEST_CASE("join_node inheritance"){
110*de0109beSIlya Mishin     CHECK_MESSAGE((std::is_base_of<oneapi::tbb::flow::graph_node,
111*de0109beSIlya Mishin                    oneapi::tbb::flow::join_node<my_input_tuple>>::value),
112*de0109beSIlya Mishin                    "join_node should be derived from graph_node");
113*de0109beSIlya Mishin     CHECK_MESSAGE((std::is_base_of<oneapi::tbb::flow::sender<my_input_tuple>,
114*de0109beSIlya Mishin                    oneapi::tbb::flow::join_node<my_input_tuple>>::value),
115*de0109beSIlya Mishin                    "join_node should be derived from sender<input_tuple>");
11651c0b2f7Stbbdev }
11751c0b2f7Stbbdev 
118*de0109beSIlya Mishin //! Test join_node<queueing> behavior and broadcast property
11951c0b2f7Stbbdev //! \brief \ref requirement
120*de0109beSIlya Mishin TEST_CASE("join_node queueing policy and broadcast property") {
121*de0109beSIlya Mishin     oneapi::tbb::flow::graph g;
122*de0109beSIlya Mishin     oneapi::tbb::flow::function_node<int, int>
123*de0109beSIlya Mishin         f1( g, oneapi::tbb::flow::unlimited, [](const int &i) { return i; } );
124*de0109beSIlya Mishin     oneapi::tbb::flow::function_node<float, float>
125*de0109beSIlya Mishin         f2( g, oneapi::tbb::flow::unlimited, [](const float &f) { return f; } );
126*de0109beSIlya Mishin     oneapi::tbb::flow::continue_node<input_msg> c1( g,
127*de0109beSIlya Mishin                             [](oneapi::tbb::flow::continue_msg) { return input_msg(1); } );
12851c0b2f7Stbbdev 
129*de0109beSIlya Mishin     oneapi::tbb::flow::join_node<my_input_tuple, oneapi::tbb::flow::queueing> testing_node(g);
13051c0b2f7Stbbdev 
131*de0109beSIlya Mishin     conformance::test_push_receiver<my_input_tuple> q_node(g);
132*de0109beSIlya Mishin 
133*de0109beSIlya Mishin     std::atomic<int> number{1};
134*de0109beSIlya Mishin     oneapi::tbb::flow::function_node<my_input_tuple, my_input_tuple>
135*de0109beSIlya Mishin         f3( g, oneapi::tbb::flow::unlimited,
136*de0109beSIlya Mishin             [&]( const my_input_tuple &t ) {
137*de0109beSIlya Mishin                 CHECK_MESSAGE((std::get<0>(t) == number), "Messages must be in first-in first-out order" );
138*de0109beSIlya Mishin                 CHECK_MESSAGE((std::get<1>(t) == static_cast<float>(number) + 0.5f), "Messages must be in first-in first-out order" );
139*de0109beSIlya Mishin                 CHECK_MESSAGE((std::get<2>(t) == 1), "Messages must be in first-in first-out order" );
140*de0109beSIlya Mishin                 ++number;
141*de0109beSIlya Mishin                 return t;
14251c0b2f7Stbbdev             } );
14351c0b2f7Stbbdev 
144*de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(f1, oneapi::tbb::flow::input_port<0>(testing_node));
145*de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(f2, oneapi::tbb::flow::input_port<1>(testing_node));
146*de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(c1, oneapi::tbb::flow::input_port<2>(testing_node));
147*de0109beSIlya Mishin     make_edge(testing_node, f3);
148*de0109beSIlya Mishin     make_edge(f3, q_node);
14951c0b2f7Stbbdev 
150*de0109beSIlya Mishin     f1.try_put(1);
151*de0109beSIlya Mishin     g.wait_for_all();
152*de0109beSIlya Mishin     CHECK_MESSAGE((get_values(q_node).size() == 0),
153*de0109beSIlya Mishin         "join_node must broadcast when there is at least one message at each input port");
154*de0109beSIlya Mishin     f1.try_put(2);
155*de0109beSIlya Mishin     f2.try_put(1.5f);
156*de0109beSIlya Mishin     g.wait_for_all();
157*de0109beSIlya Mishin     CHECK_MESSAGE((get_values(q_node).size() == 0),
158*de0109beSIlya Mishin         "join_node must broadcast when there is at least one message at each input port");
15951c0b2f7Stbbdev     f1.try_put(3);
160*de0109beSIlya Mishin     f2.try_put(2.5f);
161*de0109beSIlya Mishin     c1.try_put(oneapi::tbb::flow::continue_msg());
16251c0b2f7Stbbdev     g.wait_for_all();
163*de0109beSIlya Mishin     CHECK_MESSAGE((get_values(q_node).size() == 1),
164*de0109beSIlya Mishin         "join_node must broadcast when there is at least one message at each input port");
165*de0109beSIlya Mishin     f2.try_put(3.5f);
166*de0109beSIlya Mishin     c1.try_put(oneapi::tbb::flow::continue_msg());
167*de0109beSIlya Mishin     g.wait_for_all();
168*de0109beSIlya Mishin     CHECK_MESSAGE((get_values(q_node).size() == 1),
169*de0109beSIlya Mishin         "If at least one successor accepts the tuple, the head of each input port’s queue is removed");
170*de0109beSIlya Mishin     c1.try_put(oneapi::tbb::flow::continue_msg());
171*de0109beSIlya Mishin     g.wait_for_all();
172*de0109beSIlya Mishin     CHECK_MESSAGE((get_values(q_node).size() == 1),
173*de0109beSIlya Mishin         "If at least one successor accepts the tuple, the head of each input port’s queue is removed");
174*de0109beSIlya Mishin     c1.try_put(oneapi::tbb::flow::continue_msg());
175*de0109beSIlya Mishin     g.wait_for_all();
176*de0109beSIlya Mishin     CHECK_MESSAGE((get_values(q_node).size() == 0),
177*de0109beSIlya Mishin         "join_node must broadcast when there is at least one message at each input port");
178*de0109beSIlya Mishin 
179*de0109beSIlya Mishin     oneapi::tbb::flow::remove_edge(testing_node, f3);
180*de0109beSIlya Mishin 
181*de0109beSIlya Mishin     f1.try_put(1);
182*de0109beSIlya Mishin     f2.try_put(1);
183*de0109beSIlya Mishin     c1.try_put(oneapi::tbb::flow::continue_msg());
184*de0109beSIlya Mishin     g.wait_for_all();
185*de0109beSIlya Mishin 
186*de0109beSIlya Mishin     my_input_tuple tmp(0, 0.f, input_msg(0));
187*de0109beSIlya Mishin     CHECK_MESSAGE((testing_node.try_get(tmp)), "If no one successor accepts the tuple the messages\
188*de0109beSIlya Mishin         must remain in their respective input port queues");
189*de0109beSIlya Mishin     CHECK_MESSAGE((tmp == my_input_tuple(1, 1.f, input_msg(1))), "If no one successor accepts the tuple\
190*de0109beSIlya Mishin         the messages must remain in their respective input port queues");
19151c0b2f7Stbbdev }
19251c0b2f7Stbbdev 
193*de0109beSIlya Mishin //! Test join_node<reserving> behavior
19451c0b2f7Stbbdev //! \brief \ref requirement
195*de0109beSIlya Mishin TEST_CASE("join_node reserving policy") {
196*de0109beSIlya Mishin     conformance::test_with_reserving_join_node_class<oneapi::tbb::flow::write_once_node<int>>();
19751c0b2f7Stbbdev }
19851c0b2f7Stbbdev 
199*de0109beSIlya Mishin template<typename KeyType>
200*de0109beSIlya Mishin struct MyHash{
201*de0109beSIlya Mishin     std::size_t hash(const KeyType &k) const {
202*de0109beSIlya Mishin         return k * 2000 + 3;
203*de0109beSIlya Mishin     }
204*de0109beSIlya Mishin 
205*de0109beSIlya Mishin     bool equal(const KeyType &k1, const KeyType &k2) const{
206*de0109beSIlya Mishin         return hash(k1) == hash(k2);
207*de0109beSIlya Mishin     }
208*de0109beSIlya Mishin };
209*de0109beSIlya Mishin 
210*de0109beSIlya Mishin //! Test join_node<key_matching> behavior
21151c0b2f7Stbbdev //! \brief \ref requirement
212*de0109beSIlya Mishin TEST_CASE("join_node key_matching policy"){
213*de0109beSIlya Mishin     oneapi::tbb::flow::graph g;
214*de0109beSIlya Mishin     auto body1 = [](const oneapi::tbb::flow::continue_msg &) -> int { return 1; };
215*de0109beSIlya Mishin     auto body2 = [](const float &val) -> int { return static_cast<int>(val); };
21651c0b2f7Stbbdev 
217*de0109beSIlya Mishin     oneapi::tbb::flow::join_node<std::tuple<oneapi::tbb::flow::continue_msg, float>,
218*de0109beSIlya Mishin         oneapi::tbb::flow::key_matching<int, MyHash<int>>> testing_node(g, body1, body2);
21951c0b2f7Stbbdev 
220*de0109beSIlya Mishin     oneapi::tbb::flow::input_port<0>(testing_node).try_put(oneapi::tbb::flow::continue_msg());
221*de0109beSIlya Mishin     oneapi::tbb::flow::input_port<1>(testing_node).try_put(1.3f);
22251c0b2f7Stbbdev 
22351c0b2f7Stbbdev     g.wait_for_all();
22451c0b2f7Stbbdev 
225*de0109beSIlya Mishin     std::tuple<oneapi::tbb::flow::continue_msg, float> tmp;
226*de0109beSIlya Mishin     CHECK_MESSAGE((testing_node.try_get(tmp)), "Mapped keys should match.\
227*de0109beSIlya Mishin         If no successor accepts the tuple, it is must been saved and will be forwarded on a subsequent try_get");
228*de0109beSIlya Mishin     CHECK_MESSAGE((!testing_node.try_get(tmp)), "Message should not exist after item is consumed");
229*de0109beSIlya Mishin }
230*de0109beSIlya Mishin 
231*de0109beSIlya Mishin //! Test join_node<tag_matching> behavior
232*de0109beSIlya Mishin //! \brief \ref requirement
233*de0109beSIlya Mishin TEST_CASE("join_node tag_matching policy"){
234*de0109beSIlya Mishin     oneapi::tbb::flow::graph g;
235*de0109beSIlya Mishin     auto body1 = [](const oneapi::tbb::flow::continue_msg &) -> oneapi::tbb::flow::tag_value { return 1; };
236*de0109beSIlya Mishin     auto body2 = [](const float &val) -> oneapi::tbb::flow::tag_value { return static_cast<oneapi::tbb::flow::tag_value>(val); };
237*de0109beSIlya Mishin 
238*de0109beSIlya Mishin     oneapi::tbb::flow::join_node<std::tuple<oneapi::tbb::flow::continue_msg, float>,
239*de0109beSIlya Mishin         oneapi::tbb::flow::tag_matching> testing_node(g, body1, body2);
240*de0109beSIlya Mishin 
241*de0109beSIlya Mishin     oneapi::tbb::flow::input_port<0>(testing_node).try_put(oneapi::tbb::flow::continue_msg());
242*de0109beSIlya Mishin     oneapi::tbb::flow::input_port<1>(testing_node).try_put(1.3f);
243*de0109beSIlya Mishin 
244*de0109beSIlya Mishin     g.wait_for_all();
245*de0109beSIlya Mishin 
246*de0109beSIlya Mishin     std::tuple<oneapi::tbb::flow::continue_msg, float> tmp;
247*de0109beSIlya Mishin     CHECK_MESSAGE((testing_node.try_get(tmp) == true), "Mapped keys should match");
248*de0109beSIlya Mishin }
249*de0109beSIlya Mishin 
250*de0109beSIlya Mishin #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
251*de0109beSIlya Mishin //! Test deduction guides
252*de0109beSIlya Mishin //! \brief \ref requirement
253*de0109beSIlya Mishin TEST_CASE("Deduction guides test"){
254*de0109beSIlya Mishin     test_deduction_guides();
255*de0109beSIlya Mishin }
256*de0109beSIlya Mishin #endif
257*de0109beSIlya Mishin 
258*de0109beSIlya Mishin //! Test join_node input_ports() returns a tuple of input ports.
259*de0109beSIlya Mishin //! \brief \ref interface \ref requirement
260*de0109beSIlya Mishin TEST_CASE("join_node output_ports") {
261*de0109beSIlya Mishin     oneapi::tbb::flow::graph g;
262*de0109beSIlya Mishin     oneapi::tbb::flow::join_node<std::tuple<int>> node(g);
263*de0109beSIlya Mishin 
264*de0109beSIlya Mishin     CHECK_MESSAGE((std::is_same<oneapi::tbb::flow::join_node<std::tuple<int>>::input_ports_type&,
265*de0109beSIlya Mishin         decltype(node.input_ports())>::value), "join_node input_ports should returns a tuple of input ports");
26651c0b2f7Stbbdev }
267