1 /* 2 Copyright (c) 2020-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #if __INTEL_COMPILER && _MSC_VER 18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated 19 #endif 20 21 #include "conformance_flowgraph.h" 22 23 //! \file conformance_graph.cpp 24 //! \brief Test for [flow_graph.graph] specification 25 26 void test_continue_node_rf_reset_protocol(){ 27 using namespace oneapi::tbb::flow; 28 graph g; 29 30 std::atomic<bool> flag = {false}; 31 continue_node<int> source(g, 2, [&](const continue_msg&){ flag = true; return 1;}); 32 33 source.try_put(continue_msg()); 34 g.wait_for_all(); 35 36 CHECK_MESSAGE((flag == false), "Should be false"); 37 38 g.reset(rf_reset_protocol); 39 40 source.try_put(continue_msg()); 41 g.wait_for_all(); 42 CHECK_MESSAGE((flag == false), "Internal number of predecessors reinitialized"); 43 44 source.try_put(continue_msg()); 45 g.wait_for_all(); 46 CHECK_MESSAGE((flag == true), "Should be true"); 47 } 48 49 void test_input_node_rf_reset_protocol(){ 50 oneapi::tbb::flow::graph g; 51 52 conformance::copy_counting_object<int> fun; 53 54 oneapi::tbb::flow::input_node<int> node(g, fun); 55 oneapi::tbb::flow::limiter_node<int> rejecter(g, 0); 56 57 oneapi::tbb::flow::make_edge(node, rejecter); 58 59 node.activate(); 60 g.wait_for_all(); 61 62 g.reset(oneapi::tbb::flow::rf_reset_protocol); 63 64 int tmp = -1; 65 CHECK_MESSAGE((node.try_get(tmp) == false), "Should be false"); 66 } 67 68 template<typename Node> 69 void test_functional_nodes_rf_reset_protocol(){ 70 oneapi::tbb::flow::graph g; 71 size_t concurrency_limit = 1; 72 oneapi::tbb::global_control control(oneapi::tbb::global_control::max_allowed_parallelism, concurrency_limit); 73 74 conformance::counting_functor<int> counting_body; 75 Node f(g, oneapi::tbb::flow::serial, counting_body); 76 77 f.try_put(0); 78 f.try_put(0); 79 CHECK_MESSAGE((counting_body.execute_count == 0), "Body should not be executed"); 80 g.reset(oneapi::tbb::flow::rf_reset_protocol); 81 82 g.wait_for_all(); 83 CHECK_MESSAGE((counting_body.execute_count == 1), "Body should be executed"); 84 } 85 86 template<typename Node, typename ...Args> 87 void test_buffering_nodes_rf_reset_protocol(Args... node_body){ 88 oneapi::tbb::flow::graph g; 89 Node testing_node(g, node_body...); 90 91 int tmp = -1; 92 CHECK_MESSAGE((testing_node.try_get(tmp) == false), "try_get should not succeed"); 93 CHECK_MESSAGE((tmp == -1), "Value should not be updated"); 94 95 testing_node.try_put(1); 96 g.wait_for_all(); 97 g.reset(oneapi::tbb::flow::rf_reset_protocol); 98 99 tmp = -1; 100 CHECK_MESSAGE((testing_node.try_get(tmp) == false), "try_get should not succeed"); 101 CHECK_MESSAGE((tmp == -1), "Value should not be updated"); 102 g.wait_for_all(); 103 } 104 105 template<typename Node, typename InputType, typename ...Args> 106 void test_nodes_with_body_rf_reset_bodies(Args... node_args){ 107 oneapi::tbb::flow::graph g; 108 conformance::counting_functor<int> counting_body(5); 109 Node testing_node(g, node_args..., counting_body); 110 111 testing_node.try_put(InputType()); 112 g.wait_for_all(); 113 114 CHECK_MESSAGE((counting_body.execute_count == 1), "Body should be executed"); 115 116 g.reset(oneapi::tbb::flow::rf_reset_bodies); 117 testing_node.try_put(InputType()); 118 g.wait_for_all(); 119 120 CHECK_MESSAGE((counting_body.execute_count == 1), "Body should be replaced with a copy of the body"); 121 } 122 123 void test_limiter_node_rf_reset_protocol(){ 124 oneapi::tbb::flow::graph g; 125 126 constexpr int limit = 5; 127 oneapi::tbb::flow::limiter_node<int> testing_node(g, limit); 128 conformance::test_push_receiver<int> suc_node(g); 129 130 oneapi::tbb::flow::make_edge(testing_node, suc_node); 131 132 for(int i = 0; i < limit * 2; ++i) 133 testing_node.try_put(1); 134 g.wait_for_all(); 135 136 CHECK_MESSAGE((conformance::get_values(suc_node).size() == limit), "Descendant of the node needs be receive limited number of messages"); 137 138 g.reset(oneapi::tbb::flow::rf_reset_protocol); 139 140 for(int i = 0; i < limit * 2; ++i) 141 testing_node.try_put(1); 142 g.wait_for_all(); 143 144 CHECK_MESSAGE((conformance::get_values(suc_node).size() == limit), "Descendant of the node needs be receive limited number of messages"); 145 } 146 147 void test_join_node_rf_reset_protocol(){ 148 oneapi::tbb::flow::graph g; 149 150 oneapi::tbb::flow::join_node<std::tuple<int>, oneapi::tbb::flow::queueing> testing_node(g); 151 152 oneapi::tbb::flow::input_port<0>(testing_node).try_put(1); 153 154 g.wait_for_all(); 155 g.reset(oneapi::tbb::flow::rf_reset_protocol); 156 157 std::tuple<int> tmp(0); 158 CHECK_MESSAGE((!testing_node.try_get(tmp)), "All buffers must be emptied"); 159 } 160 161 //! Graph reset 162 //! \brief \ref requirement 163 TEST_CASE("graph reset with rf_reset_protocol") { 164 using namespace oneapi::tbb::flow; 165 test_continue_node_rf_reset_protocol(); 166 test_input_node_rf_reset_protocol(); 167 test_functional_nodes_rf_reset_protocol<function_node<int, int, queueing>>(); 168 test_functional_nodes_rf_reset_protocol<multifunction_node<int, std::tuple<int>, queueing>>(); 169 test_functional_nodes_rf_reset_protocol<async_node<int, int, queueing>>(); 170 171 test_buffering_nodes_rf_reset_protocol<buffer_node<int>>(); 172 test_buffering_nodes_rf_reset_protocol<queue_node<int>>(); 173 test_buffering_nodes_rf_reset_protocol<overwrite_node<int>>(); 174 test_buffering_nodes_rf_reset_protocol<write_once_node<int>>(); 175 test_buffering_nodes_rf_reset_protocol<priority_queue_node<int>>(); 176 conformance::sequencer_functor<int> sequencer; 177 test_buffering_nodes_rf_reset_protocol<sequencer_node<int>>(sequencer); 178 179 test_limiter_node_rf_reset_protocol(); 180 test_join_node_rf_reset_protocol(); 181 } 182 183 //! Graph reset rf_clear_edges 184 //! \brief \ref requirement 185 TEST_CASE("graph reset with rf_clear_edges") { 186 oneapi::tbb::flow::graph g; 187 using body = conformance::dummy_functor<int>; 188 189 oneapi::tbb::flow::queue_node<int> successor(g); 190 oneapi::tbb::flow::queue_node<std::tuple<int>> successor2(g); 191 oneapi::tbb::flow::queue_node<oneapi::tbb::flow::indexer_node<int>::output_type> successor3(g); 192 193 //node types 194 oneapi::tbb::flow::continue_node<int> ct(g, body()); 195 oneapi::tbb::flow::split_node< std::tuple<int> > s(g); 196 oneapi::tbb::flow::input_node<int> src(g, body()); 197 oneapi::tbb::flow::function_node<int, int> fxn(g, oneapi::tbb::flow::unlimited, body()); 198 oneapi::tbb::flow::multifunction_node<int, std::tuple<int, int> > m_fxn(g, oneapi::tbb::flow::unlimited, body()); 199 oneapi::tbb::flow::broadcast_node<int> bc(g); 200 oneapi::tbb::flow::limiter_node<int> lim(g, 2); 201 oneapi::tbb::flow::indexer_node<int> ind(g); 202 oneapi::tbb::flow::join_node< std::tuple< int >, oneapi::tbb::flow::queueing > j(g); 203 oneapi::tbb::flow::buffer_node<int> bf(g); 204 oneapi::tbb::flow::priority_queue_node<int> pq(g); 205 oneapi::tbb::flow::write_once_node<int> wo(g); 206 oneapi::tbb::flow::overwrite_node<int> ovw(g); 207 oneapi::tbb::flow::sequencer_node<int> seq(g, conformance::sequencer_functor<int>()); 208 209 oneapi::tbb::flow::make_edge(ct, successor); 210 oneapi::tbb::flow::make_edge(s, successor); 211 oneapi::tbb::flow::make_edge(src, successor); 212 oneapi::tbb::flow::make_edge(fxn, successor); 213 oneapi::tbb::flow::make_edge(m_fxn, successor); 214 oneapi::tbb::flow::make_edge(bc, successor); 215 oneapi::tbb::flow::make_edge(lim, successor); 216 oneapi::tbb::flow::make_edge(ind, successor3); 217 oneapi::tbb::flow::make_edge(j, successor2); 218 oneapi::tbb::flow::make_edge(bf, successor); 219 oneapi::tbb::flow::make_edge(pq, successor); 220 oneapi::tbb::flow::make_edge(wo, successor); 221 oneapi::tbb::flow::make_edge(ovw, successor); 222 oneapi::tbb::flow::make_edge(seq, successor); 223 224 g.wait_for_all(); 225 g.reset(oneapi::tbb::flow::rf_clear_edges); 226 227 ct.try_put(oneapi::tbb::flow::continue_msg()); 228 s.try_put(std::tuple<int>{1}); 229 src.activate(); 230 fxn.try_put(1); 231 m_fxn.try_put(1); 232 bc.try_put(1); 233 lim.try_put(1); 234 oneapi::tbb::flow::input_port<0>(ind).try_put(1); 235 oneapi::tbb::flow::input_port<0>(j).try_put(1); 236 bf.try_put(1); 237 pq.try_put(1); 238 wo.try_put(1); 239 ovw.try_put(1); 240 seq.try_put(0); 241 242 g.wait_for_all(); 243 244 CHECK_MESSAGE((conformance::get_values(successor).size() == 0), "Message should not pass when edges doesn't exist"); 245 CHECK_MESSAGE((conformance::get_values(successor2).size() == 0), "Message should not pass when edge doesn't exist"); 246 CHECK_MESSAGE((conformance::get_values(successor3).size() == 0), "Message should not pass when edge doesn't exist"); 247 } 248 249 //! Graph reset rf_reset_bodies 250 //! \brief \ref requirement 251 TEST_CASE("graph reset with rf_reset_bodies") { 252 using namespace oneapi::tbb::flow; 253 test_nodes_with_body_rf_reset_bodies<continue_node<int>, continue_msg>(serial); 254 test_nodes_with_body_rf_reset_bodies<function_node<int, int>, int>(serial); 255 test_nodes_with_body_rf_reset_bodies<multifunction_node<int, std::tuple<int>>, int>(serial); 256 test_nodes_with_body_rf_reset_bodies<async_node<int, int>, int>(serial); 257 258 graph g; 259 conformance::counting_functor<int> counting_body(1); 260 input_node<int> testing_node(g,counting_body); 261 queue_node<int> q_node(g); 262 263 make_edge(testing_node, q_node); 264 265 testing_node.activate(); 266 g.wait_for_all(); 267 268 CHECK_MESSAGE((counting_body.execute_count == 2), "Body should be executed"); 269 270 g.reset(rf_reset_bodies); 271 testing_node.activate(); 272 g.wait_for_all(); 273 274 CHECK_MESSAGE((counting_body.execute_count == 2), "Body should be replaced with a copy of the body"); 275 } 276 277 //! Graph cancel 278 //! \brief \ref requirement 279 TEST_CASE("graph cancel") { 280 oneapi::tbb::flow::graph g; 281 CHECK_MESSAGE(!g.is_cancelled(), "Freshly created graph should not be cancelled." ); 282 283 g.cancel(); 284 CHECK_MESSAGE(!g.is_cancelled(), "Cancelled status should appear only after the wait_for_all() call." ); 285 286 g.wait_for_all(); 287 CHECK_MESSAGE(g.is_cancelled(), "Waiting should allow checking the cancellation status." ); 288 289 g.reset(); 290 CHECK_MESSAGE(!g.is_cancelled(), "Resetting must reset the cancellation status." ); 291 292 std::atomic<bool> cancelled(false); 293 std::atomic<unsigned> executed(0); 294 oneapi::tbb::flow::function_node<int> f(g, oneapi::tbb::flow::serial, [&](int) { 295 ++executed; 296 while( !cancelled.load(std::memory_order_relaxed) ) 297 std::this_thread::sleep_for(std::chrono::milliseconds(1)); 298 }); 299 300 const unsigned N = 10; 301 for( unsigned i = 0; i < N; ++i ) 302 f.try_put(0); 303 304 std::thread thr([&] { 305 while( !executed ) 306 std::this_thread::sleep_for(std::chrono::milliseconds(1)); 307 g.cancel(); 308 cancelled.store(true, std::memory_order_relaxed); 309 }); 310 g.wait_for_all(); 311 thr.join(); 312 CHECK_MESSAGE(g.is_cancelled(), "Wait for all should not change the cancellation status." ); 313 CHECK_MESSAGE(1 == executed, "Buffered messages should be dropped by the cancelled graph." ); 314 } 315