1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 //
18 // Example program that reads a file of decimal integers in text format
19 // and changes each to its square.
20 //
21 
22 #include <cstring>
23 #include <cstdlib>
24 #include <cstdio>
25 #include <cctype>
26 
27 #include "oneapi/tbb/parallel_pipeline.h"
28 #include "oneapi/tbb/tick_count.h"
29 #include "oneapi/tbb/tbb_allocator.h"
30 #include "oneapi/tbb/global_control.h"
31 
32 #include "common/utility/utility.hpp"
33 #include "common/utility/get_default_num_threads.hpp"
34 
35 extern void generate_if_needed(const char*);
36 
37 //! Holds a slice of text.
38 /** Instances *must* be allocated/freed using methods herein, because the C++ declaration
39     represents only the header of a much larger object in memory. */
40 class TextSlice {
41     //! Pointer to one past last character in sequence
42     char* logical_end;
43     //! Pointer to one past last available byte in sequence.
44     char* physical_end;
45 
46 public:
47     //! Allocate a TextSlice object that can hold up to max_size characters.
48     static TextSlice* allocate(std::size_t max_size) {
49         // +1 leaves room for a terminating null character.
50         TextSlice* t = (TextSlice*)oneapi::tbb::tbb_allocator<char>().allocate(sizeof(TextSlice) +
51                                                                                max_size + 1);
52         t->logical_end = t->begin();
53         t->physical_end = t->begin() + max_size;
54         return t;
55     }
56     //! Free a TextSlice object
57     void free() {
58         oneapi::tbb::tbb_allocator<char>().deallocate(
59             (char*)this, sizeof(TextSlice) + (physical_end - begin()) + 1);
60     }
61     //! Pointer to beginning of sequence
62     char* begin() {
63         return (char*)(this + 1);
64     }
65     //! Pointer to one past last character in sequence
66     char* end() {
67         return logical_end;
68     }
69     //! Length of sequence
70     std::size_t size() const {
71         return logical_end - (char*)(this + 1);
72     }
73     //! Maximum number of characters that can be appended to sequence
74     std::size_t avail() const {
75         return physical_end - logical_end;
76     }
77     //! Append sequence [first,last) to this sequence.
78     void append(char* first, char* last) {
79         memcpy(logical_end, first, last - first);
80         logical_end += last - first;
81     }
82     //! Set end() to given value.
83     void set_end(char* p) {
84         logical_end = p;
85     }
86 };
87 
88 std::size_t MAX_CHAR_PER_INPUT_SLICE = 4000;
89 std::string InputFileName = "input.txt";
90 std::string OutputFileName = "output.txt";
91 
92 TextSlice* next_slice = nullptr;
93 
94 class MyInputFunc {
95 public:
96     MyInputFunc(FILE* input_file_);
97     MyInputFunc(const MyInputFunc& f) : input_file(f.input_file) {}
98     ~MyInputFunc();
99     TextSlice* operator()(oneapi::tbb::flow_control& fc) const;
100 
101 private:
102     FILE* input_file;
103 };
104 
105 MyInputFunc::MyInputFunc(FILE* input_file_) : input_file(input_file_) {}
106 
107 MyInputFunc::~MyInputFunc() {}
108 
109 TextSlice* MyInputFunc::operator()(oneapi::tbb::flow_control& fc) const {
110     // Read characters into space that is available in the next slice.
111     if (!next_slice)
112         next_slice = TextSlice::allocate(MAX_CHAR_PER_INPUT_SLICE);
113     std::size_t m = next_slice->avail();
114     std::size_t n = fread(next_slice->end(), 1, m, input_file);
115     if (!n && next_slice->size() == 0) {
116         // No more characters to process
117         fc.stop();
118         return nullptr;
119     }
120     else {
121         // Have more characters to process.
122         TextSlice* t = next_slice;
123         next_slice = TextSlice::allocate(MAX_CHAR_PER_INPUT_SLICE);
124         char* p = t->end() + n;
125         if (n == m) {
126             // Might have read partial number.
127             // If so, transfer characters of partial number to next slice.
128             while (p > t->begin() && isdigit(p[-1]))
129                 --p;
130             assert(p > t->begin()); // Number too large to fit in buffer
131             next_slice->append(p, t->end() + n);
132         }
133         t->set_end(p);
134         return t;
135     }
136 }
137 
138 // Functor that changes each decimal number to its square.
139 class MyTransformFunc {
140 public:
141     TextSlice* operator()(TextSlice* input) const;
142 };
143 
144 TextSlice* MyTransformFunc::operator()(TextSlice* input) const {
145     // Add terminating null so that strtol works right even if number is at end of the input.
146     *input->end() = '\0';
147     char* p = input->begin();
148     TextSlice* out = TextSlice::allocate(2 * MAX_CHAR_PER_INPUT_SLICE);
149     char* q = out->begin();
150     for (;;) {
151         while (p < input->end() && !isdigit(*p))
152             *q++ = *p++;
153         if (p == input->end())
154             break;
155         long x = strtol(p, &p, 10);
156         // Note: no overflow checking is needed here, as we have twice the
157         // input string length, but the square of a non-negative integer n
158         // cannot have more than twice as many digits as n.
159         long y = x * x;
160         sprintf(q, "%ld", y);
161         q = strchr(q, 0);
162     }
163     out->set_end(q);
164     input->free();
165     return out;
166 }
167 
168 // Functor that writes a TextSlice to a file.
169 class MyOutputFunc {
170     FILE* my_output_file;
171 
172 public:
173     MyOutputFunc(FILE* output_file);
174     void operator()(TextSlice* item) const;
175 };
176 
177 MyOutputFunc::MyOutputFunc(FILE* output_file) : my_output_file(output_file) {}
178 
179 void MyOutputFunc::operator()(TextSlice* out) const {
180     std::size_t n = fwrite(out->begin(), 1, out->size(), my_output_file);
181     if (n != out->size()) {
182         fprintf(stderr, "Can't write into file '%s'\n", OutputFileName.c_str());
183         std::exit(-1);
184     }
185     out->free();
186 }
187 
188 bool silent = false;
189 
190 int run_pipeline(int nthreads) {
191     FILE* input_file = fopen(InputFileName.c_str(), "r");
192     if (!input_file) {
193         throw std::invalid_argument(("Invalid input file name: " + InputFileName).c_str());
194         return 0;
195     }
196     FILE* output_file = fopen(OutputFileName.c_str(), "w");
197     if (!output_file) {
198         throw std::invalid_argument(("Invalid output file name: " + OutputFileName).c_str());
199         return 0;
200     }
201 
202     oneapi::tbb::tick_count t0 = oneapi::tbb::tick_count::now();
203 
204     // Need more than one token in flight per thread to keep all threads
205     // busy; 2-4 works
206     oneapi::tbb::parallel_pipeline(
207         nthreads * 4,
208         oneapi::tbb::make_filter<void, TextSlice*>(oneapi::tbb::filter_mode::serial_in_order,
209                                                    MyInputFunc(input_file)) &
210             oneapi::tbb::make_filter<TextSlice*, TextSlice*>(oneapi::tbb::filter_mode::parallel,
211                                                              MyTransformFunc()) &
212             oneapi::tbb::make_filter<TextSlice*, void>(oneapi::tbb::filter_mode::serial_in_order,
213                                                        MyOutputFunc(output_file)));
214 
215     oneapi::tbb::tick_count t1 = oneapi::tbb::tick_count::now();
216 
217     fclose(output_file);
218     fclose(input_file);
219 
220     if (!silent)
221         printf("time = %g\n", (t1 - t0).seconds());
222 
223     return 1;
224 }
225 
226 int main(int argc, char* argv[]) {
227     oneapi::tbb::tick_count mainStartTime = oneapi::tbb::tick_count::now();
228 
229     // The 1st argument is the function to obtain 'auto' value; the 2nd is the default value
230     // The example interprets 0 threads as "run serially, then fully subscribed"
231     utility::thread_number_range threads(utility::get_default_num_threads, 0);
232 
233     utility::parse_cli_arguments(
234         argc,
235         argv,
236         utility::cli_argument_pack()
237             //"-h" option for displaying help is present implicitly
238             .positional_arg(threads, "n-of-threads", utility::thread_number_range_desc)
239             .positional_arg(InputFileName, "input-file", "input file name")
240             .positional_arg(OutputFileName, "output-file", "output file name")
241             .positional_arg(MAX_CHAR_PER_INPUT_SLICE,
242                             "max-slice-size",
243                             "the maximum number of characters in one slice")
244             .arg(silent, "silent", "no output except elapsed time"));
245     generate_if_needed(InputFileName.c_str());
246 
247     if (threads.first) {
248         for (int p = threads.first; p <= threads.last; p = threads.step(p)) {
249             if (!silent)
250                 printf("threads = %d ", p);
251             oneapi::tbb::global_control c(oneapi::tbb::global_control::max_allowed_parallelism, p);
252             if (!run_pipeline(p))
253                 return -1;
254         }
255     }
256     else { // Number of threads wasn't set explicitly. Run serial and parallel version
257         { // serial run
258             if (!silent)
259                 printf("serial run   ");
260             oneapi::tbb::global_control c(oneapi::tbb::global_control::max_allowed_parallelism, 1);
261             if (!run_pipeline(1))
262                 return -1;
263         }
264         { // parallel run (number of threads is selected automatically)
265             if (!silent)
266                 printf("parallel run ");
267             oneapi::tbb::global_control c(oneapi::tbb::global_control::max_allowed_parallelism,
268                                           utility::get_default_num_threads());
269             if (!run_pipeline(utility::get_default_num_threads()))
270                 return -1;
271         }
272     }
273 
274     utility::report_elapsed_time((oneapi::tbb::tick_count::now() - mainStartTime).seconds());
275 
276     return 0;
277 }
278