1d86ed7fbStbbdev /*
2b15aabb3Stbbdev     Copyright (c) 2005-2021 Intel Corporation
3d86ed7fbStbbdev 
4d86ed7fbStbbdev     Licensed under the Apache License, Version 2.0 (the "License");
5d86ed7fbStbbdev     you may not use this file except in compliance with the License.
6d86ed7fbStbbdev     You may obtain a copy of the License at
7d86ed7fbStbbdev 
8d86ed7fbStbbdev         http://www.apache.org/licenses/LICENSE-2.0
9d86ed7fbStbbdev 
10d86ed7fbStbbdev     Unless required by applicable law or agreed to in writing, software
11d86ed7fbStbbdev     distributed under the License is distributed on an "AS IS" BASIS,
12d86ed7fbStbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13d86ed7fbStbbdev     See the License for the specific language governing permissions and
14d86ed7fbStbbdev     limitations under the License.
15d86ed7fbStbbdev */
16d86ed7fbStbbdev 
17d86ed7fbStbbdev //
18d86ed7fbStbbdev // Example program that reads a file of decimal integers in text format
19d86ed7fbStbbdev // and changes each to its square.
20d86ed7fbStbbdev //
21*478de5b1Stbbdev 
22d86ed7fbStbbdev #include <cstring>
23d86ed7fbStbbdev #include <cstdlib>
24d86ed7fbStbbdev #include <cstdio>
25d86ed7fbStbbdev #include <cctype>
26d86ed7fbStbbdev 
27d86ed7fbStbbdev #include "oneapi/tbb/parallel_pipeline.h"
28d86ed7fbStbbdev #include "oneapi/tbb/tick_count.h"
29d86ed7fbStbbdev #include "oneapi/tbb/tbb_allocator.h"
30d86ed7fbStbbdev #include "oneapi/tbb/global_control.h"
31d86ed7fbStbbdev 
32d86ed7fbStbbdev #include "common/utility/utility.hpp"
33d86ed7fbStbbdev #include "common/utility/get_default_num_threads.hpp"
34d86ed7fbStbbdev 
35d86ed7fbStbbdev extern void generate_if_needed(const char*);
36d86ed7fbStbbdev 
37d86ed7fbStbbdev //! Holds a slice of text.
38d86ed7fbStbbdev /** Instances *must* be allocated/freed using methods herein, because the C++ declaration
39d86ed7fbStbbdev     represents only the header of a much larger object in memory. */
40d86ed7fbStbbdev class TextSlice {
41d86ed7fbStbbdev     //! Pointer to one past last character in sequence
42d86ed7fbStbbdev     char* logical_end;
43d86ed7fbStbbdev     //! Pointer to one past last available byte in sequence.
44d86ed7fbStbbdev     char* physical_end;
45d86ed7fbStbbdev 
46d86ed7fbStbbdev public:
47d86ed7fbStbbdev     //! Allocate a TextSlice object that can hold up to max_size characters.
allocate(std::size_t max_size)48d86ed7fbStbbdev     static TextSlice* allocate(std::size_t max_size) {
49d86ed7fbStbbdev         // +1 leaves room for a terminating null character.
50d86ed7fbStbbdev         TextSlice* t = (TextSlice*)oneapi::tbb::tbb_allocator<char>().allocate(sizeof(TextSlice) +
51d86ed7fbStbbdev                                                                                max_size + 1);
52d86ed7fbStbbdev         t->logical_end = t->begin();
53d86ed7fbStbbdev         t->physical_end = t->begin() + max_size;
54d86ed7fbStbbdev         return t;
55d86ed7fbStbbdev     }
56d86ed7fbStbbdev     //! Free a TextSlice object
free()57d86ed7fbStbbdev     void free() {
58d86ed7fbStbbdev         oneapi::tbb::tbb_allocator<char>().deallocate(
59d86ed7fbStbbdev             (char*)this, sizeof(TextSlice) + (physical_end - begin()) + 1);
60d86ed7fbStbbdev     }
61d86ed7fbStbbdev     //! Pointer to beginning of sequence
begin()62d86ed7fbStbbdev     char* begin() {
63d86ed7fbStbbdev         return (char*)(this + 1);
64d86ed7fbStbbdev     }
65d86ed7fbStbbdev     //! Pointer to one past last character in sequence
end()66d86ed7fbStbbdev     char* end() {
67d86ed7fbStbbdev         return logical_end;
68d86ed7fbStbbdev     }
69d86ed7fbStbbdev     //! Length of sequence
size() const70d86ed7fbStbbdev     std::size_t size() const {
71d86ed7fbStbbdev         return logical_end - (char*)(this + 1);
72d86ed7fbStbbdev     }
73d86ed7fbStbbdev     //! Maximum number of characters that can be appended to sequence
avail() const74d86ed7fbStbbdev     std::size_t avail() const {
75d86ed7fbStbbdev         return physical_end - logical_end;
76d86ed7fbStbbdev     }
77d86ed7fbStbbdev     //! Append sequence [first,last) to this sequence.
append(char * first,char * last)78d86ed7fbStbbdev     void append(char* first, char* last) {
79d86ed7fbStbbdev         memcpy(logical_end, first, last - first);
80d86ed7fbStbbdev         logical_end += last - first;
81d86ed7fbStbbdev     }
82d86ed7fbStbbdev     //! Set end() to given value.
set_end(char * p)83d86ed7fbStbbdev     void set_end(char* p) {
84d86ed7fbStbbdev         logical_end = p;
85d86ed7fbStbbdev     }
86d86ed7fbStbbdev };
87d86ed7fbStbbdev 
88d86ed7fbStbbdev std::size_t MAX_CHAR_PER_INPUT_SLICE = 4000;
89d86ed7fbStbbdev std::string InputFileName = "input.txt";
90d86ed7fbStbbdev std::string OutputFileName = "output.txt";
91d86ed7fbStbbdev 
92d86ed7fbStbbdev TextSlice* next_slice = nullptr;
93d86ed7fbStbbdev 
94d86ed7fbStbbdev class MyInputFunc {
95d86ed7fbStbbdev public:
96d86ed7fbStbbdev     MyInputFunc(FILE* input_file_);
MyInputFunc(const MyInputFunc & f)97d86ed7fbStbbdev     MyInputFunc(const MyInputFunc& f) : input_file(f.input_file) {}
98d86ed7fbStbbdev     ~MyInputFunc();
99d86ed7fbStbbdev     TextSlice* operator()(oneapi::tbb::flow_control& fc) const;
100d86ed7fbStbbdev 
101d86ed7fbStbbdev private:
102d86ed7fbStbbdev     FILE* input_file;
103d86ed7fbStbbdev };
104d86ed7fbStbbdev 
MyInputFunc(FILE * input_file_)105d86ed7fbStbbdev MyInputFunc::MyInputFunc(FILE* input_file_) : input_file(input_file_) {}
106d86ed7fbStbbdev 
~MyInputFunc()107d86ed7fbStbbdev MyInputFunc::~MyInputFunc() {}
108d86ed7fbStbbdev 
operator ()(oneapi::tbb::flow_control & fc) const109d86ed7fbStbbdev TextSlice* MyInputFunc::operator()(oneapi::tbb::flow_control& fc) const {
110d86ed7fbStbbdev     // Read characters into space that is available in the next slice.
111d86ed7fbStbbdev     if (!next_slice)
112d86ed7fbStbbdev         next_slice = TextSlice::allocate(MAX_CHAR_PER_INPUT_SLICE);
113d86ed7fbStbbdev     std::size_t m = next_slice->avail();
114d86ed7fbStbbdev     std::size_t n = fread(next_slice->end(), 1, m, input_file);
115d86ed7fbStbbdev     if (!n && next_slice->size() == 0) {
116d86ed7fbStbbdev         // No more characters to process
117d86ed7fbStbbdev         fc.stop();
118d86ed7fbStbbdev         return nullptr;
119d86ed7fbStbbdev     }
120d86ed7fbStbbdev     else {
121d86ed7fbStbbdev         // Have more characters to process.
122d86ed7fbStbbdev         TextSlice* t = next_slice;
123d86ed7fbStbbdev         next_slice = TextSlice::allocate(MAX_CHAR_PER_INPUT_SLICE);
124d86ed7fbStbbdev         char* p = t->end() + n;
125d86ed7fbStbbdev         if (n == m) {
126d86ed7fbStbbdev             // Might have read partial number.
127d86ed7fbStbbdev             // If so, transfer characters of partial number to next slice.
128d86ed7fbStbbdev             while (p > t->begin() && isdigit(p[-1]))
129d86ed7fbStbbdev                 --p;
130d86ed7fbStbbdev             assert(p > t->begin()); // Number too large to fit in buffer
131d86ed7fbStbbdev             next_slice->append(p, t->end() + n);
132d86ed7fbStbbdev         }
133d86ed7fbStbbdev         t->set_end(p);
134d86ed7fbStbbdev         return t;
135d86ed7fbStbbdev     }
136d86ed7fbStbbdev }
137d86ed7fbStbbdev 
138d86ed7fbStbbdev // Functor that changes each decimal number to its square.
139d86ed7fbStbbdev class MyTransformFunc {
140d86ed7fbStbbdev public:
141d86ed7fbStbbdev     TextSlice* operator()(TextSlice* input) const;
142d86ed7fbStbbdev };
143d86ed7fbStbbdev 
operator ()(TextSlice * input) const144d86ed7fbStbbdev TextSlice* MyTransformFunc::operator()(TextSlice* input) const {
145d86ed7fbStbbdev     // Add terminating null so that strtol works right even if number is at end of the input.
146d86ed7fbStbbdev     *input->end() = '\0';
147d86ed7fbStbbdev     char* p = input->begin();
148d86ed7fbStbbdev     TextSlice* out = TextSlice::allocate(2 * MAX_CHAR_PER_INPUT_SLICE);
149d86ed7fbStbbdev     char* q = out->begin();
150d86ed7fbStbbdev     for (;;) {
151d86ed7fbStbbdev         while (p < input->end() && !isdigit(*p))
152d86ed7fbStbbdev             *q++ = *p++;
153d86ed7fbStbbdev         if (p == input->end())
154d86ed7fbStbbdev             break;
155d86ed7fbStbbdev         long x = strtol(p, &p, 10);
156d86ed7fbStbbdev         // Note: no overflow checking is needed here, as we have twice the
157d86ed7fbStbbdev         // input string length, but the square of a non-negative integer n
158d86ed7fbStbbdev         // cannot have more than twice as many digits as n.
159d86ed7fbStbbdev         long y = x * x;
160d86ed7fbStbbdev         sprintf(q, "%ld", y);
161d86ed7fbStbbdev         q = strchr(q, 0);
162d86ed7fbStbbdev     }
163d86ed7fbStbbdev     out->set_end(q);
164d86ed7fbStbbdev     input->free();
165d86ed7fbStbbdev     return out;
166d86ed7fbStbbdev }
167d86ed7fbStbbdev 
168d86ed7fbStbbdev // Functor that writes a TextSlice to a file.
169d86ed7fbStbbdev class MyOutputFunc {
170d86ed7fbStbbdev     FILE* my_output_file;
171d86ed7fbStbbdev 
172d86ed7fbStbbdev public:
173d86ed7fbStbbdev     MyOutputFunc(FILE* output_file);
174d86ed7fbStbbdev     void operator()(TextSlice* item) const;
175d86ed7fbStbbdev };
176d86ed7fbStbbdev 
MyOutputFunc(FILE * output_file)177d86ed7fbStbbdev MyOutputFunc::MyOutputFunc(FILE* output_file) : my_output_file(output_file) {}
178d86ed7fbStbbdev 
operator ()(TextSlice * out) const179d86ed7fbStbbdev void MyOutputFunc::operator()(TextSlice* out) const {
180d86ed7fbStbbdev     std::size_t n = fwrite(out->begin(), 1, out->size(), my_output_file);
181d86ed7fbStbbdev     if (n != out->size()) {
182d86ed7fbStbbdev         fprintf(stderr, "Can't write into file '%s'\n", OutputFileName.c_str());
183d86ed7fbStbbdev         std::exit(-1);
184d86ed7fbStbbdev     }
185d86ed7fbStbbdev     out->free();
186d86ed7fbStbbdev }
187d86ed7fbStbbdev 
188d86ed7fbStbbdev bool silent = false;
189d86ed7fbStbbdev 
run_pipeline(int nthreads)190d86ed7fbStbbdev int run_pipeline(int nthreads) {
191d86ed7fbStbbdev     FILE* input_file = fopen(InputFileName.c_str(), "r");
192d86ed7fbStbbdev     if (!input_file) {
193d86ed7fbStbbdev         throw std::invalid_argument(("Invalid input file name: " + InputFileName).c_str());
194d86ed7fbStbbdev         return 0;
195d86ed7fbStbbdev     }
196d86ed7fbStbbdev     FILE* output_file = fopen(OutputFileName.c_str(), "w");
197d86ed7fbStbbdev     if (!output_file) {
198d86ed7fbStbbdev         throw std::invalid_argument(("Invalid output file name: " + OutputFileName).c_str());
199d86ed7fbStbbdev         return 0;
200d86ed7fbStbbdev     }
201d86ed7fbStbbdev 
202d86ed7fbStbbdev     oneapi::tbb::tick_count t0 = oneapi::tbb::tick_count::now();
203d86ed7fbStbbdev 
204d86ed7fbStbbdev     // Need more than one token in flight per thread to keep all threads
205d86ed7fbStbbdev     // busy; 2-4 works
206d86ed7fbStbbdev     oneapi::tbb::parallel_pipeline(
207d86ed7fbStbbdev         nthreads * 4,
208d86ed7fbStbbdev         oneapi::tbb::make_filter<void, TextSlice*>(oneapi::tbb::filter_mode::serial_in_order,
209d86ed7fbStbbdev                                                    MyInputFunc(input_file)) &
210d86ed7fbStbbdev             oneapi::tbb::make_filter<TextSlice*, TextSlice*>(oneapi::tbb::filter_mode::parallel,
211d86ed7fbStbbdev                                                              MyTransformFunc()) &
212d86ed7fbStbbdev             oneapi::tbb::make_filter<TextSlice*, void>(oneapi::tbb::filter_mode::serial_in_order,
213d86ed7fbStbbdev                                                        MyOutputFunc(output_file)));
214d86ed7fbStbbdev 
215d86ed7fbStbbdev     oneapi::tbb::tick_count t1 = oneapi::tbb::tick_count::now();
216d86ed7fbStbbdev 
217d86ed7fbStbbdev     fclose(output_file);
218d86ed7fbStbbdev     fclose(input_file);
219d86ed7fbStbbdev 
220d86ed7fbStbbdev     if (!silent)
221d86ed7fbStbbdev         printf("time = %g\n", (t1 - t0).seconds());
222d86ed7fbStbbdev 
223d86ed7fbStbbdev     return 1;
224d86ed7fbStbbdev }
225d86ed7fbStbbdev 
main(int argc,char * argv[])226d86ed7fbStbbdev int main(int argc, char* argv[]) {
227d86ed7fbStbbdev     oneapi::tbb::tick_count mainStartTime = oneapi::tbb::tick_count::now();
228d86ed7fbStbbdev 
229d86ed7fbStbbdev     // The 1st argument is the function to obtain 'auto' value; the 2nd is the default value
230d86ed7fbStbbdev     // The example interprets 0 threads as "run serially, then fully subscribed"
231d86ed7fbStbbdev     utility::thread_number_range threads(utility::get_default_num_threads, 0);
232d86ed7fbStbbdev 
233d86ed7fbStbbdev     utility::parse_cli_arguments(
234d86ed7fbStbbdev         argc,
235d86ed7fbStbbdev         argv,
236d86ed7fbStbbdev         utility::cli_argument_pack()
237d86ed7fbStbbdev             //"-h" option for displaying help is present implicitly
238d86ed7fbStbbdev             .positional_arg(threads, "n-of-threads", utility::thread_number_range_desc)
239d86ed7fbStbbdev             .positional_arg(InputFileName, "input-file", "input file name")
240d86ed7fbStbbdev             .positional_arg(OutputFileName, "output-file", "output file name")
241d86ed7fbStbbdev             .positional_arg(MAX_CHAR_PER_INPUT_SLICE,
242d86ed7fbStbbdev                             "max-slice-size",
243d86ed7fbStbbdev                             "the maximum number of characters in one slice")
244d86ed7fbStbbdev             .arg(silent, "silent", "no output except elapsed time"));
245d86ed7fbStbbdev     generate_if_needed(InputFileName.c_str());
246d86ed7fbStbbdev 
247d86ed7fbStbbdev     if (threads.first) {
248d86ed7fbStbbdev         for (int p = threads.first; p <= threads.last; p = threads.step(p)) {
249d86ed7fbStbbdev             if (!silent)
250d86ed7fbStbbdev                 printf("threads = %d ", p);
251d86ed7fbStbbdev             oneapi::tbb::global_control c(oneapi::tbb::global_control::max_allowed_parallelism, p);
252d86ed7fbStbbdev             if (!run_pipeline(p))
253d86ed7fbStbbdev                 return -1;
254d86ed7fbStbbdev         }
255d86ed7fbStbbdev     }
256d86ed7fbStbbdev     else { // Number of threads wasn't set explicitly. Run serial and parallel version
257d86ed7fbStbbdev         { // serial run
258d86ed7fbStbbdev             if (!silent)
259d86ed7fbStbbdev                 printf("serial run   ");
260d86ed7fbStbbdev             oneapi::tbb::global_control c(oneapi::tbb::global_control::max_allowed_parallelism, 1);
261d86ed7fbStbbdev             if (!run_pipeline(1))
262d86ed7fbStbbdev                 return -1;
263d86ed7fbStbbdev         }
264d86ed7fbStbbdev         { // parallel run (number of threads is selected automatically)
265d86ed7fbStbbdev             if (!silent)
266d86ed7fbStbbdev                 printf("parallel run ");
267d86ed7fbStbbdev             oneapi::tbb::global_control c(oneapi::tbb::global_control::max_allowed_parallelism,
268d86ed7fbStbbdev                                           utility::get_default_num_threads());
269d86ed7fbStbbdev             if (!run_pipeline(utility::get_default_num_threads()))
270d86ed7fbStbbdev                 return -1;
271d86ed7fbStbbdev         }
272d86ed7fbStbbdev     }
273d86ed7fbStbbdev 
274d86ed7fbStbbdev     utility::report_elapsed_time((oneapi::tbb::tick_count::now() - mainStartTime).seconds());
275d86ed7fbStbbdev 
276d86ed7fbStbbdev     return 0;
277d86ed7fbStbbdev }
278