151c0b2f7Stbbdev /* 2*274f68e5SIlya Isaev Copyright (c) 2020-2022 Intel Corporation 351c0b2f7Stbbdev 451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License"); 551c0b2f7Stbbdev you may not use this file except in compliance with the License. 651c0b2f7Stbbdev You may obtain a copy of the License at 751c0b2f7Stbbdev 851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0 951c0b2f7Stbbdev 1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software 1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS, 1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1351c0b2f7Stbbdev See the License for the specific language governing permissions and 1451c0b2f7Stbbdev limitations under the License. 1551c0b2f7Stbbdev */ 1651c0b2f7Stbbdev 17b15aabb3Stbbdev #if __INTEL_COMPILER && _MSC_VER 18b15aabb3Stbbdev #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated 19b15aabb3Stbbdev #endif 2051c0b2f7Stbbdev 2151c0b2f7Stbbdev #include "conformance_flowgraph.h" 2251c0b2f7Stbbdev 2351c0b2f7Stbbdev //! \file conformance_join_node.cpp 2451c0b2f7Stbbdev //! \brief Test for [flow_graph.join_node] specification 2551c0b2f7Stbbdev 26de0109beSIlya Mishin using input_msg = conformance::message</*default_ctor*/true, /*copy_ctor*/true, /*copy_assign*/true>; 27de0109beSIlya Mishin using my_input_tuple = std::tuple<int, float, input_msg>; 2851c0b2f7Stbbdev 29de0109beSIlya Mishin std::vector<my_input_tuple> get_values( conformance::test_push_receiver<my_input_tuple>& rr ) { 30de0109beSIlya Mishin std::vector<my_input_tuple> messages; 31*274f68e5SIlya Isaev my_input_tuple tmp(0, 0.f, input_msg(0)); 32*274f68e5SIlya Isaev while(rr.try_get(tmp)) { 33de0109beSIlya Mishin messages.push_back(tmp); 34de0109beSIlya Mishin } 35de0109beSIlya Mishin return messages; 3651c0b2f7Stbbdev } 3751c0b2f7Stbbdev 38de0109beSIlya Mishin #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 39de0109beSIlya Mishin void test_deduction_guides() { 40de0109beSIlya Mishin using namespace tbb::flow; 4151c0b2f7Stbbdev 4251c0b2f7Stbbdev graph g; 43de0109beSIlya Mishin using tuple_type = std::tuple<int, int, int>; 44de0109beSIlya Mishin broadcast_node<int> b1(g), b2(g), b3(g); 45de0109beSIlya Mishin broadcast_node<tuple_type> b4(g); 46de0109beSIlya Mishin join_node<tuple_type> j0(g); 4751c0b2f7Stbbdev 48de0109beSIlya Mishin #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 49de0109beSIlya Mishin join_node j1(follows(b1, b2, b3)); 50de0109beSIlya Mishin static_assert(std::is_same_v<decltype(j1), join_node<tuple_type>>); 51de0109beSIlya Mishin 52de0109beSIlya Mishin join_node j2(follows(b1, b2, b3), reserving()); 53de0109beSIlya Mishin static_assert(std::is_same_v<decltype(j2), join_node<tuple_type, reserving>>); 54de0109beSIlya Mishin 55de0109beSIlya Mishin join_node j3(precedes(b4)); 56de0109beSIlya Mishin static_assert(std::is_same_v<decltype(j3), join_node<tuple_type>>); 57de0109beSIlya Mishin 58de0109beSIlya Mishin join_node j4(precedes(b4), reserving()); 59de0109beSIlya Mishin static_assert(std::is_same_v<decltype(j4), join_node<tuple_type, reserving>>); 60de0109beSIlya Mishin #endif 61de0109beSIlya Mishin 62de0109beSIlya Mishin join_node j5(j0); 63de0109beSIlya Mishin static_assert(std::is_same_v<decltype(j5), join_node<tuple_type>>); 6451c0b2f7Stbbdev } 6551c0b2f7Stbbdev 66de0109beSIlya Mishin #endif 6751c0b2f7Stbbdev 68de0109beSIlya Mishin //! The node that is constructed has a reference to the same graph object as src. 69de0109beSIlya Mishin //! The list of predecessors, messages in the input ports, and successors are not copied. 7051c0b2f7Stbbdev //! \brief \ref interface 7151c0b2f7Stbbdev TEST_CASE("join_node copy constructor"){ 72de0109beSIlya Mishin oneapi::tbb::flow::graph g; 73de0109beSIlya Mishin oneapi::tbb::flow::continue_node<int> node0( g, 74de0109beSIlya Mishin [](oneapi::tbb::flow::continue_msg) { return 1; } ); 75de0109beSIlya Mishin 76de0109beSIlya Mishin oneapi::tbb::flow::join_node<std::tuple<int>> node1(g); 77de0109beSIlya Mishin conformance::test_push_receiver<std::tuple<int>> node2(g); 78de0109beSIlya Mishin conformance::test_push_receiver<std::tuple<int>> node3(g); 79de0109beSIlya Mishin 80de0109beSIlya Mishin oneapi::tbb::flow::make_edge(node0, oneapi::tbb::flow::input_port<0>(node1)); 81de0109beSIlya Mishin oneapi::tbb::flow::make_edge(node1, node2); 82de0109beSIlya Mishin oneapi::tbb::flow::join_node<std::tuple<int>> node_copy(node1); 83de0109beSIlya Mishin 84de0109beSIlya Mishin oneapi::tbb::flow::make_edge(node_copy, node3); 85de0109beSIlya Mishin 86de0109beSIlya Mishin oneapi::tbb::flow::input_port<0>(node_copy).try_put(1); 87de0109beSIlya Mishin g.wait_for_all(); 88de0109beSIlya Mishin 89de0109beSIlya Mishin auto values = conformance::get_values(node3); 90de0109beSIlya Mishin CHECK_MESSAGE((conformance::get_values(node2).size() == 0 && values.size() == 1), "Copied node doesn`t copy successor"); 91de0109beSIlya Mishin 92de0109beSIlya Mishin node0.try_put(oneapi::tbb::flow::continue_msg()); 93de0109beSIlya Mishin g.wait_for_all(); 94de0109beSIlya Mishin 95de0109beSIlya Mishin CHECK_MESSAGE((conformance::get_values(node2).size() == 1 && conformance::get_values(node3).size() == 0), "Copied node doesn`t copy predecessor"); 96de0109beSIlya Mishin 97de0109beSIlya Mishin oneapi::tbb::flow::remove_edge(node1, node2); 98de0109beSIlya Mishin oneapi::tbb::flow::input_port<0>(node1).try_put(1); 99de0109beSIlya Mishin g.wait_for_all(); 100de0109beSIlya Mishin oneapi::tbb::flow::join_node<std::tuple<int>> node_copy2(node1); 101de0109beSIlya Mishin oneapi::tbb::flow::make_edge(node_copy2, node3); 102de0109beSIlya Mishin oneapi::tbb::flow::input_port<0>(node_copy2).try_put(2); 103de0109beSIlya Mishin g.wait_for_all(); 104de0109beSIlya Mishin CHECK_MESSAGE((std::get<0>(conformance::get_values(node3)[0]) == 2), "Copied node doesn`t copy messages in the input ports"); 10551c0b2f7Stbbdev } 10651c0b2f7Stbbdev 10751c0b2f7Stbbdev //! Test inheritance relations 10851c0b2f7Stbbdev //! \brief \ref interface 10951c0b2f7Stbbdev TEST_CASE("join_node inheritance"){ 110de0109beSIlya Mishin CHECK_MESSAGE((std::is_base_of<oneapi::tbb::flow::graph_node, 111de0109beSIlya Mishin oneapi::tbb::flow::join_node<my_input_tuple>>::value), 112de0109beSIlya Mishin "join_node should be derived from graph_node"); 113de0109beSIlya Mishin CHECK_MESSAGE((std::is_base_of<oneapi::tbb::flow::sender<my_input_tuple>, 114de0109beSIlya Mishin oneapi::tbb::flow::join_node<my_input_tuple>>::value), 115de0109beSIlya Mishin "join_node should be derived from sender<input_tuple>"); 11651c0b2f7Stbbdev } 11751c0b2f7Stbbdev 118de0109beSIlya Mishin //! Test join_node<queueing> behavior and broadcast property 11951c0b2f7Stbbdev //! \brief \ref requirement 120de0109beSIlya Mishin TEST_CASE("join_node queueing policy and broadcast property") { 121de0109beSIlya Mishin oneapi::tbb::flow::graph g; 122de0109beSIlya Mishin oneapi::tbb::flow::function_node<int, int> 123de0109beSIlya Mishin f1( g, oneapi::tbb::flow::unlimited, [](const int &i) { return i; } ); 124de0109beSIlya Mishin oneapi::tbb::flow::function_node<float, float> 125de0109beSIlya Mishin f2( g, oneapi::tbb::flow::unlimited, [](const float &f) { return f; } ); 126de0109beSIlya Mishin oneapi::tbb::flow::continue_node<input_msg> c1( g, 127de0109beSIlya Mishin [](oneapi::tbb::flow::continue_msg) { return input_msg(1); } ); 12851c0b2f7Stbbdev 129de0109beSIlya Mishin oneapi::tbb::flow::join_node<my_input_tuple, oneapi::tbb::flow::queueing> testing_node(g); 13051c0b2f7Stbbdev 131de0109beSIlya Mishin conformance::test_push_receiver<my_input_tuple> q_node(g); 132de0109beSIlya Mishin 133de0109beSIlya Mishin std::atomic<int> number{1}; 134de0109beSIlya Mishin oneapi::tbb::flow::function_node<my_input_tuple, my_input_tuple> 135de0109beSIlya Mishin f3( g, oneapi::tbb::flow::unlimited, 136de0109beSIlya Mishin [&]( const my_input_tuple &t ) { 137de0109beSIlya Mishin CHECK_MESSAGE((std::get<0>(t) == number), "Messages must be in first-in first-out order" ); 138de0109beSIlya Mishin CHECK_MESSAGE((std::get<1>(t) == static_cast<float>(number) + 0.5f), "Messages must be in first-in first-out order" ); 139de0109beSIlya Mishin CHECK_MESSAGE((std::get<2>(t) == 1), "Messages must be in first-in first-out order" ); 140de0109beSIlya Mishin ++number; 141de0109beSIlya Mishin return t; 14251c0b2f7Stbbdev } ); 14351c0b2f7Stbbdev 144de0109beSIlya Mishin oneapi::tbb::flow::make_edge(f1, oneapi::tbb::flow::input_port<0>(testing_node)); 145de0109beSIlya Mishin oneapi::tbb::flow::make_edge(f2, oneapi::tbb::flow::input_port<1>(testing_node)); 146de0109beSIlya Mishin oneapi::tbb::flow::make_edge(c1, oneapi::tbb::flow::input_port<2>(testing_node)); 147de0109beSIlya Mishin make_edge(testing_node, f3); 148de0109beSIlya Mishin make_edge(f3, q_node); 14951c0b2f7Stbbdev 150de0109beSIlya Mishin f1.try_put(1); 151de0109beSIlya Mishin g.wait_for_all(); 152de0109beSIlya Mishin CHECK_MESSAGE((get_values(q_node).size() == 0), 153de0109beSIlya Mishin "join_node must broadcast when there is at least one message at each input port"); 154de0109beSIlya Mishin f1.try_put(2); 155de0109beSIlya Mishin f2.try_put(1.5f); 156de0109beSIlya Mishin g.wait_for_all(); 157de0109beSIlya Mishin CHECK_MESSAGE((get_values(q_node).size() == 0), 158de0109beSIlya Mishin "join_node must broadcast when there is at least one message at each input port"); 15951c0b2f7Stbbdev f1.try_put(3); 160de0109beSIlya Mishin f2.try_put(2.5f); 161de0109beSIlya Mishin c1.try_put(oneapi::tbb::flow::continue_msg()); 16251c0b2f7Stbbdev g.wait_for_all(); 163de0109beSIlya Mishin CHECK_MESSAGE((get_values(q_node).size() == 1), 164de0109beSIlya Mishin "join_node must broadcast when there is at least one message at each input port"); 165de0109beSIlya Mishin f2.try_put(3.5f); 166de0109beSIlya Mishin c1.try_put(oneapi::tbb::flow::continue_msg()); 167de0109beSIlya Mishin g.wait_for_all(); 168de0109beSIlya Mishin CHECK_MESSAGE((get_values(q_node).size() == 1), 169de0109beSIlya Mishin "If at least one successor accepts the tuple, the head of each input port’s queue is removed"); 170de0109beSIlya Mishin c1.try_put(oneapi::tbb::flow::continue_msg()); 171de0109beSIlya Mishin g.wait_for_all(); 172de0109beSIlya Mishin CHECK_MESSAGE((get_values(q_node).size() == 1), 173de0109beSIlya Mishin "If at least one successor accepts the tuple, the head of each input port’s queue is removed"); 174de0109beSIlya Mishin c1.try_put(oneapi::tbb::flow::continue_msg()); 175de0109beSIlya Mishin g.wait_for_all(); 176de0109beSIlya Mishin CHECK_MESSAGE((get_values(q_node).size() == 0), 177de0109beSIlya Mishin "join_node must broadcast when there is at least one message at each input port"); 178de0109beSIlya Mishin 179de0109beSIlya Mishin oneapi::tbb::flow::remove_edge(testing_node, f3); 180de0109beSIlya Mishin 181de0109beSIlya Mishin f1.try_put(1); 182de0109beSIlya Mishin f2.try_put(1); 183de0109beSIlya Mishin c1.try_put(oneapi::tbb::flow::continue_msg()); 184de0109beSIlya Mishin g.wait_for_all(); 185de0109beSIlya Mishin 186de0109beSIlya Mishin my_input_tuple tmp(0, 0.f, input_msg(0)); 187de0109beSIlya Mishin CHECK_MESSAGE((testing_node.try_get(tmp)), "If no one successor accepts the tuple the messages\ 188de0109beSIlya Mishin must remain in their respective input port queues"); 189de0109beSIlya Mishin CHECK_MESSAGE((tmp == my_input_tuple(1, 1.f, input_msg(1))), "If no one successor accepts the tuple\ 190de0109beSIlya Mishin the messages must remain in their respective input port queues"); 19151c0b2f7Stbbdev } 19251c0b2f7Stbbdev 193de0109beSIlya Mishin //! Test join_node<reserving> behavior 19451c0b2f7Stbbdev //! \brief \ref requirement 195de0109beSIlya Mishin TEST_CASE("join_node reserving policy") { 196de0109beSIlya Mishin conformance::test_with_reserving_join_node_class<oneapi::tbb::flow::write_once_node<int>>(); 19751c0b2f7Stbbdev } 19851c0b2f7Stbbdev 199de0109beSIlya Mishin template<typename KeyType> 200de0109beSIlya Mishin struct MyHash{ 201de0109beSIlya Mishin std::size_t hash(const KeyType &k) const { 202de0109beSIlya Mishin return k * 2000 + 3; 203de0109beSIlya Mishin } 204de0109beSIlya Mishin 205de0109beSIlya Mishin bool equal(const KeyType &k1, const KeyType &k2) const{ 206de0109beSIlya Mishin return hash(k1) == hash(k2); 207de0109beSIlya Mishin } 208de0109beSIlya Mishin }; 209de0109beSIlya Mishin 210de0109beSIlya Mishin //! Test join_node<key_matching> behavior 21151c0b2f7Stbbdev //! \brief \ref requirement 212de0109beSIlya Mishin TEST_CASE("join_node key_matching policy"){ 213de0109beSIlya Mishin oneapi::tbb::flow::graph g; 214de0109beSIlya Mishin auto body1 = [](const oneapi::tbb::flow::continue_msg &) -> int { return 1; }; 215de0109beSIlya Mishin auto body2 = [](const float &val) -> int { return static_cast<int>(val); }; 21651c0b2f7Stbbdev 217de0109beSIlya Mishin oneapi::tbb::flow::join_node<std::tuple<oneapi::tbb::flow::continue_msg, float>, 218de0109beSIlya Mishin oneapi::tbb::flow::key_matching<int, MyHash<int>>> testing_node(g, body1, body2); 21951c0b2f7Stbbdev 220de0109beSIlya Mishin oneapi::tbb::flow::input_port<0>(testing_node).try_put(oneapi::tbb::flow::continue_msg()); 221de0109beSIlya Mishin oneapi::tbb::flow::input_port<1>(testing_node).try_put(1.3f); 22251c0b2f7Stbbdev 22351c0b2f7Stbbdev g.wait_for_all(); 22451c0b2f7Stbbdev 225de0109beSIlya Mishin std::tuple<oneapi::tbb::flow::continue_msg, float> tmp; 226de0109beSIlya Mishin CHECK_MESSAGE((testing_node.try_get(tmp)), "Mapped keys should match.\ 227de0109beSIlya Mishin If no successor accepts the tuple, it is must been saved and will be forwarded on a subsequent try_get"); 228de0109beSIlya Mishin CHECK_MESSAGE((!testing_node.try_get(tmp)), "Message should not exist after item is consumed"); 229de0109beSIlya Mishin } 230de0109beSIlya Mishin 231de0109beSIlya Mishin //! Test join_node<tag_matching> behavior 232de0109beSIlya Mishin //! \brief \ref requirement 233de0109beSIlya Mishin TEST_CASE("join_node tag_matching policy"){ 234de0109beSIlya Mishin oneapi::tbb::flow::graph g; 235de0109beSIlya Mishin auto body1 = [](const oneapi::tbb::flow::continue_msg &) -> oneapi::tbb::flow::tag_value { return 1; }; 236de0109beSIlya Mishin auto body2 = [](const float &val) -> oneapi::tbb::flow::tag_value { return static_cast<oneapi::tbb::flow::tag_value>(val); }; 237de0109beSIlya Mishin 238de0109beSIlya Mishin oneapi::tbb::flow::join_node<std::tuple<oneapi::tbb::flow::continue_msg, float>, 239de0109beSIlya Mishin oneapi::tbb::flow::tag_matching> testing_node(g, body1, body2); 240de0109beSIlya Mishin 241de0109beSIlya Mishin oneapi::tbb::flow::input_port<0>(testing_node).try_put(oneapi::tbb::flow::continue_msg()); 242de0109beSIlya Mishin oneapi::tbb::flow::input_port<1>(testing_node).try_put(1.3f); 243de0109beSIlya Mishin 244de0109beSIlya Mishin g.wait_for_all(); 245de0109beSIlya Mishin 246de0109beSIlya Mishin std::tuple<oneapi::tbb::flow::continue_msg, float> tmp; 247de0109beSIlya Mishin CHECK_MESSAGE((testing_node.try_get(tmp) == true), "Mapped keys should match"); 248de0109beSIlya Mishin } 249de0109beSIlya Mishin 250de0109beSIlya Mishin #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 251de0109beSIlya Mishin //! Test deduction guides 252de0109beSIlya Mishin //! \brief \ref requirement 253de0109beSIlya Mishin TEST_CASE("Deduction guides test"){ 254de0109beSIlya Mishin test_deduction_guides(); 255de0109beSIlya Mishin } 256de0109beSIlya Mishin #endif 257de0109beSIlya Mishin 258de0109beSIlya Mishin //! Test join_node input_ports() returns a tuple of input ports. 259de0109beSIlya Mishin //! \brief \ref interface \ref requirement 260de0109beSIlya Mishin TEST_CASE("join_node output_ports") { 261de0109beSIlya Mishin oneapi::tbb::flow::graph g; 262de0109beSIlya Mishin oneapi::tbb::flow::join_node<std::tuple<int>> node(g); 263de0109beSIlya Mishin 264de0109beSIlya Mishin CHECK_MESSAGE((std::is_same<oneapi::tbb::flow::join_node<std::tuple<int>>::input_ports_type&, 265de0109beSIlya Mishin decltype(node.input_ports())>::value), "join_node input_ports should returns a tuple of input ports"); 26651c0b2f7Stbbdev } 267