1 /* 2 Copyright (c) 2020 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 18 #include "common/test.h" 19 20 #include "common/utils.h" 21 #include "common/graph_utils.h" 22 23 #include "oneapi/tbb/flow_graph.h" 24 #include "oneapi/tbb/task_arena.h" 25 #include "oneapi/tbb/global_control.h" 26 27 #include "conformance_flowgraph.h" 28 29 //! \file conformance_graph.cpp 30 //! \brief Test for [flow_graph.graph] specification 31 32 using namespace oneapi::tbb::flow; 33 using namespace std; 34 35 //! Graph reset 36 //! \brief \ref requirement 37 TEST_CASE("graph reset") { 38 graph g; 39 size_t concurrency_limit = 1; 40 oneapi::tbb::global_control control(oneapi::tbb::global_control::max_allowed_parallelism, concurrency_limit); 41 42 // Functional nodes 43 // TODO: Check input_node, multifunction_node, async_node similarly 44 45 // continue_node 46 bool flag = false; 47 continue_node<int> source(g, 2, [&](const continue_msg&){ flag = true; return 1;}); 48 49 source.try_put(continue_msg()); 50 g.wait_for_all(); 51 52 CHECK_MESSAGE( (flag == false), "Should still be false"); 53 54 g.reset(rf_reset_protocol); 55 56 source.try_put(continue_msg()); 57 g.wait_for_all(); 58 CHECK_MESSAGE( (flag == false), "Should still be false"); 59 60 source.try_put(continue_msg()); 61 g.wait_for_all(); 62 CHECK_MESSAGE( (flag == true), "Should be true"); 63 64 // functional_node 65 int flag_fun = 0; 66 function_node<int, int, queueing> f(g, serial, [&](const int& v){ flag_fun++; return v;}); 67 68 f.try_put(0); 69 f.try_put(0); 70 71 CHECK_MESSAGE( (flag_fun == 0), "Should not be updated"); 72 g.reset(rf_reset_protocol); 73 74 g.wait_for_all(); 75 CHECK_MESSAGE( (flag_fun == 1), "Should be updated"); 76 77 // Buffering nodes 78 // TODO: Check overwrite_node write_once_node priority_queue_node sequencer_node similarly 79 80 // buffer_node 81 buffer_node<int> buff(g); 82 83 int tmp = -1; 84 CHECK_MESSAGE( (buff.try_get(tmp) == false), "try_get should not succeed"); 85 CHECK_MESSAGE( (tmp == -1), "Value should not be updated"); 86 87 buff.try_put(1); 88 89 g.reset(); 90 91 tmp = -1; 92 CHECK_MESSAGE( (buff.try_get(tmp) == false), "try_get should not succeed"); 93 CHECK_MESSAGE( (tmp == -1), "Value should not be updated"); 94 g.wait_for_all(); 95 96 // queue_node 97 queue_node<int> q(g); 98 99 tmp = -1; 100 CHECK_MESSAGE( (q.try_get(tmp) == false), "try_get should not succeed"); 101 CHECK_MESSAGE( (tmp == -1), "Value should not be updated"); 102 103 q.try_put(1); 104 105 g.reset(); 106 107 tmp = -1; 108 CHECK_MESSAGE( (q.try_get(tmp) == false), "try_get should not succeed"); 109 CHECK_MESSAGE( (tmp == -1), "Value should not be updated"); 110 g.wait_for_all(); 111 112 // Check rf_clear_edges 113 continue_node<int> src(g, [&](const continue_msg&){ return 1;}); 114 queue_node<int> dest(g); 115 make_edge(src, dest); 116 117 src.try_put(continue_msg()); 118 g.wait_for_all(); 119 120 tmp = -1; 121 CHECK_MESSAGE( (dest.try_get(tmp)== true), "Message should pass when edge exists"); 122 CHECK_MESSAGE( (tmp == 1 ), "Message should pass when edge exists"); 123 CHECK_MESSAGE( (dest.try_get(tmp)== false), "Message should not pass after item is consumed"); 124 125 g.reset(rf_clear_edges); 126 127 tmp = -1; 128 src.try_put(continue_msg()); 129 g.wait_for_all(); 130 131 CHECK_MESSAGE( (dest.try_get(tmp)== false), "Message should not pass when edge doesn't exist"); 132 CHECK_MESSAGE( (tmp == -1), "Value should not be altered"); 133 134 // TODO: Add check that default invocaiton is the same as with rf_reset_protocol 135 // TODO: See if specification for broadcast_node and other service nodes is sufficient for reset checks 136 } 137 138 //! Graph cancel 139 //! \brief \ref requirement 140 TEST_CASE("graph cancel") { 141 graph g; 142 CHECK_MESSAGE( !g.is_cancelled(), "Freshly created graph should not be cancelled." ); 143 144 g.cancel(); 145 CHECK_MESSAGE( !g.is_cancelled(), "Cancelled status should appear only after the wait_for_all() call." ); 146 147 g.wait_for_all(); 148 CHECK_MESSAGE( g.is_cancelled(), "Waiting should allow checking the cancellation status." ); 149 150 g.reset(); 151 CHECK_MESSAGE( !g.is_cancelled(), "Resetting must reset the cancellation status." ); 152 153 std::atomic<bool> cancelled(false); 154 std::atomic<unsigned> executed(0); 155 function_node<int> f(g, serial, [&](int) { 156 ++executed; 157 while( !cancelled.load(std::memory_order_relaxed) ) 158 std::this_thread::sleep_for(std::chrono::milliseconds(1)); 159 }); 160 161 const unsigned N = 10; 162 for( unsigned i = 0; i < N; ++i ) 163 f.try_put(0); 164 165 std::thread thr([&] { 166 while( !executed ) 167 std::this_thread::sleep_for(std::chrono::milliseconds(1)); 168 g.cancel(); 169 cancelled.store(true, std::memory_order_relaxed); 170 }); 171 g.wait_for_all(); 172 thr.join(); 173 CHECK_MESSAGE( g.is_cancelled(), "Wait for all should not change the cancellation status." ); 174 CHECK_MESSAGE( 1 == executed, "Buffered messages should be dropped by the cancelled graph." ); 175 176 } 177