1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 // 18 // Example program that reads a file of decimal integers in text format 19 // and changes each to its square. 20 // 21 #include <cstring> 22 #include <cstdlib> 23 #include <cstdio> 24 #include <cctype> 25 26 #include "oneapi/tbb/parallel_pipeline.h" 27 #include "oneapi/tbb/tick_count.h" 28 #include "oneapi/tbb/tbb_allocator.h" 29 #include "oneapi/tbb/global_control.h" 30 31 #include "common/utility/utility.hpp" 32 #include "common/utility/get_default_num_threads.hpp" 33 34 extern void generate_if_needed(const char*); 35 36 //! Holds a slice of text. 37 /** Instances *must* be allocated/freed using methods herein, because the C++ declaration 38 represents only the header of a much larger object in memory. */ 39 class TextSlice { 40 //! Pointer to one past last character in sequence 41 char* logical_end; 42 //! Pointer to one past last available byte in sequence. 43 char* physical_end; 44 45 public: 46 //! Allocate a TextSlice object that can hold up to max_size characters. 47 static TextSlice* allocate(std::size_t max_size) { 48 // +1 leaves room for a terminating null character. 49 TextSlice* t = (TextSlice*)oneapi::tbb::tbb_allocator<char>().allocate(sizeof(TextSlice) + 50 max_size + 1); 51 t->logical_end = t->begin(); 52 t->physical_end = t->begin() + max_size; 53 return t; 54 } 55 //! Free a TextSlice object 56 void free() { 57 oneapi::tbb::tbb_allocator<char>().deallocate( 58 (char*)this, sizeof(TextSlice) + (physical_end - begin()) + 1); 59 } 60 //! Pointer to beginning of sequence 61 char* begin() { 62 return (char*)(this + 1); 63 } 64 //! Pointer to one past last character in sequence 65 char* end() { 66 return logical_end; 67 } 68 //! Length of sequence 69 std::size_t size() const { 70 return logical_end - (char*)(this + 1); 71 } 72 //! Maximum number of characters that can be appended to sequence 73 std::size_t avail() const { 74 return physical_end - logical_end; 75 } 76 //! Append sequence [first,last) to this sequence. 77 void append(char* first, char* last) { 78 memcpy(logical_end, first, last - first); 79 logical_end += last - first; 80 } 81 //! Set end() to given value. 82 void set_end(char* p) { 83 logical_end = p; 84 } 85 }; 86 87 std::size_t MAX_CHAR_PER_INPUT_SLICE = 4000; 88 std::string InputFileName = "input.txt"; 89 std::string OutputFileName = "output.txt"; 90 91 TextSlice* next_slice = nullptr; 92 93 class MyInputFunc { 94 public: 95 MyInputFunc(FILE* input_file_); 96 MyInputFunc(const MyInputFunc& f) : input_file(f.input_file) {} 97 ~MyInputFunc(); 98 TextSlice* operator()(oneapi::tbb::flow_control& fc) const; 99 100 private: 101 FILE* input_file; 102 }; 103 104 MyInputFunc::MyInputFunc(FILE* input_file_) : input_file(input_file_) {} 105 106 MyInputFunc::~MyInputFunc() {} 107 108 TextSlice* MyInputFunc::operator()(oneapi::tbb::flow_control& fc) const { 109 // Read characters into space that is available in the next slice. 110 if (!next_slice) 111 next_slice = TextSlice::allocate(MAX_CHAR_PER_INPUT_SLICE); 112 std::size_t m = next_slice->avail(); 113 std::size_t n = fread(next_slice->end(), 1, m, input_file); 114 if (!n && next_slice->size() == 0) { 115 // No more characters to process 116 fc.stop(); 117 return nullptr; 118 } 119 else { 120 // Have more characters to process. 121 TextSlice* t = next_slice; 122 next_slice = TextSlice::allocate(MAX_CHAR_PER_INPUT_SLICE); 123 char* p = t->end() + n; 124 if (n == m) { 125 // Might have read partial number. 126 // If so, transfer characters of partial number to next slice. 127 while (p > t->begin() && isdigit(p[-1])) 128 --p; 129 assert(p > t->begin()); // Number too large to fit in buffer 130 next_slice->append(p, t->end() + n); 131 } 132 t->set_end(p); 133 return t; 134 } 135 } 136 137 // Functor that changes each decimal number to its square. 138 class MyTransformFunc { 139 public: 140 TextSlice* operator()(TextSlice* input) const; 141 }; 142 143 TextSlice* MyTransformFunc::operator()(TextSlice* input) const { 144 // Add terminating null so that strtol works right even if number is at end of the input. 145 *input->end() = '\0'; 146 char* p = input->begin(); 147 TextSlice* out = TextSlice::allocate(2 * MAX_CHAR_PER_INPUT_SLICE); 148 char* q = out->begin(); 149 for (;;) { 150 while (p < input->end() && !isdigit(*p)) 151 *q++ = *p++; 152 if (p == input->end()) 153 break; 154 long x = strtol(p, &p, 10); 155 // Note: no overflow checking is needed here, as we have twice the 156 // input string length, but the square of a non-negative integer n 157 // cannot have more than twice as many digits as n. 158 long y = x * x; 159 sprintf(q, "%ld", y); 160 q = strchr(q, 0); 161 } 162 out->set_end(q); 163 input->free(); 164 return out; 165 } 166 167 // Functor that writes a TextSlice to a file. 168 class MyOutputFunc { 169 FILE* my_output_file; 170 171 public: 172 MyOutputFunc(FILE* output_file); 173 void operator()(TextSlice* item) const; 174 }; 175 176 MyOutputFunc::MyOutputFunc(FILE* output_file) : my_output_file(output_file) {} 177 178 void MyOutputFunc::operator()(TextSlice* out) const { 179 std::size_t n = fwrite(out->begin(), 1, out->size(), my_output_file); 180 if (n != out->size()) { 181 fprintf(stderr, "Can't write into file '%s'\n", OutputFileName.c_str()); 182 std::exit(-1); 183 } 184 out->free(); 185 } 186 187 bool silent = false; 188 189 int run_pipeline(int nthreads) { 190 FILE* input_file = fopen(InputFileName.c_str(), "r"); 191 if (!input_file) { 192 throw std::invalid_argument(("Invalid input file name: " + InputFileName).c_str()); 193 return 0; 194 } 195 FILE* output_file = fopen(OutputFileName.c_str(), "w"); 196 if (!output_file) { 197 throw std::invalid_argument(("Invalid output file name: " + OutputFileName).c_str()); 198 return 0; 199 } 200 201 oneapi::tbb::tick_count t0 = oneapi::tbb::tick_count::now(); 202 203 // Need more than one token in flight per thread to keep all threads 204 // busy; 2-4 works 205 oneapi::tbb::parallel_pipeline( 206 nthreads * 4, 207 oneapi::tbb::make_filter<void, TextSlice*>(oneapi::tbb::filter_mode::serial_in_order, 208 MyInputFunc(input_file)) & 209 oneapi::tbb::make_filter<TextSlice*, TextSlice*>(oneapi::tbb::filter_mode::parallel, 210 MyTransformFunc()) & 211 oneapi::tbb::make_filter<TextSlice*, void>(oneapi::tbb::filter_mode::serial_in_order, 212 MyOutputFunc(output_file))); 213 214 oneapi::tbb::tick_count t1 = oneapi::tbb::tick_count::now(); 215 216 fclose(output_file); 217 fclose(input_file); 218 219 if (!silent) 220 printf("time = %g\n", (t1 - t0).seconds()); 221 222 return 1; 223 } 224 225 int main(int argc, char* argv[]) { 226 oneapi::tbb::tick_count mainStartTime = oneapi::tbb::tick_count::now(); 227 228 // The 1st argument is the function to obtain 'auto' value; the 2nd is the default value 229 // The example interprets 0 threads as "run serially, then fully subscribed" 230 utility::thread_number_range threads(utility::get_default_num_threads, 0); 231 232 utility::parse_cli_arguments( 233 argc, 234 argv, 235 utility::cli_argument_pack() 236 //"-h" option for displaying help is present implicitly 237 .positional_arg(threads, "n-of-threads", utility::thread_number_range_desc) 238 .positional_arg(InputFileName, "input-file", "input file name") 239 .positional_arg(OutputFileName, "output-file", "output file name") 240 .positional_arg(MAX_CHAR_PER_INPUT_SLICE, 241 "max-slice-size", 242 "the maximum number of characters in one slice") 243 .arg(silent, "silent", "no output except elapsed time")); 244 generate_if_needed(InputFileName.c_str()); 245 246 if (threads.first) { 247 for (int p = threads.first; p <= threads.last; p = threads.step(p)) { 248 if (!silent) 249 printf("threads = %d ", p); 250 oneapi::tbb::global_control c(oneapi::tbb::global_control::max_allowed_parallelism, p); 251 if (!run_pipeline(p)) 252 return -1; 253 } 254 } 255 else { // Number of threads wasn't set explicitly. Run serial and parallel version 256 { // serial run 257 if (!silent) 258 printf("serial run "); 259 oneapi::tbb::global_control c(oneapi::tbb::global_control::max_allowed_parallelism, 1); 260 if (!run_pipeline(1)) 261 return -1; 262 } 263 { // parallel run (number of threads is selected automatically) 264 if (!silent) 265 printf("parallel run "); 266 oneapi::tbb::global_control c(oneapi::tbb::global_control::max_allowed_parallelism, 267 utility::get_default_num_threads()); 268 if (!run_pipeline(utility::get_default_num_threads())) 269 return -1; 270 } 271 } 272 273 utility::report_elapsed_time((oneapi::tbb::tick_count::now() - mainStartTime).seconds()); 274 275 return 0; 276 } 277