1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #if __INTEL_COMPILER && _MSC_VER 18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated 19 #endif 20 21 #include "common/config.h" 22 23 #include "tbb/flow_graph.h" 24 25 #include "common/test.h" 26 #include "common/utils.h" 27 #include "common/graph_utils.h" 28 #include "common/spin_barrier.h" 29 30 31 //! \file test_flow_graph.cpp 32 //! \brief Test for [flow_graph.continue_msg flow_graph.graph_node flow_graph.input_port flow_graph.output_port flow_graph.join_node flow_graph.split_node flow_graph.limiter_node flow_graph.write_once_node flow_graph.overwrite_node flow_graph.make_edge flow_graph.graph flow_graph.buffer_node flow_graph.function_node flow_graph.multifunction_node flow_graph.continue_node flow_graph.input_node] specification 33 34 const int T = 4; 35 const int W = 4; 36 37 struct decrement_wait : utils::NoAssign { 38 39 tbb::flow::graph * const my_graph; 40 bool * const my_done_flag; 41 42 decrement_wait( tbb::flow::graph &h, bool *done_flag ) : my_graph(&h), my_done_flag(done_flag) {} 43 44 void operator()(int i) const { 45 utils::Sleep(10 * i); 46 47 my_done_flag[i] = true; 48 my_graph->release_wait(); 49 } 50 }; 51 52 static void test_wait_count() { 53 tbb::flow::graph h; 54 for (int i = 0; i < T; ++i ) { 55 bool done_flag[W]; 56 for (int j = 0; j < W; ++j ) { 57 for ( int w = 0; w < W; ++w ) done_flag[w] = false; 58 for ( int w = 0; w < j; ++w ) h.reserve_wait(); 59 60 utils::NativeParallelFor( j, decrement_wait(h, done_flag) ); 61 h.wait_for_all(); 62 for ( int w = 0; w < W; ++w ) { 63 if ( w < j ) CHECK_MESSAGE( done_flag[w] == true, "" ); 64 else CHECK_MESSAGE( done_flag[w] == false, "" ); 65 } 66 } 67 } 68 } 69 70 // Encapsulate object we want to store in vector (because contained type must have 71 // copy constructor and assignment operator 72 class my_int_buffer { 73 tbb::flow::buffer_node<int> *b; 74 tbb::flow::graph& my_graph; 75 public: 76 my_int_buffer(tbb::flow::graph &g) : my_graph(g) { b = new tbb::flow::buffer_node<int>(my_graph); } 77 my_int_buffer(const my_int_buffer& other) : my_graph(other.my_graph) { 78 b = new tbb::flow::buffer_node<int>(my_graph); 79 } 80 ~my_int_buffer() { delete b; } 81 my_int_buffer& operator=(const my_int_buffer& /*other*/) { 82 return *this; 83 } 84 }; 85 86 // test the graph iterator, delete nodes from graph, test again 87 void test_iterator() { 88 tbb::flow::graph g; 89 my_int_buffer a_buffer(g); 90 my_int_buffer b_buffer(g); 91 my_int_buffer c_buffer(g); 92 my_int_buffer *d_buffer = new my_int_buffer(g); 93 my_int_buffer e_buffer(g); 94 std::vector< my_int_buffer > my_buffer_vector(10, c_buffer); 95 96 int count = 0; 97 for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) { 98 count++; 99 } 100 CHECK_MESSAGE( (count==15), "error in iterator count"); 101 102 delete d_buffer; 103 104 count = 0; 105 for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) { 106 count++; 107 } 108 CHECK_MESSAGE( (count==14), "error in iterator count"); 109 110 my_buffer_vector.clear(); 111 112 count = 0; 113 for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) { 114 count++; 115 } 116 CHECK_MESSAGE( (count==4), "error in iterator count"); 117 } 118 119 class AddRemoveBody : utils::NoAssign { 120 tbb::flow::graph& g; 121 int nThreads; 122 utils::SpinBarrier &barrier; 123 public: 124 AddRemoveBody(int nthr, utils::SpinBarrier &barrier_, tbb::flow::graph& _g) : 125 g(_g), nThreads(nthr), barrier(barrier_) 126 {} 127 void operator()(const int /*threadID*/) const { 128 my_int_buffer b(g); 129 { 130 std::vector<my_int_buffer> my_buffer_vector(100, b); 131 barrier.wait(); // wait until all nodes are created 132 // now test that the proper number of nodes were created 133 int count = 0; 134 for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) { 135 count++; 136 } 137 CHECK_MESSAGE( (count==101*nThreads), "error in iterator count"); 138 barrier.wait(); // wait until all threads are done counting 139 } // all nodes but for the initial node on this thread are deleted 140 barrier.wait(); // wait until all threads have deleted all nodes in their vectors 141 // now test that all the nodes were deleted except for the initial node 142 int count = 0; 143 for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) { 144 count++; 145 } 146 CHECK_MESSAGE( (count==nThreads), "error in iterator count"); 147 barrier.wait(); // wait until all threads are done counting 148 } // initial node gets deleted 149 }; 150 151 void test_parallel(int nThreads) { 152 tbb::flow::graph g; 153 utils::SpinBarrier barrier(nThreads); 154 AddRemoveBody body(nThreads, barrier, g); 155 NativeParallelFor(nThreads, body); 156 } 157 158 /* 159 * Functors for graph arena spawn tests 160 */ 161 162 inline void check_arena(tbb::task_arena* midway_arena) { 163 CHECK_MESSAGE(midway_arena->max_concurrency() == 2, ""); 164 CHECK_MESSAGE(tbb::this_task_arena::max_concurrency() == 1, ""); 165 } 166 167 struct run_functor { 168 tbb::task_arena* midway_arena; 169 int return_value; 170 run_functor(tbb::task_arena* a) : midway_arena(a), return_value(1) {} 171 int operator()() { 172 check_arena(midway_arena); 173 return return_value; 174 } 175 }; 176 177 template < typename T > 178 struct function_body { 179 tbb::task_arena* midway_arena; 180 function_body(tbb::task_arena* a) : midway_arena(a) {} 181 tbb::flow::continue_msg operator()(const T& /*arg*/) { 182 check_arena(midway_arena); 183 return tbb::flow::continue_msg(); 184 } 185 }; 186 187 typedef tbb::flow::multifunction_node< int, std::tuple< int > > mf_node; 188 189 struct multifunction_body { 190 tbb::task_arena* midway_arena; 191 multifunction_body(tbb::task_arena* a) : midway_arena(a) {} 192 void operator()(const int& /*arg*/, mf_node::output_ports_type& /*outports*/) { 193 check_arena(midway_arena); 194 } 195 }; 196 197 struct input_body { 198 tbb::task_arena* midway_arena; 199 int counter; 200 input_body(tbb::task_arena* a) : midway_arena(a), counter(0) {} 201 int operator()(tbb::flow_control &fc) { 202 check_arena(midway_arena); 203 if (counter++ >= 1) { 204 fc.stop(); 205 } 206 return int(); 207 } 208 }; 209 210 struct nodes_test_functor : utils::NoAssign { 211 tbb::task_arena* midway_arena; 212 tbb::flow::graph& my_graph; 213 214 nodes_test_functor(tbb::task_arena* a, tbb::flow::graph& g) : midway_arena(a), my_graph(g) {} 215 void operator()() const { 216 217 // Define test nodes 218 // Continue, function, source nodes 219 tbb::flow::continue_node< tbb::flow::continue_msg > c_n(my_graph, function_body<tbb::flow::continue_msg>(midway_arena)); 220 tbb::flow::function_node< int > f_n(my_graph, tbb::flow::unlimited, function_body<int>(midway_arena)); 221 tbb::flow::input_node< int > s_n(my_graph, input_body(midway_arena)); 222 223 // Multifunction node 224 mf_node m_n(my_graph, tbb::flow::unlimited, multifunction_body(midway_arena)); 225 226 // Join node 227 tbb::flow::function_node< std::tuple< int, int > > join_f_n( 228 my_graph, tbb::flow::unlimited, function_body< std::tuple< int, int > >(midway_arena) 229 ); 230 tbb::flow::join_node< std::tuple< int, int > > j_n(my_graph); 231 make_edge(j_n, join_f_n); 232 233 // Split node 234 tbb::flow::function_node< int > split_f_n1 = f_n; 235 tbb::flow::function_node< int > split_f_n2 = f_n; 236 tbb::flow::split_node< std::tuple< int, int > > sp_n(my_graph); 237 make_edge(tbb::flow::output_port<0>(sp_n), split_f_n1); 238 make_edge(tbb::flow::output_port<1>(sp_n), split_f_n2); 239 240 // Overwrite node 241 tbb::flow::function_node< int > ow_f_n = f_n; 242 tbb::flow::overwrite_node< int > ow_n(my_graph); 243 make_edge(ow_n, ow_f_n); 244 245 // Write once node 246 tbb::flow::function_node< int > w_f_n = f_n; 247 tbb::flow::write_once_node< int > w_n(my_graph); 248 make_edge(w_n, w_f_n); 249 250 // Buffer node 251 tbb::flow::function_node< int > buf_f_n = f_n; 252 tbb::flow::buffer_node< int > buf_n(my_graph); 253 make_edge(w_n, buf_f_n); 254 255 // Limiter node 256 tbb::flow::function_node< int > l_f_n = f_n; 257 tbb::flow::limiter_node< int > l_n(my_graph, 1); 258 make_edge(l_n, l_f_n); 259 260 // Execute nodes 261 c_n.try_put( tbb::flow::continue_msg() ); 262 f_n.try_put(1); 263 m_n.try_put(1); 264 s_n.activate(); 265 266 tbb::flow::input_port<0>(j_n).try_put(1); 267 tbb::flow::input_port<1>(j_n).try_put(1); 268 269 std::tuple< int, int > sp_tuple(1, 1); 270 sp_n.try_put(sp_tuple); 271 272 ow_n.try_put(1); 273 w_n.try_put(1); 274 buf_n.try_put(1); 275 l_n.try_put(1); 276 277 my_graph.wait_for_all(); 278 } 279 }; 280 281 void test_graph_arena() { 282 // There is only one thread for execution (external thread). 283 // So, if graph's tasks get spawned in different arena 284 // external thread won't be able to find them in its own arena. 285 // In this case test should hang. 286 tbb::task_arena arena(1); 287 arena.execute( 288 [&]() { 289 tbb::flow::graph g; 290 tbb::task_arena midway_arena; 291 midway_arena.initialize(2); 292 midway_arena.execute(nodes_test_functor(&midway_arena, g)); 293 294 } 295 ); 296 } 297 298 //! Test wait counts 299 //! \brief error_guessing 300 TEST_CASE("Test wait_count"){ 301 for(unsigned int p=utils::MinThread; p<=utils::MaxThread; ++p ) { 302 tbb::task_arena arena(p); 303 arena.execute( 304 [&]() { 305 test_wait_count(); 306 } 307 ); 308 } 309 } 310 311 //! Test graph iterators 312 //! \brief interface 313 TEST_CASE("Test graph::iterator"){ 314 for(unsigned int p=utils::MinThread; p<=utils::MaxThread; ++p ) { 315 tbb::task_arena arena(p); 316 arena.execute( 317 [&]() { 318 test_iterator(); 319 } 320 ); 321 } 322 } 323 324 //! Test parallel for body 325 //! \brief \ref error_guessing 326 TEST_CASE("Test parallel"){ 327 for(unsigned int p=utils::MinThread; p<=utils::MaxThread; ++p ) { 328 tbb::task_arena arena(p); 329 arena.execute( 330 [&]() { 331 test_parallel(p); 332 } 333 ); 334 } 335 } 336 337 //! Test separate arena isn't used 338 //! \brief \ref error_guessing 339 TEST_CASE("Test graph_arena"){ 340 test_graph_arena(); 341 } 342 343 //! Graph iterator 344 //! \brief \ref error_guessing 345 TEST_CASE("graph iterator") { 346 using namespace tbb::flow; 347 348 graph g; 349 350 auto past_end = g.end(); 351 ++past_end; 352 353 continue_node<int> n(g, [](const continue_msg &){return 1;}); 354 355 size_t item_count = 0; 356 357 for(auto it = g.cbegin(); it != g.cend(); it++) 358 ++item_count; 359 CHECK_MESSAGE((item_count == 1), "Should find 1 item"); 360 361 item_count = 0; 362 auto jt(g.begin()); 363 for(; jt != g.end(); jt++) 364 ++item_count; 365 CHECK_MESSAGE((item_count == 1), "Should find 1 item"); 366 367 graph g2; 368 continue_node<int> n2(g, [](const continue_msg &){return 1;}); 369 CHECK_MESSAGE((g.begin() != g2.begin()), "Different graphs should have different iterators"); 370 } 371