151c0b2f7Stbbdev /* 2b15aabb3Stbbdev Copyright (c) 2020-2021 Intel Corporation 351c0b2f7Stbbdev 451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License"); 551c0b2f7Stbbdev you may not use this file except in compliance with the License. 651c0b2f7Stbbdev You may obtain a copy of the License at 751c0b2f7Stbbdev 851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0 951c0b2f7Stbbdev 1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software 1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS, 1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1351c0b2f7Stbbdev See the License for the specific language governing permissions and 1451c0b2f7Stbbdev limitations under the License. 1551c0b2f7Stbbdev */ 1651c0b2f7Stbbdev 17b15aabb3Stbbdev #if __INTEL_COMPILER && _MSC_VER 18b15aabb3Stbbdev #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated 19b15aabb3Stbbdev #endif 2051c0b2f7Stbbdev 2151c0b2f7Stbbdev #include "conformance_flowgraph.h" 2251c0b2f7Stbbdev 2351c0b2f7Stbbdev //! \file conformance_join_node.cpp 2451c0b2f7Stbbdev //! \brief Test for [flow_graph.join_node] specification 2551c0b2f7Stbbdev 26*de0109beSIlya Mishin using input_msg = conformance::message</*default_ctor*/true, /*copy_ctor*/true, /*copy_assign*/true>; 27*de0109beSIlya Mishin using my_input_tuple = std::tuple<int, float, input_msg>; 2851c0b2f7Stbbdev 29*de0109beSIlya Mishin std::vector<my_input_tuple> get_values( conformance::test_push_receiver<my_input_tuple>& rr ) { 30*de0109beSIlya Mishin std::vector<my_input_tuple> messages; 31*de0109beSIlya Mishin int val = 0; 32*de0109beSIlya Mishin for(my_input_tuple tmp(0, 0.f, input_msg(0)); rr.try_get(tmp); ++val) { 33*de0109beSIlya Mishin messages.push_back(tmp); 34*de0109beSIlya Mishin } 35*de0109beSIlya Mishin return messages; 3651c0b2f7Stbbdev } 3751c0b2f7Stbbdev 38*de0109beSIlya Mishin #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 39*de0109beSIlya Mishin void test_deduction_guides() { 40*de0109beSIlya Mishin using namespace tbb::flow; 4151c0b2f7Stbbdev 4251c0b2f7Stbbdev graph g; 43*de0109beSIlya Mishin using tuple_type = std::tuple<int, int, int>; 44*de0109beSIlya Mishin broadcast_node<int> b1(g), b2(g), b3(g); 45*de0109beSIlya Mishin broadcast_node<tuple_type> b4(g); 46*de0109beSIlya Mishin join_node<tuple_type> j0(g); 4751c0b2f7Stbbdev 48*de0109beSIlya Mishin #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 49*de0109beSIlya Mishin join_node j1(follows(b1, b2, b3)); 50*de0109beSIlya Mishin static_assert(std::is_same_v<decltype(j1), join_node<tuple_type>>); 51*de0109beSIlya Mishin 52*de0109beSIlya Mishin join_node j2(follows(b1, b2, b3), reserving()); 53*de0109beSIlya Mishin static_assert(std::is_same_v<decltype(j2), join_node<tuple_type, reserving>>); 54*de0109beSIlya Mishin 55*de0109beSIlya Mishin join_node j3(precedes(b4)); 56*de0109beSIlya Mishin static_assert(std::is_same_v<decltype(j3), join_node<tuple_type>>); 57*de0109beSIlya Mishin 58*de0109beSIlya Mishin join_node j4(precedes(b4), reserving()); 59*de0109beSIlya Mishin static_assert(std::is_same_v<decltype(j4), join_node<tuple_type, reserving>>); 60*de0109beSIlya Mishin #endif 61*de0109beSIlya Mishin 62*de0109beSIlya Mishin join_node j5(j0); 63*de0109beSIlya Mishin static_assert(std::is_same_v<decltype(j5), join_node<tuple_type>>); 6451c0b2f7Stbbdev } 6551c0b2f7Stbbdev 66*de0109beSIlya Mishin #endif 6751c0b2f7Stbbdev 68*de0109beSIlya Mishin //! The node that is constructed has a reference to the same graph object as src. 69*de0109beSIlya Mishin //! The list of predecessors, messages in the input ports, and successors are not copied. 7051c0b2f7Stbbdev //! \brief \ref interface 7151c0b2f7Stbbdev TEST_CASE("join_node copy constructor"){ 72*de0109beSIlya Mishin oneapi::tbb::flow::graph g; 73*de0109beSIlya Mishin oneapi::tbb::flow::continue_node<int> node0( g, 74*de0109beSIlya Mishin [](oneapi::tbb::flow::continue_msg) { return 1; } ); 75*de0109beSIlya Mishin 76*de0109beSIlya Mishin oneapi::tbb::flow::join_node<std::tuple<int>> node1(g); 77*de0109beSIlya Mishin conformance::test_push_receiver<std::tuple<int>> node2(g); 78*de0109beSIlya Mishin conformance::test_push_receiver<std::tuple<int>> node3(g); 79*de0109beSIlya Mishin 80*de0109beSIlya Mishin oneapi::tbb::flow::make_edge(node0, oneapi::tbb::flow::input_port<0>(node1)); 81*de0109beSIlya Mishin oneapi::tbb::flow::make_edge(node1, node2); 82*de0109beSIlya Mishin oneapi::tbb::flow::join_node<std::tuple<int>> node_copy(node1); 83*de0109beSIlya Mishin 84*de0109beSIlya Mishin oneapi::tbb::flow::make_edge(node_copy, node3); 85*de0109beSIlya Mishin 86*de0109beSIlya Mishin oneapi::tbb::flow::input_port<0>(node_copy).try_put(1); 87*de0109beSIlya Mishin g.wait_for_all(); 88*de0109beSIlya Mishin 89*de0109beSIlya Mishin auto values = conformance::get_values(node3); 90*de0109beSIlya Mishin CHECK_MESSAGE((conformance::get_values(node2).size() == 0 && values.size() == 1), "Copied node doesn`t copy successor"); 91*de0109beSIlya Mishin 92*de0109beSIlya Mishin node0.try_put(oneapi::tbb::flow::continue_msg()); 93*de0109beSIlya Mishin g.wait_for_all(); 94*de0109beSIlya Mishin 95*de0109beSIlya Mishin CHECK_MESSAGE((conformance::get_values(node2).size() == 1 && conformance::get_values(node3).size() == 0), "Copied node doesn`t copy predecessor"); 96*de0109beSIlya Mishin 97*de0109beSIlya Mishin oneapi::tbb::flow::remove_edge(node1, node2); 98*de0109beSIlya Mishin oneapi::tbb::flow::input_port<0>(node1).try_put(1); 99*de0109beSIlya Mishin g.wait_for_all(); 100*de0109beSIlya Mishin oneapi::tbb::flow::join_node<std::tuple<int>> node_copy2(node1); 101*de0109beSIlya Mishin oneapi::tbb::flow::make_edge(node_copy2, node3); 102*de0109beSIlya Mishin oneapi::tbb::flow::input_port<0>(node_copy2).try_put(2); 103*de0109beSIlya Mishin g.wait_for_all(); 104*de0109beSIlya Mishin CHECK_MESSAGE((std::get<0>(conformance::get_values(node3)[0]) == 2), "Copied node doesn`t copy messages in the input ports"); 10551c0b2f7Stbbdev } 10651c0b2f7Stbbdev 10751c0b2f7Stbbdev //! Test inheritance relations 10851c0b2f7Stbbdev //! \brief \ref interface 10951c0b2f7Stbbdev TEST_CASE("join_node inheritance"){ 110*de0109beSIlya Mishin CHECK_MESSAGE((std::is_base_of<oneapi::tbb::flow::graph_node, 111*de0109beSIlya Mishin oneapi::tbb::flow::join_node<my_input_tuple>>::value), 112*de0109beSIlya Mishin "join_node should be derived from graph_node"); 113*de0109beSIlya Mishin CHECK_MESSAGE((std::is_base_of<oneapi::tbb::flow::sender<my_input_tuple>, 114*de0109beSIlya Mishin oneapi::tbb::flow::join_node<my_input_tuple>>::value), 115*de0109beSIlya Mishin "join_node should be derived from sender<input_tuple>"); 11651c0b2f7Stbbdev } 11751c0b2f7Stbbdev 118*de0109beSIlya Mishin //! Test join_node<queueing> behavior and broadcast property 11951c0b2f7Stbbdev //! \brief \ref requirement 120*de0109beSIlya Mishin TEST_CASE("join_node queueing policy and broadcast property") { 121*de0109beSIlya Mishin oneapi::tbb::flow::graph g; 122*de0109beSIlya Mishin oneapi::tbb::flow::function_node<int, int> 123*de0109beSIlya Mishin f1( g, oneapi::tbb::flow::unlimited, [](const int &i) { return i; } ); 124*de0109beSIlya Mishin oneapi::tbb::flow::function_node<float, float> 125*de0109beSIlya Mishin f2( g, oneapi::tbb::flow::unlimited, [](const float &f) { return f; } ); 126*de0109beSIlya Mishin oneapi::tbb::flow::continue_node<input_msg> c1( g, 127*de0109beSIlya Mishin [](oneapi::tbb::flow::continue_msg) { return input_msg(1); } ); 12851c0b2f7Stbbdev 129*de0109beSIlya Mishin oneapi::tbb::flow::join_node<my_input_tuple, oneapi::tbb::flow::queueing> testing_node(g); 13051c0b2f7Stbbdev 131*de0109beSIlya Mishin conformance::test_push_receiver<my_input_tuple> q_node(g); 132*de0109beSIlya Mishin 133*de0109beSIlya Mishin std::atomic<int> number{1}; 134*de0109beSIlya Mishin oneapi::tbb::flow::function_node<my_input_tuple, my_input_tuple> 135*de0109beSIlya Mishin f3( g, oneapi::tbb::flow::unlimited, 136*de0109beSIlya Mishin [&]( const my_input_tuple &t ) { 137*de0109beSIlya Mishin CHECK_MESSAGE((std::get<0>(t) == number), "Messages must be in first-in first-out order" ); 138*de0109beSIlya Mishin CHECK_MESSAGE((std::get<1>(t) == static_cast<float>(number) + 0.5f), "Messages must be in first-in first-out order" ); 139*de0109beSIlya Mishin CHECK_MESSAGE((std::get<2>(t) == 1), "Messages must be in first-in first-out order" ); 140*de0109beSIlya Mishin ++number; 141*de0109beSIlya Mishin return t; 14251c0b2f7Stbbdev } ); 14351c0b2f7Stbbdev 144*de0109beSIlya Mishin oneapi::tbb::flow::make_edge(f1, oneapi::tbb::flow::input_port<0>(testing_node)); 145*de0109beSIlya Mishin oneapi::tbb::flow::make_edge(f2, oneapi::tbb::flow::input_port<1>(testing_node)); 146*de0109beSIlya Mishin oneapi::tbb::flow::make_edge(c1, oneapi::tbb::flow::input_port<2>(testing_node)); 147*de0109beSIlya Mishin make_edge(testing_node, f3); 148*de0109beSIlya Mishin make_edge(f3, q_node); 14951c0b2f7Stbbdev 150*de0109beSIlya Mishin f1.try_put(1); 151*de0109beSIlya Mishin g.wait_for_all(); 152*de0109beSIlya Mishin CHECK_MESSAGE((get_values(q_node).size() == 0), 153*de0109beSIlya Mishin "join_node must broadcast when there is at least one message at each input port"); 154*de0109beSIlya Mishin f1.try_put(2); 155*de0109beSIlya Mishin f2.try_put(1.5f); 156*de0109beSIlya Mishin g.wait_for_all(); 157*de0109beSIlya Mishin CHECK_MESSAGE((get_values(q_node).size() == 0), 158*de0109beSIlya Mishin "join_node must broadcast when there is at least one message at each input port"); 15951c0b2f7Stbbdev f1.try_put(3); 160*de0109beSIlya Mishin f2.try_put(2.5f); 161*de0109beSIlya Mishin c1.try_put(oneapi::tbb::flow::continue_msg()); 16251c0b2f7Stbbdev g.wait_for_all(); 163*de0109beSIlya Mishin CHECK_MESSAGE((get_values(q_node).size() == 1), 164*de0109beSIlya Mishin "join_node must broadcast when there is at least one message at each input port"); 165*de0109beSIlya Mishin f2.try_put(3.5f); 166*de0109beSIlya Mishin c1.try_put(oneapi::tbb::flow::continue_msg()); 167*de0109beSIlya Mishin g.wait_for_all(); 168*de0109beSIlya Mishin CHECK_MESSAGE((get_values(q_node).size() == 1), 169*de0109beSIlya Mishin "If at least one successor accepts the tuple, the head of each input port’s queue is removed"); 170*de0109beSIlya Mishin c1.try_put(oneapi::tbb::flow::continue_msg()); 171*de0109beSIlya Mishin g.wait_for_all(); 172*de0109beSIlya Mishin CHECK_MESSAGE((get_values(q_node).size() == 1), 173*de0109beSIlya Mishin "If at least one successor accepts the tuple, the head of each input port’s queue is removed"); 174*de0109beSIlya Mishin c1.try_put(oneapi::tbb::flow::continue_msg()); 175*de0109beSIlya Mishin g.wait_for_all(); 176*de0109beSIlya Mishin CHECK_MESSAGE((get_values(q_node).size() == 0), 177*de0109beSIlya Mishin "join_node must broadcast when there is at least one message at each input port"); 178*de0109beSIlya Mishin 179*de0109beSIlya Mishin oneapi::tbb::flow::remove_edge(testing_node, f3); 180*de0109beSIlya Mishin 181*de0109beSIlya Mishin f1.try_put(1); 182*de0109beSIlya Mishin f2.try_put(1); 183*de0109beSIlya Mishin c1.try_put(oneapi::tbb::flow::continue_msg()); 184*de0109beSIlya Mishin g.wait_for_all(); 185*de0109beSIlya Mishin 186*de0109beSIlya Mishin my_input_tuple tmp(0, 0.f, input_msg(0)); 187*de0109beSIlya Mishin CHECK_MESSAGE((testing_node.try_get(tmp)), "If no one successor accepts the tuple the messages\ 188*de0109beSIlya Mishin must remain in their respective input port queues"); 189*de0109beSIlya Mishin CHECK_MESSAGE((tmp == my_input_tuple(1, 1.f, input_msg(1))), "If no one successor accepts the tuple\ 190*de0109beSIlya Mishin the messages must remain in their respective input port queues"); 19151c0b2f7Stbbdev } 19251c0b2f7Stbbdev 193*de0109beSIlya Mishin //! Test join_node<reserving> behavior 19451c0b2f7Stbbdev //! \brief \ref requirement 195*de0109beSIlya Mishin TEST_CASE("join_node reserving policy") { 196*de0109beSIlya Mishin conformance::test_with_reserving_join_node_class<oneapi::tbb::flow::write_once_node<int>>(); 19751c0b2f7Stbbdev } 19851c0b2f7Stbbdev 199*de0109beSIlya Mishin template<typename KeyType> 200*de0109beSIlya Mishin struct MyHash{ 201*de0109beSIlya Mishin std::size_t hash(const KeyType &k) const { 202*de0109beSIlya Mishin return k * 2000 + 3; 203*de0109beSIlya Mishin } 204*de0109beSIlya Mishin 205*de0109beSIlya Mishin bool equal(const KeyType &k1, const KeyType &k2) const{ 206*de0109beSIlya Mishin return hash(k1) == hash(k2); 207*de0109beSIlya Mishin } 208*de0109beSIlya Mishin }; 209*de0109beSIlya Mishin 210*de0109beSIlya Mishin //! Test join_node<key_matching> behavior 21151c0b2f7Stbbdev //! \brief \ref requirement 212*de0109beSIlya Mishin TEST_CASE("join_node key_matching policy"){ 213*de0109beSIlya Mishin oneapi::tbb::flow::graph g; 214*de0109beSIlya Mishin auto body1 = [](const oneapi::tbb::flow::continue_msg &) -> int { return 1; }; 215*de0109beSIlya Mishin auto body2 = [](const float &val) -> int { return static_cast<int>(val); }; 21651c0b2f7Stbbdev 217*de0109beSIlya Mishin oneapi::tbb::flow::join_node<std::tuple<oneapi::tbb::flow::continue_msg, float>, 218*de0109beSIlya Mishin oneapi::tbb::flow::key_matching<int, MyHash<int>>> testing_node(g, body1, body2); 21951c0b2f7Stbbdev 220*de0109beSIlya Mishin oneapi::tbb::flow::input_port<0>(testing_node).try_put(oneapi::tbb::flow::continue_msg()); 221*de0109beSIlya Mishin oneapi::tbb::flow::input_port<1>(testing_node).try_put(1.3f); 22251c0b2f7Stbbdev 22351c0b2f7Stbbdev g.wait_for_all(); 22451c0b2f7Stbbdev 225*de0109beSIlya Mishin std::tuple<oneapi::tbb::flow::continue_msg, float> tmp; 226*de0109beSIlya Mishin CHECK_MESSAGE((testing_node.try_get(tmp)), "Mapped keys should match.\ 227*de0109beSIlya Mishin If no successor accepts the tuple, it is must been saved and will be forwarded on a subsequent try_get"); 228*de0109beSIlya Mishin CHECK_MESSAGE((!testing_node.try_get(tmp)), "Message should not exist after item is consumed"); 229*de0109beSIlya Mishin } 230*de0109beSIlya Mishin 231*de0109beSIlya Mishin //! Test join_node<tag_matching> behavior 232*de0109beSIlya Mishin //! \brief \ref requirement 233*de0109beSIlya Mishin TEST_CASE("join_node tag_matching policy"){ 234*de0109beSIlya Mishin oneapi::tbb::flow::graph g; 235*de0109beSIlya Mishin auto body1 = [](const oneapi::tbb::flow::continue_msg &) -> oneapi::tbb::flow::tag_value { return 1; }; 236*de0109beSIlya Mishin auto body2 = [](const float &val) -> oneapi::tbb::flow::tag_value { return static_cast<oneapi::tbb::flow::tag_value>(val); }; 237*de0109beSIlya Mishin 238*de0109beSIlya Mishin oneapi::tbb::flow::join_node<std::tuple<oneapi::tbb::flow::continue_msg, float>, 239*de0109beSIlya Mishin oneapi::tbb::flow::tag_matching> testing_node(g, body1, body2); 240*de0109beSIlya Mishin 241*de0109beSIlya Mishin oneapi::tbb::flow::input_port<0>(testing_node).try_put(oneapi::tbb::flow::continue_msg()); 242*de0109beSIlya Mishin oneapi::tbb::flow::input_port<1>(testing_node).try_put(1.3f); 243*de0109beSIlya Mishin 244*de0109beSIlya Mishin g.wait_for_all(); 245*de0109beSIlya Mishin 246*de0109beSIlya Mishin std::tuple<oneapi::tbb::flow::continue_msg, float> tmp; 247*de0109beSIlya Mishin CHECK_MESSAGE((testing_node.try_get(tmp) == true), "Mapped keys should match"); 248*de0109beSIlya Mishin } 249*de0109beSIlya Mishin 250*de0109beSIlya Mishin #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 251*de0109beSIlya Mishin //! Test deduction guides 252*de0109beSIlya Mishin //! \brief \ref requirement 253*de0109beSIlya Mishin TEST_CASE("Deduction guides test"){ 254*de0109beSIlya Mishin test_deduction_guides(); 255*de0109beSIlya Mishin } 256*de0109beSIlya Mishin #endif 257*de0109beSIlya Mishin 258*de0109beSIlya Mishin //! Test join_node input_ports() returns a tuple of input ports. 259*de0109beSIlya Mishin //! \brief \ref interface \ref requirement 260*de0109beSIlya Mishin TEST_CASE("join_node output_ports") { 261*de0109beSIlya Mishin oneapi::tbb::flow::graph g; 262*de0109beSIlya Mishin oneapi::tbb::flow::join_node<std::tuple<int>> node(g); 263*de0109beSIlya Mishin 264*de0109beSIlya Mishin CHECK_MESSAGE((std::is_same<oneapi::tbb::flow::join_node<std::tuple<int>>::input_ports_type&, 265*de0109beSIlya Mishin decltype(node.input_ports())>::value), "join_node input_ports should returns a tuple of input ports"); 26651c0b2f7Stbbdev } 267