1 /* 2 Copyright (c) 2005-2023 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB_parallel_filters_H 18 #define __TBB_parallel_filters_H 19 20 #include "_config.h" 21 #include "_task.h" 22 #include "_pipeline_filters_deduction.h" 23 #include "../tbb_allocator.h" 24 25 #include <cstddef> 26 #include <cstdint> 27 28 namespace tbb { 29 namespace detail { 30 31 namespace d1 { 32 class base_filter; 33 } 34 35 namespace r1 { 36 TBB_EXPORT void __TBB_EXPORTED_FUNC set_end_of_input(d1::base_filter&); 37 class pipeline; 38 class stage_task; 39 class input_buffer; 40 } 41 42 namespace d1 { 43 class filter_node; 44 45 //! A stage in a pipeline. 46 /** @ingroup algorithms */ 47 class base_filter{ 48 private: 49 //! Value used to mark "not in pipeline" not_in_pipeline()50 static base_filter* not_in_pipeline() { return reinterpret_cast<base_filter*>(std::intptr_t(-1)); } 51 public: 52 //! The lowest bit 0 is for parallel vs serial 53 static constexpr unsigned int filter_is_serial = 0x1; 54 55 //! 2nd bit distinguishes ordered vs unordered filters. 56 static constexpr unsigned int filter_is_out_of_order = 0x1<<1; 57 58 //! 3rd bit marks input filters emitting small objects 59 static constexpr unsigned int filter_may_emit_null = 0x1<<2; 60 61 base_filter(const base_filter&) = delete; 62 base_filter& operator=(const base_filter&) = delete; 63 64 protected: base_filter(unsigned int m)65 explicit base_filter( unsigned int m ) : 66 next_filter_in_pipeline(not_in_pipeline()), 67 my_input_buffer(nullptr), 68 my_filter_mode(m), 69 my_pipeline(nullptr) 70 {} 71 72 // signal end-of-input for concrete_filters set_end_of_input()73 void set_end_of_input() { 74 r1::set_end_of_input(*this); 75 } 76 77 public: 78 //! True if filter is serial. is_serial()79 bool is_serial() const { 80 return bool( my_filter_mode & filter_is_serial ); 81 } 82 83 //! True if filter must receive stream in order. is_ordered()84 bool is_ordered() const { 85 return (my_filter_mode & filter_is_serial) && !(my_filter_mode & filter_is_out_of_order); 86 } 87 88 //! true if an input filter can emit null object_may_be_null()89 bool object_may_be_null() { 90 return ( my_filter_mode & filter_may_emit_null ) == filter_may_emit_null; 91 } 92 93 //! Operate on an item from the input stream, and return item for output stream. 94 /** Returns nullptr if filter is a sink. */ 95 virtual void* operator()( void* item ) = 0; 96 97 //! Destroy filter. ~base_filter()98 virtual ~base_filter() {}; 99 100 //! Destroys item if pipeline was cancelled. 101 /** Required to prevent memory leaks. 102 Note it can be called concurrently even for serial filters.*/ finalize(void *)103 virtual void finalize( void* /*item*/ ) {} 104 105 private: 106 //! Pointer to next filter in the pipeline. 107 base_filter* next_filter_in_pipeline; 108 109 //! Buffer for incoming tokens, or nullptr if not required. 110 /** The buffer is required if the filter is serial. */ 111 r1::input_buffer* my_input_buffer; 112 113 friend class r1::stage_task; 114 friend class r1::pipeline; 115 friend void r1::set_end_of_input(d1::base_filter&); 116 117 //! Storage for filter mode and dynamically checked implementation version. 118 const unsigned int my_filter_mode; 119 120 //! Pointer to the pipeline. 121 r1::pipeline* my_pipeline; 122 }; 123 124 template<typename Body, typename InputType, typename OutputType > 125 class concrete_filter; 126 127 //! input_filter control to signal end-of-input for parallel_pipeline 128 class flow_control { 129 bool is_pipeline_stopped = false; 130 flow_control() = default; 131 template<typename Body, typename InputType, typename OutputType > friend class concrete_filter; 132 template<typename Output> 133 __TBB_requires(std::copyable<Output>) 134 friend class input_node; 135 public: stop()136 void stop() { is_pipeline_stopped = true; } 137 }; 138 139 // Emulate std::is_trivially_copyable (false positives not allowed, false negatives suboptimal but safe). 140 #if __TBB_CPP11_TYPE_PROPERTIES_PRESENT 141 template<typename T> using tbb_trivially_copyable = std::is_trivially_copyable<T>; 142 #else 143 template<typename T> struct tbb_trivially_copyable { enum { value = false }; }; 144 template<typename T> struct tbb_trivially_copyable < T* > { enum { value = true }; }; 145 template<> struct tbb_trivially_copyable < bool > { enum { value = true }; }; 146 template<> struct tbb_trivially_copyable < char > { enum { value = true }; }; 147 template<> struct tbb_trivially_copyable < signed char > { enum { value = true }; }; 148 template<> struct tbb_trivially_copyable <unsigned char > { enum { value = true }; }; 149 template<> struct tbb_trivially_copyable < short > { enum { value = true }; }; 150 template<> struct tbb_trivially_copyable <unsigned short > { enum { value = true }; }; 151 template<> struct tbb_trivially_copyable < int > { enum { value = true }; }; 152 template<> struct tbb_trivially_copyable <unsigned int > { enum { value = true }; }; 153 template<> struct tbb_trivially_copyable < long > { enum { value = true }; }; 154 template<> struct tbb_trivially_copyable <unsigned long > { enum { value = true }; }; 155 template<> struct tbb_trivially_copyable < long long> { enum { value = true }; }; 156 template<> struct tbb_trivially_copyable <unsigned long long> { enum { value = true }; }; 157 template<> struct tbb_trivially_copyable < float > { enum { value = true }; }; 158 template<> struct tbb_trivially_copyable < double > { enum { value = true }; }; 159 template<> struct tbb_trivially_copyable < long double > { enum { value = true }; }; 160 #endif // __TBB_CPP11_TYPE_PROPERTIES_PRESENT 161 162 template<typename T> 163 struct use_allocator { 164 static constexpr bool value = sizeof(T) > sizeof(void *) || !tbb_trivially_copyable<T>::value; 165 }; 166 167 // A helper class to customize how a type is passed between filters. 168 // Usage: token_helper<T, use_allocator<T>::value> 169 template<typename T, bool Allocate> struct token_helper; 170 171 // using tbb_allocator 172 template<typename T> 173 struct token_helper<T, true> { 174 using pointer = T*; 175 using value_type = T; 176 static pointer create_token(value_type && source) { 177 return new (r1::allocate_memory(sizeof(T))) T(std::move(source)); 178 } 179 static value_type & token(pointer & t) { return *t; } 180 static void * cast_to_void_ptr(pointer ref) { return reinterpret_cast<void *>(ref); } 181 static pointer cast_from_void_ptr(void * ref) { return reinterpret_cast<pointer>(ref); } 182 static void destroy_token(pointer token) { 183 token->~value_type(); 184 r1::deallocate_memory(token); 185 } 186 }; 187 188 // pointer specialization 189 template<typename T> 190 struct token_helper<T*, false> { 191 using pointer = T*; 192 using value_type = T*; 193 static pointer create_token(const value_type & source) { return source; } 194 static value_type & token(pointer & t) { return t; } 195 static void * cast_to_void_ptr(pointer ref) { return reinterpret_cast<void *>(ref); } 196 static pointer cast_from_void_ptr(void * ref) { return reinterpret_cast<pointer>(ref); } 197 static void destroy_token( pointer /*token*/) {} 198 }; 199 200 // converting type to and from void*, passing objects directly 201 template<typename T> 202 struct token_helper<T, false> { 203 typedef union { 204 T actual_value; 205 void * void_overlay; 206 } type_to_void_ptr_map; 207 using pointer = T; // not really a pointer in this case. 208 using value_type = T; 209 static pointer create_token(const value_type & source) { return source; } 210 static value_type & token(pointer & t) { return t; } 211 static void * cast_to_void_ptr(pointer ref) { 212 type_to_void_ptr_map mymap; 213 mymap.void_overlay = nullptr; 214 mymap.actual_value = ref; 215 return mymap.void_overlay; 216 } 217 static pointer cast_from_void_ptr(void * ref) { 218 type_to_void_ptr_map mymap; 219 mymap.void_overlay = ref; 220 return mymap.actual_value; 221 } 222 static void destroy_token( pointer /*token*/) {} 223 }; 224 225 // intermediate 226 template<typename InputType, typename OutputType, typename Body> 227 class concrete_filter: public base_filter { 228 const Body& my_body; 229 using input_helper = token_helper<InputType, use_allocator<InputType >::value>; 230 using input_pointer = typename input_helper::pointer; 231 using output_helper = token_helper<OutputType, use_allocator<OutputType>::value>; 232 using output_pointer = typename output_helper::pointer; 233 234 void* operator()(void* input) override { 235 input_pointer temp_input = input_helper::cast_from_void_ptr(input); 236 output_pointer temp_output = output_helper::create_token(tbb::detail::invoke(my_body, std::move(input_helper::token(temp_input)))); 237 input_helper::destroy_token(temp_input); 238 return output_helper::cast_to_void_ptr(temp_output); 239 } 240 241 void finalize(void * input) override { 242 input_pointer temp_input = input_helper::cast_from_void_ptr(input); 243 input_helper::destroy_token(temp_input); 244 } 245 246 public: 247 concrete_filter(unsigned int m, const Body& body) : base_filter(m), my_body(body) {} 248 }; 249 250 // input 251 template<typename OutputType, typename Body> 252 class concrete_filter<void, OutputType, Body>: public base_filter { 253 const Body& my_body; 254 using output_helper = token_helper<OutputType, use_allocator<OutputType>::value>; 255 using output_pointer = typename output_helper::pointer; 256 257 void* operator()(void*) override { 258 flow_control control; 259 output_pointer temp_output = output_helper::create_token(my_body(control)); 260 if(control.is_pipeline_stopped) { 261 output_helper::destroy_token(temp_output); 262 set_end_of_input(); 263 return nullptr; 264 } 265 return output_helper::cast_to_void_ptr(temp_output); 266 } 267 268 public: 269 concrete_filter(unsigned int m, const Body& body) : 270 base_filter(m | filter_may_emit_null), 271 my_body(body) 272 {} 273 }; 274 275 // output 276 template<typename InputType, typename Body> 277 class concrete_filter<InputType, void, Body>: public base_filter { 278 const Body& my_body; 279 using input_helper = token_helper<InputType, use_allocator<InputType >::value>; 280 using input_pointer = typename input_helper::pointer; 281 282 void* operator()(void* input) override { 283 input_pointer temp_input = input_helper::cast_from_void_ptr(input); 284 tbb::detail::invoke(my_body, std::move(input_helper::token(temp_input))); 285 input_helper::destroy_token(temp_input); 286 return nullptr; 287 } 288 void finalize(void* input) override { 289 input_pointer temp_input = input_helper::cast_from_void_ptr(input); 290 input_helper::destroy_token(temp_input); 291 } 292 293 public: 294 concrete_filter(unsigned int m, const Body& body) : base_filter(m), my_body(body) {} 295 }; 296 297 template<typename Body> 298 class concrete_filter<void, void, Body>: public base_filter { 299 const Body& my_body; 300 301 void* operator()(void*) override { 302 flow_control control; 303 my_body(control); 304 void* output = control.is_pipeline_stopped ? nullptr : (void*)(std::intptr_t)-1; 305 return output; 306 } 307 public: 308 concrete_filter(unsigned int m, const Body& body) : base_filter(m), my_body(body) {} 309 }; 310 311 class filter_node_ptr { 312 filter_node * my_node; 313 314 public: 315 filter_node_ptr() : my_node(nullptr) {} 316 filter_node_ptr(filter_node *); 317 ~filter_node_ptr(); 318 filter_node_ptr(const filter_node_ptr &); 319 filter_node_ptr(filter_node_ptr &&); 320 void operator=(filter_node *); 321 void operator=(const filter_node_ptr &); 322 void operator=(filter_node_ptr &&); 323 filter_node& operator*() const; 324 operator bool() const; 325 }; 326 327 //! Abstract base class that represents a node in a parse tree underlying a filter class. 328 /** These nodes are always heap-allocated and can be shared by filter objects. */ 329 class filter_node { 330 /** Count must be atomic because it is hidden state for user, but might be shared by threads. */ 331 std::atomic<std::intptr_t> ref_count; 332 public: 333 filter_node_ptr left; 334 filter_node_ptr right; 335 protected: 336 filter_node() : ref_count(0), left(nullptr), right(nullptr) { 337 #ifdef __TBB_TEST_FILTER_NODE_COUNT 338 ++(__TBB_TEST_FILTER_NODE_COUNT); 339 #endif 340 } 341 public: 342 filter_node(const filter_node_ptr& x, const filter_node_ptr& y) : filter_node(){ 343 left = x; 344 right = y; 345 } 346 filter_node(const filter_node&) = delete; 347 filter_node& operator=(const filter_node&) = delete; 348 349 //! Add concrete_filter to pipeline 350 virtual base_filter* create_filter() const { 351 __TBB_ASSERT(false, "method of non-leaf was called"); 352 return nullptr; 353 } 354 355 //! Increment reference count 356 void add_ref() { ref_count.fetch_add(1, std::memory_order_relaxed); } 357 358 //! Decrement reference count and delete if it becomes zero. 359 void remove_ref() { 360 __TBB_ASSERT(ref_count>0,"ref_count underflow"); 361 if( ref_count.fetch_sub(1, std::memory_order_relaxed) == 1 ) { 362 this->~filter_node(); 363 r1::deallocate_memory(this); 364 } 365 } 366 367 virtual ~filter_node() { 368 #ifdef __TBB_TEST_FILTER_NODE_COUNT 369 --(__TBB_TEST_FILTER_NODE_COUNT); 370 #endif 371 } 372 }; 373 374 inline filter_node_ptr::filter_node_ptr(filter_node * nd) : my_node(nd) { 375 if (my_node) { 376 my_node->add_ref(); 377 } 378 } 379 380 inline filter_node_ptr::~filter_node_ptr() { 381 if (my_node) { 382 my_node->remove_ref(); 383 } 384 } 385 386 inline filter_node_ptr::filter_node_ptr(const filter_node_ptr & rhs) : my_node(rhs.my_node) { 387 if (my_node) { 388 my_node->add_ref(); 389 } 390 } 391 392 inline filter_node_ptr::filter_node_ptr(filter_node_ptr && rhs) : my_node(rhs.my_node) { 393 rhs.my_node = nullptr; 394 } 395 396 inline void filter_node_ptr::operator=(filter_node * rhs) { 397 // Order of operations below carefully chosen so that reference counts remain correct 398 // in unlikely event that remove_ref throws exception. 399 filter_node* old = my_node; 400 my_node = rhs; 401 if (my_node) { 402 my_node->add_ref(); 403 } 404 if (old) { 405 old->remove_ref(); 406 } 407 } 408 409 inline void filter_node_ptr::operator=(const filter_node_ptr & rhs) { 410 *this = rhs.my_node; 411 } 412 413 inline void filter_node_ptr::operator=(filter_node_ptr && rhs) { 414 filter_node* old = my_node; 415 my_node = rhs.my_node; 416 rhs.my_node = nullptr; 417 if (old) { 418 old->remove_ref(); 419 } 420 } 421 422 inline filter_node& filter_node_ptr::operator*() const{ 423 __TBB_ASSERT(my_node,"nullptr node is used"); 424 return *my_node; 425 } 426 427 inline filter_node_ptr::operator bool() const { 428 return my_node != nullptr; 429 } 430 431 //! Node in parse tree representing result of make_filter. 432 template<typename InputType, typename OutputType, typename Body> 433 class filter_node_leaf: public filter_node { 434 const unsigned int my_mode; 435 const Body my_body; 436 base_filter* create_filter() const override { 437 return new(r1::allocate_memory(sizeof(concrete_filter<InputType, OutputType, Body>))) concrete_filter<InputType, OutputType, Body>(my_mode,my_body); 438 } 439 public: 440 filter_node_leaf( unsigned int m, const Body& b ) : my_mode(m), my_body(b) {} 441 }; 442 443 444 template <typename Body, typename Input = typename filter_body_types<decltype(&Body::operator())>::input_type> 445 using filter_input = typename std::conditional<std::is_same<Input, flow_control>::value, void, Input>::type; 446 447 template <typename Body> 448 using filter_output = typename filter_body_types<decltype(&Body::operator())>::output_type; 449 450 } // namespace d1 451 } // namespace detail 452 } // namespace tbb 453 454 455 #endif /* __TBB_parallel_filters_H */ 456