1 /*
2 Copyright (c) 2005-2021 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 //
18 // Example program that reads a file of decimal integers in text format
19 // and changes each to its square.
20 //
21
22 #include <cstring>
23 #include <cstdlib>
24 #include <cstdio>
25 #include <cctype>
26
27 #include "oneapi/tbb/parallel_pipeline.h"
28 #include "oneapi/tbb/tick_count.h"
29 #include "oneapi/tbb/tbb_allocator.h"
30 #include "oneapi/tbb/global_control.h"
31
32 #include "common/utility/utility.hpp"
33 #include "common/utility/get_default_num_threads.hpp"
34
35 extern void generate_if_needed(const char*);
36
37 //! Holds a slice of text.
38 /** Instances *must* be allocated/freed using methods herein, because the C++ declaration
39 represents only the header of a much larger object in memory. */
40 class TextSlice {
41 //! Pointer to one past last character in sequence
42 char* logical_end;
43 //! Pointer to one past last available byte in sequence.
44 char* physical_end;
45
46 public:
47 //! Allocate a TextSlice object that can hold up to max_size characters.
allocate(std::size_t max_size)48 static TextSlice* allocate(std::size_t max_size) {
49 // +1 leaves room for a terminating null character.
50 TextSlice* t = (TextSlice*)oneapi::tbb::tbb_allocator<char>().allocate(sizeof(TextSlice) +
51 max_size + 1);
52 t->logical_end = t->begin();
53 t->physical_end = t->begin() + max_size;
54 return t;
55 }
56 //! Free a TextSlice object
free()57 void free() {
58 oneapi::tbb::tbb_allocator<char>().deallocate(
59 (char*)this, sizeof(TextSlice) + (physical_end - begin()) + 1);
60 }
61 //! Pointer to beginning of sequence
begin()62 char* begin() {
63 return (char*)(this + 1);
64 }
65 //! Pointer to one past last character in sequence
end()66 char* end() {
67 return logical_end;
68 }
69 //! Length of sequence
size() const70 std::size_t size() const {
71 return logical_end - (char*)(this + 1);
72 }
73 //! Maximum number of characters that can be appended to sequence
avail() const74 std::size_t avail() const {
75 return physical_end - logical_end;
76 }
77 //! Append sequence [first,last) to this sequence.
append(char * first,char * last)78 void append(char* first, char* last) {
79 memcpy(logical_end, first, last - first);
80 logical_end += last - first;
81 }
82 //! Set end() to given value.
set_end(char * p)83 void set_end(char* p) {
84 logical_end = p;
85 }
86 };
87
88 std::size_t MAX_CHAR_PER_INPUT_SLICE = 4000;
89 std::string InputFileName = "input.txt";
90 std::string OutputFileName = "output.txt";
91
92 TextSlice* next_slice = nullptr;
93
94 class MyInputFunc {
95 public:
96 MyInputFunc(FILE* input_file_);
MyInputFunc(const MyInputFunc & f)97 MyInputFunc(const MyInputFunc& f) : input_file(f.input_file) {}
98 ~MyInputFunc();
99 TextSlice* operator()(oneapi::tbb::flow_control& fc) const;
100
101 private:
102 FILE* input_file;
103 };
104
MyInputFunc(FILE * input_file_)105 MyInputFunc::MyInputFunc(FILE* input_file_) : input_file(input_file_) {}
106
~MyInputFunc()107 MyInputFunc::~MyInputFunc() {}
108
operator ()(oneapi::tbb::flow_control & fc) const109 TextSlice* MyInputFunc::operator()(oneapi::tbb::flow_control& fc) const {
110 // Read characters into space that is available in the next slice.
111 if (!next_slice)
112 next_slice = TextSlice::allocate(MAX_CHAR_PER_INPUT_SLICE);
113 std::size_t m = next_slice->avail();
114 std::size_t n = fread(next_slice->end(), 1, m, input_file);
115 if (!n && next_slice->size() == 0) {
116 // No more characters to process
117 fc.stop();
118 return nullptr;
119 }
120 else {
121 // Have more characters to process.
122 TextSlice* t = next_slice;
123 next_slice = TextSlice::allocate(MAX_CHAR_PER_INPUT_SLICE);
124 char* p = t->end() + n;
125 if (n == m) {
126 // Might have read partial number.
127 // If so, transfer characters of partial number to next slice.
128 while (p > t->begin() && isdigit(p[-1]))
129 --p;
130 assert(p > t->begin()); // Number too large to fit in buffer
131 next_slice->append(p, t->end() + n);
132 }
133 t->set_end(p);
134 return t;
135 }
136 }
137
138 // Functor that changes each decimal number to its square.
139 class MyTransformFunc {
140 public:
141 TextSlice* operator()(TextSlice* input) const;
142 };
143
operator ()(TextSlice * input) const144 TextSlice* MyTransformFunc::operator()(TextSlice* input) const {
145 // Add terminating null so that strtol works right even if number is at end of the input.
146 *input->end() = '\0';
147 char* p = input->begin();
148 TextSlice* out = TextSlice::allocate(2 * MAX_CHAR_PER_INPUT_SLICE);
149 char* q = out->begin();
150 for (;;) {
151 while (p < input->end() && !isdigit(*p))
152 *q++ = *p++;
153 if (p == input->end())
154 break;
155 long x = strtol(p, &p, 10);
156 // Note: no overflow checking is needed here, as we have twice the
157 // input string length, but the square of a non-negative integer n
158 // cannot have more than twice as many digits as n.
159 long y = x * x;
160 sprintf(q, "%ld", y);
161 q = strchr(q, 0);
162 }
163 out->set_end(q);
164 input->free();
165 return out;
166 }
167
168 // Functor that writes a TextSlice to a file.
169 class MyOutputFunc {
170 FILE* my_output_file;
171
172 public:
173 MyOutputFunc(FILE* output_file);
174 void operator()(TextSlice* item) const;
175 };
176
MyOutputFunc(FILE * output_file)177 MyOutputFunc::MyOutputFunc(FILE* output_file) : my_output_file(output_file) {}
178
operator ()(TextSlice * out) const179 void MyOutputFunc::operator()(TextSlice* out) const {
180 std::size_t n = fwrite(out->begin(), 1, out->size(), my_output_file);
181 if (n != out->size()) {
182 fprintf(stderr, "Can't write into file '%s'\n", OutputFileName.c_str());
183 std::exit(-1);
184 }
185 out->free();
186 }
187
188 bool silent = false;
189
run_pipeline(int nthreads)190 int run_pipeline(int nthreads) {
191 FILE* input_file = fopen(InputFileName.c_str(), "r");
192 if (!input_file) {
193 throw std::invalid_argument(("Invalid input file name: " + InputFileName).c_str());
194 return 0;
195 }
196 FILE* output_file = fopen(OutputFileName.c_str(), "w");
197 if (!output_file) {
198 throw std::invalid_argument(("Invalid output file name: " + OutputFileName).c_str());
199 return 0;
200 }
201
202 oneapi::tbb::tick_count t0 = oneapi::tbb::tick_count::now();
203
204 // Need more than one token in flight per thread to keep all threads
205 // busy; 2-4 works
206 oneapi::tbb::parallel_pipeline(
207 nthreads * 4,
208 oneapi::tbb::make_filter<void, TextSlice*>(oneapi::tbb::filter_mode::serial_in_order,
209 MyInputFunc(input_file)) &
210 oneapi::tbb::make_filter<TextSlice*, TextSlice*>(oneapi::tbb::filter_mode::parallel,
211 MyTransformFunc()) &
212 oneapi::tbb::make_filter<TextSlice*, void>(oneapi::tbb::filter_mode::serial_in_order,
213 MyOutputFunc(output_file)));
214
215 oneapi::tbb::tick_count t1 = oneapi::tbb::tick_count::now();
216
217 fclose(output_file);
218 fclose(input_file);
219
220 if (!silent)
221 printf("time = %g\n", (t1 - t0).seconds());
222
223 return 1;
224 }
225
main(int argc,char * argv[])226 int main(int argc, char* argv[]) {
227 oneapi::tbb::tick_count mainStartTime = oneapi::tbb::tick_count::now();
228
229 // The 1st argument is the function to obtain 'auto' value; the 2nd is the default value
230 // The example interprets 0 threads as "run serially, then fully subscribed"
231 utility::thread_number_range threads(utility::get_default_num_threads, 0);
232
233 utility::parse_cli_arguments(
234 argc,
235 argv,
236 utility::cli_argument_pack()
237 //"-h" option for displaying help is present implicitly
238 .positional_arg(threads, "n-of-threads", utility::thread_number_range_desc)
239 .positional_arg(InputFileName, "input-file", "input file name")
240 .positional_arg(OutputFileName, "output-file", "output file name")
241 .positional_arg(MAX_CHAR_PER_INPUT_SLICE,
242 "max-slice-size",
243 "the maximum number of characters in one slice")
244 .arg(silent, "silent", "no output except elapsed time"));
245 generate_if_needed(InputFileName.c_str());
246
247 if (threads.first) {
248 for (int p = threads.first; p <= threads.last; p = threads.step(p)) {
249 if (!silent)
250 printf("threads = %d ", p);
251 oneapi::tbb::global_control c(oneapi::tbb::global_control::max_allowed_parallelism, p);
252 if (!run_pipeline(p))
253 return -1;
254 }
255 }
256 else { // Number of threads wasn't set explicitly. Run serial and parallel version
257 { // serial run
258 if (!silent)
259 printf("serial run ");
260 oneapi::tbb::global_control c(oneapi::tbb::global_control::max_allowed_parallelism, 1);
261 if (!run_pipeline(1))
262 return -1;
263 }
264 { // parallel run (number of threads is selected automatically)
265 if (!silent)
266 printf("parallel run ");
267 oneapi::tbb::global_control c(oneapi::tbb::global_control::max_allowed_parallelism,
268 utility::get_default_num_threads());
269 if (!run_pipeline(utility::get_default_num_threads()))
270 return -1;
271 }
272 }
273
274 utility::report_elapsed_time((oneapi::tbb::tick_count::now() - mainStartTime).seconds());
275
276 return 0;
277 }
278