1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 // TODO: Add overlapping put / receive tests 18 19 #include "common/config.h" 20 21 #include "tbb/flow_graph.h" 22 #include "tbb/global_control.h" 23 24 #include "common/test.h" 25 #include "common/utils.h" 26 #include "common/utils_assert.h" 27 #include "common/checktype.h" 28 #include "common/graph_utils.h" 29 #include "common/test_follows_and_precedes_api.h" 30 31 #include <cstdio> 32 33 34 //! \file test_priority_queue_node.cpp 35 //! \brief Test for [flow_graph.priority_queue_node] specification 36 37 38 #define N 10 39 #define C 10 40 41 template< typename T > 42 void spin_try_get( tbb::flow::priority_queue_node<T> &q, T &value ) { 43 while ( q.try_get(value) != true ) ; 44 } 45 46 template< typename T > 47 void check_item( T* next_value, T &value ) { 48 int tid = value / N; 49 int offset = value % N; 50 CHECK_MESSAGE( next_value[tid] == T(offset), "" ); 51 ++next_value[tid]; 52 } 53 54 template< typename T > 55 struct parallel_puts : utils::NoAssign { 56 tbb::flow::priority_queue_node<T> &my_q; 57 parallel_puts( tbb::flow::priority_queue_node<T> &q ) : my_q(q) {} 58 void operator()(int i) const { 59 for (int j = 0; j < N; ++j) { 60 bool msg = my_q.try_put( T(N*i + j) ); 61 CHECK_MESSAGE( msg == true, "" ); 62 } 63 } 64 }; 65 66 template< typename T > 67 struct parallel_gets : utils::NoAssign { 68 tbb::flow::priority_queue_node<T> &my_q; 69 parallel_gets( tbb::flow::priority_queue_node<T> &q) : my_q(q) {} 70 void operator()(int) const { 71 T prev; 72 spin_try_get( my_q, prev ); 73 for (int j = 0; j < N-1; ++j) { 74 T v; 75 spin_try_get( my_q, v ); 76 CHECK_MESSAGE(v < prev, ""); 77 } 78 } 79 }; 80 81 template< typename T > 82 struct parallel_put_get : utils::NoAssign { 83 tbb::flow::priority_queue_node<T> &my_q; 84 parallel_put_get( tbb::flow::priority_queue_node<T> &q ) : my_q(q) {} 85 void operator()(int tid) const { 86 for ( int i = 0; i < N; i+=C ) { 87 int j_end = ( N < i + C ) ? N : i + C; 88 // dump about C values into the Q 89 for ( int j = i; j < j_end; ++j ) { 90 CHECK_MESSAGE( my_q.try_put( T (N*tid + j ) ) == true, "" ); 91 } 92 // receive about C values from the Q 93 for ( int j = i; j < j_end; ++j ) { 94 T v; 95 spin_try_get( my_q, v ); 96 } 97 } 98 } 99 }; 100 101 // 102 // Tests 103 // 104 // Item can be reserved, released, consumed ( single serial receiver ) 105 // 106 template< typename T > 107 int test_reservation(int) { 108 tbb::flow::graph g; 109 110 // Simple tests 111 tbb::flow::priority_queue_node<T> q(g); 112 113 { 114 115 T bogus_value(-1); 116 117 q.try_put(T(1)); 118 q.try_put(T(2)); 119 q.try_put(T(3)); 120 g.wait_for_all(); 121 122 T v=bogus_value, w=bogus_value; 123 CHECK_MESSAGE( q.try_reserve(v) == true, "" ); 124 CHECK_MESSAGE( v == T(3), "" ); 125 CHECK_MESSAGE( q.try_release() == true, "" ); 126 v = bogus_value; 127 g.wait_for_all(); 128 CHECK_MESSAGE( q.try_reserve(v) == true, "" ); 129 CHECK_MESSAGE( v == T(3), "" ); 130 CHECK_MESSAGE( q.try_consume() == true, "" ); 131 v = bogus_value; 132 g.wait_for_all(); 133 134 CHECK_MESSAGE( q.try_get(v) == true, "" ); 135 CHECK_MESSAGE( v == T(2), "" ); 136 v = bogus_value; 137 g.wait_for_all(); 138 139 CHECK_MESSAGE( q.try_reserve(v) == true, "" ); 140 CHECK_MESSAGE( v == T(1), "" ); 141 CHECK_MESSAGE( q.try_reserve(w) == false, "" ); 142 CHECK_MESSAGE( w == bogus_value, "" ); 143 CHECK_MESSAGE( q.try_get(w) == false, "" ); 144 CHECK_MESSAGE( w == bogus_value, "" ); 145 CHECK_MESSAGE( q.try_release() == true, "" ); 146 v = bogus_value; 147 g.wait_for_all(); 148 CHECK_MESSAGE( q.try_reserve(v) == true, "" ); 149 CHECK_MESSAGE( v == T(1), "" ); 150 CHECK_MESSAGE( q.try_consume() == true, "" ); 151 v = bogus_value; 152 g.wait_for_all(); 153 CHECK_MESSAGE( q.try_get(v) == false, "" ); 154 } 155 return 0; 156 } 157 158 // 159 // Tests 160 // 161 // multiple parallel senders, items in FIFO (relatively to sender) order 162 // multiple parallel senders, multiple parallel receivers, items in FIFO order (relative to sender/receiver) and all items received 163 // * overlapped puts / gets 164 // * all puts finished before any getS 165 // 166 template< typename T > 167 int test_parallel(int num_threads) { 168 tbb::flow::graph g; 169 tbb::flow::priority_queue_node<T> q(g); 170 tbb::flow::priority_queue_node<T> q2(g); 171 tbb::flow::priority_queue_node<T> q3(g); 172 T bogus_value(-1); 173 T j = bogus_value; 174 175 NativeParallelFor( num_threads, parallel_puts<T>(q) ); 176 for (int i = num_threads*N -1; i>=0; --i) { 177 spin_try_get( q, j ); 178 CHECK_MESSAGE(j == i, ""); 179 j = bogus_value; 180 } 181 g.wait_for_all(); 182 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 183 CHECK_MESSAGE( j == bogus_value, "" ); 184 185 NativeParallelFor( num_threads, parallel_puts<T>(q) ); 186 g.wait_for_all(); 187 NativeParallelFor( num_threads, parallel_gets<T>(q) ); 188 g.wait_for_all(); 189 j = bogus_value; 190 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 191 CHECK_MESSAGE( j == bogus_value, "" ); 192 193 NativeParallelFor( num_threads, parallel_put_get<T>(q) ); 194 g.wait_for_all(); 195 j = bogus_value; 196 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 197 CHECK_MESSAGE( j == bogus_value, "" ); 198 199 tbb::flow::make_edge( q, q2 ); 200 tbb::flow::make_edge( q2, q3 ); 201 NativeParallelFor( num_threads, parallel_puts<T>(q) ); 202 g.wait_for_all(); 203 NativeParallelFor( num_threads, parallel_gets<T>(q3) ); 204 g.wait_for_all(); 205 j = bogus_value; 206 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 207 CHECK_MESSAGE( j == bogus_value, "" ); 208 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 209 CHECK_MESSAGE( j == bogus_value, "" ); 210 CHECK_MESSAGE( q3.try_get( j ) == false, "" ); 211 CHECK_MESSAGE( j == bogus_value, "" ); 212 213 // test copy constructor 214 CHECK_MESSAGE( remove_successor(q, q2) == true, "" ); 215 NativeParallelFor( num_threads, parallel_puts<T>(q) ); 216 tbb::flow::priority_queue_node<T> q_copy(q); 217 g.wait_for_all(); 218 j = bogus_value; 219 CHECK_MESSAGE( q_copy.try_get( j ) == false, "" ); 220 CHECK_MESSAGE( register_successor(q, q_copy) == true, "" ); 221 for (int i = num_threads*N -1; i>=0; --i) { 222 spin_try_get( q_copy, j ); 223 CHECK_MESSAGE(j == i, ""); 224 j = bogus_value; 225 } 226 g.wait_for_all(); 227 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 228 CHECK_MESSAGE( j == bogus_value, "" ); 229 CHECK_MESSAGE( q_copy.try_get( j ) == false, "" ); 230 CHECK_MESSAGE( j == bogus_value, "" ); 231 232 return 0; 233 } 234 235 // 236 // Tests 237 // 238 // Predecessors cannot be registered 239 // Empty Q rejects item requests 240 // Single serial sender, items in FIFO order 241 // Chained Qs ( 2 & 3 ), single sender, items at last Q in FIFO order 242 // 243 244 template< typename T > 245 int test_serial() { 246 tbb::flow::graph g; 247 T bogus_value(-1); 248 249 tbb::flow::priority_queue_node<T> q(g); 250 tbb::flow::priority_queue_node<T> q2(g); 251 T j = bogus_value; 252 253 // 254 // Rejects attempts to add / remove predecessor 255 // Rejects request from empty Q 256 // 257 CHECK_MESSAGE( register_predecessor(q, q2) == false, "" ); 258 CHECK_MESSAGE( remove_predecessor(q, q2) == false, "" ); 259 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 260 CHECK_MESSAGE( j == bogus_value, "" ); 261 262 // 263 // Simple puts and gets 264 // 265 266 for (int i = 0; i < N; ++i) 267 CHECK_MESSAGE( q.try_put( T(i) ), "" ); 268 for (int i = N-1; i >=0; --i) { 269 j = bogus_value; 270 spin_try_get( q, j ); 271 CHECK_MESSAGE( i == j, "" ); 272 } 273 j = bogus_value; 274 g.wait_for_all(); 275 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 276 CHECK_MESSAGE( j == bogus_value, "" ); 277 278 tbb::flow::make_edge( q, q2 ); 279 280 for (int i = 0; i < N; ++i) 281 CHECK_MESSAGE( q.try_put( T(i) ), "" ); 282 g.wait_for_all(); 283 for (int i = N-1; i >= 0; --i) { 284 j = bogus_value; 285 spin_try_get( q2, j ); 286 CHECK_MESSAGE( i == j, "" ); 287 } 288 j = bogus_value; 289 g.wait_for_all(); 290 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 291 g.wait_for_all(); 292 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 293 CHECK_MESSAGE( j == bogus_value, "" ); 294 295 tbb::flow::remove_edge( q, q2 ); 296 CHECK_MESSAGE( q.try_put( 1 ) == true, "" ); 297 g.wait_for_all(); 298 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 299 CHECK_MESSAGE( j == bogus_value, "" ); 300 g.wait_for_all(); 301 CHECK_MESSAGE( q.try_get( j ) == true, "" ); 302 CHECK_MESSAGE( j == 1, "" ); 303 304 tbb::flow::priority_queue_node<T> q3(g); 305 tbb::flow::make_edge( q, q2 ); 306 tbb::flow::make_edge( q2, q3 ); 307 308 for (int i = 0; i < N; ++i) 309 CHECK_MESSAGE( q.try_put( T(i) ), "" ); 310 g.wait_for_all(); 311 for (int i = N-1; i >= 0; --i) { 312 j = bogus_value; 313 spin_try_get( q3, j ); 314 CHECK_MESSAGE( i == j, "" ); 315 } 316 j = bogus_value; 317 g.wait_for_all(); 318 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 319 g.wait_for_all(); 320 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 321 g.wait_for_all(); 322 CHECK_MESSAGE( q3.try_get( j ) == false, "" ); 323 CHECK_MESSAGE( j == bogus_value, "" ); 324 325 tbb::flow::remove_edge( q, q2 ); 326 CHECK_MESSAGE( q.try_put( 1 ) == true, "" ); 327 g.wait_for_all(); 328 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 329 CHECK_MESSAGE( j == bogus_value, "" ); 330 g.wait_for_all(); 331 CHECK_MESSAGE( q3.try_get( j ) == false, "" ); 332 CHECK_MESSAGE( j == bogus_value, "" ); 333 g.wait_for_all(); 334 CHECK_MESSAGE( q.try_get( j ) == true, "" ); 335 CHECK_MESSAGE( j == 1, "" ); 336 337 return 0; 338 } 339 340 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 341 #include <array> 342 #include <vector> 343 void test_follows_and_precedes_api() { 344 std::array<int, 3> messages_for_follows = { {0, 1, 2} }; 345 std::vector<int> messages_for_precedes = {0, 1, 2}; 346 347 follows_and_precedes_testing::test_follows <int, tbb::flow::priority_queue_node<int>>(messages_for_follows); 348 follows_and_precedes_testing::test_precedes <int, tbb::flow::priority_queue_node<int>>(messages_for_precedes); 349 } 350 #endif 351 352 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 353 void test_deduction_guides() { 354 using namespace tbb::flow; 355 356 graph g; 357 broadcast_node<int> br(g); 358 priority_queue_node<int> pq0(g); 359 360 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 361 using compare_type = std::greater<void>; 362 priority_queue_node pq1(follows(br)); 363 static_assert(std::is_same_v<decltype(pq1), priority_queue_node<int>>); 364 365 priority_queue_node pq2(follows(br), compare_type()); 366 static_assert(std::is_same_v<decltype(pq2), priority_queue_node<int, compare_type>>); 367 368 priority_queue_node pq3(precedes(br)); 369 static_assert(std::is_same_v<decltype(pq3), priority_queue_node<int>>); 370 371 priority_queue_node pq4(precedes(br), compare_type()); 372 static_assert(std::is_same_v<decltype(pq4), priority_queue_node<int, compare_type>>); 373 #endif 374 375 priority_queue_node pq5(pq0); 376 static_assert(std::is_same_v<decltype(pq5), priority_queue_node<int>>); 377 g.wait_for_all(); 378 } 379 #endif 380 381 //! Test serial, parallel behavior and reservation under parallelism 382 //! \brief \ref requirement \ref error_guessing 383 TEST_CASE("Serial, parallel and reservation tests"){ 384 for (int p = 2; p <= 4; ++p) { 385 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, p); 386 tbb::task_arena arena(p); 387 arena.execute( 388 [&]() { 389 test_serial<int>(); 390 test_reservation<int>(p); 391 test_reservation<CheckType<int> >(p); 392 test_parallel<int>(p); 393 } 394 ); 395 } 396 } 397 398 //! Test reset and cancellation 399 //! \brief \ref error_guessing 400 TEST_CASE("Reset tests"){ 401 INFO("Testing resets\n"); 402 test_resets<int,tbb::flow::priority_queue_node<int> >(); 403 test_resets<float,tbb::flow::priority_queue_node<float> >(); 404 } 405 406 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 407 //! Test follows and precedes API 408 //! \brief \ref error_guessing 409 TEST_CASE("Test follows and precedes API"){ 410 test_follows_and_precedes_api(); 411 } 412 #endif 413 414 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 415 //! Test decution guides 416 //! \brief \ref requirement 417 TEST_CASE("Test deduction guides"){ 418 test_deduction_guides(); 419 } 420 #endif 421 422