1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 // TODO: Add overlapping put / receive tests 18 19 #include "common/config.h" 20 21 // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these 22 // parts in all of tests might make testing of the product, which is different from what is actually 23 // released. 24 #define __TBB_EXTRA_DEBUG 1 25 #include "tbb/flow_graph.h" 26 27 #include "common/test.h" 28 #include "common/utils.h" 29 #include "common/utils_assert.h" 30 #include "common/checktype.h" 31 #include "common/graph_utils.h" 32 #include "common/test_follows_and_precedes_api.h" 33 34 #include <cstdio> 35 36 37 //! \file test_priority_queue_node.cpp 38 //! \brief Test for [flow_graph.priority_queue_node] specification 39 40 41 #define N 10 42 #define C 10 43 44 template< typename T > 45 void spin_try_get( tbb::flow::priority_queue_node<T> &q, T &value ) { 46 while ( q.try_get(value) != true ) ; 47 } 48 49 template< typename T > 50 void check_item( T* next_value, T &value ) { 51 int tid = value / N; 52 int offset = value % N; 53 CHECK_MESSAGE( next_value[tid] == T(offset), "" ); 54 ++next_value[tid]; 55 } 56 57 template< typename T > 58 struct parallel_puts : utils::NoAssign { 59 tbb::flow::priority_queue_node<T> &my_q; 60 parallel_puts( tbb::flow::priority_queue_node<T> &q ) : my_q(q) {} 61 void operator()(int i) const { 62 for (int j = 0; j < N; ++j) { 63 bool msg = my_q.try_put( T(N*i + j) ); 64 CHECK_MESSAGE( msg == true, "" ); 65 } 66 } 67 }; 68 69 template< typename T > 70 struct parallel_gets : utils::NoAssign { 71 tbb::flow::priority_queue_node<T> &my_q; 72 parallel_gets( tbb::flow::priority_queue_node<T> &q) : my_q(q) {} 73 void operator()(int) const { 74 T prev; 75 spin_try_get( my_q, prev ); 76 for (int j = 0; j < N-1; ++j) { 77 T v; 78 spin_try_get( my_q, v ); 79 CHECK_MESSAGE(v < prev, ""); 80 } 81 } 82 }; 83 84 template< typename T > 85 struct parallel_put_get : utils::NoAssign { 86 tbb::flow::priority_queue_node<T> &my_q; 87 parallel_put_get( tbb::flow::priority_queue_node<T> &q ) : my_q(q) {} 88 void operator()(int tid) const { 89 for ( int i = 0; i < N; i+=C ) { 90 int j_end = ( N < i + C ) ? N : i + C; 91 // dump about C values into the Q 92 for ( int j = i; j < j_end; ++j ) { 93 CHECK_MESSAGE( my_q.try_put( T (N*tid + j ) ) == true, "" ); 94 } 95 // receive about C values from the Q 96 for ( int j = i; j < j_end; ++j ) { 97 T v; 98 spin_try_get( my_q, v ); 99 } 100 } 101 } 102 }; 103 104 // 105 // Tests 106 // 107 // Item can be reserved, released, consumed ( single serial receiver ) 108 // 109 template< typename T > 110 int test_reservation(int) { 111 tbb::flow::graph g; 112 113 // Simple tests 114 tbb::flow::priority_queue_node<T> q(g); 115 116 { 117 118 T bogus_value(-1); 119 120 q.try_put(T(1)); 121 q.try_put(T(2)); 122 q.try_put(T(3)); 123 g.wait_for_all(); 124 125 T v=bogus_value, w=bogus_value; 126 CHECK_MESSAGE( q.try_reserve(v) == true, "" ); 127 CHECK_MESSAGE( v == T(3), "" ); 128 CHECK_MESSAGE( q.try_release() == true, "" ); 129 v = bogus_value; 130 g.wait_for_all(); 131 CHECK_MESSAGE( q.try_reserve(v) == true, "" ); 132 CHECK_MESSAGE( v == T(3), "" ); 133 CHECK_MESSAGE( q.try_consume() == true, "" ); 134 v = bogus_value; 135 g.wait_for_all(); 136 137 CHECK_MESSAGE( q.try_get(v) == true, "" ); 138 CHECK_MESSAGE( v == T(2), "" ); 139 v = bogus_value; 140 g.wait_for_all(); 141 142 CHECK_MESSAGE( q.try_reserve(v) == true, "" ); 143 CHECK_MESSAGE( v == T(1), "" ); 144 CHECK_MESSAGE( q.try_reserve(w) == false, "" ); 145 CHECK_MESSAGE( w == bogus_value, "" ); 146 CHECK_MESSAGE( q.try_get(w) == false, "" ); 147 CHECK_MESSAGE( w == bogus_value, "" ); 148 CHECK_MESSAGE( q.try_release() == true, "" ); 149 v = bogus_value; 150 g.wait_for_all(); 151 CHECK_MESSAGE( q.try_reserve(v) == true, "" ); 152 CHECK_MESSAGE( v == T(1), "" ); 153 CHECK_MESSAGE( q.try_consume() == true, "" ); 154 v = bogus_value; 155 g.wait_for_all(); 156 CHECK_MESSAGE( q.try_get(v) == false, "" ); 157 } 158 return 0; 159 } 160 161 // 162 // Tests 163 // 164 // multiple parallel senders, items in FIFO (relatively to sender) order 165 // multiple parallel senders, multiple parallel receivers, items in FIFO order (relative to sender/receiver) and all items received 166 // * overlapped puts / gets 167 // * all puts finished before any getS 168 // 169 template< typename T > 170 int test_parallel(int num_threads) { 171 tbb::flow::graph g; 172 tbb::flow::priority_queue_node<T> q(g); 173 tbb::flow::priority_queue_node<T> q2(g); 174 tbb::flow::priority_queue_node<T> q3(g); 175 T bogus_value(-1); 176 T j = bogus_value; 177 178 NativeParallelFor( num_threads, parallel_puts<T>(q) ); 179 for (int i = num_threads*N -1; i>=0; --i) { 180 spin_try_get( q, j ); 181 CHECK_MESSAGE(j == i, ""); 182 j = bogus_value; 183 } 184 g.wait_for_all(); 185 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 186 CHECK_MESSAGE( j == bogus_value, "" ); 187 188 NativeParallelFor( num_threads, parallel_puts<T>(q) ); 189 g.wait_for_all(); 190 NativeParallelFor( num_threads, parallel_gets<T>(q) ); 191 g.wait_for_all(); 192 j = bogus_value; 193 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 194 CHECK_MESSAGE( j == bogus_value, "" ); 195 196 NativeParallelFor( num_threads, parallel_put_get<T>(q) ); 197 g.wait_for_all(); 198 j = bogus_value; 199 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 200 CHECK_MESSAGE( j == bogus_value, "" ); 201 202 tbb::flow::make_edge( q, q2 ); 203 tbb::flow::make_edge( q2, q3 ); 204 NativeParallelFor( num_threads, parallel_puts<T>(q) ); 205 g.wait_for_all(); 206 NativeParallelFor( num_threads, parallel_gets<T>(q3) ); 207 g.wait_for_all(); 208 j = bogus_value; 209 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 210 CHECK_MESSAGE( j == bogus_value, "" ); 211 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 212 CHECK_MESSAGE( j == bogus_value, "" ); 213 CHECK_MESSAGE( q3.try_get( j ) == false, "" ); 214 CHECK_MESSAGE( j == bogus_value, "" ); 215 216 // test copy constructor 217 CHECK_MESSAGE( remove_successor(q, q2) == true, "" ); 218 NativeParallelFor( num_threads, parallel_puts<T>(q) ); 219 tbb::flow::priority_queue_node<T> q_copy(q); 220 g.wait_for_all(); 221 j = bogus_value; 222 CHECK_MESSAGE( q_copy.try_get( j ) == false, "" ); 223 CHECK_MESSAGE( register_successor(q, q_copy) == true, "" ); 224 for (int i = num_threads*N -1; i>=0; --i) { 225 spin_try_get( q_copy, j ); 226 CHECK_MESSAGE(j == i, ""); 227 j = bogus_value; 228 } 229 g.wait_for_all(); 230 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 231 CHECK_MESSAGE( j == bogus_value, "" ); 232 CHECK_MESSAGE( q_copy.try_get( j ) == false, "" ); 233 CHECK_MESSAGE( j == bogus_value, "" ); 234 235 return 0; 236 } 237 238 // 239 // Tests 240 // 241 // Predecessors cannot be registered 242 // Empty Q rejects item requests 243 // Single serial sender, items in FIFO order 244 // Chained Qs ( 2 & 3 ), single sender, items at last Q in FIFO order 245 // 246 247 template< typename T > 248 int test_serial() { 249 tbb::flow::graph g; 250 T bogus_value(-1); 251 252 tbb::flow::priority_queue_node<T> q(g); 253 tbb::flow::priority_queue_node<T> q2(g); 254 T j = bogus_value; 255 256 // 257 // Rejects attempts to add / remove predecessor 258 // Rejects request from empty Q 259 // 260 CHECK_MESSAGE( register_predecessor(q, q2) == false, "" ); 261 CHECK_MESSAGE( remove_predecessor(q, q2) == false, "" ); 262 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 263 CHECK_MESSAGE( j == bogus_value, "" ); 264 265 // 266 // Simple puts and gets 267 // 268 269 for (int i = 0; i < N; ++i) 270 CHECK_MESSAGE( q.try_put( T(i) ), "" ); 271 for (int i = N-1; i >=0; --i) { 272 j = bogus_value; 273 spin_try_get( q, j ); 274 CHECK_MESSAGE( i == j, "" ); 275 } 276 j = bogus_value; 277 g.wait_for_all(); 278 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 279 CHECK_MESSAGE( j == bogus_value, "" ); 280 281 tbb::flow::make_edge( q, q2 ); 282 283 for (int i = 0; i < N; ++i) 284 CHECK_MESSAGE( q.try_put( T(i) ), "" ); 285 g.wait_for_all(); 286 for (int i = N-1; i >= 0; --i) { 287 j = bogus_value; 288 spin_try_get( q2, j ); 289 CHECK_MESSAGE( i == j, "" ); 290 } 291 j = bogus_value; 292 g.wait_for_all(); 293 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 294 g.wait_for_all(); 295 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 296 CHECK_MESSAGE( j == bogus_value, "" ); 297 298 tbb::flow::remove_edge( q, q2 ); 299 CHECK_MESSAGE( q.try_put( 1 ) == true, "" ); 300 g.wait_for_all(); 301 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 302 CHECK_MESSAGE( j == bogus_value, "" ); 303 g.wait_for_all(); 304 CHECK_MESSAGE( q.try_get( j ) == true, "" ); 305 CHECK_MESSAGE( j == 1, "" ); 306 307 tbb::flow::priority_queue_node<T> q3(g); 308 tbb::flow::make_edge( q, q2 ); 309 tbb::flow::make_edge( q2, q3 ); 310 311 for (int i = 0; i < N; ++i) 312 CHECK_MESSAGE( q.try_put( T(i) ), "" ); 313 g.wait_for_all(); 314 for (int i = N-1; i >= 0; --i) { 315 j = bogus_value; 316 spin_try_get( q3, j ); 317 CHECK_MESSAGE( i == j, "" ); 318 } 319 j = bogus_value; 320 g.wait_for_all(); 321 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 322 g.wait_for_all(); 323 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 324 g.wait_for_all(); 325 CHECK_MESSAGE( q3.try_get( j ) == false, "" ); 326 CHECK_MESSAGE( j == bogus_value, "" ); 327 328 tbb::flow::remove_edge( q, q2 ); 329 CHECK_MESSAGE( q.try_put( 1 ) == true, "" ); 330 g.wait_for_all(); 331 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 332 CHECK_MESSAGE( j == bogus_value, "" ); 333 g.wait_for_all(); 334 CHECK_MESSAGE( q3.try_get( j ) == false, "" ); 335 CHECK_MESSAGE( j == bogus_value, "" ); 336 g.wait_for_all(); 337 CHECK_MESSAGE( q.try_get( j ) == true, "" ); 338 CHECK_MESSAGE( j == 1, "" ); 339 340 return 0; 341 } 342 343 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 344 #include <array> 345 #include <vector> 346 void test_follows_and_precedes_api() { 347 std::array<int, 3> messages_for_follows = { {0, 1, 2} }; 348 std::vector<int> messages_for_precedes = {0, 1, 2}; 349 350 follows_and_precedes_testing::test_follows <int, tbb::flow::priority_queue_node<int>>(messages_for_follows); 351 follows_and_precedes_testing::test_precedes <int, tbb::flow::priority_queue_node<int>>(messages_for_precedes); 352 } 353 #endif 354 355 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 356 void test_deduction_guides() { 357 using namespace tbb::flow; 358 359 graph g; 360 broadcast_node<int> br(g); 361 priority_queue_node<int> pq0(g); 362 363 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 364 using compare_type = std::greater<void>; 365 priority_queue_node pq1(follows(br)); 366 static_assert(std::is_same_v<decltype(pq1), priority_queue_node<int>>); 367 368 priority_queue_node pq2(follows(br), compare_type()); 369 static_assert(std::is_same_v<decltype(pq2), priority_queue_node<int, compare_type>>); 370 371 priority_queue_node pq3(precedes(br)); 372 static_assert(std::is_same_v<decltype(pq3), priority_queue_node<int>>); 373 374 priority_queue_node pq4(precedes(br), compare_type()); 375 static_assert(std::is_same_v<decltype(pq4), priority_queue_node<int, compare_type>>); 376 #endif 377 378 priority_queue_node pq5(pq0); 379 static_assert(std::is_same_v<decltype(pq5), priority_queue_node<int>>); 380 g.wait_for_all(); 381 } 382 #endif 383 384 //! Test serial, parallel behavior and reservation under parallelism 385 //! \brief \ref requirement \ref error_guessing 386 TEST_CASE("Serial, parallel and reservation tests"){ 387 for (int p = 2; p <= 4; ++p) { 388 tbb::task_arena arena(p); 389 arena.execute( 390 [&]() { 391 test_serial<int>(); 392 test_reservation<int>(p); 393 test_reservation<CheckType<int> >(p); 394 test_parallel<int>(p); 395 } 396 ); 397 } 398 } 399 400 //! Test reset and cancellation 401 //! \brief \ref error_guessing 402 TEST_CASE("Reset tests"){ 403 INFO("Testing resets\n"); 404 test_resets<int,tbb::flow::priority_queue_node<int> >(); 405 test_resets<float,tbb::flow::priority_queue_node<float> >(); 406 } 407 408 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 409 //! Test follows and precedes API 410 //! \brief \ref error_guessing 411 TEST_CASE("Test follows and precedes API"){ 412 test_follows_and_precedes_api(); 413 } 414 #endif 415 416 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 417 //! Test decution guides 418 //! \brief \ref requirement 419 TEST_CASE("Test deduction guides"){ 420 test_deduction_guides(); 421 } 422 #endif 423 424