1d86ed7fbStbbdev /*
2b15aabb3Stbbdev Copyright (c) 2005-2021 Intel Corporation
3d86ed7fbStbbdev
4d86ed7fbStbbdev Licensed under the Apache License, Version 2.0 (the "License");
5d86ed7fbStbbdev you may not use this file except in compliance with the License.
6d86ed7fbStbbdev You may obtain a copy of the License at
7d86ed7fbStbbdev
8d86ed7fbStbbdev http://www.apache.org/licenses/LICENSE-2.0
9d86ed7fbStbbdev
10d86ed7fbStbbdev Unless required by applicable law or agreed to in writing, software
11d86ed7fbStbbdev distributed under the License is distributed on an "AS IS" BASIS,
12d86ed7fbStbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13d86ed7fbStbbdev See the License for the specific language governing permissions and
14d86ed7fbStbbdev limitations under the License.
15d86ed7fbStbbdev */
16d86ed7fbStbbdev
17d86ed7fbStbbdev //
18d86ed7fbStbbdev // Example program that reads a file of decimal integers in text format
19d86ed7fbStbbdev // and changes each to its square.
20d86ed7fbStbbdev //
21*478de5b1Stbbdev
22d86ed7fbStbbdev #include <cstring>
23d86ed7fbStbbdev #include <cstdlib>
24d86ed7fbStbbdev #include <cstdio>
25d86ed7fbStbbdev #include <cctype>
26d86ed7fbStbbdev
27d86ed7fbStbbdev #include "oneapi/tbb/parallel_pipeline.h"
28d86ed7fbStbbdev #include "oneapi/tbb/tick_count.h"
29d86ed7fbStbbdev #include "oneapi/tbb/tbb_allocator.h"
30d86ed7fbStbbdev #include "oneapi/tbb/global_control.h"
31d86ed7fbStbbdev
32d86ed7fbStbbdev #include "common/utility/utility.hpp"
33d86ed7fbStbbdev #include "common/utility/get_default_num_threads.hpp"
34d86ed7fbStbbdev
35d86ed7fbStbbdev extern void generate_if_needed(const char*);
36d86ed7fbStbbdev
37d86ed7fbStbbdev //! Holds a slice of text.
38d86ed7fbStbbdev /** Instances *must* be allocated/freed using methods herein, because the C++ declaration
39d86ed7fbStbbdev represents only the header of a much larger object in memory. */
40d86ed7fbStbbdev class TextSlice {
41d86ed7fbStbbdev //! Pointer to one past last character in sequence
42d86ed7fbStbbdev char* logical_end;
43d86ed7fbStbbdev //! Pointer to one past last available byte in sequence.
44d86ed7fbStbbdev char* physical_end;
45d86ed7fbStbbdev
46d86ed7fbStbbdev public:
47d86ed7fbStbbdev //! Allocate a TextSlice object that can hold up to max_size characters.
allocate(std::size_t max_size)48d86ed7fbStbbdev static TextSlice* allocate(std::size_t max_size) {
49d86ed7fbStbbdev // +1 leaves room for a terminating null character.
50d86ed7fbStbbdev TextSlice* t = (TextSlice*)oneapi::tbb::tbb_allocator<char>().allocate(sizeof(TextSlice) +
51d86ed7fbStbbdev max_size + 1);
52d86ed7fbStbbdev t->logical_end = t->begin();
53d86ed7fbStbbdev t->physical_end = t->begin() + max_size;
54d86ed7fbStbbdev return t;
55d86ed7fbStbbdev }
56d86ed7fbStbbdev //! Free a TextSlice object
free()57d86ed7fbStbbdev void free() {
58d86ed7fbStbbdev oneapi::tbb::tbb_allocator<char>().deallocate(
59d86ed7fbStbbdev (char*)this, sizeof(TextSlice) + (physical_end - begin()) + 1);
60d86ed7fbStbbdev }
61d86ed7fbStbbdev //! Pointer to beginning of sequence
begin()62d86ed7fbStbbdev char* begin() {
63d86ed7fbStbbdev return (char*)(this + 1);
64d86ed7fbStbbdev }
65d86ed7fbStbbdev //! Pointer to one past last character in sequence
end()66d86ed7fbStbbdev char* end() {
67d86ed7fbStbbdev return logical_end;
68d86ed7fbStbbdev }
69d86ed7fbStbbdev //! Length of sequence
size() const70d86ed7fbStbbdev std::size_t size() const {
71d86ed7fbStbbdev return logical_end - (char*)(this + 1);
72d86ed7fbStbbdev }
73d86ed7fbStbbdev //! Maximum number of characters that can be appended to sequence
avail() const74d86ed7fbStbbdev std::size_t avail() const {
75d86ed7fbStbbdev return physical_end - logical_end;
76d86ed7fbStbbdev }
77d86ed7fbStbbdev //! Append sequence [first,last) to this sequence.
append(char * first,char * last)78d86ed7fbStbbdev void append(char* first, char* last) {
79d86ed7fbStbbdev memcpy(logical_end, first, last - first);
80d86ed7fbStbbdev logical_end += last - first;
81d86ed7fbStbbdev }
82d86ed7fbStbbdev //! Set end() to given value.
set_end(char * p)83d86ed7fbStbbdev void set_end(char* p) {
84d86ed7fbStbbdev logical_end = p;
85d86ed7fbStbbdev }
86d86ed7fbStbbdev };
87d86ed7fbStbbdev
88d86ed7fbStbbdev std::size_t MAX_CHAR_PER_INPUT_SLICE = 4000;
89d86ed7fbStbbdev std::string InputFileName = "input.txt";
90d86ed7fbStbbdev std::string OutputFileName = "output.txt";
91d86ed7fbStbbdev
92d86ed7fbStbbdev TextSlice* next_slice = nullptr;
93d86ed7fbStbbdev
94d86ed7fbStbbdev class MyInputFunc {
95d86ed7fbStbbdev public:
96d86ed7fbStbbdev MyInputFunc(FILE* input_file_);
MyInputFunc(const MyInputFunc & f)97d86ed7fbStbbdev MyInputFunc(const MyInputFunc& f) : input_file(f.input_file) {}
98d86ed7fbStbbdev ~MyInputFunc();
99d86ed7fbStbbdev TextSlice* operator()(oneapi::tbb::flow_control& fc) const;
100d86ed7fbStbbdev
101d86ed7fbStbbdev private:
102d86ed7fbStbbdev FILE* input_file;
103d86ed7fbStbbdev };
104d86ed7fbStbbdev
MyInputFunc(FILE * input_file_)105d86ed7fbStbbdev MyInputFunc::MyInputFunc(FILE* input_file_) : input_file(input_file_) {}
106d86ed7fbStbbdev
~MyInputFunc()107d86ed7fbStbbdev MyInputFunc::~MyInputFunc() {}
108d86ed7fbStbbdev
operator ()(oneapi::tbb::flow_control & fc) const109d86ed7fbStbbdev TextSlice* MyInputFunc::operator()(oneapi::tbb::flow_control& fc) const {
110d86ed7fbStbbdev // Read characters into space that is available in the next slice.
111d86ed7fbStbbdev if (!next_slice)
112d86ed7fbStbbdev next_slice = TextSlice::allocate(MAX_CHAR_PER_INPUT_SLICE);
113d86ed7fbStbbdev std::size_t m = next_slice->avail();
114d86ed7fbStbbdev std::size_t n = fread(next_slice->end(), 1, m, input_file);
115d86ed7fbStbbdev if (!n && next_slice->size() == 0) {
116d86ed7fbStbbdev // No more characters to process
117d86ed7fbStbbdev fc.stop();
118d86ed7fbStbbdev return nullptr;
119d86ed7fbStbbdev }
120d86ed7fbStbbdev else {
121d86ed7fbStbbdev // Have more characters to process.
122d86ed7fbStbbdev TextSlice* t = next_slice;
123d86ed7fbStbbdev next_slice = TextSlice::allocate(MAX_CHAR_PER_INPUT_SLICE);
124d86ed7fbStbbdev char* p = t->end() + n;
125d86ed7fbStbbdev if (n == m) {
126d86ed7fbStbbdev // Might have read partial number.
127d86ed7fbStbbdev // If so, transfer characters of partial number to next slice.
128d86ed7fbStbbdev while (p > t->begin() && isdigit(p[-1]))
129d86ed7fbStbbdev --p;
130d86ed7fbStbbdev assert(p > t->begin()); // Number too large to fit in buffer
131d86ed7fbStbbdev next_slice->append(p, t->end() + n);
132d86ed7fbStbbdev }
133d86ed7fbStbbdev t->set_end(p);
134d86ed7fbStbbdev return t;
135d86ed7fbStbbdev }
136d86ed7fbStbbdev }
137d86ed7fbStbbdev
138d86ed7fbStbbdev // Functor that changes each decimal number to its square.
139d86ed7fbStbbdev class MyTransformFunc {
140d86ed7fbStbbdev public:
141d86ed7fbStbbdev TextSlice* operator()(TextSlice* input) const;
142d86ed7fbStbbdev };
143d86ed7fbStbbdev
operator ()(TextSlice * input) const144d86ed7fbStbbdev TextSlice* MyTransformFunc::operator()(TextSlice* input) const {
145d86ed7fbStbbdev // Add terminating null so that strtol works right even if number is at end of the input.
146d86ed7fbStbbdev *input->end() = '\0';
147d86ed7fbStbbdev char* p = input->begin();
148d86ed7fbStbbdev TextSlice* out = TextSlice::allocate(2 * MAX_CHAR_PER_INPUT_SLICE);
149d86ed7fbStbbdev char* q = out->begin();
150d86ed7fbStbbdev for (;;) {
151d86ed7fbStbbdev while (p < input->end() && !isdigit(*p))
152d86ed7fbStbbdev *q++ = *p++;
153d86ed7fbStbbdev if (p == input->end())
154d86ed7fbStbbdev break;
155d86ed7fbStbbdev long x = strtol(p, &p, 10);
156d86ed7fbStbbdev // Note: no overflow checking is needed here, as we have twice the
157d86ed7fbStbbdev // input string length, but the square of a non-negative integer n
158d86ed7fbStbbdev // cannot have more than twice as many digits as n.
159d86ed7fbStbbdev long y = x * x;
160d86ed7fbStbbdev sprintf(q, "%ld", y);
161d86ed7fbStbbdev q = strchr(q, 0);
162d86ed7fbStbbdev }
163d86ed7fbStbbdev out->set_end(q);
164d86ed7fbStbbdev input->free();
165d86ed7fbStbbdev return out;
166d86ed7fbStbbdev }
167d86ed7fbStbbdev
168d86ed7fbStbbdev // Functor that writes a TextSlice to a file.
169d86ed7fbStbbdev class MyOutputFunc {
170d86ed7fbStbbdev FILE* my_output_file;
171d86ed7fbStbbdev
172d86ed7fbStbbdev public:
173d86ed7fbStbbdev MyOutputFunc(FILE* output_file);
174d86ed7fbStbbdev void operator()(TextSlice* item) const;
175d86ed7fbStbbdev };
176d86ed7fbStbbdev
MyOutputFunc(FILE * output_file)177d86ed7fbStbbdev MyOutputFunc::MyOutputFunc(FILE* output_file) : my_output_file(output_file) {}
178d86ed7fbStbbdev
operator ()(TextSlice * out) const179d86ed7fbStbbdev void MyOutputFunc::operator()(TextSlice* out) const {
180d86ed7fbStbbdev std::size_t n = fwrite(out->begin(), 1, out->size(), my_output_file);
181d86ed7fbStbbdev if (n != out->size()) {
182d86ed7fbStbbdev fprintf(stderr, "Can't write into file '%s'\n", OutputFileName.c_str());
183d86ed7fbStbbdev std::exit(-1);
184d86ed7fbStbbdev }
185d86ed7fbStbbdev out->free();
186d86ed7fbStbbdev }
187d86ed7fbStbbdev
188d86ed7fbStbbdev bool silent = false;
189d86ed7fbStbbdev
run_pipeline(int nthreads)190d86ed7fbStbbdev int run_pipeline(int nthreads) {
191d86ed7fbStbbdev FILE* input_file = fopen(InputFileName.c_str(), "r");
192d86ed7fbStbbdev if (!input_file) {
193d86ed7fbStbbdev throw std::invalid_argument(("Invalid input file name: " + InputFileName).c_str());
194d86ed7fbStbbdev return 0;
195d86ed7fbStbbdev }
196d86ed7fbStbbdev FILE* output_file = fopen(OutputFileName.c_str(), "w");
197d86ed7fbStbbdev if (!output_file) {
198d86ed7fbStbbdev throw std::invalid_argument(("Invalid output file name: " + OutputFileName).c_str());
199d86ed7fbStbbdev return 0;
200d86ed7fbStbbdev }
201d86ed7fbStbbdev
202d86ed7fbStbbdev oneapi::tbb::tick_count t0 = oneapi::tbb::tick_count::now();
203d86ed7fbStbbdev
204d86ed7fbStbbdev // Need more than one token in flight per thread to keep all threads
205d86ed7fbStbbdev // busy; 2-4 works
206d86ed7fbStbbdev oneapi::tbb::parallel_pipeline(
207d86ed7fbStbbdev nthreads * 4,
208d86ed7fbStbbdev oneapi::tbb::make_filter<void, TextSlice*>(oneapi::tbb::filter_mode::serial_in_order,
209d86ed7fbStbbdev MyInputFunc(input_file)) &
210d86ed7fbStbbdev oneapi::tbb::make_filter<TextSlice*, TextSlice*>(oneapi::tbb::filter_mode::parallel,
211d86ed7fbStbbdev MyTransformFunc()) &
212d86ed7fbStbbdev oneapi::tbb::make_filter<TextSlice*, void>(oneapi::tbb::filter_mode::serial_in_order,
213d86ed7fbStbbdev MyOutputFunc(output_file)));
214d86ed7fbStbbdev
215d86ed7fbStbbdev oneapi::tbb::tick_count t1 = oneapi::tbb::tick_count::now();
216d86ed7fbStbbdev
217d86ed7fbStbbdev fclose(output_file);
218d86ed7fbStbbdev fclose(input_file);
219d86ed7fbStbbdev
220d86ed7fbStbbdev if (!silent)
221d86ed7fbStbbdev printf("time = %g\n", (t1 - t0).seconds());
222d86ed7fbStbbdev
223d86ed7fbStbbdev return 1;
224d86ed7fbStbbdev }
225d86ed7fbStbbdev
main(int argc,char * argv[])226d86ed7fbStbbdev int main(int argc, char* argv[]) {
227d86ed7fbStbbdev oneapi::tbb::tick_count mainStartTime = oneapi::tbb::tick_count::now();
228d86ed7fbStbbdev
229d86ed7fbStbbdev // The 1st argument is the function to obtain 'auto' value; the 2nd is the default value
230d86ed7fbStbbdev // The example interprets 0 threads as "run serially, then fully subscribed"
231d86ed7fbStbbdev utility::thread_number_range threads(utility::get_default_num_threads, 0);
232d86ed7fbStbbdev
233d86ed7fbStbbdev utility::parse_cli_arguments(
234d86ed7fbStbbdev argc,
235d86ed7fbStbbdev argv,
236d86ed7fbStbbdev utility::cli_argument_pack()
237d86ed7fbStbbdev //"-h" option for displaying help is present implicitly
238d86ed7fbStbbdev .positional_arg(threads, "n-of-threads", utility::thread_number_range_desc)
239d86ed7fbStbbdev .positional_arg(InputFileName, "input-file", "input file name")
240d86ed7fbStbbdev .positional_arg(OutputFileName, "output-file", "output file name")
241d86ed7fbStbbdev .positional_arg(MAX_CHAR_PER_INPUT_SLICE,
242d86ed7fbStbbdev "max-slice-size",
243d86ed7fbStbbdev "the maximum number of characters in one slice")
244d86ed7fbStbbdev .arg(silent, "silent", "no output except elapsed time"));
245d86ed7fbStbbdev generate_if_needed(InputFileName.c_str());
246d86ed7fbStbbdev
247d86ed7fbStbbdev if (threads.first) {
248d86ed7fbStbbdev for (int p = threads.first; p <= threads.last; p = threads.step(p)) {
249d86ed7fbStbbdev if (!silent)
250d86ed7fbStbbdev printf("threads = %d ", p);
251d86ed7fbStbbdev oneapi::tbb::global_control c(oneapi::tbb::global_control::max_allowed_parallelism, p);
252d86ed7fbStbbdev if (!run_pipeline(p))
253d86ed7fbStbbdev return -1;
254d86ed7fbStbbdev }
255d86ed7fbStbbdev }
256d86ed7fbStbbdev else { // Number of threads wasn't set explicitly. Run serial and parallel version
257d86ed7fbStbbdev { // serial run
258d86ed7fbStbbdev if (!silent)
259d86ed7fbStbbdev printf("serial run ");
260d86ed7fbStbbdev oneapi::tbb::global_control c(oneapi::tbb::global_control::max_allowed_parallelism, 1);
261d86ed7fbStbbdev if (!run_pipeline(1))
262d86ed7fbStbbdev return -1;
263d86ed7fbStbbdev }
264d86ed7fbStbbdev { // parallel run (number of threads is selected automatically)
265d86ed7fbStbbdev if (!silent)
266d86ed7fbStbbdev printf("parallel run ");
267d86ed7fbStbbdev oneapi::tbb::global_control c(oneapi::tbb::global_control::max_allowed_parallelism,
268d86ed7fbStbbdev utility::get_default_num_threads());
269d86ed7fbStbbdev if (!run_pipeline(utility::get_default_num_threads()))
270d86ed7fbStbbdev return -1;
271d86ed7fbStbbdev }
272d86ed7fbStbbdev }
273d86ed7fbStbbdev
274d86ed7fbStbbdev utility::report_elapsed_time((oneapi::tbb::tick_count::now() - mainStartTime).seconds());
275d86ed7fbStbbdev
276d86ed7fbStbbdev return 0;
277d86ed7fbStbbdev }
278