1 /*
2     Copyright (c) 2005-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_parallel_filters_H
18 #define __TBB_parallel_filters_H
19 
20 #include "_config.h"
21 #include "_task.h"
22 #include "_pipeline_filters_deduction.h"
23 #include "../tbb_allocator.h"
24 
25 #include <cstddef>
26 #include <cstdint>
27 
28 namespace tbb {
29 namespace detail {
30 
31 namespace d1 {
32 class base_filter;
33 }
34 
35 namespace r1 {
36 TBB_EXPORT void __TBB_EXPORTED_FUNC set_end_of_input(d1::base_filter&);
37 class pipeline;
38 class stage_task;
39 class input_buffer;
40 }
41 
42 namespace d1 {
43 class filter_node;
44 
45 //! A stage in a pipeline.
46 /** @ingroup algorithms */
47 class base_filter{
48 private:
49     //! Value used to mark "not in pipeline"
not_in_pipeline()50     static base_filter* not_in_pipeline() { return reinterpret_cast<base_filter*>(std::intptr_t(-1)); }
51 public:
52     //! The lowest bit 0 is for parallel vs serial
53     static constexpr  unsigned int filter_is_serial = 0x1;
54 
55     //! 2nd bit distinguishes ordered vs unordered filters.
56     static constexpr  unsigned int filter_is_out_of_order = 0x1<<1;
57 
58     //! 3rd bit marks input filters emitting small objects
59     static constexpr  unsigned int filter_may_emit_null = 0x1<<2;
60 
61     base_filter(const base_filter&) = delete;
62     base_filter& operator=(const base_filter&) = delete;
63 
64 protected:
base_filter(unsigned int m)65     explicit base_filter( unsigned int m ) :
66         next_filter_in_pipeline(not_in_pipeline()),
67         my_input_buffer(nullptr),
68         my_filter_mode(m),
69         my_pipeline(nullptr)
70     {}
71 
72     // signal end-of-input for concrete_filters
set_end_of_input()73     void set_end_of_input() {
74         r1::set_end_of_input(*this);
75     }
76 
77 public:
78     //! True if filter is serial.
is_serial()79     bool is_serial() const {
80         return bool( my_filter_mode & filter_is_serial );
81     }
82 
83     //! True if filter must receive stream in order.
is_ordered()84     bool is_ordered() const {
85         return (my_filter_mode & filter_is_serial) && !(my_filter_mode & filter_is_out_of_order);
86     }
87 
88     //! true if an input filter can emit null
object_may_be_null()89     bool object_may_be_null() {
90         return ( my_filter_mode & filter_may_emit_null ) == filter_may_emit_null;
91     }
92 
93     //! Operate on an item from the input stream, and return item for output stream.
94     /** Returns nullptr if filter is a sink. */
95     virtual void* operator()( void* item ) = 0;
96 
97     //! Destroy filter.
~base_filter()98     virtual ~base_filter() {};
99 
100     //! Destroys item if pipeline was cancelled.
101     /** Required to prevent memory leaks.
102         Note it can be called concurrently even for serial filters.*/
finalize(void *)103     virtual void finalize( void* /*item*/ ) {}
104 
105 private:
106     //! Pointer to next filter in the pipeline.
107     base_filter* next_filter_in_pipeline;
108 
109     //! Buffer for incoming tokens, or nullptr if not required.
110     /** The buffer is required if the filter is serial. */
111     r1::input_buffer* my_input_buffer;
112 
113     friend class r1::stage_task;
114     friend class r1::pipeline;
115     friend void r1::set_end_of_input(d1::base_filter&);
116 
117     //! Storage for filter mode and dynamically checked implementation version.
118     const unsigned int my_filter_mode;
119 
120     //! Pointer to the pipeline.
121     r1::pipeline* my_pipeline;
122 };
123 
124 template<typename Body, typename InputType, typename OutputType >
125 class concrete_filter;
126 
127 //! input_filter control to signal end-of-input for parallel_pipeline
128 class flow_control {
129     bool is_pipeline_stopped = false;
130     flow_control() = default;
131     template<typename Body, typename InputType, typename OutputType > friend class concrete_filter;
132     template<typename Output>
133     __TBB_requires(std::copyable<Output>)
134     friend class input_node;
135 public:
stop()136     void stop() { is_pipeline_stopped = true; }
137 };
138 
139 // Emulate std::is_trivially_copyable (false positives not allowed, false negatives suboptimal but safe).
140 #if __TBB_CPP11_TYPE_PROPERTIES_PRESENT
141 template<typename T> using tbb_trivially_copyable = std::is_trivially_copyable<T>;
142 #else
143 template<typename T> struct tbb_trivially_copyable                      { enum { value = false }; };
144 template<typename T> struct tbb_trivially_copyable <         T*       > { enum { value = true  }; };
145 template<>           struct tbb_trivially_copyable <         bool     > { enum { value = true  }; };
146 template<>           struct tbb_trivially_copyable <         char     > { enum { value = true  }; };
147 template<>           struct tbb_trivially_copyable <  signed char     > { enum { value = true  }; };
148 template<>           struct tbb_trivially_copyable <unsigned char     > { enum { value = true  }; };
149 template<>           struct tbb_trivially_copyable <         short    > { enum { value = true  }; };
150 template<>           struct tbb_trivially_copyable <unsigned short    > { enum { value = true  }; };
151 template<>           struct tbb_trivially_copyable <         int      > { enum { value = true  }; };
152 template<>           struct tbb_trivially_copyable <unsigned int      > { enum { value = true  }; };
153 template<>           struct tbb_trivially_copyable <         long     > { enum { value = true  }; };
154 template<>           struct tbb_trivially_copyable <unsigned long     > { enum { value = true  }; };
155 template<>           struct tbb_trivially_copyable <         long long> { enum { value = true  }; };
156 template<>           struct tbb_trivially_copyable <unsigned long long> { enum { value = true  }; };
157 template<>           struct tbb_trivially_copyable <         float    > { enum { value = true  }; };
158 template<>           struct tbb_trivially_copyable <         double   > { enum { value = true  }; };
159 template<>           struct tbb_trivially_copyable <    long double   > { enum { value = true  }; };
160 #endif // __TBB_CPP11_TYPE_PROPERTIES_PRESENT
161 
162 template<typename T>
163 struct use_allocator {
164    static constexpr bool value = sizeof(T) > sizeof(void *) || !tbb_trivially_copyable<T>::value;
165 };
166 
167 // A helper class to customize how a type is passed between filters.
168 // Usage: token_helper<T, use_allocator<T>::value>
169 template<typename T, bool Allocate> struct token_helper;
170 
171 // using tbb_allocator
172 template<typename T>
173 struct token_helper<T, true> {
174     using pointer = T*;
175     using value_type = T;
176     static pointer create_token(value_type && source) {
177         return new (r1::allocate_memory(sizeof(T))) T(std::move(source));
178     }
179     static value_type & token(pointer & t) { return *t; }
180     static void * cast_to_void_ptr(pointer ref) { return reinterpret_cast<void *>(ref); }
181     static pointer cast_from_void_ptr(void * ref) { return reinterpret_cast<pointer>(ref); }
182     static void destroy_token(pointer token) {
183         token->~value_type();
184         r1::deallocate_memory(token);
185     }
186 };
187 
188 // pointer specialization
189 template<typename T>
190 struct token_helper<T*, false> {
191     using pointer = T*;
192     using value_type = T*;
193     static pointer create_token(const value_type & source) { return source; }
194     static value_type & token(pointer & t) { return t; }
195     static void * cast_to_void_ptr(pointer ref) { return reinterpret_cast<void *>(ref); }
196     static pointer cast_from_void_ptr(void * ref) { return reinterpret_cast<pointer>(ref); }
197     static void destroy_token( pointer /*token*/) {}
198 };
199 
200 // converting type to and from void*, passing objects directly
201 template<typename T>
202 struct token_helper<T, false> {
203     typedef union {
204         T actual_value;
205         void * void_overlay;
206     } type_to_void_ptr_map;
207     using pointer = T;  // not really a pointer in this case.
208     using value_type = T;
209     static pointer create_token(const value_type & source) { return source; }
210     static value_type & token(pointer & t) { return t; }
211     static void * cast_to_void_ptr(pointer ref) {
212         type_to_void_ptr_map mymap;
213         mymap.void_overlay = nullptr;
214         mymap.actual_value = ref;
215         return mymap.void_overlay;
216     }
217     static pointer cast_from_void_ptr(void * ref) {
218         type_to_void_ptr_map mymap;
219         mymap.void_overlay = ref;
220         return mymap.actual_value;
221     }
222     static void destroy_token( pointer /*token*/) {}
223 };
224 
225 // intermediate
226 template<typename InputType,  typename OutputType, typename Body>
227 class concrete_filter: public base_filter {
228     const Body& my_body;
229     using input_helper = token_helper<InputType, use_allocator<InputType >::value>;
230     using input_pointer = typename input_helper::pointer;
231     using output_helper = token_helper<OutputType, use_allocator<OutputType>::value>;
232     using output_pointer = typename output_helper::pointer;
233 
234     void* operator()(void* input) override {
235         input_pointer temp_input = input_helper::cast_from_void_ptr(input);
236         output_pointer temp_output = output_helper::create_token(tbb::detail::invoke(my_body, std::move(input_helper::token(temp_input))));
237         input_helper::destroy_token(temp_input);
238         return output_helper::cast_to_void_ptr(temp_output);
239     }
240 
241     void finalize(void * input) override {
242         input_pointer temp_input = input_helper::cast_from_void_ptr(input);
243         input_helper::destroy_token(temp_input);
244     }
245 
246 public:
247     concrete_filter(unsigned int m, const Body& body) : base_filter(m), my_body(body) {}
248 };
249 
250 // input
251 template<typename OutputType, typename Body>
252 class concrete_filter<void, OutputType, Body>: public base_filter {
253     const Body& my_body;
254     using output_helper = token_helper<OutputType, use_allocator<OutputType>::value>;
255     using output_pointer = typename output_helper::pointer;
256 
257     void* operator()(void*) override {
258         flow_control control;
259         output_pointer temp_output = output_helper::create_token(my_body(control));
260         if(control.is_pipeline_stopped) {
261             output_helper::destroy_token(temp_output);
262             set_end_of_input();
263             return nullptr;
264         }
265         return output_helper::cast_to_void_ptr(temp_output);
266     }
267 
268 public:
269     concrete_filter(unsigned int m, const Body& body) :
270         base_filter(m | filter_may_emit_null),
271         my_body(body)
272     {}
273 };
274 
275 // output
276 template<typename InputType, typename Body>
277 class concrete_filter<InputType, void, Body>: public base_filter {
278     const Body& my_body;
279     using input_helper = token_helper<InputType, use_allocator<InputType >::value>;
280     using input_pointer = typename input_helper::pointer;
281 
282     void* operator()(void* input) override {
283         input_pointer temp_input = input_helper::cast_from_void_ptr(input);
284         tbb::detail::invoke(my_body, std::move(input_helper::token(temp_input)));
285         input_helper::destroy_token(temp_input);
286         return nullptr;
287     }
288     void finalize(void* input) override {
289         input_pointer temp_input = input_helper::cast_from_void_ptr(input);
290         input_helper::destroy_token(temp_input);
291     }
292 
293 public:
294     concrete_filter(unsigned int m, const Body& body) : base_filter(m), my_body(body) {}
295 };
296 
297 template<typename Body>
298 class concrete_filter<void, void, Body>: public base_filter {
299     const Body& my_body;
300 
301     void* operator()(void*) override {
302         flow_control control;
303         my_body(control);
304         void* output = control.is_pipeline_stopped ? nullptr : (void*)(std::intptr_t)-1;
305         return output;
306     }
307 public:
308     concrete_filter(unsigned int m, const Body& body) : base_filter(m), my_body(body) {}
309 };
310 
311 class filter_node_ptr {
312     filter_node * my_node;
313 
314 public:
315     filter_node_ptr() : my_node(nullptr) {}
316     filter_node_ptr(filter_node *);
317     ~filter_node_ptr();
318     filter_node_ptr(const filter_node_ptr &);
319     filter_node_ptr(filter_node_ptr &&);
320     void operator=(filter_node *);
321     void operator=(const filter_node_ptr &);
322     void operator=(filter_node_ptr &&);
323     filter_node& operator*() const;
324     operator bool() const;
325 };
326 
327 //! Abstract base class that represents a node in a parse tree underlying a filter class.
328 /** These nodes are always heap-allocated and can be shared by filter objects. */
329 class filter_node {
330     /** Count must be atomic because it is hidden state for user, but might be shared by threads. */
331     std::atomic<std::intptr_t> ref_count;
332 public:
333     filter_node_ptr left;
334     filter_node_ptr right;
335 protected:
336     filter_node() : ref_count(0), left(nullptr), right(nullptr) {
337 #ifdef __TBB_TEST_FILTER_NODE_COUNT
338         ++(__TBB_TEST_FILTER_NODE_COUNT);
339 #endif
340     }
341 public:
342     filter_node(const filter_node_ptr& x, const filter_node_ptr& y) : filter_node(){
343         left = x;
344         right = y;
345     }
346     filter_node(const filter_node&) = delete;
347     filter_node& operator=(const filter_node&) = delete;
348 
349     //! Add concrete_filter to pipeline
350     virtual base_filter* create_filter() const {
351         __TBB_ASSERT(false, "method of non-leaf was called");
352         return nullptr;
353     }
354 
355     //! Increment reference count
356     void add_ref() { ref_count.fetch_add(1, std::memory_order_relaxed); }
357 
358     //! Decrement reference count and delete if it becomes zero.
359     void remove_ref() {
360         __TBB_ASSERT(ref_count>0,"ref_count underflow");
361         if( ref_count.fetch_sub(1, std::memory_order_relaxed) == 1 ) {
362             this->~filter_node();
363             r1::deallocate_memory(this);
364         }
365     }
366 
367     virtual ~filter_node() {
368 #ifdef __TBB_TEST_FILTER_NODE_COUNT
369         --(__TBB_TEST_FILTER_NODE_COUNT);
370 #endif
371     }
372 };
373 
374 inline filter_node_ptr::filter_node_ptr(filter_node * nd) : my_node(nd) {
375     if (my_node) {
376         my_node->add_ref();
377     }
378 }
379 
380 inline filter_node_ptr::~filter_node_ptr() {
381     if (my_node) {
382         my_node->remove_ref();
383     }
384 }
385 
386 inline filter_node_ptr::filter_node_ptr(const filter_node_ptr & rhs) : my_node(rhs.my_node) {
387     if (my_node) {
388         my_node->add_ref();
389     }
390 }
391 
392 inline filter_node_ptr::filter_node_ptr(filter_node_ptr && rhs) : my_node(rhs.my_node) {
393     rhs.my_node = nullptr;
394 }
395 
396 inline void filter_node_ptr::operator=(filter_node * rhs) {
397     // Order of operations below carefully chosen so that reference counts remain correct
398     // in unlikely event that remove_ref throws exception.
399     filter_node* old = my_node;
400     my_node = rhs;
401     if (my_node) {
402         my_node->add_ref();
403     }
404     if (old) {
405         old->remove_ref();
406     }
407 }
408 
409 inline void filter_node_ptr::operator=(const filter_node_ptr & rhs) {
410     *this = rhs.my_node;
411 }
412 
413 inline void filter_node_ptr::operator=(filter_node_ptr && rhs) {
414     filter_node* old = my_node;
415     my_node = rhs.my_node;
416     rhs.my_node = nullptr;
417     if (old) {
418         old->remove_ref();
419     }
420 }
421 
422 inline filter_node& filter_node_ptr::operator*() const{
423     __TBB_ASSERT(my_node,"nullptr node is used");
424     return *my_node;
425 }
426 
427 inline filter_node_ptr::operator bool() const {
428     return my_node != nullptr;
429 }
430 
431 //! Node in parse tree representing result of make_filter.
432 template<typename InputType, typename OutputType, typename Body>
433 class filter_node_leaf: public filter_node {
434     const unsigned int my_mode;
435     const Body my_body;
436     base_filter* create_filter() const override {
437         return new(r1::allocate_memory(sizeof(concrete_filter<InputType, OutputType, Body>))) concrete_filter<InputType, OutputType, Body>(my_mode,my_body);
438     }
439 public:
440     filter_node_leaf( unsigned int m, const Body& b ) : my_mode(m), my_body(b) {}
441 };
442 
443 
444 template <typename Body, typename Input = typename filter_body_types<decltype(&Body::operator())>::input_type>
445 using filter_input = typename std::conditional<std::is_same<Input, flow_control>::value, void, Input>::type;
446 
447 template <typename Body>
448 using filter_output = typename filter_body_types<decltype(&Body::operator())>::output_type;
449 
450 } // namespace d1
451 } // namespace detail
452 } // namespace tbb
453 
454 
455 #endif /* __TBB_parallel_filters_H */
456