1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 //
18 // Example program that reads a file of decimal integers in text format
19 // and changes each to its square.
20 //
21 #include <cstring>
22 #include <cstdlib>
23 #include <cstdio>
24 #include <cctype>
25 
26 #include "oneapi/tbb/parallel_pipeline.h"
27 #include "oneapi/tbb/tick_count.h"
28 #include "oneapi/tbb/tbb_allocator.h"
29 #include "oneapi/tbb/global_control.h"
30 
31 #include "common/utility/utility.hpp"
32 #include "common/utility/get_default_num_threads.hpp"
33 
34 extern void generate_if_needed(const char*);
35 
36 //! Holds a slice of text.
37 /** Instances *must* be allocated/freed using methods herein, because the C++ declaration
38     represents only the header of a much larger object in memory. */
39 class TextSlice {
40     //! Pointer to one past last character in sequence
41     char* logical_end;
42     //! Pointer to one past last available byte in sequence.
43     char* physical_end;
44 
45 public:
46     //! Allocate a TextSlice object that can hold up to max_size characters.
47     static TextSlice* allocate(std::size_t max_size) {
48         // +1 leaves room for a terminating null character.
49         TextSlice* t = (TextSlice*)oneapi::tbb::tbb_allocator<char>().allocate(sizeof(TextSlice) +
50                                                                                max_size + 1);
51         t->logical_end = t->begin();
52         t->physical_end = t->begin() + max_size;
53         return t;
54     }
55     //! Free a TextSlice object
56     void free() {
57         oneapi::tbb::tbb_allocator<char>().deallocate(
58             (char*)this, sizeof(TextSlice) + (physical_end - begin()) + 1);
59     }
60     //! Pointer to beginning of sequence
61     char* begin() {
62         return (char*)(this + 1);
63     }
64     //! Pointer to one past last character in sequence
65     char* end() {
66         return logical_end;
67     }
68     //! Length of sequence
69     std::size_t size() const {
70         return logical_end - (char*)(this + 1);
71     }
72     //! Maximum number of characters that can be appended to sequence
73     std::size_t avail() const {
74         return physical_end - logical_end;
75     }
76     //! Append sequence [first,last) to this sequence.
77     void append(char* first, char* last) {
78         memcpy(logical_end, first, last - first);
79         logical_end += last - first;
80     }
81     //! Set end() to given value.
82     void set_end(char* p) {
83         logical_end = p;
84     }
85 };
86 
87 std::size_t MAX_CHAR_PER_INPUT_SLICE = 4000;
88 std::string InputFileName = "input.txt";
89 std::string OutputFileName = "output.txt";
90 
91 TextSlice* next_slice = nullptr;
92 
93 class MyInputFunc {
94 public:
95     MyInputFunc(FILE* input_file_);
96     MyInputFunc(const MyInputFunc& f) : input_file(f.input_file) {}
97     ~MyInputFunc();
98     TextSlice* operator()(oneapi::tbb::flow_control& fc) const;
99 
100 private:
101     FILE* input_file;
102 };
103 
104 MyInputFunc::MyInputFunc(FILE* input_file_) : input_file(input_file_) {}
105 
106 MyInputFunc::~MyInputFunc() {}
107 
108 TextSlice* MyInputFunc::operator()(oneapi::tbb::flow_control& fc) const {
109     // Read characters into space that is available in the next slice.
110     if (!next_slice)
111         next_slice = TextSlice::allocate(MAX_CHAR_PER_INPUT_SLICE);
112     std::size_t m = next_slice->avail();
113     std::size_t n = fread(next_slice->end(), 1, m, input_file);
114     if (!n && next_slice->size() == 0) {
115         // No more characters to process
116         fc.stop();
117         return nullptr;
118     }
119     else {
120         // Have more characters to process.
121         TextSlice* t = next_slice;
122         next_slice = TextSlice::allocate(MAX_CHAR_PER_INPUT_SLICE);
123         char* p = t->end() + n;
124         if (n == m) {
125             // Might have read partial number.
126             // If so, transfer characters of partial number to next slice.
127             while (p > t->begin() && isdigit(p[-1]))
128                 --p;
129             assert(p > t->begin()); // Number too large to fit in buffer
130             next_slice->append(p, t->end() + n);
131         }
132         t->set_end(p);
133         return t;
134     }
135 }
136 
137 // Functor that changes each decimal number to its square.
138 class MyTransformFunc {
139 public:
140     TextSlice* operator()(TextSlice* input) const;
141 };
142 
143 TextSlice* MyTransformFunc::operator()(TextSlice* input) const {
144     // Add terminating null so that strtol works right even if number is at end of the input.
145     *input->end() = '\0';
146     char* p = input->begin();
147     TextSlice* out = TextSlice::allocate(2 * MAX_CHAR_PER_INPUT_SLICE);
148     char* q = out->begin();
149     for (;;) {
150         while (p < input->end() && !isdigit(*p))
151             *q++ = *p++;
152         if (p == input->end())
153             break;
154         long x = strtol(p, &p, 10);
155         // Note: no overflow checking is needed here, as we have twice the
156         // input string length, but the square of a non-negative integer n
157         // cannot have more than twice as many digits as n.
158         long y = x * x;
159         sprintf(q, "%ld", y);
160         q = strchr(q, 0);
161     }
162     out->set_end(q);
163     input->free();
164     return out;
165 }
166 
167 // Functor that writes a TextSlice to a file.
168 class MyOutputFunc {
169     FILE* my_output_file;
170 
171 public:
172     MyOutputFunc(FILE* output_file);
173     void operator()(TextSlice* item) const;
174 };
175 
176 MyOutputFunc::MyOutputFunc(FILE* output_file) : my_output_file(output_file) {}
177 
178 void MyOutputFunc::operator()(TextSlice* out) const {
179     std::size_t n = fwrite(out->begin(), 1, out->size(), my_output_file);
180     if (n != out->size()) {
181         fprintf(stderr, "Can't write into file '%s'\n", OutputFileName.c_str());
182         std::exit(-1);
183     }
184     out->free();
185 }
186 
187 bool silent = false;
188 
189 int run_pipeline(int nthreads) {
190     FILE* input_file = fopen(InputFileName.c_str(), "r");
191     if (!input_file) {
192         throw std::invalid_argument(("Invalid input file name: " + InputFileName).c_str());
193         return 0;
194     }
195     FILE* output_file = fopen(OutputFileName.c_str(), "w");
196     if (!output_file) {
197         throw std::invalid_argument(("Invalid output file name: " + OutputFileName).c_str());
198         return 0;
199     }
200 
201     oneapi::tbb::tick_count t0 = oneapi::tbb::tick_count::now();
202 
203     // Need more than one token in flight per thread to keep all threads
204     // busy; 2-4 works
205     oneapi::tbb::parallel_pipeline(
206         nthreads * 4,
207         oneapi::tbb::make_filter<void, TextSlice*>(oneapi::tbb::filter_mode::serial_in_order,
208                                                    MyInputFunc(input_file)) &
209             oneapi::tbb::make_filter<TextSlice*, TextSlice*>(oneapi::tbb::filter_mode::parallel,
210                                                              MyTransformFunc()) &
211             oneapi::tbb::make_filter<TextSlice*, void>(oneapi::tbb::filter_mode::serial_in_order,
212                                                        MyOutputFunc(output_file)));
213 
214     oneapi::tbb::tick_count t1 = oneapi::tbb::tick_count::now();
215 
216     fclose(output_file);
217     fclose(input_file);
218 
219     if (!silent)
220         printf("time = %g\n", (t1 - t0).seconds());
221 
222     return 1;
223 }
224 
225 int main(int argc, char* argv[]) {
226     oneapi::tbb::tick_count mainStartTime = oneapi::tbb::tick_count::now();
227 
228     // The 1st argument is the function to obtain 'auto' value; the 2nd is the default value
229     // The example interprets 0 threads as "run serially, then fully subscribed"
230     utility::thread_number_range threads(utility::get_default_num_threads, 0);
231 
232     utility::parse_cli_arguments(
233         argc,
234         argv,
235         utility::cli_argument_pack()
236             //"-h" option for displaying help is present implicitly
237             .positional_arg(threads, "n-of-threads", utility::thread_number_range_desc)
238             .positional_arg(InputFileName, "input-file", "input file name")
239             .positional_arg(OutputFileName, "output-file", "output file name")
240             .positional_arg(MAX_CHAR_PER_INPUT_SLICE,
241                             "max-slice-size",
242                             "the maximum number of characters in one slice")
243             .arg(silent, "silent", "no output except elapsed time"));
244     generate_if_needed(InputFileName.c_str());
245 
246     if (threads.first) {
247         for (int p = threads.first; p <= threads.last; p = threads.step(p)) {
248             if (!silent)
249                 printf("threads = %d ", p);
250             oneapi::tbb::global_control c(oneapi::tbb::global_control::max_allowed_parallelism, p);
251             if (!run_pipeline(p))
252                 return -1;
253         }
254     }
255     else { // Number of threads wasn't set explicitly. Run serial and parallel version
256         { // serial run
257             if (!silent)
258                 printf("serial run   ");
259             oneapi::tbb::global_control c(oneapi::tbb::global_control::max_allowed_parallelism, 1);
260             if (!run_pipeline(1))
261                 return -1;
262         }
263         { // parallel run (number of threads is selected automatically)
264             if (!silent)
265                 printf("parallel run ");
266             oneapi::tbb::global_control c(oneapi::tbb::global_control::max_allowed_parallelism,
267                                           utility::get_default_num_threads());
268             if (!run_pipeline(utility::get_default_num_threads()))
269                 return -1;
270         }
271     }
272 
273     utility::report_elapsed_time((oneapi::tbb::tick_count::now() - mainStartTime).seconds());
274 
275     return 0;
276 }
277