1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 // 18 // Example program that reads a file of decimal integers in text format 19 // and changes each to its square. 20 // 21 22 #include <cstring> 23 #include <cstdlib> 24 #include <cstdio> 25 #include <cctype> 26 27 #include "oneapi/tbb/parallel_pipeline.h" 28 #include "oneapi/tbb/tick_count.h" 29 #include "oneapi/tbb/tbb_allocator.h" 30 #include "oneapi/tbb/global_control.h" 31 32 #include "common/utility/utility.hpp" 33 #include "common/utility/get_default_num_threads.hpp" 34 35 extern void generate_if_needed(const char*); 36 37 //! Holds a slice of text. 38 /** Instances *must* be allocated/freed using methods herein, because the C++ declaration 39 represents only the header of a much larger object in memory. */ 40 class TextSlice { 41 //! Pointer to one past last character in sequence 42 char* logical_end; 43 //! Pointer to one past last available byte in sequence. 44 char* physical_end; 45 46 public: 47 //! Allocate a TextSlice object that can hold up to max_size characters. 48 static TextSlice* allocate(std::size_t max_size) { 49 // +1 leaves room for a terminating null character. 50 TextSlice* t = (TextSlice*)oneapi::tbb::tbb_allocator<char>().allocate(sizeof(TextSlice) + 51 max_size + 1); 52 t->logical_end = t->begin(); 53 t->physical_end = t->begin() + max_size; 54 return t; 55 } 56 //! Free a TextSlice object 57 void free() { 58 oneapi::tbb::tbb_allocator<char>().deallocate( 59 (char*)this, sizeof(TextSlice) + (physical_end - begin()) + 1); 60 } 61 //! Pointer to beginning of sequence 62 char* begin() { 63 return (char*)(this + 1); 64 } 65 //! Pointer to one past last character in sequence 66 char* end() { 67 return logical_end; 68 } 69 //! Length of sequence 70 std::size_t size() const { 71 return logical_end - (char*)(this + 1); 72 } 73 //! Maximum number of characters that can be appended to sequence 74 std::size_t avail() const { 75 return physical_end - logical_end; 76 } 77 //! Append sequence [first,last) to this sequence. 78 void append(char* first, char* last) { 79 memcpy(logical_end, first, last - first); 80 logical_end += last - first; 81 } 82 //! Set end() to given value. 83 void set_end(char* p) { 84 logical_end = p; 85 } 86 }; 87 88 std::size_t MAX_CHAR_PER_INPUT_SLICE = 4000; 89 std::string InputFileName = "input.txt"; 90 std::string OutputFileName = "output.txt"; 91 92 TextSlice* next_slice = nullptr; 93 94 class MyInputFunc { 95 public: 96 MyInputFunc(FILE* input_file_); 97 MyInputFunc(const MyInputFunc& f) : input_file(f.input_file) {} 98 ~MyInputFunc(); 99 TextSlice* operator()(oneapi::tbb::flow_control& fc) const; 100 101 private: 102 FILE* input_file; 103 }; 104 105 MyInputFunc::MyInputFunc(FILE* input_file_) : input_file(input_file_) {} 106 107 MyInputFunc::~MyInputFunc() {} 108 109 TextSlice* MyInputFunc::operator()(oneapi::tbb::flow_control& fc) const { 110 // Read characters into space that is available in the next slice. 111 if (!next_slice) 112 next_slice = TextSlice::allocate(MAX_CHAR_PER_INPUT_SLICE); 113 std::size_t m = next_slice->avail(); 114 std::size_t n = fread(next_slice->end(), 1, m, input_file); 115 if (!n && next_slice->size() == 0) { 116 // No more characters to process 117 fc.stop(); 118 return nullptr; 119 } 120 else { 121 // Have more characters to process. 122 TextSlice* t = next_slice; 123 next_slice = TextSlice::allocate(MAX_CHAR_PER_INPUT_SLICE); 124 char* p = t->end() + n; 125 if (n == m) { 126 // Might have read partial number. 127 // If so, transfer characters of partial number to next slice. 128 while (p > t->begin() && isdigit(p[-1])) 129 --p; 130 assert(p > t->begin()); // Number too large to fit in buffer 131 next_slice->append(p, t->end() + n); 132 } 133 t->set_end(p); 134 return t; 135 } 136 } 137 138 // Functor that changes each decimal number to its square. 139 class MyTransformFunc { 140 public: 141 TextSlice* operator()(TextSlice* input) const; 142 }; 143 144 TextSlice* MyTransformFunc::operator()(TextSlice* input) const { 145 // Add terminating null so that strtol works right even if number is at end of the input. 146 *input->end() = '\0'; 147 char* p = input->begin(); 148 TextSlice* out = TextSlice::allocate(2 * MAX_CHAR_PER_INPUT_SLICE); 149 char* q = out->begin(); 150 for (;;) { 151 while (p < input->end() && !isdigit(*p)) 152 *q++ = *p++; 153 if (p == input->end()) 154 break; 155 long x = strtol(p, &p, 10); 156 // Note: no overflow checking is needed here, as we have twice the 157 // input string length, but the square of a non-negative integer n 158 // cannot have more than twice as many digits as n. 159 long y = x * x; 160 sprintf(q, "%ld", y); 161 q = strchr(q, 0); 162 } 163 out->set_end(q); 164 input->free(); 165 return out; 166 } 167 168 // Functor that writes a TextSlice to a file. 169 class MyOutputFunc { 170 FILE* my_output_file; 171 172 public: 173 MyOutputFunc(FILE* output_file); 174 void operator()(TextSlice* item) const; 175 }; 176 177 MyOutputFunc::MyOutputFunc(FILE* output_file) : my_output_file(output_file) {} 178 179 void MyOutputFunc::operator()(TextSlice* out) const { 180 std::size_t n = fwrite(out->begin(), 1, out->size(), my_output_file); 181 if (n != out->size()) { 182 fprintf(stderr, "Can't write into file '%s'\n", OutputFileName.c_str()); 183 std::exit(-1); 184 } 185 out->free(); 186 } 187 188 bool silent = false; 189 190 int run_pipeline(int nthreads) { 191 FILE* input_file = fopen(InputFileName.c_str(), "r"); 192 if (!input_file) { 193 throw std::invalid_argument(("Invalid input file name: " + InputFileName).c_str()); 194 return 0; 195 } 196 FILE* output_file = fopen(OutputFileName.c_str(), "w"); 197 if (!output_file) { 198 throw std::invalid_argument(("Invalid output file name: " + OutputFileName).c_str()); 199 return 0; 200 } 201 202 oneapi::tbb::tick_count t0 = oneapi::tbb::tick_count::now(); 203 204 // Need more than one token in flight per thread to keep all threads 205 // busy; 2-4 works 206 oneapi::tbb::parallel_pipeline( 207 nthreads * 4, 208 oneapi::tbb::make_filter<void, TextSlice*>(oneapi::tbb::filter_mode::serial_in_order, 209 MyInputFunc(input_file)) & 210 oneapi::tbb::make_filter<TextSlice*, TextSlice*>(oneapi::tbb::filter_mode::parallel, 211 MyTransformFunc()) & 212 oneapi::tbb::make_filter<TextSlice*, void>(oneapi::tbb::filter_mode::serial_in_order, 213 MyOutputFunc(output_file))); 214 215 oneapi::tbb::tick_count t1 = oneapi::tbb::tick_count::now(); 216 217 fclose(output_file); 218 fclose(input_file); 219 220 if (!silent) 221 printf("time = %g\n", (t1 - t0).seconds()); 222 223 return 1; 224 } 225 226 int main(int argc, char* argv[]) { 227 oneapi::tbb::tick_count mainStartTime = oneapi::tbb::tick_count::now(); 228 229 // The 1st argument is the function to obtain 'auto' value; the 2nd is the default value 230 // The example interprets 0 threads as "run serially, then fully subscribed" 231 utility::thread_number_range threads(utility::get_default_num_threads, 0); 232 233 utility::parse_cli_arguments( 234 argc, 235 argv, 236 utility::cli_argument_pack() 237 //"-h" option for displaying help is present implicitly 238 .positional_arg(threads, "n-of-threads", utility::thread_number_range_desc) 239 .positional_arg(InputFileName, "input-file", "input file name") 240 .positional_arg(OutputFileName, "output-file", "output file name") 241 .positional_arg(MAX_CHAR_PER_INPUT_SLICE, 242 "max-slice-size", 243 "the maximum number of characters in one slice") 244 .arg(silent, "silent", "no output except elapsed time")); 245 generate_if_needed(InputFileName.c_str()); 246 247 if (threads.first) { 248 for (int p = threads.first; p <= threads.last; p = threads.step(p)) { 249 if (!silent) 250 printf("threads = %d ", p); 251 oneapi::tbb::global_control c(oneapi::tbb::global_control::max_allowed_parallelism, p); 252 if (!run_pipeline(p)) 253 return -1; 254 } 255 } 256 else { // Number of threads wasn't set explicitly. Run serial and parallel version 257 { // serial run 258 if (!silent) 259 printf("serial run "); 260 oneapi::tbb::global_control c(oneapi::tbb::global_control::max_allowed_parallelism, 1); 261 if (!run_pipeline(1)) 262 return -1; 263 } 264 { // parallel run (number of threads is selected automatically) 265 if (!silent) 266 printf("parallel run "); 267 oneapi::tbb::global_control c(oneapi::tbb::global_control::max_allowed_parallelism, 268 utility::get_default_num_threads()); 269 if (!run_pipeline(utility::get_default_num_threads())) 270 return -1; 271 } 272 } 273 274 utility::report_elapsed_time((oneapi::tbb::tick_count::now() - mainStartTime).seconds()); 275 276 return 0; 277 } 278