1 /* 2 Copyright (c) 2005-2020 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 /* Example program that computes Fibonacci numbers in different ways. 18 Arguments are: [ Number [Threads [Repeats]]] 19 The defaults are Number=500 Threads=1:4 Repeats=1. 20 21 The point of this program is to check that the library is working properly. 22 Most of the computations are deliberately silly and not expected to 23 show any speedup on multiprocessors. 24 */ 25 26 // enable assertions 27 #ifdef NDEBUG 28 #undef NDEBUG 29 #endif 30 31 #include <cstdio> 32 #include <cstdlib> 33 #include <cassert> 34 35 #include <utility> 36 #include <thread> 37 #include <atomic> 38 #include <mutex> 39 40 #include "oneapi/tbb/tick_count.h" 41 #include "oneapi/tbb/blocked_range.h" 42 #include "oneapi/tbb/concurrent_vector.h" 43 #include "oneapi/tbb/concurrent_queue.h" 44 #include "oneapi/tbb/concurrent_hash_map.h" 45 #include "oneapi/tbb/parallel_for.h" 46 #include "oneapi/tbb/parallel_reduce.h" 47 #include "oneapi/tbb/parallel_scan.h" 48 #include "oneapi/tbb/parallel_pipeline.h" 49 #include "oneapi/tbb/spin_mutex.h" 50 #include "oneapi/tbb/queuing_mutex.h" 51 #include "oneapi/tbb/global_control.h" 52 53 //! type used for Fibonacci number computations 54 typedef long long value; 55 56 //! Matrix 2x2 class 57 struct Matrix2x2 { 58 //! Array of values 59 value v[2][2]; 60 Matrix2x2() {} 61 Matrix2x2(value v00, value v01, value v10, value v11) { 62 v[0][0] = v00; 63 v[0][1] = v01; 64 v[1][0] = v10; 65 v[1][1] = v11; 66 } 67 Matrix2x2 operator*(const Matrix2x2 &to) const; //< Multiply two Matrices 68 }; 69 //! Identity matrix 70 static const Matrix2x2 MatrixIdentity(1, 0, 0, 1); 71 //! Default matrix to multiply 72 static const Matrix2x2 Matrix1110(1, 1, 1, 0); 73 //! Raw arrays matrices multiply 74 void Matrix2x2Multiply(const value a[2][2], const value b[2][2], value c[2][2]); 75 76 /////////////////////// Serial methods //////////////////////// 77 78 //! Plain serial sum 79 value SerialFib(int n) { 80 if (n < 2) 81 return n; 82 value a = 0, b = 1, sum; 83 int i; 84 for (i = 2; i <= n; i++) { // n is really index of Fibonacci number 85 sum = a + b; 86 a = b; 87 b = sum; 88 } 89 return sum; 90 } 91 //! Serial n-1 matrices multiplication 92 value SerialMatrixFib(int n) { 93 value c[2][2], a[2][2] = { { 1, 1 }, { 1, 0 } }, b[2][2] = { { 1, 1 }, { 1, 0 } }; 94 int i; 95 for (i = 2; i < n; i++) { // Using condition to prevent copying of values 96 if (i & 1) 97 Matrix2x2Multiply(a, c, b); 98 else 99 Matrix2x2Multiply(a, b, c); 100 } 101 return (i & 1) ? c[0][0] : b[0][0]; // get result from upper left cell 102 } 103 //! Recursive summing. Just for complete list of serial algorithms, not used 104 value SerialRecursiveFib(int n) { 105 value result; 106 if (n < 2) 107 result = n; 108 else 109 result = SerialRecursiveFib(n - 1) + SerialRecursiveFib(n - 2); 110 return result; 111 } 112 //! Introducing of queue method in serial 113 value SerialQueueFib(int n) { 114 oneapi::tbb::concurrent_queue<Matrix2x2> Q; 115 for (int i = 1; i < n; i++) 116 Q.push(Matrix1110); 117 Matrix2x2 A, B; 118 while (true) { 119 while (!Q.try_pop(A)) 120 std::this_thread::yield(); 121 if (Q.empty()) 122 break; 123 while (!Q.try_pop(B)) 124 std::this_thread::yield(); 125 Q.push(A * B); 126 } 127 return A.v[0][0]; 128 } 129 //! Trying to use concurrent_vector 130 value SerialVectorFib(int n) { 131 oneapi::tbb::concurrent_vector<value> A; 132 A.grow_by(2); 133 A[0] = 0; 134 A[1] = 1; 135 for (int i = 2; i <= n; i++) { 136 A.grow_to_at_least(i + 1); 137 A[i] = A[i - 1] + A[i - 2]; 138 } 139 return A[n]; 140 } 141 142 ///////////////////// Parallel methods //////////////////////// 143 144 // *** Serial shared by mutexes *** // 145 146 //! Shared glabals 147 value SharedA = 0, SharedB = 1; 148 int SharedI = 1, SharedN; 149 150 //! Template task class which computes Fibonacci numbers with shared globals 151 template <typename M> 152 class SharedSerialFibBody { 153 M &mutex; 154 155 public: 156 SharedSerialFibBody(M &m) : mutex(m) {} 157 //! main loop 158 void operator()(const oneapi::tbb::blocked_range<int> &range) const { 159 for (;;) { 160 typename M::scoped_lock lock(mutex); 161 if (SharedI >= SharedN) 162 break; 163 value sum = SharedA + SharedB; 164 SharedA = SharedB; 165 SharedB = sum; 166 ++SharedI; 167 } 168 } 169 }; 170 171 template <> 172 void SharedSerialFibBody<std::mutex>::operator()( 173 const oneapi::tbb::blocked_range<int> &range) const { 174 for (;;) { 175 std::lock_guard<std::mutex> lock(mutex); 176 if (SharedI >= SharedN) 177 break; 178 value sum = SharedA + SharedB; 179 SharedA = SharedB; 180 SharedB = sum; 181 ++SharedI; 182 } 183 } 184 185 //! Root function 186 template <class M> 187 value SharedSerialFib(int n) { 188 SharedA = 0; 189 SharedB = 1; 190 SharedI = 1; 191 SharedN = n; 192 M mutex; 193 parallel_for(oneapi::tbb::blocked_range<int>(0, 4, 1), SharedSerialFibBody<M>(mutex)); 194 return SharedB; 195 } 196 197 // *** Serial shared by concurrent hash map *** // 198 199 //! Hash comparer 200 struct IntHashCompare { 201 bool equal(const int j, const int k) const { 202 return j == k; 203 } 204 unsigned long hash(const int k) const { 205 return (unsigned long)k; 206 } 207 }; 208 //! NumbersTable type based on concurrent_hash_map 209 typedef oneapi::tbb::concurrent_hash_map<int, value, IntHashCompare> NumbersTable; 210 //! task for serial method using shared concurrent_hash_map 211 class ConcurrentHashSerialFibTask { 212 NumbersTable &Fib; 213 int my_n; 214 215 public: 216 //! constructor 217 ConcurrentHashSerialFibTask(NumbersTable &cht, int n) : Fib(cht), my_n(n) {} 218 //! executing task 219 void operator()() const { 220 for (int i = 2; i <= my_n; ++i) { // there is no difference in to recycle or to make loop 221 NumbersTable::const_accessor f1, f2; // same as iterators 222 if (!Fib.find(f1, i - 1) || !Fib.find(f2, i - 2)) { 223 // Something is seriously wrong, because i-1 and i-2 must have been inserted 224 // earlier by this thread or another thread. 225 assert(0); 226 } 227 value sum = f1->second + f2->second; 228 NumbersTable::const_accessor fsum; 229 Fib.insert(fsum, std::make_pair(i, sum)); // inserting 230 assert(fsum->second == sum); // check value 231 } 232 } 233 }; 234 235 //! Root function 236 value ConcurrentHashSerialFib(int n) { 237 NumbersTable Fib; 238 bool okay; 239 okay = Fib.insert(std::make_pair(0, 0)); 240 assert(okay); // assign initial values 241 okay = Fib.insert(std::make_pair(1, 1)); 242 assert(okay); 243 244 // task_list list; 245 oneapi::tbb::task_group tg; 246 // allocate tasks 247 tg.run(ConcurrentHashSerialFibTask(Fib, n)); 248 tg.run(ConcurrentHashSerialFibTask(Fib, n)); 249 tg.wait(); 250 NumbersTable::const_accessor fresult; 251 okay = Fib.find(fresult, n); 252 assert(okay); 253 return fresult->second; 254 } 255 256 // *** Queue with parallel_pipeline *** // 257 258 typedef oneapi::tbb::concurrent_queue<Matrix2x2> queue_t; 259 namespace parallel_pipeline_ns { 260 std::atomic<int> N; //< index of Fibonacci number minus 1 261 queue_t Queue; 262 } // namespace parallel_pipeline_ns 263 264 //! functor to fills queue 265 struct InputFunc { 266 InputFunc() {} 267 queue_t *operator()(oneapi::tbb::flow_control &fc) const { 268 using namespace parallel_pipeline_ns; 269 270 int n = --N; 271 if (n <= 0) { 272 fc.stop(); 273 return nullptr; 274 } 275 Queue.push(Matrix1110); 276 return &Queue; 277 } 278 }; 279 //! functor to process queue 280 struct MultiplyFunc { 281 MultiplyFunc() {} 282 void operator()(queue_t *queue) const { 283 //concurrent_queue<Matrix2x2> &Queue = *static_cast<concurrent_queue<Matrix2x2> *>(p); 284 Matrix2x2 m1, m2; 285 // get two elements 286 while (!queue->try_pop(m1)) 287 std::this_thread::yield(); 288 while (!queue->try_pop(m2)) 289 std::this_thread::yield(); 290 m1 = m1 * m2; // process them 291 queue->push(m1); // and push back 292 } 293 }; 294 //! Root function 295 value ParallelPipeFib(int n) { 296 using namespace parallel_pipeline_ns; 297 298 N = n - 1; 299 Queue.push(Matrix1110); 300 301 oneapi::tbb::parallel_pipeline( 302 n, 303 oneapi::tbb::make_filter<void, queue_t *>(oneapi::tbb::filter_mode::parallel, InputFunc()) & 304 oneapi::tbb::make_filter<queue_t *, void>(oneapi::tbb::filter_mode::parallel, 305 MultiplyFunc())); 306 307 assert(Queue.unsafe_size() == 1); 308 Matrix2x2 M; 309 bool result = Queue.try_pop(M); // get last element 310 assert(result); 311 value res = M.v[0][0]; // get value 312 Queue.clear(); 313 return res; 314 } 315 316 // *** parallel_reduce *** // 317 318 //! Functor for parallel_reduce 319 struct parallel_reduceFibBody { 320 Matrix2x2 sum; 321 int split_flag; //< flag to make one less operation for split bodies 322 //! Constructor fills sum with initial matrix 323 parallel_reduceFibBody() : sum(Matrix1110), split_flag(0) {} 324 //! Splitting constructor 325 parallel_reduceFibBody(parallel_reduceFibBody &other, oneapi::tbb::split) 326 : sum(Matrix1110), 327 split_flag(1 /*note that it is split*/) {} 328 //! Join point 329 void join(parallel_reduceFibBody &s) { 330 sum = sum * s.sum; 331 } 332 //! Process multiplications 333 void operator()(const oneapi::tbb::blocked_range<int> &r) { 334 for (int k = r.begin() + split_flag; k < r.end(); ++k) 335 sum = sum * Matrix1110; 336 split_flag = 0; // reset flag, because this method can be reused for next range 337 } 338 }; 339 //! Root function 340 value parallel_reduceFib(int n) { 341 parallel_reduceFibBody b; 342 oneapi::tbb::parallel_reduce(oneapi::tbb::blocked_range<int>(2, n, 3), 343 b); // do parallel reduce on range [2, n) for b 344 return b.sum.v[0][0]; 345 } 346 347 // *** parallel_scan *** // 348 349 //! Functor for parallel_scan 350 struct parallel_scanFibBody { 351 /** Though parallel_scan is usually used to accumulate running sums, 352 it can be used to accumulate running products too. */ 353 Matrix2x2 product; 354 /** Pointer to output sequence */ 355 value *const output; 356 //! Constructor sets product to identity matrix 357 parallel_scanFibBody(value *output_) : product(MatrixIdentity), output(output_) {} 358 //! Splitting constructor 359 parallel_scanFibBody(parallel_scanFibBody &b, oneapi::tbb::split) 360 : product(MatrixIdentity), 361 output(b.output) {} 362 //! Method for merging summary information from a, which was split off from *this, into *this. 363 void reverse_join(parallel_scanFibBody &a) { 364 // When using non-commutative reduction operation, reverse_join 365 // should put argument "a" on the left side of the operation. 366 // The reversal from the argument order is why the method is 367 // called "reverse_join" instead of "join". 368 product = a.product * product; 369 } 370 //! Method for assigning final result back to original body. 371 void assign(parallel_scanFibBody &b) { 372 product = b.product; 373 } 374 //! Compute matrix running product. 375 /** Tag indicates whether is is the final scan over the range, or 376 just a helper "prescan" that is computing a partial reduction. */ 377 template <typename Tag> 378 void operator()(const oneapi::tbb::blocked_range<int> &r, Tag tag) { 379 for (int k = r.begin(); k < r.end(); ++k) { 380 // Code performs an "exclusive" scan, which outputs a value *before* updating the product. 381 // For an "inclusive" scan, output the value after the update. 382 if (tag.is_final_scan()) 383 output[k] = product.v[0][1]; 384 product = product * Matrix1110; 385 } 386 } 387 }; 388 //! Root function 389 value parallel_scanFib(int n) { 390 value *output = new value[n]; 391 parallel_scanFibBody b(output); 392 oneapi::tbb::parallel_scan(oneapi::tbb::blocked_range<int>(0, n, 3), b); 393 // output[0..n-1] now contains the Fibonacci sequence (modulo integer wrap-around). 394 // Check the last two values for correctness. 395 assert(n < 2 || output[n - 2] + output[n - 1] == b.product.v[0][1]); 396 delete[] output; 397 return b.product.v[0][1]; 398 } 399 400 /////////////////////////// Main //////////////////////////////////////////////////// 401 402 //! A closed range of int. 403 struct IntRange { 404 int low; 405 int high; 406 void set_from_string(const char *s); 407 IntRange(int low_, int high_) : low(low_), high(high_) {} 408 }; 409 410 void IntRange::set_from_string(const char *s) { 411 char *end; 412 high = low = strtol(s, &end, 0); 413 switch (*end) { 414 case ':': high = strtol(end + 1, 0, 0); break; 415 case '\0': break; 416 default: printf("unexpected character = %c\n", *end); 417 } 418 } 419 420 //! Tick count for start 421 static oneapi::tbb::tick_count t0; 422 423 //! Verbose output flag 424 static bool Verbose = false; 425 426 typedef value (*MeasureFunc)(int); 427 //! Measure ticks count in loop [2..n] 428 value Measure(const char *name, MeasureFunc func, int n) { 429 value result; 430 if (Verbose) 431 printf("%s", name); 432 t0 = oneapi::tbb::tick_count::now(); 433 for (int number = 2; number <= n; number++) 434 result = func(number); 435 if (Verbose) 436 printf("\t- in %f msec\n", (oneapi::tbb::tick_count::now() - t0).seconds() * 1000); 437 return result; 438 } 439 440 //! program entry 441 int main(int argc, char *argv[]) { 442 if (argc > 1) 443 Verbose = true; 444 int NumbersCount = argc > 1 ? strtol(argv[1], 0, 0) : 500; 445 IntRange NThread(1, 4); // Number of threads to use. 446 if (argc > 2) 447 NThread.set_from_string(argv[2]); 448 unsigned long ntrial = argc > 3 ? (unsigned long)strtoul(argv[3], 0, 0) : 1; 449 value result, sum; 450 451 if (Verbose) 452 printf("Fibonacci numbers example. Generating %d numbers..\n", NumbersCount); 453 454 result = Measure("Serial loop", SerialFib, NumbersCount); 455 sum = Measure("Serial matrix", SerialMatrixFib, NumbersCount); 456 assert(result == sum); 457 sum = Measure("Serial vector", SerialVectorFib, NumbersCount); 458 assert(result == sum); 459 sum = Measure("Serial queue", SerialQueueFib, NumbersCount); 460 assert(result == sum); 461 // now in parallel 462 for (unsigned long i = 0; i < ntrial; ++i) { 463 for (int threads = NThread.low; threads <= NThread.high; threads *= 2) { 464 oneapi::tbb::global_control c(oneapi::tbb::global_control::max_allowed_parallelism, 465 threads); 466 if (Verbose) 467 printf("\nThreads number is %d\n", threads); 468 469 sum = Measure("Shared serial (mutex)\t", SharedSerialFib<std::mutex>, NumbersCount); 470 assert(result == sum); 471 sum = Measure("Shared serial (spin_mutex)", 472 SharedSerialFib<oneapi::tbb::spin_mutex>, 473 NumbersCount); 474 assert(result == sum); 475 sum = Measure("Shared serial (queuing_mutex)", 476 SharedSerialFib<oneapi::tbb::queuing_mutex>, 477 NumbersCount); 478 assert(result == sum); 479 sum = Measure("Shared serial (Conc.HashTable)", ConcurrentHashSerialFib, NumbersCount); 480 assert(result == sum); 481 sum = Measure("Parallel pipe/queue\t", ParallelPipeFib, NumbersCount); 482 assert(result == sum); 483 sum = Measure("Parallel reduce\t\t", parallel_reduceFib, NumbersCount); 484 assert(result == sum); 485 sum = Measure("Parallel scan\t\t", parallel_scanFib, NumbersCount); 486 assert(result == sum); 487 } 488 489 #ifdef __GNUC__ 490 if (Verbose) 491 printf("Fibonacci number #%d modulo 2^64 is %lld\n\n", NumbersCount, result); 492 #else 493 if (Verbose) 494 printf("Fibonacci number #%d modulo 2^64 is %I64d\n\n", NumbersCount, result); 495 #endif 496 } 497 if (!Verbose) 498 printf("TEST PASSED\n"); 499 return 0; 500 } 501 502 // Utils 503 504 void Matrix2x2Multiply(const value a[2][2], const value b[2][2], value c[2][2]) { 505 for (int i = 0; i <= 1; i++) 506 for (int j = 0; j <= 1; j++) 507 c[i][j] = a[i][0] * b[0][j] + a[i][1] * b[1][j]; 508 } 509 510 Matrix2x2 Matrix2x2::operator*(const Matrix2x2 &to) const { 511 Matrix2x2 result; 512 Matrix2x2Multiply(v, to.v, result.v); 513 return result; 514 } 515