1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 // TODO: Add overlapping put / receive tests 18 19 #include "common/config.h" 20 21 #include "tbb/flow_graph.h" 22 23 #include "common/test.h" 24 #include "common/utils.h" 25 #include "common/utils_assert.h" 26 #include "common/checktype.h" 27 #include "common/graph_utils.h" 28 #include "common/test_follows_and_precedes_api.h" 29 30 #include <cstdio> 31 32 33 //! \file test_priority_queue_node.cpp 34 //! \brief Test for [flow_graph.priority_queue_node] specification 35 36 37 #define N 10 38 #define C 10 39 40 template< typename T > 41 void spin_try_get( tbb::flow::priority_queue_node<T> &q, T &value ) { 42 while ( q.try_get(value) != true ) ; 43 } 44 45 template< typename T > 46 void check_item( T* next_value, T &value ) { 47 int tid = value / N; 48 int offset = value % N; 49 CHECK_MESSAGE( next_value[tid] == T(offset), "" ); 50 ++next_value[tid]; 51 } 52 53 template< typename T > 54 struct parallel_puts : utils::NoAssign { 55 tbb::flow::priority_queue_node<T> &my_q; 56 parallel_puts( tbb::flow::priority_queue_node<T> &q ) : my_q(q) {} 57 void operator()(int i) const { 58 for (int j = 0; j < N; ++j) { 59 bool msg = my_q.try_put( T(N*i + j) ); 60 CHECK_MESSAGE( msg == true, "" ); 61 } 62 } 63 }; 64 65 template< typename T > 66 struct parallel_gets : utils::NoAssign { 67 tbb::flow::priority_queue_node<T> &my_q; 68 parallel_gets( tbb::flow::priority_queue_node<T> &q) : my_q(q) {} 69 void operator()(int) const { 70 T prev; 71 spin_try_get( my_q, prev ); 72 for (int j = 0; j < N-1; ++j) { 73 T v; 74 spin_try_get( my_q, v ); 75 CHECK_MESSAGE(v < prev, ""); 76 } 77 } 78 }; 79 80 template< typename T > 81 struct parallel_put_get : utils::NoAssign { 82 tbb::flow::priority_queue_node<T> &my_q; 83 parallel_put_get( tbb::flow::priority_queue_node<T> &q ) : my_q(q) {} 84 void operator()(int tid) const { 85 for ( int i = 0; i < N; i+=C ) { 86 int j_end = ( N < i + C ) ? N : i + C; 87 // dump about C values into the Q 88 for ( int j = i; j < j_end; ++j ) { 89 CHECK_MESSAGE( my_q.try_put( T (N*tid + j ) ) == true, "" ); 90 } 91 // receive about C values from the Q 92 for ( int j = i; j < j_end; ++j ) { 93 T v; 94 spin_try_get( my_q, v ); 95 } 96 } 97 } 98 }; 99 100 // 101 // Tests 102 // 103 // Item can be reserved, released, consumed ( single serial receiver ) 104 // 105 template< typename T > 106 int test_reservation(int) { 107 tbb::flow::graph g; 108 109 // Simple tests 110 tbb::flow::priority_queue_node<T> q(g); 111 112 { 113 114 T bogus_value(-1); 115 116 q.try_put(T(1)); 117 q.try_put(T(2)); 118 q.try_put(T(3)); 119 g.wait_for_all(); 120 121 T v=bogus_value, w=bogus_value; 122 CHECK_MESSAGE( q.try_reserve(v) == true, "" ); 123 CHECK_MESSAGE( v == T(3), "" ); 124 CHECK_MESSAGE( q.try_release() == true, "" ); 125 v = bogus_value; 126 g.wait_for_all(); 127 CHECK_MESSAGE( q.try_reserve(v) == true, "" ); 128 CHECK_MESSAGE( v == T(3), "" ); 129 CHECK_MESSAGE( q.try_consume() == true, "" ); 130 v = bogus_value; 131 g.wait_for_all(); 132 133 CHECK_MESSAGE( q.try_get(v) == true, "" ); 134 CHECK_MESSAGE( v == T(2), "" ); 135 v = bogus_value; 136 g.wait_for_all(); 137 138 CHECK_MESSAGE( q.try_reserve(v) == true, "" ); 139 CHECK_MESSAGE( v == T(1), "" ); 140 CHECK_MESSAGE( q.try_reserve(w) == false, "" ); 141 CHECK_MESSAGE( w == bogus_value, "" ); 142 CHECK_MESSAGE( q.try_get(w) == false, "" ); 143 CHECK_MESSAGE( w == bogus_value, "" ); 144 CHECK_MESSAGE( q.try_release() == true, "" ); 145 v = bogus_value; 146 g.wait_for_all(); 147 CHECK_MESSAGE( q.try_reserve(v) == true, "" ); 148 CHECK_MESSAGE( v == T(1), "" ); 149 CHECK_MESSAGE( q.try_consume() == true, "" ); 150 v = bogus_value; 151 g.wait_for_all(); 152 CHECK_MESSAGE( q.try_get(v) == false, "" ); 153 } 154 return 0; 155 } 156 157 // 158 // Tests 159 // 160 // multiple parallel senders, items in FIFO (relatively to sender) order 161 // multiple parallel senders, multiple parallel receivers, items in FIFO order (relative to sender/receiver) and all items received 162 // * overlapped puts / gets 163 // * all puts finished before any getS 164 // 165 template< typename T > 166 int test_parallel(int num_threads) { 167 tbb::flow::graph g; 168 tbb::flow::priority_queue_node<T> q(g); 169 tbb::flow::priority_queue_node<T> q2(g); 170 tbb::flow::priority_queue_node<T> q3(g); 171 T bogus_value(-1); 172 T j = bogus_value; 173 174 NativeParallelFor( num_threads, parallel_puts<T>(q) ); 175 for (int i = num_threads*N -1; i>=0; --i) { 176 spin_try_get( q, j ); 177 CHECK_MESSAGE(j == i, ""); 178 j = bogus_value; 179 } 180 g.wait_for_all(); 181 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 182 CHECK_MESSAGE( j == bogus_value, "" ); 183 184 NativeParallelFor( num_threads, parallel_puts<T>(q) ); 185 g.wait_for_all(); 186 NativeParallelFor( num_threads, parallel_gets<T>(q) ); 187 g.wait_for_all(); 188 j = bogus_value; 189 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 190 CHECK_MESSAGE( j == bogus_value, "" ); 191 192 NativeParallelFor( num_threads, parallel_put_get<T>(q) ); 193 g.wait_for_all(); 194 j = bogus_value; 195 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 196 CHECK_MESSAGE( j == bogus_value, "" ); 197 198 tbb::flow::make_edge( q, q2 ); 199 tbb::flow::make_edge( q2, q3 ); 200 NativeParallelFor( num_threads, parallel_puts<T>(q) ); 201 g.wait_for_all(); 202 NativeParallelFor( num_threads, parallel_gets<T>(q3) ); 203 g.wait_for_all(); 204 j = bogus_value; 205 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 206 CHECK_MESSAGE( j == bogus_value, "" ); 207 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 208 CHECK_MESSAGE( j == bogus_value, "" ); 209 CHECK_MESSAGE( q3.try_get( j ) == false, "" ); 210 CHECK_MESSAGE( j == bogus_value, "" ); 211 212 // test copy constructor 213 CHECK_MESSAGE( remove_successor(q, q2) == true, "" ); 214 NativeParallelFor( num_threads, parallel_puts<T>(q) ); 215 tbb::flow::priority_queue_node<T> q_copy(q); 216 g.wait_for_all(); 217 j = bogus_value; 218 CHECK_MESSAGE( q_copy.try_get( j ) == false, "" ); 219 CHECK_MESSAGE( register_successor(q, q_copy) == true, "" ); 220 for (int i = num_threads*N -1; i>=0; --i) { 221 spin_try_get( q_copy, j ); 222 CHECK_MESSAGE(j == i, ""); 223 j = bogus_value; 224 } 225 g.wait_for_all(); 226 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 227 CHECK_MESSAGE( j == bogus_value, "" ); 228 CHECK_MESSAGE( q_copy.try_get( j ) == false, "" ); 229 CHECK_MESSAGE( j == bogus_value, "" ); 230 231 return 0; 232 } 233 234 // 235 // Tests 236 // 237 // Predecessors cannot be registered 238 // Empty Q rejects item requests 239 // Single serial sender, items in FIFO order 240 // Chained Qs ( 2 & 3 ), single sender, items at last Q in FIFO order 241 // 242 243 template< typename T > 244 int test_serial() { 245 tbb::flow::graph g; 246 T bogus_value(-1); 247 248 tbb::flow::priority_queue_node<T> q(g); 249 tbb::flow::priority_queue_node<T> q2(g); 250 T j = bogus_value; 251 252 // 253 // Rejects attempts to add / remove predecessor 254 // Rejects request from empty Q 255 // 256 CHECK_MESSAGE( register_predecessor(q, q2) == false, "" ); 257 CHECK_MESSAGE( remove_predecessor(q, q2) == false, "" ); 258 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 259 CHECK_MESSAGE( j == bogus_value, "" ); 260 261 // 262 // Simple puts and gets 263 // 264 265 for (int i = 0; i < N; ++i) 266 CHECK_MESSAGE( q.try_put( T(i) ), "" ); 267 for (int i = N-1; i >=0; --i) { 268 j = bogus_value; 269 spin_try_get( q, j ); 270 CHECK_MESSAGE( i == j, "" ); 271 } 272 j = bogus_value; 273 g.wait_for_all(); 274 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 275 CHECK_MESSAGE( j == bogus_value, "" ); 276 277 tbb::flow::make_edge( q, q2 ); 278 279 for (int i = 0; i < N; ++i) 280 CHECK_MESSAGE( q.try_put( T(i) ), "" ); 281 g.wait_for_all(); 282 for (int i = N-1; i >= 0; --i) { 283 j = bogus_value; 284 spin_try_get( q2, j ); 285 CHECK_MESSAGE( i == j, "" ); 286 } 287 j = bogus_value; 288 g.wait_for_all(); 289 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 290 g.wait_for_all(); 291 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 292 CHECK_MESSAGE( j == bogus_value, "" ); 293 294 tbb::flow::remove_edge( q, q2 ); 295 CHECK_MESSAGE( q.try_put( 1 ) == true, "" ); 296 g.wait_for_all(); 297 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 298 CHECK_MESSAGE( j == bogus_value, "" ); 299 g.wait_for_all(); 300 CHECK_MESSAGE( q.try_get( j ) == true, "" ); 301 CHECK_MESSAGE( j == 1, "" ); 302 303 tbb::flow::priority_queue_node<T> q3(g); 304 tbb::flow::make_edge( q, q2 ); 305 tbb::flow::make_edge( q2, q3 ); 306 307 for (int i = 0; i < N; ++i) 308 CHECK_MESSAGE( q.try_put( T(i) ), "" ); 309 g.wait_for_all(); 310 for (int i = N-1; i >= 0; --i) { 311 j = bogus_value; 312 spin_try_get( q3, j ); 313 CHECK_MESSAGE( i == j, "" ); 314 } 315 j = bogus_value; 316 g.wait_for_all(); 317 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 318 g.wait_for_all(); 319 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 320 g.wait_for_all(); 321 CHECK_MESSAGE( q3.try_get( j ) == false, "" ); 322 CHECK_MESSAGE( j == bogus_value, "" ); 323 324 tbb::flow::remove_edge( q, q2 ); 325 CHECK_MESSAGE( q.try_put( 1 ) == true, "" ); 326 g.wait_for_all(); 327 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 328 CHECK_MESSAGE( j == bogus_value, "" ); 329 g.wait_for_all(); 330 CHECK_MESSAGE( q3.try_get( j ) == false, "" ); 331 CHECK_MESSAGE( j == bogus_value, "" ); 332 g.wait_for_all(); 333 CHECK_MESSAGE( q.try_get( j ) == true, "" ); 334 CHECK_MESSAGE( j == 1, "" ); 335 336 return 0; 337 } 338 339 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 340 #include <array> 341 #include <vector> 342 void test_follows_and_precedes_api() { 343 std::array<int, 3> messages_for_follows = { {0, 1, 2} }; 344 std::vector<int> messages_for_precedes = {0, 1, 2}; 345 346 follows_and_precedes_testing::test_follows <int, tbb::flow::priority_queue_node<int>>(messages_for_follows); 347 follows_and_precedes_testing::test_precedes <int, tbb::flow::priority_queue_node<int>>(messages_for_precedes); 348 } 349 #endif 350 351 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 352 void test_deduction_guides() { 353 using namespace tbb::flow; 354 355 graph g; 356 broadcast_node<int> br(g); 357 priority_queue_node<int> pq0(g); 358 359 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 360 using compare_type = std::greater<void>; 361 priority_queue_node pq1(follows(br)); 362 static_assert(std::is_same_v<decltype(pq1), priority_queue_node<int>>); 363 364 priority_queue_node pq2(follows(br), compare_type()); 365 static_assert(std::is_same_v<decltype(pq2), priority_queue_node<int, compare_type>>); 366 367 priority_queue_node pq3(precedes(br)); 368 static_assert(std::is_same_v<decltype(pq3), priority_queue_node<int>>); 369 370 priority_queue_node pq4(precedes(br), compare_type()); 371 static_assert(std::is_same_v<decltype(pq4), priority_queue_node<int, compare_type>>); 372 #endif 373 374 priority_queue_node pq5(pq0); 375 static_assert(std::is_same_v<decltype(pq5), priority_queue_node<int>>); 376 g.wait_for_all(); 377 } 378 #endif 379 380 //! Test serial, parallel behavior and reservation under parallelism 381 //! \brief \ref requirement \ref error_guessing 382 TEST_CASE("Serial, parallel and reservation tests"){ 383 for (int p = 2; p <= 4; ++p) { 384 tbb::task_arena arena(p); 385 arena.execute( 386 [&]() { 387 test_serial<int>(); 388 test_reservation<int>(p); 389 test_reservation<CheckType<int> >(p); 390 test_parallel<int>(p); 391 } 392 ); 393 } 394 } 395 396 //! Test reset and cancellation 397 //! \brief \ref error_guessing 398 TEST_CASE("Reset tests"){ 399 INFO("Testing resets\n"); 400 test_resets<int,tbb::flow::priority_queue_node<int> >(); 401 test_resets<float,tbb::flow::priority_queue_node<float> >(); 402 } 403 404 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 405 //! Test follows and precedes API 406 //! \brief \ref error_guessing 407 TEST_CASE("Test follows and precedes API"){ 408 test_follows_and_precedes_api(); 409 } 410 #endif 411 412 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 413 //! Test decution guides 414 //! \brief \ref requirement 415 TEST_CASE("Test deduction guides"){ 416 test_deduction_guides(); 417 } 418 #endif 419 420