1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 /* Example program that computes Fibonacci numbers in different ways.
18    Arguments are: [ Number [Threads [Repeats]]]
19    The defaults are Number=500 Threads=1:4 Repeats=1.
20 
21    The point of this program is to check that the library is working properly.
22    Most of the computations are deliberately silly and not expected to
23    show any speedup on multiprocessors.
24 */
25 
26 // enable assertions
27 #ifdef NDEBUG
28 #undef NDEBUG
29 #endif
30 
31 #include <cstdio>
32 #include <cstdlib>
33 #include <cassert>
34 
35 #include <utility>
36 #include <thread>
37 #include <atomic>
38 #include <mutex>
39 
40 #include "oneapi/tbb/tick_count.h"
41 #include "oneapi/tbb/blocked_range.h"
42 #include "oneapi/tbb/concurrent_vector.h"
43 #include "oneapi/tbb/concurrent_queue.h"
44 #include "oneapi/tbb/concurrent_hash_map.h"
45 #include "oneapi/tbb/parallel_for.h"
46 #include "oneapi/tbb/parallel_reduce.h"
47 #include "oneapi/tbb/parallel_scan.h"
48 #include "oneapi/tbb/parallel_pipeline.h"
49 #include "oneapi/tbb/spin_mutex.h"
50 #include "oneapi/tbb/queuing_mutex.h"
51 #include "oneapi/tbb/global_control.h"
52 
53 //! type used for Fibonacci number computations
54 typedef long long value;
55 
56 //! Matrix 2x2 class
57 struct Matrix2x2 {
58     //! Array of values
59     value v[2][2];
60     Matrix2x2() {}
61     Matrix2x2(value v00, value v01, value v10, value v11) {
62         v[0][0] = v00;
63         v[0][1] = v01;
64         v[1][0] = v10;
65         v[1][1] = v11;
66     }
67     Matrix2x2 operator*(const Matrix2x2 &to) const; //< Multiply two Matrices
68 };
69 //! Identity matrix
70 static const Matrix2x2 MatrixIdentity(1, 0, 0, 1);
71 //! Default matrix to multiply
72 static const Matrix2x2 Matrix1110(1, 1, 1, 0);
73 //! Raw arrays matrices multiply
74 void Matrix2x2Multiply(const value a[2][2], const value b[2][2], value c[2][2]);
75 
76 /////////////////////// Serial methods ////////////////////////
77 
78 //! Plain serial sum
79 value SerialFib(int n) {
80     if (n < 2)
81         return n;
82     value a = 0, b = 1, sum;
83     int i;
84     for (i = 2; i <= n; i++) { // n is really index of Fibonacci number
85         sum = a + b;
86         a = b;
87         b = sum;
88     }
89     return sum;
90 }
91 //! Serial n-1 matrices multiplication
92 value SerialMatrixFib(int n) {
93     value c[2][2], a[2][2] = { { 1, 1 }, { 1, 0 } }, b[2][2] = { { 1, 1 }, { 1, 0 } };
94     int i;
95     for (i = 2; i < n; i++) { // Using condition to prevent copying of values
96         if (i & 1)
97             Matrix2x2Multiply(a, c, b);
98         else
99             Matrix2x2Multiply(a, b, c);
100     }
101     return (i & 1) ? c[0][0] : b[0][0]; // get result from upper left cell
102 }
103 //! Recursive summing. Just for complete list of serial algorithms, not used
104 value SerialRecursiveFib(int n) {
105     value result;
106     if (n < 2)
107         result = n;
108     else
109         result = SerialRecursiveFib(n - 1) + SerialRecursiveFib(n - 2);
110     return result;
111 }
112 //! Introducing of queue method in serial
113 value SerialQueueFib(int n) {
114     oneapi::tbb::concurrent_queue<Matrix2x2> Q;
115     for (int i = 1; i < n; i++)
116         Q.push(Matrix1110);
117     Matrix2x2 A, B;
118     while (true) {
119         while (!Q.try_pop(A))
120             std::this_thread::yield();
121         if (Q.empty())
122             break;
123         while (!Q.try_pop(B))
124             std::this_thread::yield();
125         Q.push(A * B);
126     }
127     return A.v[0][0];
128 }
129 //! Trying to use concurrent_vector
130 value SerialVectorFib(int n) {
131     oneapi::tbb::concurrent_vector<value> A;
132     A.grow_by(2);
133     A[0] = 0;
134     A[1] = 1;
135     for (int i = 2; i <= n; i++) {
136         A.grow_to_at_least(i + 1);
137         A[i] = A[i - 1] + A[i - 2];
138     }
139     return A[n];
140 }
141 
142 ///////////////////// Parallel methods ////////////////////////
143 
144 // *** Serial shared by mutexes *** //
145 
146 //! Shared glabals
147 value SharedA = 0, SharedB = 1;
148 int SharedI = 1, SharedN;
149 
150 //! Template task class which computes Fibonacci numbers with shared globals
151 template <typename M>
152 class SharedSerialFibBody {
153     M &mutex;
154 
155 public:
156     SharedSerialFibBody(M &m) : mutex(m) {}
157     //! main loop
158     void operator()(const oneapi::tbb::blocked_range<int> &range) const {
159         for (;;) {
160             typename M::scoped_lock lock(mutex);
161             if (SharedI >= SharedN)
162                 break;
163             value sum = SharedA + SharedB;
164             SharedA = SharedB;
165             SharedB = sum;
166             ++SharedI;
167         }
168     }
169 };
170 
171 template <>
172 void SharedSerialFibBody<std::mutex>::operator()(
173     const oneapi::tbb::blocked_range<int> &range) const {
174     for (;;) {
175         std::lock_guard<std::mutex> lock(mutex);
176         if (SharedI >= SharedN)
177             break;
178         value sum = SharedA + SharedB;
179         SharedA = SharedB;
180         SharedB = sum;
181         ++SharedI;
182     }
183 }
184 
185 //! Root function
186 template <class M>
187 value SharedSerialFib(int n) {
188     SharedA = 0;
189     SharedB = 1;
190     SharedI = 1;
191     SharedN = n;
192     M mutex;
193     parallel_for(oneapi::tbb::blocked_range<int>(0, 4, 1), SharedSerialFibBody<M>(mutex));
194     return SharedB;
195 }
196 
197 // *** Serial shared by concurrent hash map *** //
198 
199 //! Hash comparer
200 struct IntHashCompare {
201     bool equal(const int j, const int k) const {
202         return j == k;
203     }
204     unsigned long hash(const int k) const {
205         return (unsigned long)k;
206     }
207 };
208 //! NumbersTable type based on concurrent_hash_map
209 typedef oneapi::tbb::concurrent_hash_map<int, value, IntHashCompare> NumbersTable;
210 //! task for serial method using shared concurrent_hash_map
211 class ConcurrentHashSerialFibTask {
212     NumbersTable &Fib;
213     int my_n;
214 
215 public:
216     //! constructor
217     ConcurrentHashSerialFibTask(NumbersTable &cht, int n) : Fib(cht), my_n(n) {}
218     //! executing task
219     void operator()() const {
220         for (int i = 2; i <= my_n; ++i) { // there is no difference in to recycle or to make loop
221             NumbersTable::const_accessor f1, f2; // same as iterators
222             if (!Fib.find(f1, i - 1) || !Fib.find(f2, i - 2)) {
223                 // Something is seriously wrong, because i-1 and i-2 must have been inserted
224                 // earlier by this thread or another thread.
225                 assert(0);
226             }
227             value sum = f1->second + f2->second;
228             NumbersTable::const_accessor fsum;
229             Fib.insert(fsum, std::make_pair(i, sum)); // inserting
230             assert(fsum->second == sum); // check value
231         }
232     }
233 };
234 
235 //! Root function
236 value ConcurrentHashSerialFib(int n) {
237     NumbersTable Fib;
238     bool okay;
239     okay = Fib.insert(std::make_pair(0, 0));
240     assert(okay); // assign initial values
241     okay = Fib.insert(std::make_pair(1, 1));
242     assert(okay);
243 
244     // task_list list;
245     oneapi::tbb::task_group tg;
246     // allocate tasks
247     tg.run(ConcurrentHashSerialFibTask(Fib, n));
248     tg.run(ConcurrentHashSerialFibTask(Fib, n));
249     tg.wait();
250     NumbersTable::const_accessor fresult;
251     okay = Fib.find(fresult, n);
252     assert(okay);
253     return fresult->second;
254 }
255 
256 // *** Queue with parallel_pipeline *** //
257 
258 typedef oneapi::tbb::concurrent_queue<Matrix2x2> queue_t;
259 namespace parallel_pipeline_ns {
260 std::atomic<int> N; //< index of Fibonacci number minus 1
261 queue_t Queue;
262 } // namespace parallel_pipeline_ns
263 
264 //! functor to fills queue
265 struct InputFunc {
266     InputFunc() {}
267     queue_t *operator()(oneapi::tbb::flow_control &fc) const {
268         using namespace parallel_pipeline_ns;
269 
270         int n = --N;
271         if (n <= 0) {
272             fc.stop();
273             return nullptr;
274         }
275         Queue.push(Matrix1110);
276         return &Queue;
277     }
278 };
279 //! functor to process queue
280 struct MultiplyFunc {
281     MultiplyFunc() {}
282     void operator()(queue_t *queue) const {
283         //concurrent_queue<Matrix2x2> &Queue = *static_cast<concurrent_queue<Matrix2x2> *>(p);
284         Matrix2x2 m1, m2;
285         // get two elements
286         while (!queue->try_pop(m1))
287             std::this_thread::yield();
288         while (!queue->try_pop(m2))
289             std::this_thread::yield();
290         m1 = m1 * m2; // process them
291         queue->push(m1); // and push back
292     }
293 };
294 //! Root function
295 value ParallelPipeFib(int n) {
296     using namespace parallel_pipeline_ns;
297 
298     N = n - 1;
299     Queue.push(Matrix1110);
300 
301     oneapi::tbb::parallel_pipeline(
302         n,
303         oneapi::tbb::make_filter<void, queue_t *>(oneapi::tbb::filter_mode::parallel, InputFunc()) &
304             oneapi::tbb::make_filter<queue_t *, void>(oneapi::tbb::filter_mode::parallel,
305                                                       MultiplyFunc()));
306 
307     assert(Queue.unsafe_size() == 1);
308     Matrix2x2 M;
309     bool result = Queue.try_pop(M); // get last element
310     assert(result);
311     value res = M.v[0][0]; // get value
312     Queue.clear();
313     return res;
314 }
315 
316 // *** parallel_reduce *** //
317 
318 //! Functor for parallel_reduce
319 struct parallel_reduceFibBody {
320     Matrix2x2 sum;
321     int split_flag; //< flag to make one less operation for split bodies
322     //! Constructor fills sum with initial matrix
323     parallel_reduceFibBody() : sum(Matrix1110), split_flag(0) {}
324     //! Splitting constructor
325     parallel_reduceFibBody(parallel_reduceFibBody &other, oneapi::tbb::split)
326             : sum(Matrix1110),
327               split_flag(1 /*note that it is split*/) {}
328     //! Join point
329     void join(parallel_reduceFibBody &s) {
330         sum = sum * s.sum;
331     }
332     //! Process multiplications
333     void operator()(const oneapi::tbb::blocked_range<int> &r) {
334         for (int k = r.begin() + split_flag; k < r.end(); ++k)
335             sum = sum * Matrix1110;
336         split_flag = 0; // reset flag, because this method can be reused for next range
337     }
338 };
339 //! Root function
340 value parallel_reduceFib(int n) {
341     parallel_reduceFibBody b;
342     oneapi::tbb::parallel_reduce(oneapi::tbb::blocked_range<int>(2, n, 3),
343                                  b); // do parallel reduce on range [2, n) for b
344     return b.sum.v[0][0];
345 }
346 
347 // *** parallel_scan *** //
348 
349 //! Functor for parallel_scan
350 struct parallel_scanFibBody {
351     /** Though parallel_scan is usually used to accumulate running sums,
352         it can be used to accumulate running products too. */
353     Matrix2x2 product;
354     /** Pointer to output sequence */
355     value *const output;
356     //! Constructor sets product to identity matrix
357     parallel_scanFibBody(value *output_) : product(MatrixIdentity), output(output_) {}
358     //! Splitting constructor
359     parallel_scanFibBody(parallel_scanFibBody &b, oneapi::tbb::split)
360             : product(MatrixIdentity),
361               output(b.output) {}
362     //! Method for merging summary information from a, which was split off from *this, into *this.
363     void reverse_join(parallel_scanFibBody &a) {
364         // When using non-commutative reduction operation, reverse_join
365         // should put argument "a" on the left side of the operation.
366         // The reversal from the argument order is why the method is
367         // called "reverse_join" instead of "join".
368         product = a.product * product;
369     }
370     //! Method for assigning final result back to original body.
371     void assign(parallel_scanFibBody &b) {
372         product = b.product;
373     }
374     //! Compute matrix running product.
375     /** Tag indicates whether is is the final scan over the range, or
376         just a helper "prescan" that is computing a partial reduction. */
377     template <typename Tag>
378     void operator()(const oneapi::tbb::blocked_range<int> &r, Tag tag) {
379         for (int k = r.begin(); k < r.end(); ++k) {
380             // Code performs an "exclusive" scan, which outputs a value *before* updating the product.
381             // For an "inclusive" scan, output the value after the update.
382             if (tag.is_final_scan())
383                 output[k] = product.v[0][1];
384             product = product * Matrix1110;
385         }
386     }
387 };
388 //! Root function
389 value parallel_scanFib(int n) {
390     value *output = new value[n];
391     parallel_scanFibBody b(output);
392     oneapi::tbb::parallel_scan(oneapi::tbb::blocked_range<int>(0, n, 3), b);
393     // output[0..n-1] now contains the Fibonacci sequence (modulo integer wrap-around).
394     // Check the last two values for correctness.
395     assert(n < 2 || output[n - 2] + output[n - 1] == b.product.v[0][1]);
396     delete[] output;
397     return b.product.v[0][1];
398 }
399 
400 /////////////////////////// Main ////////////////////////////////////////////////////
401 
402 //! A closed range of int.
403 struct IntRange {
404     int low;
405     int high;
406     void set_from_string(const char *s);
407     IntRange(int low_, int high_) : low(low_), high(high_) {}
408 };
409 
410 void IntRange::set_from_string(const char *s) {
411     char *end;
412     high = low = strtol(s, &end, 0);
413     switch (*end) {
414         case ':': high = strtol(end + 1, 0, 0); break;
415         case '\0': break;
416         default: printf("unexpected character = %c\n", *end);
417     }
418 }
419 
420 //! Tick count for start
421 static oneapi::tbb::tick_count t0;
422 
423 //! Verbose output flag
424 static bool Verbose = false;
425 
426 typedef value (*MeasureFunc)(int);
427 //! Measure ticks count in loop [2..n]
428 value Measure(const char *name, MeasureFunc func, int n) {
429     value result;
430     if (Verbose)
431         printf("%s", name);
432     t0 = oneapi::tbb::tick_count::now();
433     for (int number = 2; number <= n; number++)
434         result = func(number);
435     if (Verbose)
436         printf("\t- in %f msec\n", (oneapi::tbb::tick_count::now() - t0).seconds() * 1000);
437     return result;
438 }
439 
440 //! program entry
441 int main(int argc, char *argv[]) {
442     if (argc > 1)
443         Verbose = true;
444     int NumbersCount = argc > 1 ? strtol(argv[1], 0, 0) : 500;
445     IntRange NThread(1, 4); // Number of threads to use.
446     if (argc > 2)
447         NThread.set_from_string(argv[2]);
448     unsigned long ntrial = argc > 3 ? (unsigned long)strtoul(argv[3], 0, 0) : 1;
449     value result, sum;
450 
451     if (Verbose)
452         printf("Fibonacci numbers example. Generating %d numbers..\n", NumbersCount);
453 
454     result = Measure("Serial loop", SerialFib, NumbersCount);
455     sum = Measure("Serial matrix", SerialMatrixFib, NumbersCount);
456     assert(result == sum);
457     sum = Measure("Serial vector", SerialVectorFib, NumbersCount);
458     assert(result == sum);
459     sum = Measure("Serial queue", SerialQueueFib, NumbersCount);
460     assert(result == sum);
461     // now in parallel
462     for (unsigned long i = 0; i < ntrial; ++i) {
463         for (int threads = NThread.low; threads <= NThread.high; threads *= 2) {
464             oneapi::tbb::global_control c(oneapi::tbb::global_control::max_allowed_parallelism,
465                                           threads);
466             if (Verbose)
467                 printf("\nThreads number is %d\n", threads);
468 
469             sum = Measure("Shared serial (mutex)\t", SharedSerialFib<std::mutex>, NumbersCount);
470             assert(result == sum);
471             sum = Measure("Shared serial (spin_mutex)",
472                           SharedSerialFib<oneapi::tbb::spin_mutex>,
473                           NumbersCount);
474             assert(result == sum);
475             sum = Measure("Shared serial (queuing_mutex)",
476                           SharedSerialFib<oneapi::tbb::queuing_mutex>,
477                           NumbersCount);
478             assert(result == sum);
479             sum = Measure("Shared serial (Conc.HashTable)", ConcurrentHashSerialFib, NumbersCount);
480             assert(result == sum);
481             sum = Measure("Parallel pipe/queue\t", ParallelPipeFib, NumbersCount);
482             assert(result == sum);
483             sum = Measure("Parallel reduce\t\t", parallel_reduceFib, NumbersCount);
484             assert(result == sum);
485             sum = Measure("Parallel scan\t\t", parallel_scanFib, NumbersCount);
486             assert(result == sum);
487         }
488 
489 #ifdef __GNUC__
490         if (Verbose)
491             printf("Fibonacci number #%d modulo 2^64 is %lld\n\n", NumbersCount, result);
492 #else
493         if (Verbose)
494             printf("Fibonacci number #%d modulo 2^64 is %I64d\n\n", NumbersCount, result);
495 #endif
496     }
497     if (!Verbose)
498         printf("TEST PASSED\n");
499     return 0;
500 }
501 
502 // Utils
503 
504 void Matrix2x2Multiply(const value a[2][2], const value b[2][2], value c[2][2]) {
505     for (int i = 0; i <= 1; i++)
506         for (int j = 0; j <= 1; j++)
507             c[i][j] = a[i][0] * b[0][j] + a[i][1] * b[1][j];
508 }
509 
510 Matrix2x2 Matrix2x2::operator*(const Matrix2x2 &to) const {
511     Matrix2x2 result;
512     Matrix2x2Multiply(v, to.v, result.v);
513     return result;
514 }
515