1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #if __INTEL_COMPILER && _MSC_VER 18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated 19 #endif 20 21 #include "common/config.h" 22 23 // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these 24 // parts in all of tests might make testing of the product, which is different from what is actually 25 // released. 26 #define __TBB_EXTRA_DEBUG 1 27 #include "tbb/flow_graph.h" 28 29 #include "common/test.h" 30 #include "common/utils.h" 31 #include "common/graph_utils.h" 32 #include "common/spin_barrier.h" 33 34 35 //! \file test_flow_graph.cpp 36 //! \brief Test for [flow_graph.continue_msg flow_graph.graph_node flow_graph.input_port flow_graph.output_port flow_graph.join_node flow_graph.split_node flow_graph.limiter_node flow_graph.write_once_node flow_graph.overwrite_node flow_graph.make_edge flow_graph.graph flow_graph.buffer_node flow_graph.function_node flow_graph.multifunction_node flow_graph.continue_node flow_graph.input_node] specification 37 38 const int T = 4; 39 const int W = 4; 40 41 struct decrement_wait : utils::NoAssign { 42 43 tbb::flow::graph * const my_graph; 44 bool * const my_done_flag; 45 46 decrement_wait( tbb::flow::graph &h, bool *done_flag ) : my_graph(&h), my_done_flag(done_flag) {} 47 48 void operator()(int i) const { 49 utils::Sleep(10 * i); 50 51 my_done_flag[i] = true; 52 my_graph->release_wait(); 53 } 54 }; 55 56 static void test_wait_count() { 57 tbb::flow::graph h; 58 for (int i = 0; i < T; ++i ) { 59 bool done_flag[W]; 60 for (int j = 0; j < W; ++j ) { 61 for ( int w = 0; w < W; ++w ) done_flag[w] = false; 62 for ( int w = 0; w < j; ++w ) h.reserve_wait(); 63 64 utils::NativeParallelFor( j, decrement_wait(h, done_flag) ); 65 h.wait_for_all(); 66 for ( int w = 0; w < W; ++w ) { 67 if ( w < j ) CHECK_MESSAGE( done_flag[w] == true, "" ); 68 else CHECK_MESSAGE( done_flag[w] == false, "" ); 69 } 70 } 71 } 72 } 73 74 // Encapsulate object we want to store in vector (because contained type must have 75 // copy constructor and assignment operator 76 class my_int_buffer { 77 tbb::flow::buffer_node<int> *b; 78 tbb::flow::graph& my_graph; 79 public: 80 my_int_buffer(tbb::flow::graph &g) : my_graph(g) { b = new tbb::flow::buffer_node<int>(my_graph); } 81 my_int_buffer(const my_int_buffer& other) : my_graph(other.my_graph) { 82 b = new tbb::flow::buffer_node<int>(my_graph); 83 } 84 ~my_int_buffer() { delete b; } 85 my_int_buffer& operator=(const my_int_buffer& /*other*/) { 86 return *this; 87 } 88 }; 89 90 // test the graph iterator, delete nodes from graph, test again 91 void test_iterator() { 92 tbb::flow::graph g; 93 my_int_buffer a_buffer(g); 94 my_int_buffer b_buffer(g); 95 my_int_buffer c_buffer(g); 96 my_int_buffer *d_buffer = new my_int_buffer(g); 97 my_int_buffer e_buffer(g); 98 std::vector< my_int_buffer > my_buffer_vector(10, c_buffer); 99 100 int count = 0; 101 for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) { 102 count++; 103 } 104 CHECK_MESSAGE( (count==15), "error in iterator count"); 105 106 delete d_buffer; 107 108 count = 0; 109 for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) { 110 count++; 111 } 112 CHECK_MESSAGE( (count==14), "error in iterator count"); 113 114 my_buffer_vector.clear(); 115 116 count = 0; 117 for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) { 118 count++; 119 } 120 CHECK_MESSAGE( (count==4), "error in iterator count"); 121 } 122 123 class AddRemoveBody : utils::NoAssign { 124 tbb::flow::graph& g; 125 int nThreads; 126 utils::SpinBarrier &barrier; 127 public: 128 AddRemoveBody(int nthr, utils::SpinBarrier &barrier_, tbb::flow::graph& _g) : 129 g(_g), nThreads(nthr), barrier(barrier_) 130 {} 131 void operator()(const int /*threadID*/) const { 132 my_int_buffer b(g); 133 { 134 std::vector<my_int_buffer> my_buffer_vector(100, b); 135 barrier.wait(); // wait until all nodes are created 136 // now test that the proper number of nodes were created 137 int count = 0; 138 for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) { 139 count++; 140 } 141 CHECK_MESSAGE( (count==101*nThreads), "error in iterator count"); 142 barrier.wait(); // wait until all threads are done counting 143 } // all nodes but for the initial node on this thread are deleted 144 barrier.wait(); // wait until all threads have deleted all nodes in their vectors 145 // now test that all the nodes were deleted except for the initial node 146 int count = 0; 147 for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) { 148 count++; 149 } 150 CHECK_MESSAGE( (count==nThreads), "error in iterator count"); 151 barrier.wait(); // wait until all threads are done counting 152 } // initial node gets deleted 153 }; 154 155 void test_parallel(int nThreads) { 156 tbb::flow::graph g; 157 utils::SpinBarrier barrier(nThreads); 158 AddRemoveBody body(nThreads, barrier, g); 159 NativeParallelFor(nThreads, body); 160 } 161 162 /* 163 * Functors for graph arena spawn tests 164 */ 165 166 inline void check_arena(tbb::task_arena* midway_arena) { 167 CHECK_MESSAGE(midway_arena->max_concurrency() == 2, ""); 168 CHECK_MESSAGE(tbb::this_task_arena::max_concurrency() == 1, ""); 169 } 170 171 struct run_functor { 172 tbb::task_arena* midway_arena; 173 int return_value; 174 run_functor(tbb::task_arena* a) : midway_arena(a), return_value(1) {} 175 int operator()() { 176 check_arena(midway_arena); 177 return return_value; 178 } 179 }; 180 181 template < typename T > 182 struct function_body { 183 tbb::task_arena* midway_arena; 184 function_body(tbb::task_arena* a) : midway_arena(a) {} 185 tbb::flow::continue_msg operator()(const T& /*arg*/) { 186 check_arena(midway_arena); 187 return tbb::flow::continue_msg(); 188 } 189 }; 190 191 typedef tbb::flow::multifunction_node< int, std::tuple< int > > mf_node; 192 193 struct multifunction_body { 194 tbb::task_arena* midway_arena; 195 multifunction_body(tbb::task_arena* a) : midway_arena(a) {} 196 void operator()(const int& /*arg*/, mf_node::output_ports_type& /*outports*/) { 197 check_arena(midway_arena); 198 } 199 }; 200 201 struct input_body { 202 tbb::task_arena* midway_arena; 203 int counter; 204 input_body(tbb::task_arena* a) : midway_arena(a), counter(0) {} 205 int operator()(tbb::flow_control &fc) { 206 check_arena(midway_arena); 207 if (counter++ >= 1) { 208 fc.stop(); 209 } 210 return int(); 211 } 212 }; 213 214 struct nodes_test_functor : utils::NoAssign { 215 tbb::task_arena* midway_arena; 216 tbb::flow::graph& my_graph; 217 218 nodes_test_functor(tbb::task_arena* a, tbb::flow::graph& g) : midway_arena(a), my_graph(g) {} 219 void operator()() const { 220 221 // Define test nodes 222 // Continue, function, source nodes 223 tbb::flow::continue_node< tbb::flow::continue_msg > c_n(my_graph, function_body<tbb::flow::continue_msg>(midway_arena)); 224 tbb::flow::function_node< int > f_n(my_graph, tbb::flow::unlimited, function_body<int>(midway_arena)); 225 tbb::flow::input_node< int > s_n(my_graph, input_body(midway_arena)); 226 227 // Multifunction node 228 mf_node m_n(my_graph, tbb::flow::unlimited, multifunction_body(midway_arena)); 229 230 // Join node 231 tbb::flow::function_node< std::tuple< int, int > > join_f_n( 232 my_graph, tbb::flow::unlimited, function_body< std::tuple< int, int > >(midway_arena) 233 ); 234 tbb::flow::join_node< std::tuple< int, int > > j_n(my_graph); 235 make_edge(j_n, join_f_n); 236 237 // Split node 238 tbb::flow::function_node< int > split_f_n1 = f_n; 239 tbb::flow::function_node< int > split_f_n2 = f_n; 240 tbb::flow::split_node< std::tuple< int, int > > sp_n(my_graph); 241 make_edge(tbb::flow::output_port<0>(sp_n), split_f_n1); 242 make_edge(tbb::flow::output_port<1>(sp_n), split_f_n2); 243 244 // Overwrite node 245 tbb::flow::function_node< int > ow_f_n = f_n; 246 tbb::flow::overwrite_node< int > ow_n(my_graph); 247 make_edge(ow_n, ow_f_n); 248 249 // Write once node 250 tbb::flow::function_node< int > w_f_n = f_n; 251 tbb::flow::write_once_node< int > w_n(my_graph); 252 make_edge(w_n, w_f_n); 253 254 // Buffer node 255 tbb::flow::function_node< int > buf_f_n = f_n; 256 tbb::flow::buffer_node< int > buf_n(my_graph); 257 make_edge(w_n, buf_f_n); 258 259 // Limiter node 260 tbb::flow::function_node< int > l_f_n = f_n; 261 tbb::flow::limiter_node< int > l_n(my_graph, 1); 262 make_edge(l_n, l_f_n); 263 264 // Execute nodes 265 c_n.try_put( tbb::flow::continue_msg() ); 266 f_n.try_put(1); 267 m_n.try_put(1); 268 s_n.activate(); 269 270 tbb::flow::input_port<0>(j_n).try_put(1); 271 tbb::flow::input_port<1>(j_n).try_put(1); 272 273 std::tuple< int, int > sp_tuple(1, 1); 274 sp_n.try_put(sp_tuple); 275 276 ow_n.try_put(1); 277 w_n.try_put(1); 278 buf_n.try_put(1); 279 l_n.try_put(1); 280 281 my_graph.wait_for_all(); 282 } 283 }; 284 285 void test_graph_arena() { 286 // There is only one thread for execution (external thread). 287 // So, if graph's tasks get spawned in different arena 288 // external thread won't be able to find them in its own arena. 289 // In this case test should hang. 290 tbb::task_arena arena(1); 291 arena.execute( 292 [&]() { 293 tbb::flow::graph g; 294 tbb::task_arena midway_arena; 295 midway_arena.initialize(2); 296 midway_arena.execute(nodes_test_functor(&midway_arena, g)); 297 298 } 299 ); 300 } 301 302 //! Test wait counts 303 //! \brief error_guessing 304 TEST_CASE("Test wait_count"){ 305 for(unsigned int p=utils::MinThread; p<=utils::MaxThread; ++p ) { 306 tbb::task_arena arena(p); 307 arena.execute( 308 [&]() { 309 test_wait_count(); 310 } 311 ); 312 } 313 } 314 315 //! Test graph iterators 316 //! \brief interface 317 TEST_CASE("Test graph::iterator"){ 318 for(unsigned int p=utils::MinThread; p<=utils::MaxThread; ++p ) { 319 tbb::task_arena arena(p); 320 arena.execute( 321 [&]() { 322 test_iterator(); 323 } 324 ); 325 } 326 } 327 328 //! Test parallel for body 329 //! \brief \ref error_guessing 330 TEST_CASE("Test parallel"){ 331 for(unsigned int p=utils::MinThread; p<=utils::MaxThread; ++p ) { 332 tbb::task_arena arena(p); 333 arena.execute( 334 [&]() { 335 test_parallel(p); 336 } 337 ); 338 } 339 } 340 341 //! Test separate arena isn't used 342 //! \brief \ref error_guessing 343 TEST_CASE("Test graph_arena"){ 344 test_graph_arena(); 345 } 346 347 //! Graph iterator 348 //! \brief \ref error_guessing 349 TEST_CASE("graph iterator") { 350 using namespace tbb::flow; 351 352 graph g; 353 354 auto past_end = g.end(); 355 ++past_end; 356 357 continue_node<int> n(g, [](const continue_msg &){return 1;}); 358 359 size_t item_count = 0; 360 361 for(auto it = g.cbegin(); it != g.cend(); it++) 362 ++item_count; 363 CHECK_MESSAGE((item_count == 1), "Should find 1 item"); 364 365 item_count = 0; 366 auto jt(g.begin()); 367 for(; jt != g.end(); jt++) 368 ++item_count; 369 CHECK_MESSAGE((item_count == 1), "Should find 1 item"); 370 371 graph g2; 372 continue_node<int> n2(g, [](const continue_msg &){return 1;}); 373 CHECK_MESSAGE((g.begin() != g2.begin()), "Different graphs should have different iterators"); 374 } 375