151c0b2f7Stbbdev /*
2*274f68e5SIlya Isaev     Copyright (c) 2020-2022 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
17b15aabb3Stbbdev #if __INTEL_COMPILER && _MSC_VER
18b15aabb3Stbbdev #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19b15aabb3Stbbdev #endif
2051c0b2f7Stbbdev 
2151c0b2f7Stbbdev #include "conformance_flowgraph.h"
2251c0b2f7Stbbdev 
2351c0b2f7Stbbdev //! \file conformance_join_node.cpp
2451c0b2f7Stbbdev //! \brief Test for [flow_graph.join_node] specification
2551c0b2f7Stbbdev 
26de0109beSIlya Mishin using input_msg = conformance::message</*default_ctor*/true, /*copy_ctor*/true, /*copy_assign*/true>;
27de0109beSIlya Mishin using my_input_tuple = std::tuple<int, float, input_msg>;
2851c0b2f7Stbbdev 
29de0109beSIlya Mishin std::vector<my_input_tuple> get_values( conformance::test_push_receiver<my_input_tuple>& rr ) {
30de0109beSIlya Mishin     std::vector<my_input_tuple> messages;
31*274f68e5SIlya Isaev     my_input_tuple tmp(0, 0.f, input_msg(0));
32*274f68e5SIlya Isaev     while(rr.try_get(tmp)) {
33de0109beSIlya Mishin         messages.push_back(tmp);
34de0109beSIlya Mishin     }
35de0109beSIlya Mishin     return messages;
3651c0b2f7Stbbdev }
3751c0b2f7Stbbdev 
38de0109beSIlya Mishin #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
39de0109beSIlya Mishin void test_deduction_guides() {
40de0109beSIlya Mishin     using namespace tbb::flow;
4151c0b2f7Stbbdev 
4251c0b2f7Stbbdev     graph g;
43de0109beSIlya Mishin     using tuple_type = std::tuple<int, int, int>;
44de0109beSIlya Mishin     broadcast_node<int> b1(g), b2(g), b3(g);
45de0109beSIlya Mishin     broadcast_node<tuple_type> b4(g);
46de0109beSIlya Mishin     join_node<tuple_type> j0(g);
4751c0b2f7Stbbdev 
48de0109beSIlya Mishin #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
49de0109beSIlya Mishin     join_node j1(follows(b1, b2, b3));
50de0109beSIlya Mishin     static_assert(std::is_same_v<decltype(j1), join_node<tuple_type>>);
51de0109beSIlya Mishin 
52de0109beSIlya Mishin     join_node j2(follows(b1, b2, b3), reserving());
53de0109beSIlya Mishin     static_assert(std::is_same_v<decltype(j2), join_node<tuple_type, reserving>>);
54de0109beSIlya Mishin 
55de0109beSIlya Mishin     join_node j3(precedes(b4));
56de0109beSIlya Mishin     static_assert(std::is_same_v<decltype(j3), join_node<tuple_type>>);
57de0109beSIlya Mishin 
58de0109beSIlya Mishin     join_node j4(precedes(b4), reserving());
59de0109beSIlya Mishin     static_assert(std::is_same_v<decltype(j4), join_node<tuple_type, reserving>>);
60de0109beSIlya Mishin #endif
61de0109beSIlya Mishin 
62de0109beSIlya Mishin     join_node j5(j0);
63de0109beSIlya Mishin     static_assert(std::is_same_v<decltype(j5), join_node<tuple_type>>);
6451c0b2f7Stbbdev }
6551c0b2f7Stbbdev 
66de0109beSIlya Mishin #endif
6751c0b2f7Stbbdev 
68de0109beSIlya Mishin //! The node that is constructed has a reference to the same graph object as src.
69de0109beSIlya Mishin //! The list of predecessors, messages in the input ports, and successors are not copied.
7051c0b2f7Stbbdev //! \brief \ref interface
7151c0b2f7Stbbdev TEST_CASE("join_node copy constructor"){
72de0109beSIlya Mishin     oneapi::tbb::flow::graph g;
73de0109beSIlya Mishin     oneapi::tbb::flow::continue_node<int> node0( g,
74de0109beSIlya Mishin                                 [](oneapi::tbb::flow::continue_msg) { return 1; } );
75de0109beSIlya Mishin 
76de0109beSIlya Mishin     oneapi::tbb::flow::join_node<std::tuple<int>> node1(g);
77de0109beSIlya Mishin     conformance::test_push_receiver<std::tuple<int>> node2(g);
78de0109beSIlya Mishin     conformance::test_push_receiver<std::tuple<int>> node3(g);
79de0109beSIlya Mishin 
80de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(node0, oneapi::tbb::flow::input_port<0>(node1));
81de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(node1, node2);
82de0109beSIlya Mishin     oneapi::tbb::flow::join_node<std::tuple<int>> node_copy(node1);
83de0109beSIlya Mishin 
84de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(node_copy, node3);
85de0109beSIlya Mishin 
86de0109beSIlya Mishin     oneapi::tbb::flow::input_port<0>(node_copy).try_put(1);
87de0109beSIlya Mishin     g.wait_for_all();
88de0109beSIlya Mishin 
89de0109beSIlya Mishin     auto values = conformance::get_values(node3);
90de0109beSIlya Mishin     CHECK_MESSAGE((conformance::get_values(node2).size() == 0 && values.size() == 1), "Copied node doesn`t copy successor");
91de0109beSIlya Mishin 
92de0109beSIlya Mishin     node0.try_put(oneapi::tbb::flow::continue_msg());
93de0109beSIlya Mishin     g.wait_for_all();
94de0109beSIlya Mishin 
95de0109beSIlya Mishin     CHECK_MESSAGE((conformance::get_values(node2).size() == 1 && conformance::get_values(node3).size() == 0), "Copied node doesn`t copy predecessor");
96de0109beSIlya Mishin 
97de0109beSIlya Mishin     oneapi::tbb::flow::remove_edge(node1, node2);
98de0109beSIlya Mishin     oneapi::tbb::flow::input_port<0>(node1).try_put(1);
99de0109beSIlya Mishin     g.wait_for_all();
100de0109beSIlya Mishin     oneapi::tbb::flow::join_node<std::tuple<int>> node_copy2(node1);
101de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(node_copy2, node3);
102de0109beSIlya Mishin     oneapi::tbb::flow::input_port<0>(node_copy2).try_put(2);
103de0109beSIlya Mishin     g.wait_for_all();
104de0109beSIlya Mishin     CHECK_MESSAGE((std::get<0>(conformance::get_values(node3)[0]) == 2), "Copied node doesn`t copy messages in the input ports");
10551c0b2f7Stbbdev }
10651c0b2f7Stbbdev 
10751c0b2f7Stbbdev //! Test inheritance relations
10851c0b2f7Stbbdev //! \brief \ref interface
10951c0b2f7Stbbdev TEST_CASE("join_node inheritance"){
110de0109beSIlya Mishin     CHECK_MESSAGE((std::is_base_of<oneapi::tbb::flow::graph_node,
111de0109beSIlya Mishin                    oneapi::tbb::flow::join_node<my_input_tuple>>::value),
112de0109beSIlya Mishin                    "join_node should be derived from graph_node");
113de0109beSIlya Mishin     CHECK_MESSAGE((std::is_base_of<oneapi::tbb::flow::sender<my_input_tuple>,
114de0109beSIlya Mishin                    oneapi::tbb::flow::join_node<my_input_tuple>>::value),
115de0109beSIlya Mishin                    "join_node should be derived from sender<input_tuple>");
11651c0b2f7Stbbdev }
11751c0b2f7Stbbdev 
118de0109beSIlya Mishin //! Test join_node<queueing> behavior and broadcast property
11951c0b2f7Stbbdev //! \brief \ref requirement
120de0109beSIlya Mishin TEST_CASE("join_node queueing policy and broadcast property") {
121de0109beSIlya Mishin     oneapi::tbb::flow::graph g;
122de0109beSIlya Mishin     oneapi::tbb::flow::function_node<int, int>
123de0109beSIlya Mishin         f1( g, oneapi::tbb::flow::unlimited, [](const int &i) { return i; } );
124de0109beSIlya Mishin     oneapi::tbb::flow::function_node<float, float>
125de0109beSIlya Mishin         f2( g, oneapi::tbb::flow::unlimited, [](const float &f) { return f; } );
126de0109beSIlya Mishin     oneapi::tbb::flow::continue_node<input_msg> c1( g,
127de0109beSIlya Mishin                             [](oneapi::tbb::flow::continue_msg) { return input_msg(1); } );
12851c0b2f7Stbbdev 
129de0109beSIlya Mishin     oneapi::tbb::flow::join_node<my_input_tuple, oneapi::tbb::flow::queueing> testing_node(g);
13051c0b2f7Stbbdev 
131de0109beSIlya Mishin     conformance::test_push_receiver<my_input_tuple> q_node(g);
132de0109beSIlya Mishin 
133de0109beSIlya Mishin     std::atomic<int> number{1};
134de0109beSIlya Mishin     oneapi::tbb::flow::function_node<my_input_tuple, my_input_tuple>
135de0109beSIlya Mishin         f3( g, oneapi::tbb::flow::unlimited,
136de0109beSIlya Mishin             [&]( const my_input_tuple &t ) {
137de0109beSIlya Mishin                 CHECK_MESSAGE((std::get<0>(t) == number), "Messages must be in first-in first-out order" );
138de0109beSIlya Mishin                 CHECK_MESSAGE((std::get<1>(t) == static_cast<float>(number) + 0.5f), "Messages must be in first-in first-out order" );
139de0109beSIlya Mishin                 CHECK_MESSAGE((std::get<2>(t) == 1), "Messages must be in first-in first-out order" );
140de0109beSIlya Mishin                 ++number;
141de0109beSIlya Mishin                 return t;
14251c0b2f7Stbbdev             } );
14351c0b2f7Stbbdev 
144de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(f1, oneapi::tbb::flow::input_port<0>(testing_node));
145de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(f2, oneapi::tbb::flow::input_port<1>(testing_node));
146de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(c1, oneapi::tbb::flow::input_port<2>(testing_node));
147de0109beSIlya Mishin     make_edge(testing_node, f3);
148de0109beSIlya Mishin     make_edge(f3, q_node);
14951c0b2f7Stbbdev 
150de0109beSIlya Mishin     f1.try_put(1);
151de0109beSIlya Mishin     g.wait_for_all();
152de0109beSIlya Mishin     CHECK_MESSAGE((get_values(q_node).size() == 0),
153de0109beSIlya Mishin         "join_node must broadcast when there is at least one message at each input port");
154de0109beSIlya Mishin     f1.try_put(2);
155de0109beSIlya Mishin     f2.try_put(1.5f);
156de0109beSIlya Mishin     g.wait_for_all();
157de0109beSIlya Mishin     CHECK_MESSAGE((get_values(q_node).size() == 0),
158de0109beSIlya Mishin         "join_node must broadcast when there is at least one message at each input port");
15951c0b2f7Stbbdev     f1.try_put(3);
160de0109beSIlya Mishin     f2.try_put(2.5f);
161de0109beSIlya Mishin     c1.try_put(oneapi::tbb::flow::continue_msg());
16251c0b2f7Stbbdev     g.wait_for_all();
163de0109beSIlya Mishin     CHECK_MESSAGE((get_values(q_node).size() == 1),
164de0109beSIlya Mishin         "join_node must broadcast when there is at least one message at each input port");
165de0109beSIlya Mishin     f2.try_put(3.5f);
166de0109beSIlya Mishin     c1.try_put(oneapi::tbb::flow::continue_msg());
167de0109beSIlya Mishin     g.wait_for_all();
168de0109beSIlya Mishin     CHECK_MESSAGE((get_values(q_node).size() == 1),
169de0109beSIlya Mishin         "If at least one successor accepts the tuple, the head of each input port’s queue is removed");
170de0109beSIlya Mishin     c1.try_put(oneapi::tbb::flow::continue_msg());
171de0109beSIlya Mishin     g.wait_for_all();
172de0109beSIlya Mishin     CHECK_MESSAGE((get_values(q_node).size() == 1),
173de0109beSIlya Mishin         "If at least one successor accepts the tuple, the head of each input port’s queue is removed");
174de0109beSIlya Mishin     c1.try_put(oneapi::tbb::flow::continue_msg());
175de0109beSIlya Mishin     g.wait_for_all();
176de0109beSIlya Mishin     CHECK_MESSAGE((get_values(q_node).size() == 0),
177de0109beSIlya Mishin         "join_node must broadcast when there is at least one message at each input port");
178de0109beSIlya Mishin 
179de0109beSIlya Mishin     oneapi::tbb::flow::remove_edge(testing_node, f3);
180de0109beSIlya Mishin 
181de0109beSIlya Mishin     f1.try_put(1);
182de0109beSIlya Mishin     f2.try_put(1);
183de0109beSIlya Mishin     c1.try_put(oneapi::tbb::flow::continue_msg());
184de0109beSIlya Mishin     g.wait_for_all();
185de0109beSIlya Mishin 
186de0109beSIlya Mishin     my_input_tuple tmp(0, 0.f, input_msg(0));
187de0109beSIlya Mishin     CHECK_MESSAGE((testing_node.try_get(tmp)), "If no one successor accepts the tuple the messages\
188de0109beSIlya Mishin         must remain in their respective input port queues");
189de0109beSIlya Mishin     CHECK_MESSAGE((tmp == my_input_tuple(1, 1.f, input_msg(1))), "If no one successor accepts the tuple\
190de0109beSIlya Mishin         the messages must remain in their respective input port queues");
19151c0b2f7Stbbdev }
19251c0b2f7Stbbdev 
193de0109beSIlya Mishin //! Test join_node<reserving> behavior
19451c0b2f7Stbbdev //! \brief \ref requirement
195de0109beSIlya Mishin TEST_CASE("join_node reserving policy") {
196de0109beSIlya Mishin     conformance::test_with_reserving_join_node_class<oneapi::tbb::flow::write_once_node<int>>();
19751c0b2f7Stbbdev }
19851c0b2f7Stbbdev 
199de0109beSIlya Mishin template<typename KeyType>
200de0109beSIlya Mishin struct MyHash{
201de0109beSIlya Mishin     std::size_t hash(const KeyType &k) const {
202de0109beSIlya Mishin         return k * 2000 + 3;
203de0109beSIlya Mishin     }
204de0109beSIlya Mishin 
205de0109beSIlya Mishin     bool equal(const KeyType &k1, const KeyType &k2) const{
206de0109beSIlya Mishin         return hash(k1) == hash(k2);
207de0109beSIlya Mishin     }
208de0109beSIlya Mishin };
209de0109beSIlya Mishin 
210de0109beSIlya Mishin //! Test join_node<key_matching> behavior
21151c0b2f7Stbbdev //! \brief \ref requirement
212de0109beSIlya Mishin TEST_CASE("join_node key_matching policy"){
213de0109beSIlya Mishin     oneapi::tbb::flow::graph g;
214de0109beSIlya Mishin     auto body1 = [](const oneapi::tbb::flow::continue_msg &) -> int { return 1; };
215de0109beSIlya Mishin     auto body2 = [](const float &val) -> int { return static_cast<int>(val); };
21651c0b2f7Stbbdev 
217de0109beSIlya Mishin     oneapi::tbb::flow::join_node<std::tuple<oneapi::tbb::flow::continue_msg, float>,
218de0109beSIlya Mishin         oneapi::tbb::flow::key_matching<int, MyHash<int>>> testing_node(g, body1, body2);
21951c0b2f7Stbbdev 
220de0109beSIlya Mishin     oneapi::tbb::flow::input_port<0>(testing_node).try_put(oneapi::tbb::flow::continue_msg());
221de0109beSIlya Mishin     oneapi::tbb::flow::input_port<1>(testing_node).try_put(1.3f);
22251c0b2f7Stbbdev 
22351c0b2f7Stbbdev     g.wait_for_all();
22451c0b2f7Stbbdev 
225de0109beSIlya Mishin     std::tuple<oneapi::tbb::flow::continue_msg, float> tmp;
226de0109beSIlya Mishin     CHECK_MESSAGE((testing_node.try_get(tmp)), "Mapped keys should match.\
227de0109beSIlya Mishin         If no successor accepts the tuple, it is must been saved and will be forwarded on a subsequent try_get");
228de0109beSIlya Mishin     CHECK_MESSAGE((!testing_node.try_get(tmp)), "Message should not exist after item is consumed");
229de0109beSIlya Mishin }
230de0109beSIlya Mishin 
231de0109beSIlya Mishin //! Test join_node<tag_matching> behavior
232de0109beSIlya Mishin //! \brief \ref requirement
233de0109beSIlya Mishin TEST_CASE("join_node tag_matching policy"){
234de0109beSIlya Mishin     oneapi::tbb::flow::graph g;
235de0109beSIlya Mishin     auto body1 = [](const oneapi::tbb::flow::continue_msg &) -> oneapi::tbb::flow::tag_value { return 1; };
236de0109beSIlya Mishin     auto body2 = [](const float &val) -> oneapi::tbb::flow::tag_value { return static_cast<oneapi::tbb::flow::tag_value>(val); };
237de0109beSIlya Mishin 
238de0109beSIlya Mishin     oneapi::tbb::flow::join_node<std::tuple<oneapi::tbb::flow::continue_msg, float>,
239de0109beSIlya Mishin         oneapi::tbb::flow::tag_matching> testing_node(g, body1, body2);
240de0109beSIlya Mishin 
241de0109beSIlya Mishin     oneapi::tbb::flow::input_port<0>(testing_node).try_put(oneapi::tbb::flow::continue_msg());
242de0109beSIlya Mishin     oneapi::tbb::flow::input_port<1>(testing_node).try_put(1.3f);
243de0109beSIlya Mishin 
244de0109beSIlya Mishin     g.wait_for_all();
245de0109beSIlya Mishin 
246de0109beSIlya Mishin     std::tuple<oneapi::tbb::flow::continue_msg, float> tmp;
247de0109beSIlya Mishin     CHECK_MESSAGE((testing_node.try_get(tmp) == true), "Mapped keys should match");
248de0109beSIlya Mishin }
249de0109beSIlya Mishin 
250de0109beSIlya Mishin #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
251de0109beSIlya Mishin //! Test deduction guides
252de0109beSIlya Mishin //! \brief \ref requirement
253de0109beSIlya Mishin TEST_CASE("Deduction guides test"){
254de0109beSIlya Mishin     test_deduction_guides();
255de0109beSIlya Mishin }
256de0109beSIlya Mishin #endif
257de0109beSIlya Mishin 
258de0109beSIlya Mishin //! Test join_node input_ports() returns a tuple of input ports.
259de0109beSIlya Mishin //! \brief \ref interface \ref requirement
260de0109beSIlya Mishin TEST_CASE("join_node output_ports") {
261de0109beSIlya Mishin     oneapi::tbb::flow::graph g;
262de0109beSIlya Mishin     oneapi::tbb::flow::join_node<std::tuple<int>> node(g);
263de0109beSIlya Mishin 
264de0109beSIlya Mishin     CHECK_MESSAGE((std::is_same<oneapi::tbb::flow::join_node<std::tuple<int>>::input_ports_type&,
265de0109beSIlya Mishin         decltype(node.input_ports())>::value), "join_node input_ports should returns a tuple of input ports");
26651c0b2f7Stbbdev }
267