1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_parallel_pipeline_H
18 #define __TBB_parallel_pipeline_H
19 
20 #include "detail/_pipeline_filters.h"
21 #include "detail/_config.h"
22 #include "detail/_namespace_injection.h"
23 #include "task_group.h"
24 
25 #include <cstddef>
26 #include <atomic>
27 #include <type_traits>
28 
29 namespace tbb {
30 namespace detail {
31 
32 namespace r1 {
33 TBB_EXPORT void __TBB_EXPORTED_FUNC parallel_pipeline(task_group_context&, std::size_t, const d1::filter_node&);
34 }
35 
36 namespace d1 {
37 
38 enum class filter_mode : unsigned int
39 {
40     //! processes multiple items in parallel and in no particular order
41     parallel = base_filter::filter_is_out_of_order,
42     //! processes items one at a time; all such filters process items in the same order
43     serial_in_order =  base_filter::filter_is_serial,
44     //! processes items one at a time and in no particular order
45     serial_out_of_order = base_filter::filter_is_serial | base_filter::filter_is_out_of_order
46 };
47 //! Class representing a chain of type-safe pipeline filters
48 /** @ingroup algorithms */
49 template<typename InputType, typename OutputType>
50 class filter {
51     filter_node_ptr my_root;
filter(filter_node_ptr root)52     filter( filter_node_ptr root ) : my_root(root) {}
53     friend void parallel_pipeline( size_t, const filter<void,void>&, task_group_context& );
54     template<typename T_, typename U_, typename Body>
55     friend filter<T_,U_> make_filter( filter_mode, const Body& );
56     template<typename T_, typename V_, typename U_>
57     friend filter<T_,U_> operator&( const filter<T_,V_>&, const filter<V_,U_>& );
58 public:
59     filter() = default;
filter(const filter & rhs)60     filter( const filter& rhs ) : my_root(rhs.my_root) {}
filter(filter && rhs)61     filter( filter&& rhs ) : my_root(std::move(rhs.my_root)) {}
62 
63     void operator=(const filter& rhs) {
64         my_root = rhs.my_root;
65     }
66     void operator=( filter&& rhs ) {
67         my_root = std::move(rhs.my_root);
68     }
69 
70     template<typename Body>
filter(filter_mode mode,const Body & body)71     filter( filter_mode mode, const Body& body ) :
72         my_root( new(r1::allocate_memory(sizeof(filter_node_leaf<InputType, OutputType, Body>)))
73                     filter_node_leaf<InputType, OutputType, Body>(static_cast<unsigned int>(mode), body) ) {
74     }
75 
76     filter& operator&=( const filter<OutputType,OutputType>& right ) {
77         *this = *this & right;
78         return *this;
79     }
80 
clear()81     void clear() {
82         // Like operator= with filter() on right side.
83         my_root = nullptr;
84     }
85 };
86 
87 //! Create a filter to participate in parallel_pipeline
88 /** @ingroup algorithms */
89 template<typename InputType, typename OutputType, typename Body>
make_filter(filter_mode mode,const Body & body)90 filter<InputType, OutputType> make_filter( filter_mode mode, const Body& body ) {
91     return filter_node_ptr( new(r1::allocate_memory(sizeof(filter_node_leaf<InputType, OutputType, Body>)))
92                                 filter_node_leaf<InputType, OutputType, Body>(static_cast<unsigned int>(mode), body) );
93 }
94 
95 //! Create a filter to participate in parallel_pipeline
96 /** @ingroup algorithms */
97 template<typename Body>
make_filter(filter_mode mode,const Body & body)98 filter<filter_input<Body>, filter_output<Body>> make_filter( filter_mode mode, const Body& body ) {
99     return make_filter<filter_input<Body>, filter_output<Body>>(mode, body);
100 }
101 
102 //! Composition of filters left and right.
103 /** @ingroup algorithms */
104 template<typename T, typename V, typename U>
105 filter<T,U> operator&( const filter<T,V>& left, const filter<V,U>& right ) {
106     __TBB_ASSERT(left.my_root,"cannot use default-constructed filter as left argument of '&'");
107     __TBB_ASSERT(right.my_root,"cannot use default-constructed filter as right argument of '&'");
108     return filter_node_ptr( new (r1::allocate_memory(sizeof(filter_node))) filter_node(left.my_root,right.my_root) );
109 }
110 
111 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
112 template<typename Body>
113 filter(filter_mode, Body)
114 ->filter<filter_input<Body>, filter_output<Body>>;
115 #endif // __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
116 
117 //! Parallel pipeline over chain of filters with user-supplied context.
118 /** @ingroup algorithms **/
parallel_pipeline(size_t max_number_of_live_tokens,const filter<void,void> & filter_chain,task_group_context & context)119 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter<void,void>& filter_chain, task_group_context& context) {
120     r1::parallel_pipeline(context, max_number_of_live_tokens, *filter_chain.my_root);
121 }
122 
123 //! Parallel pipeline over chain of filters.
124 /** @ingroup algorithms **/
parallel_pipeline(size_t max_number_of_live_tokens,const filter<void,void> & filter_chain)125 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter<void,void>& filter_chain) {
126     task_group_context context;
127     parallel_pipeline(max_number_of_live_tokens, filter_chain, context);
128 }
129 
130 //! Parallel pipeline over sequence of filters.
131 /** @ingroup algorithms **/
132 template<typename F1, typename F2, typename... FiltersContext>
parallel_pipeline(size_t max_number_of_live_tokens,const F1 & filter1,const F2 & filter2,FiltersContext &&...filters)133 void parallel_pipeline(size_t max_number_of_live_tokens,
134                               const F1& filter1,
135                               const F2& filter2,
136                               FiltersContext&&... filters) {
137     parallel_pipeline(max_number_of_live_tokens, filter1 & filter2, std::forward<FiltersContext>(filters)...);
138 }
139 
140 } // namespace d1
141 } // namespace detail
142 
143 inline namespace v1
144 {
145 using detail::d1::parallel_pipeline;
146 using detail::d1::filter;
147 using detail::d1::make_filter;
148 using detail::d1::filter_mode;
149 using detail::d1::flow_control;
150 }
151 } // tbb
152 
153 #endif /* __TBB_parallel_pipeline_H */
154