1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 /* Example program that computes Fibonacci numbers in different ways.
18    Arguments are: [ Number [Threads [Repeats]]]
19    The defaults are Number=500 Threads=1:4 Repeats=1.
20 
21    The point of this program is to check that the library is working properly.
22    Most of the computations are deliberately silly and not expected to
23    show any speedup on multiprocessors.
24 */
25 
26 // enable assertions
27 #ifdef NDEBUG
28 #undef NDEBUG
29 #endif
30 
31 #include <cstdio>
32 #include <cstdlib>
33 #include <cassert>
34 
35 #include <utility>
36 #include <thread>
37 #include <atomic>
38 #include <mutex>
39 
40 #include "oneapi/tbb/tick_count.h"
41 #include "oneapi/tbb/blocked_range.h"
42 #include "oneapi/tbb/concurrent_vector.h"
43 #include "oneapi/tbb/concurrent_queue.h"
44 #include "oneapi/tbb/concurrent_hash_map.h"
45 #include "oneapi/tbb/parallel_for.h"
46 #include "oneapi/tbb/parallel_reduce.h"
47 #include "oneapi/tbb/parallel_scan.h"
48 #include "oneapi/tbb/parallel_pipeline.h"
49 #include "oneapi/tbb/spin_mutex.h"
50 #include "oneapi/tbb/queuing_mutex.h"
51 #include "oneapi/tbb/global_control.h"
52 
53 //! type used for Fibonacci number computations
54 typedef long long value;
55 
56 //! Matrix 2x2 class
57 struct Matrix2x2 {
58     //! Array of values
59     value v[2][2];
60     Matrix2x2() {}
61     Matrix2x2(value v00, value v01, value v10, value v11) {
62         v[0][0] = v00;
63         v[0][1] = v01;
64         v[1][0] = v10;
65         v[1][1] = v11;
66     }
67     Matrix2x2 operator*(const Matrix2x2 &to) const; //< Multiply two Matrices
68 };
69 //! Identity matrix
70 static const Matrix2x2 MatrixIdentity(1, 0, 0, 1);
71 //! Default matrix to multiply
72 static const Matrix2x2 Matrix1110(1, 1, 1, 0);
73 //! Raw arrays matrices multiply
74 void Matrix2x2Multiply(const value a[2][2], const value b[2][2], value c[2][2]);
75 
76 /////////////////////// Serial methods ////////////////////////
77 
78 //! Plain serial sum
79 value SerialFib(int n) {
80     if (n < 2)
81         return n;
82     value a = 0, b = 1, sum;
83     int i;
84     for (i = 2; i <= n; i++) { // n is really index of Fibonacci number
85         sum = a + b;
86         a = b;
87         b = sum;
88     }
89     return sum;
90 }
91 //! Serial n-1 matrices multiplication
92 value SerialMatrixFib(int n) {
93     value c[2][2], a[2][2] = { { 1, 1 }, { 1, 0 } }, b[2][2] = { { 1, 1 }, { 1, 0 } };
94     int i;
95     for (i = 2; i < n; i++) { // Using condition to prevent copying of values
96         if (i & 1)
97             Matrix2x2Multiply(a, c, b);
98         else
99             Matrix2x2Multiply(a, b, c);
100     }
101     return (i & 1) ? c[0][0] : b[0][0]; // get result from upper left cell
102 }
103 //! Recursive summing. Just for complete list of serial algorithms, not used
104 value SerialRecursiveFib(int n) {
105     value result;
106     if (n < 2)
107         result = n;
108     else
109         result = SerialRecursiveFib(n - 1) + SerialRecursiveFib(n - 2);
110     return result;
111 }
112 
113 // GCC 4.8 C++ standard library implements std::this_thread::yield as no-op.
114 #if __TBB_GLIBCXX_THIS_THREAD_YIELD_BROKEN
115 static inline void yield() {
116     sched_yield();
117 }
118 #else
119 using std::this_thread::yield;
120 #endif
121 
122 //! Introducing of queue method in serial
123 value SerialQueueFib(int n) {
124     oneapi::tbb::concurrent_queue<Matrix2x2> Q;
125     for (int i = 1; i < n; i++)
126         Q.push(Matrix1110);
127     Matrix2x2 A, B;
128     while (true) {
129         while (!Q.try_pop(A))
130             yield();
131         if (Q.empty())
132             break;
133         while (!Q.try_pop(B))
134             yield();
135         Q.push(A * B);
136     }
137     return A.v[0][0];
138 }
139 //! Trying to use concurrent_vector
140 value SerialVectorFib(int n) {
141     oneapi::tbb::concurrent_vector<value> A;
142     A.grow_by(2);
143     A[0] = 0;
144     A[1] = 1;
145     for (int i = 2; i <= n; i++) {
146         A.grow_to_at_least(i + 1);
147         A[i] = A[i - 1] + A[i - 2];
148     }
149     return A[n];
150 }
151 
152 ///////////////////// Parallel methods ////////////////////////
153 
154 // *** Serial shared by mutexes *** //
155 
156 //! Shared glabals
157 value SharedA = 0, SharedB = 1;
158 int SharedI = 1, SharedN;
159 
160 //! Template task class which computes Fibonacci numbers with shared globals
161 template <typename M>
162 class SharedSerialFibBody {
163     M &mutex;
164 
165 public:
166     SharedSerialFibBody(M &m) : mutex(m) {}
167     //! main loop
168     void operator()(const oneapi::tbb::blocked_range<int> &range) const {
169         for (;;) {
170             typename M::scoped_lock lock(mutex);
171             if (SharedI >= SharedN)
172                 break;
173             value sum = SharedA + SharedB;
174             SharedA = SharedB;
175             SharedB = sum;
176             ++SharedI;
177         }
178     }
179 };
180 
181 template <>
182 void SharedSerialFibBody<std::mutex>::operator()(
183     const oneapi::tbb::blocked_range<int> &range) const {
184     for (;;) {
185         std::lock_guard<std::mutex> lock(mutex);
186         if (SharedI >= SharedN)
187             break;
188         value sum = SharedA + SharedB;
189         SharedA = SharedB;
190         SharedB = sum;
191         ++SharedI;
192     }
193 }
194 
195 //! Root function
196 template <class M>
197 value SharedSerialFib(int n) {
198     SharedA = 0;
199     SharedB = 1;
200     SharedI = 1;
201     SharedN = n;
202     M mutex;
203     parallel_for(oneapi::tbb::blocked_range<int>(0, 4, 1), SharedSerialFibBody<M>(mutex));
204     return SharedB;
205 }
206 
207 // *** Serial shared by concurrent hash map *** //
208 
209 //! Hash comparer
210 struct IntHashCompare {
211     bool equal(const int j, const int k) const {
212         return j == k;
213     }
214     unsigned long hash(const int k) const {
215         return (unsigned long)k;
216     }
217 };
218 //! NumbersTable type based on concurrent_hash_map
219 typedef oneapi::tbb::concurrent_hash_map<int, value, IntHashCompare> NumbersTable;
220 //! task for serial method using shared concurrent_hash_map
221 class ConcurrentHashSerialFibTask {
222     NumbersTable &Fib;
223     int my_n;
224 
225 public:
226     //! constructor
227     ConcurrentHashSerialFibTask(NumbersTable &cht, int n) : Fib(cht), my_n(n) {}
228     //! executing task
229     void operator()() const {
230         for (int i = 2; i <= my_n; ++i) { // there is no difference in to recycle or to make loop
231             NumbersTable::const_accessor f1, f2; // same as iterators
232             if (!Fib.find(f1, i - 1) || !Fib.find(f2, i - 2)) {
233                 // Something is seriously wrong, because i-1 and i-2 must have been inserted
234                 // earlier by this thread or another thread.
235                 assert(0);
236             }
237             value sum = f1->second + f2->second;
238             NumbersTable::const_accessor fsum;
239             Fib.insert(fsum, std::make_pair(i, sum)); // inserting
240             assert(fsum->second == sum); // check value
241         }
242     }
243 };
244 
245 //! Root function
246 value ConcurrentHashSerialFib(int n) {
247     NumbersTable Fib;
248     bool okay;
249     okay = Fib.insert(std::make_pair(0, 0));
250     assert(okay); // assign initial values
251     okay = Fib.insert(std::make_pair(1, 1));
252     assert(okay);
253 
254     // task_list list;
255     oneapi::tbb::task_group tg;
256     // allocate tasks
257     tg.run(ConcurrentHashSerialFibTask(Fib, n));
258     tg.run(ConcurrentHashSerialFibTask(Fib, n));
259     tg.wait();
260     NumbersTable::const_accessor fresult;
261     okay = Fib.find(fresult, n);
262     assert(okay);
263     return fresult->second;
264 }
265 
266 // *** Queue with parallel_pipeline *** //
267 
268 typedef oneapi::tbb::concurrent_queue<Matrix2x2> queue_t;
269 namespace parallel_pipeline_ns {
270 std::atomic<int> N; //< index of Fibonacci number minus 1
271 queue_t Queue;
272 } // namespace parallel_pipeline_ns
273 
274 //! functor to fills queue
275 struct InputFunc {
276     InputFunc() {}
277     queue_t *operator()(oneapi::tbb::flow_control &fc) const {
278         using namespace parallel_pipeline_ns;
279 
280         int n = --N;
281         if (n <= 0) {
282             fc.stop();
283             return nullptr;
284         }
285         Queue.push(Matrix1110);
286         return &Queue;
287     }
288 };
289 //! functor to process queue
290 struct MultiplyFunc {
291     MultiplyFunc() {}
292     void operator()(queue_t *queue) const {
293         //concurrent_queue<Matrix2x2> &Queue = *static_cast<concurrent_queue<Matrix2x2> *>(p);
294         Matrix2x2 m1, m2;
295         // get two elements
296         while (!queue->try_pop(m1))
297             yield();
298         while (!queue->try_pop(m2))
299             yield();
300         m1 = m1 * m2; // process them
301         queue->push(m1); // and push back
302     }
303 };
304 //! Root function
305 value ParallelPipeFib(int n) {
306     using namespace parallel_pipeline_ns;
307 
308     N = n - 1;
309     Queue.push(Matrix1110);
310 
311     oneapi::tbb::parallel_pipeline(
312         n,
313         oneapi::tbb::make_filter<void, queue_t *>(oneapi::tbb::filter_mode::parallel, InputFunc()) &
314             oneapi::tbb::make_filter<queue_t *, void>(oneapi::tbb::filter_mode::parallel,
315                                                       MultiplyFunc()));
316 
317     assert(Queue.unsafe_size() == 1);
318     Matrix2x2 M;
319     bool result = Queue.try_pop(M); // get last element
320     assert(result);
321     value res = M.v[0][0]; // get value
322     Queue.clear();
323     return res;
324 }
325 
326 // *** parallel_reduce *** //
327 
328 //! Functor for parallel_reduce
329 struct parallel_reduceFibBody {
330     Matrix2x2 sum;
331     int split_flag; //< flag to make one less operation for split bodies
332     //! Constructor fills sum with initial matrix
333     parallel_reduceFibBody() : sum(Matrix1110), split_flag(0) {}
334     //! Splitting constructor
335     parallel_reduceFibBody(parallel_reduceFibBody &other, oneapi::tbb::split)
336             : sum(Matrix1110),
337               split_flag(1 /*note that it is split*/) {}
338     //! Join point
339     void join(parallel_reduceFibBody &s) {
340         sum = sum * s.sum;
341     }
342     //! Process multiplications
343     void operator()(const oneapi::tbb::blocked_range<int> &r) {
344         for (int k = r.begin() + split_flag; k < r.end(); ++k)
345             sum = sum * Matrix1110;
346         split_flag = 0; // reset flag, because this method can be reused for next range
347     }
348 };
349 //! Root function
350 value parallel_reduceFib(int n) {
351     parallel_reduceFibBody b;
352     oneapi::tbb::parallel_reduce(oneapi::tbb::blocked_range<int>(2, n, 3),
353                                  b); // do parallel reduce on range [2, n) for b
354     return b.sum.v[0][0];
355 }
356 
357 // *** parallel_scan *** //
358 
359 //! Functor for parallel_scan
360 struct parallel_scanFibBody {
361     /** Though parallel_scan is usually used to accumulate running sums,
362         it can be used to accumulate running products too. */
363     Matrix2x2 product;
364     /** Pointer to output sequence */
365     value *const output;
366     //! Constructor sets product to identity matrix
367     parallel_scanFibBody(value *output_) : product(MatrixIdentity), output(output_) {}
368     //! Splitting constructor
369     parallel_scanFibBody(parallel_scanFibBody &b, oneapi::tbb::split)
370             : product(MatrixIdentity),
371               output(b.output) {}
372     //! Method for merging summary information from a, which was split off from *this, into *this.
373     void reverse_join(parallel_scanFibBody &a) {
374         // When using non-commutative reduction operation, reverse_join
375         // should put argument "a" on the left side of the operation.
376         // The reversal from the argument order is why the method is
377         // called "reverse_join" instead of "join".
378         product = a.product * product;
379     }
380     //! Method for assigning final result back to original body.
381     void assign(parallel_scanFibBody &b) {
382         product = b.product;
383     }
384     //! Compute matrix running product.
385     /** Tag indicates whether is is the final scan over the range, or
386         just a helper "prescan" that is computing a partial reduction. */
387     template <typename Tag>
388     void operator()(const oneapi::tbb::blocked_range<int> &r, Tag tag) {
389         for (int k = r.begin(); k < r.end(); ++k) {
390             // Code performs an "exclusive" scan, which outputs a value *before* updating the product.
391             // For an "inclusive" scan, output the value after the update.
392             if (tag.is_final_scan())
393                 output[k] = product.v[0][1];
394             product = product * Matrix1110;
395         }
396     }
397 };
398 //! Root function
399 value parallel_scanFib(int n) {
400     value *output = new value[n];
401     parallel_scanFibBody b(output);
402     oneapi::tbb::parallel_scan(oneapi::tbb::blocked_range<int>(0, n, 3), b);
403     // output[0..n-1] now contains the Fibonacci sequence (modulo integer wrap-around).
404     // Check the last two values for correctness.
405     assert(n < 2 || output[n - 2] + output[n - 1] == b.product.v[0][1]);
406     delete[] output;
407     return b.product.v[0][1];
408 }
409 
410 /////////////////////////// Main ////////////////////////////////////////////////////
411 
412 //! A closed range of int.
413 struct IntRange {
414     int low;
415     int high;
416     void set_from_string(const char *s);
417     IntRange(int low_, int high_) : low(low_), high(high_) {}
418 };
419 
420 void IntRange::set_from_string(const char *s) {
421     char *end;
422     high = low = strtol(s, &end, 0);
423     switch (*end) {
424         case ':': high = strtol(end + 1, 0, 0); break;
425         case '\0': break;
426         default: printf("unexpected character = %c\n", *end);
427     }
428 }
429 
430 //! Tick count for start
431 static oneapi::tbb::tick_count t0;
432 
433 //! Verbose output flag
434 static bool Verbose = false;
435 
436 typedef value (*MeasureFunc)(int);
437 //! Measure ticks count in loop [2..n]
438 value Measure(const char *name, MeasureFunc func, int n) {
439     value result;
440     if (Verbose)
441         printf("%s", name);
442     t0 = oneapi::tbb::tick_count::now();
443     for (int number = 2; number <= n; number++)
444         result = func(number);
445     if (Verbose)
446         printf("\t- in %f msec\n", (oneapi::tbb::tick_count::now() - t0).seconds() * 1000);
447     return result;
448 }
449 
450 //! program entry
451 int main(int argc, char *argv[]) {
452     if (argc > 1)
453         Verbose = true;
454     int NumbersCount = argc > 1 ? strtol(argv[1], 0, 0) : 500;
455     IntRange NThread(1, 4); // Number of threads to use.
456     if (argc > 2)
457         NThread.set_from_string(argv[2]);
458     unsigned long ntrial = argc > 3 ? (unsigned long)strtoul(argv[3], 0, 0) : 1;
459     value result, sum;
460 
461     if (Verbose)
462         printf("Fibonacci numbers example. Generating %d numbers..\n", NumbersCount);
463 
464     result = Measure("Serial loop", SerialFib, NumbersCount);
465     sum = Measure("Serial matrix", SerialMatrixFib, NumbersCount);
466     assert(result == sum);
467     sum = Measure("Serial vector", SerialVectorFib, NumbersCount);
468     assert(result == sum);
469     sum = Measure("Serial queue", SerialQueueFib, NumbersCount);
470     assert(result == sum);
471     // now in parallel
472     for (unsigned long i = 0; i < ntrial; ++i) {
473         for (int threads = NThread.low; threads <= NThread.high; threads *= 2) {
474             oneapi::tbb::global_control c(oneapi::tbb::global_control::max_allowed_parallelism,
475                                           threads);
476             if (Verbose)
477                 printf("\nThreads number is %d\n", threads);
478 
479             sum = Measure("Shared serial (mutex)\t", SharedSerialFib<std::mutex>, NumbersCount);
480             assert(result == sum);
481             sum = Measure("Shared serial (spin_mutex)",
482                           SharedSerialFib<oneapi::tbb::spin_mutex>,
483                           NumbersCount);
484             assert(result == sum);
485             sum = Measure("Shared serial (queuing_mutex)",
486                           SharedSerialFib<oneapi::tbb::queuing_mutex>,
487                           NumbersCount);
488             assert(result == sum);
489             sum = Measure("Shared serial (Conc.HashTable)", ConcurrentHashSerialFib, NumbersCount);
490             assert(result == sum);
491             sum = Measure("Parallel pipe/queue\t", ParallelPipeFib, NumbersCount);
492             assert(result == sum);
493             sum = Measure("Parallel reduce\t\t", parallel_reduceFib, NumbersCount);
494             assert(result == sum);
495             sum = Measure("Parallel scan\t\t", parallel_scanFib, NumbersCount);
496             assert(result == sum);
497         }
498 
499 #ifdef __GNUC__
500         if (Verbose)
501             printf("Fibonacci number #%d modulo 2^64 is %lld\n\n", NumbersCount, result);
502 #else
503         if (Verbose)
504             printf("Fibonacci number #%d modulo 2^64 is %I64d\n\n", NumbersCount, result);
505 #endif
506     }
507     if (!Verbose)
508         printf("TEST PASSED\n");
509     // flush to prevent bufferization on exit
510     fflush(stdout);
511     return 0;
512 }
513 
514 // Utils
515 
516 void Matrix2x2Multiply(const value a[2][2], const value b[2][2], value c[2][2]) {
517     for (int i = 0; i <= 1; i++)
518         for (int j = 0; j <= 1; j++)
519             c[i][j] = a[i][0] * b[0][j] + a[i][1] * b[1][j];
520 }
521 
522 Matrix2x2 Matrix2x2::operator*(const Matrix2x2 &to) const {
523     Matrix2x2 result;
524     Matrix2x2Multiply(v, to.v, result.v);
525     return result;
526 }
527