1 /* 2 Copyright (c) 2020-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #if __INTEL_COMPILER && _MSC_VER 18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated 19 #endif 20 21 #include "common/test.h" 22 23 #include "common/utils.h" 24 #include "common/graph_utils.h" 25 26 #include "oneapi/tbb/flow_graph.h" 27 #include "oneapi/tbb/task_arena.h" 28 #include "oneapi/tbb/global_control.h" 29 30 #include "conformance_flowgraph.h" 31 32 //! \file conformance_graph.cpp 33 //! \brief Test for [flow_graph.graph] specification 34 35 using namespace oneapi::tbb::flow; 36 using namespace std; 37 38 //! Graph reset 39 //! \brief \ref requirement 40 TEST_CASE("graph reset") { 41 graph g; 42 size_t concurrency_limit = 1; 43 oneapi::tbb::global_control control(oneapi::tbb::global_control::max_allowed_parallelism, concurrency_limit); 44 45 // Functional nodes 46 // TODO: Check input_node, multifunction_node, async_node similarly 47 48 // continue_node 49 bool flag = false; 50 continue_node<int> source(g, 2, [&](const continue_msg&){ flag = true; return 1;}); 51 52 source.try_put(continue_msg()); 53 g.wait_for_all(); 54 55 CHECK_MESSAGE( (flag == false), "Should still be false"); 56 57 g.reset(rf_reset_protocol); 58 59 source.try_put(continue_msg()); 60 g.wait_for_all(); 61 CHECK_MESSAGE( (flag == false), "Should still be false"); 62 63 source.try_put(continue_msg()); 64 g.wait_for_all(); 65 CHECK_MESSAGE( (flag == true), "Should be true"); 66 67 // functional_node 68 int flag_fun = 0; 69 function_node<int, int, queueing> f(g, serial, [&](const int& v){ flag_fun++; return v;}); 70 71 f.try_put(0); 72 f.try_put(0); 73 74 CHECK_MESSAGE( (flag_fun == 0), "Should not be updated"); 75 g.reset(rf_reset_protocol); 76 77 g.wait_for_all(); 78 CHECK_MESSAGE( (flag_fun == 1), "Should be updated"); 79 80 // Buffering nodes 81 // TODO: Check overwrite_node write_once_node priority_queue_node sequencer_node similarly 82 83 // buffer_node 84 buffer_node<int> buff(g); 85 86 int tmp = -1; 87 CHECK_MESSAGE( (buff.try_get(tmp) == false), "try_get should not succeed"); 88 CHECK_MESSAGE( (tmp == -1), "Value should not be updated"); 89 90 buff.try_put(1); 91 92 g.reset(); 93 94 tmp = -1; 95 CHECK_MESSAGE( (buff.try_get(tmp) == false), "try_get should not succeed"); 96 CHECK_MESSAGE( (tmp == -1), "Value should not be updated"); 97 g.wait_for_all(); 98 99 // queue_node 100 queue_node<int> q(g); 101 102 tmp = -1; 103 CHECK_MESSAGE( (q.try_get(tmp) == false), "try_get should not succeed"); 104 CHECK_MESSAGE( (tmp == -1), "Value should not be updated"); 105 106 q.try_put(1); 107 108 g.reset(); 109 110 tmp = -1; 111 CHECK_MESSAGE( (q.try_get(tmp) == false), "try_get should not succeed"); 112 CHECK_MESSAGE( (tmp == -1), "Value should not be updated"); 113 g.wait_for_all(); 114 115 // Check rf_clear_edges 116 continue_node<int> src(g, [&](const continue_msg&){ return 1;}); 117 queue_node<int> dest(g); 118 make_edge(src, dest); 119 120 src.try_put(continue_msg()); 121 g.wait_for_all(); 122 123 tmp = -1; 124 CHECK_MESSAGE( (dest.try_get(tmp)== true), "Message should pass when edge exists"); 125 CHECK_MESSAGE( (tmp == 1 ), "Message should pass when edge exists"); 126 CHECK_MESSAGE( (dest.try_get(tmp)== false), "Message should not pass after item is consumed"); 127 128 g.reset(rf_clear_edges); 129 130 tmp = -1; 131 src.try_put(continue_msg()); 132 g.wait_for_all(); 133 134 CHECK_MESSAGE( (dest.try_get(tmp)== false), "Message should not pass when edge doesn't exist"); 135 CHECK_MESSAGE( (tmp == -1), "Value should not be altered"); 136 137 // TODO: Add check that default invocaiton is the same as with rf_reset_protocol 138 // TODO: See if specification for broadcast_node and other service nodes is sufficient for reset checks 139 } 140 141 //! Graph cancel 142 //! \brief \ref requirement 143 TEST_CASE("graph cancel") { 144 graph g; 145 CHECK_MESSAGE( !g.is_cancelled(), "Freshly created graph should not be cancelled." ); 146 147 g.cancel(); 148 CHECK_MESSAGE( !g.is_cancelled(), "Cancelled status should appear only after the wait_for_all() call." ); 149 150 g.wait_for_all(); 151 CHECK_MESSAGE( g.is_cancelled(), "Waiting should allow checking the cancellation status." ); 152 153 g.reset(); 154 CHECK_MESSAGE( !g.is_cancelled(), "Resetting must reset the cancellation status." ); 155 156 std::atomic<bool> cancelled(false); 157 std::atomic<unsigned> executed(0); 158 function_node<int> f(g, serial, [&](int) { 159 ++executed; 160 while( !cancelled.load(std::memory_order_relaxed) ) 161 std::this_thread::sleep_for(std::chrono::milliseconds(1)); 162 }); 163 164 const unsigned N = 10; 165 for( unsigned i = 0; i < N; ++i ) 166 f.try_put(0); 167 168 std::thread thr([&] { 169 while( !executed ) 170 std::this_thread::sleep_for(std::chrono::milliseconds(1)); 171 g.cancel(); 172 cancelled.store(true, std::memory_order_relaxed); 173 }); 174 g.wait_for_all(); 175 thr.join(); 176 CHECK_MESSAGE( g.is_cancelled(), "Wait for all should not change the cancellation status." ); 177 CHECK_MESSAGE( 1 == executed, "Buffered messages should be dropped by the cancelled graph." ); 178 179 } 180