151c0b2f7Stbbdev /*
2*c21e688aSSergey Zheltov Copyright (c) 2005-2022 Intel Corporation
351c0b2f7Stbbdev
451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev you may not use this file except in compliance with the License.
651c0b2f7Stbbdev You may obtain a copy of the License at
751c0b2f7Stbbdev
851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev
1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev See the License for the specific language governing permissions and
1451c0b2f7Stbbdev limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev
1749e08aacStbbdev #include "oneapi/tbb/parallel_pipeline.h"
1849e08aacStbbdev #include "oneapi/tbb/spin_mutex.h"
1949e08aacStbbdev #include "oneapi/tbb/tbb_allocator.h"
2049e08aacStbbdev #include "oneapi/tbb/cache_aligned_allocator.h"
2151c0b2f7Stbbdev #include "itt_notify.h"
2251c0b2f7Stbbdev #include "tls.h"
2349e08aacStbbdev #include "oneapi/tbb/detail/_exception.h"
2449e08aacStbbdev #include "oneapi/tbb/detail/_small_object_pool.h"
2551c0b2f7Stbbdev
2651c0b2f7Stbbdev namespace tbb {
2751c0b2f7Stbbdev namespace detail {
2851c0b2f7Stbbdev namespace r1 {
2951c0b2f7Stbbdev
3051c0b2f7Stbbdev void handle_perror(int error_code, const char* aux_info);
3151c0b2f7Stbbdev
3251c0b2f7Stbbdev using Token = unsigned long;
3351c0b2f7Stbbdev
3451c0b2f7Stbbdev //! A processing pipeline that applies filters to items.
3551c0b2f7Stbbdev /** @ingroup algorithms */
3651c0b2f7Stbbdev class pipeline {
3751c0b2f7Stbbdev friend void parallel_pipeline(d1::task_group_context&, std::size_t, const d1::filter_node&);
3851c0b2f7Stbbdev public:
3951c0b2f7Stbbdev
4051c0b2f7Stbbdev //! Construct empty pipeline.
pipeline(d1::task_group_context & cxt,std::size_t max_token)4151c0b2f7Stbbdev pipeline(d1::task_group_context& cxt, std::size_t max_token) :
4251c0b2f7Stbbdev my_context(cxt),
4351c0b2f7Stbbdev first_filter(nullptr),
4451c0b2f7Stbbdev last_filter(nullptr),
4551c0b2f7Stbbdev input_tokens(Token(max_token)),
4651c0b2f7Stbbdev end_of_input(false),
4751c0b2f7Stbbdev wait_ctx(0) {
4851c0b2f7Stbbdev __TBB_ASSERT( max_token>0, "pipeline::run must have at least one token" );
4951c0b2f7Stbbdev }
5051c0b2f7Stbbdev
5151c0b2f7Stbbdev ~pipeline();
5251c0b2f7Stbbdev
5351c0b2f7Stbbdev //! Add filter to end of pipeline.
5451c0b2f7Stbbdev void add_filter( d1::base_filter& );
5551c0b2f7Stbbdev
5651c0b2f7Stbbdev //! Traverse tree of fitler-node in-order and add filter for each leaf
fill_pipeline(const d1::filter_node & root)5751c0b2f7Stbbdev void fill_pipeline(const d1::filter_node& root) {
5851c0b2f7Stbbdev if( root.left && root.right ) {
5951c0b2f7Stbbdev fill_pipeline(*root.left);
6051c0b2f7Stbbdev fill_pipeline(*root.right);
6151c0b2f7Stbbdev }
6251c0b2f7Stbbdev else {
6351c0b2f7Stbbdev __TBB_ASSERT(!root.left && !root.right, "tree should be full");
6451c0b2f7Stbbdev add_filter(*root.create_filter());
6551c0b2f7Stbbdev }
6651c0b2f7Stbbdev }
6751c0b2f7Stbbdev
6851c0b2f7Stbbdev private:
6951c0b2f7Stbbdev friend class stage_task;
7051c0b2f7Stbbdev friend class base_filter;
7151c0b2f7Stbbdev friend void set_end_of_input(d1::base_filter& bf);
7251c0b2f7Stbbdev
7351c0b2f7Stbbdev task_group_context& my_context;
7451c0b2f7Stbbdev
7551c0b2f7Stbbdev //! Pointer to first filter in the pipeline.
7651c0b2f7Stbbdev d1::base_filter* first_filter;
7751c0b2f7Stbbdev
7851c0b2f7Stbbdev //! Pointer to last filter in the pipeline.
7951c0b2f7Stbbdev d1::base_filter* last_filter;
8051c0b2f7Stbbdev
8151c0b2f7Stbbdev //! Number of idle tokens waiting for input stage.
8251c0b2f7Stbbdev std::atomic<Token> input_tokens;
8351c0b2f7Stbbdev
8451c0b2f7Stbbdev //! False until flow_control::stop() is called.
8551c0b2f7Stbbdev std::atomic<bool> end_of_input;
8651c0b2f7Stbbdev
8751c0b2f7Stbbdev d1::wait_context wait_ctx;
8851c0b2f7Stbbdev };
8951c0b2f7Stbbdev
90478de5b1Stbbdev //! This structure is used to store task information in an input buffer
9151c0b2f7Stbbdev struct task_info {
9251c0b2f7Stbbdev void* my_object = nullptr;
9351c0b2f7Stbbdev //! Invalid unless a task went through an ordered stage.
9451c0b2f7Stbbdev Token my_token = 0;
9551c0b2f7Stbbdev //! False until my_token is set.
9651c0b2f7Stbbdev bool my_token_ready = false;
9751c0b2f7Stbbdev //! True if my_object is valid.
9851c0b2f7Stbbdev bool is_valid = false;
9951c0b2f7Stbbdev //! Set to initial state (no object, no token)
resettbb::detail::r1::task_info10051c0b2f7Stbbdev void reset() {
10151c0b2f7Stbbdev my_object = nullptr;
10251c0b2f7Stbbdev my_token = 0;
10351c0b2f7Stbbdev my_token_ready = false;
10451c0b2f7Stbbdev is_valid = false;
10551c0b2f7Stbbdev }
10651c0b2f7Stbbdev };
10751c0b2f7Stbbdev
10851c0b2f7Stbbdev //! A buffer of input items for a filter.
10951c0b2f7Stbbdev /** Each item is a task_info, inserted into a position in the buffer corresponding to a Token. */
11051c0b2f7Stbbdev class input_buffer {
11151c0b2f7Stbbdev friend class base_filter;
11251c0b2f7Stbbdev friend class stage_task;
11351c0b2f7Stbbdev friend class pipeline;
11451c0b2f7Stbbdev friend void set_end_of_input(d1::base_filter& bf);
11551c0b2f7Stbbdev
11651c0b2f7Stbbdev using size_type = Token;
11751c0b2f7Stbbdev
11851c0b2f7Stbbdev //! Array of deferred tasks that cannot yet start executing.
11951c0b2f7Stbbdev task_info* array;
12051c0b2f7Stbbdev
12151c0b2f7Stbbdev //! Size of array
12251c0b2f7Stbbdev /** Always 0 or a power of 2 */
12351c0b2f7Stbbdev size_type array_size;
12451c0b2f7Stbbdev
12551c0b2f7Stbbdev //! Lowest token that can start executing.
12651c0b2f7Stbbdev /** All prior Token have already been seen. */
12751c0b2f7Stbbdev Token low_token;
12851c0b2f7Stbbdev
12951c0b2f7Stbbdev //! Serializes updates.
13051c0b2f7Stbbdev spin_mutex array_mutex;
13151c0b2f7Stbbdev
13251c0b2f7Stbbdev //! Resize "array".
13351c0b2f7Stbbdev /** Caller is responsible to acquiring a lock on "array_mutex". */
13451c0b2f7Stbbdev void grow( size_type minimum_size );
13551c0b2f7Stbbdev
13651c0b2f7Stbbdev //! Initial size for "array"
13751c0b2f7Stbbdev /** Must be a power of 2 */
13851c0b2f7Stbbdev static const size_type initial_buffer_size = 4;
13951c0b2f7Stbbdev
14051c0b2f7Stbbdev //! Used for out of order buffer, and for assigning my_token if is_ordered and my_token not already assigned
14151c0b2f7Stbbdev Token high_token;
14251c0b2f7Stbbdev
14351c0b2f7Stbbdev //! True for ordered filter, false otherwise.
14451c0b2f7Stbbdev const bool is_ordered;
14551c0b2f7Stbbdev
14657f524caSIlya Isaev //! for parallel filters that accepts nullptrs, thread-local flag for reaching end_of_input
1475a643c23Svlserov using end_of_input_tls_t = basic_tls<input_buffer*>;
14851c0b2f7Stbbdev end_of_input_tls_t end_of_input_tls;
14951c0b2f7Stbbdev bool end_of_input_tls_allocated; // no way to test pthread creation of TLS
15051c0b2f7Stbbdev
15151c0b2f7Stbbdev public:
15251c0b2f7Stbbdev input_buffer(const input_buffer&) = delete;
15351c0b2f7Stbbdev input_buffer& operator=(const input_buffer&) = delete;
15451c0b2f7Stbbdev
15551c0b2f7Stbbdev //! Construct empty buffer.
input_buffer(bool ordered)15651c0b2f7Stbbdev input_buffer( bool ordered) :
15751c0b2f7Stbbdev array(nullptr),
15851c0b2f7Stbbdev array_size(0),
15951c0b2f7Stbbdev low_token(0),
16051c0b2f7Stbbdev high_token(0),
16151c0b2f7Stbbdev is_ordered(ordered),
16251c0b2f7Stbbdev end_of_input_tls(),
16351c0b2f7Stbbdev end_of_input_tls_allocated(false) {
16451c0b2f7Stbbdev grow(initial_buffer_size);
16551c0b2f7Stbbdev __TBB_ASSERT( array, nullptr );
16651c0b2f7Stbbdev }
16751c0b2f7Stbbdev
16851c0b2f7Stbbdev //! Destroy the buffer.
~input_buffer()16951c0b2f7Stbbdev ~input_buffer() {
17051c0b2f7Stbbdev __TBB_ASSERT( array, nullptr );
17151c0b2f7Stbbdev cache_aligned_allocator<task_info>().deallocate(array,array_size);
17251c0b2f7Stbbdev poison_pointer( array );
17351c0b2f7Stbbdev if( end_of_input_tls_allocated ) {
17451c0b2f7Stbbdev destroy_my_tls();
17551c0b2f7Stbbdev }
17651c0b2f7Stbbdev }
17751c0b2f7Stbbdev
17851c0b2f7Stbbdev //! Define order when the first filter is serial_in_order.
get_ordered_token()17951c0b2f7Stbbdev Token get_ordered_token(){
18051c0b2f7Stbbdev return high_token++;
18151c0b2f7Stbbdev }
18251c0b2f7Stbbdev
18351c0b2f7Stbbdev //! Put a token into the buffer.
18451c0b2f7Stbbdev /** If task information was placed into buffer, returns true;
18551c0b2f7Stbbdev otherwise returns false, informing the caller to create and spawn a task.
18651c0b2f7Stbbdev */
try_put_token(task_info & info)18751c0b2f7Stbbdev bool try_put_token( task_info& info ) {
18851c0b2f7Stbbdev info.is_valid = true;
18951c0b2f7Stbbdev spin_mutex::scoped_lock lock( array_mutex );
19051c0b2f7Stbbdev Token token;
19151c0b2f7Stbbdev if( is_ordered ) {
19251c0b2f7Stbbdev if( !info.my_token_ready ) {
19351c0b2f7Stbbdev info.my_token = high_token++;
19451c0b2f7Stbbdev info.my_token_ready = true;
19551c0b2f7Stbbdev }
19651c0b2f7Stbbdev token = info.my_token;
19751c0b2f7Stbbdev } else
19851c0b2f7Stbbdev token = high_token++;
19951c0b2f7Stbbdev __TBB_ASSERT( (long)(token-low_token)>=0, nullptr );
20051c0b2f7Stbbdev if( token!=low_token ) {
20151c0b2f7Stbbdev // Trying to put token that is beyond low_token.
20251c0b2f7Stbbdev // Need to wait until low_token catches up before dispatching.
20351c0b2f7Stbbdev if( token-low_token>=array_size )
20451c0b2f7Stbbdev grow( token-low_token+1 );
20551c0b2f7Stbbdev ITT_NOTIFY( sync_releasing, this );
20651c0b2f7Stbbdev array[token&(array_size-1)] = info;
20751c0b2f7Stbbdev return true;
20851c0b2f7Stbbdev }
20951c0b2f7Stbbdev return false;
21051c0b2f7Stbbdev }
21151c0b2f7Stbbdev
21251c0b2f7Stbbdev //! Note that processing of a token is finished.
21351c0b2f7Stbbdev /** Fires up processing of the next token, if processing was deferred. */
21451c0b2f7Stbbdev // Uses template to avoid explicit dependency on stage_task.
21551c0b2f7Stbbdev template<typename StageTask>
try_to_spawn_task_for_next_token(StageTask & spawner,d1::execution_data & ed)21651c0b2f7Stbbdev void try_to_spawn_task_for_next_token(StageTask& spawner, d1::execution_data& ed) {
21751c0b2f7Stbbdev task_info wakee;
21851c0b2f7Stbbdev {
21951c0b2f7Stbbdev spin_mutex::scoped_lock lock( array_mutex );
22051c0b2f7Stbbdev // Wake the next task
22151c0b2f7Stbbdev task_info& item = array[++low_token & (array_size-1)];
22251c0b2f7Stbbdev ITT_NOTIFY( sync_acquired, this );
22351c0b2f7Stbbdev wakee = item;
22451c0b2f7Stbbdev item.is_valid = false;
22551c0b2f7Stbbdev }
22651c0b2f7Stbbdev if( wakee.is_valid )
22751c0b2f7Stbbdev spawner.spawn_stage_task(wakee, ed);
22851c0b2f7Stbbdev }
22951c0b2f7Stbbdev
23051c0b2f7Stbbdev // end_of_input signal for parallel_pipeline, parallel input filters with 0 tokens allowed.
create_my_tls()23151c0b2f7Stbbdev void create_my_tls() {
23251c0b2f7Stbbdev int status = end_of_input_tls.create();
23351c0b2f7Stbbdev if(status)
23451c0b2f7Stbbdev handle_perror(status, "TLS not allocated for filter");
23551c0b2f7Stbbdev end_of_input_tls_allocated = true;
23651c0b2f7Stbbdev }
destroy_my_tls()23751c0b2f7Stbbdev void destroy_my_tls() {
23851c0b2f7Stbbdev int status = end_of_input_tls.destroy();
23951c0b2f7Stbbdev if(status)
24051c0b2f7Stbbdev handle_perror(status, "Failed to destroy filter TLS");
24151c0b2f7Stbbdev }
my_tls_end_of_input()24251c0b2f7Stbbdev bool my_tls_end_of_input() {
2435a643c23Svlserov return end_of_input_tls.get() != nullptr;
24451c0b2f7Stbbdev }
set_my_tls_end_of_input()24551c0b2f7Stbbdev void set_my_tls_end_of_input() {
2465a643c23Svlserov end_of_input_tls.set(this);
24751c0b2f7Stbbdev }
24851c0b2f7Stbbdev };
24951c0b2f7Stbbdev
grow(size_type minimum_size)25051c0b2f7Stbbdev void input_buffer::grow( size_type minimum_size ) {
25151c0b2f7Stbbdev size_type old_size = array_size;
25251c0b2f7Stbbdev size_type new_size = old_size ? 2*old_size : initial_buffer_size;
25351c0b2f7Stbbdev while( new_size<minimum_size )
25451c0b2f7Stbbdev new_size*=2;
25551c0b2f7Stbbdev task_info* new_array = cache_aligned_allocator<task_info>().allocate(new_size);
25651c0b2f7Stbbdev task_info* old_array = array;
25751c0b2f7Stbbdev for( size_type i=0; i<new_size; ++i )
25851c0b2f7Stbbdev new_array[i].is_valid = false;
25951c0b2f7Stbbdev Token t=low_token;
26051c0b2f7Stbbdev for( size_type i=0; i<old_size; ++i, ++t )
26151c0b2f7Stbbdev new_array[t&(new_size-1)] = old_array[t&(old_size-1)];
26251c0b2f7Stbbdev array = new_array;
26351c0b2f7Stbbdev array_size = new_size;
26451c0b2f7Stbbdev if( old_array )
26551c0b2f7Stbbdev cache_aligned_allocator<task_info>().deallocate(old_array,old_size);
26651c0b2f7Stbbdev }
26751c0b2f7Stbbdev
26851c0b2f7Stbbdev class stage_task : public d1::task, public task_info {
26951c0b2f7Stbbdev private:
27051c0b2f7Stbbdev friend class pipeline;
27151c0b2f7Stbbdev pipeline& my_pipeline;
27251c0b2f7Stbbdev d1::base_filter* my_filter;
27351c0b2f7Stbbdev d1::small_object_allocator m_allocator;
27451c0b2f7Stbbdev //! True if this task has not yet read the input.
27551c0b2f7Stbbdev bool my_at_start;
27651c0b2f7Stbbdev
27751c0b2f7Stbbdev //! True if this can be executed again.
27851c0b2f7Stbbdev bool execute_filter(d1::execution_data& ed);
27951c0b2f7Stbbdev
28051c0b2f7Stbbdev //! Spawn task if token is available.
try_spawn_stage_task(d1::execution_data & ed)28151c0b2f7Stbbdev void try_spawn_stage_task(d1::execution_data& ed) {
28251c0b2f7Stbbdev ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens );
283478de5b1Stbbdev if( (my_pipeline.input_tokens.fetch_sub(1, std::memory_order_release)) > 1 ) {
28451c0b2f7Stbbdev d1::small_object_allocator alloc{};
28551c0b2f7Stbbdev r1::spawn( *alloc.new_object<stage_task>(ed, my_pipeline, alloc ), my_pipeline.my_context );
28651c0b2f7Stbbdev }
28751c0b2f7Stbbdev }
28851c0b2f7Stbbdev
28951c0b2f7Stbbdev public:
29051c0b2f7Stbbdev
29151c0b2f7Stbbdev //! Construct stage_task for first stage in a pipeline.
29251c0b2f7Stbbdev /** Such a stage has not read any input yet. */
stage_task(pipeline & pipeline,d1::small_object_allocator & alloc)29351c0b2f7Stbbdev stage_task(pipeline& pipeline, d1::small_object_allocator& alloc ) :
29451c0b2f7Stbbdev my_pipeline(pipeline),
29551c0b2f7Stbbdev my_filter(pipeline.first_filter),
29651c0b2f7Stbbdev m_allocator(alloc),
29751c0b2f7Stbbdev my_at_start(true)
29851c0b2f7Stbbdev {
29951c0b2f7Stbbdev task_info::reset();
30051c0b2f7Stbbdev my_pipeline.wait_ctx.reserve();
30151c0b2f7Stbbdev }
30251c0b2f7Stbbdev //! Construct stage_task for a subsequent stage in a pipeline.
stage_task(pipeline & pipeline,d1::base_filter * filter,const task_info & info,d1::small_object_allocator & alloc)30351c0b2f7Stbbdev stage_task(pipeline& pipeline, d1::base_filter* filter, const task_info& info, d1::small_object_allocator& alloc) :
30451c0b2f7Stbbdev task_info(info),
30551c0b2f7Stbbdev my_pipeline(pipeline),
30651c0b2f7Stbbdev my_filter(filter),
30751c0b2f7Stbbdev m_allocator(alloc),
30851c0b2f7Stbbdev my_at_start(false)
30951c0b2f7Stbbdev {
31051c0b2f7Stbbdev my_pipeline.wait_ctx.reserve();
31151c0b2f7Stbbdev }
31251c0b2f7Stbbdev //! Roughly equivalent to the constructor of input stage task
reset()31351c0b2f7Stbbdev void reset() {
31451c0b2f7Stbbdev task_info::reset();
31551c0b2f7Stbbdev my_filter = my_pipeline.first_filter;
31651c0b2f7Stbbdev my_at_start = true;
31751c0b2f7Stbbdev }
finalize(d1::execution_data & ed)31851c0b2f7Stbbdev void finalize(d1::execution_data& ed) {
31951c0b2f7Stbbdev m_allocator.delete_object(this, ed);
32051c0b2f7Stbbdev }
32151c0b2f7Stbbdev //! The virtual task execution method
execute(d1::execution_data & ed)32251c0b2f7Stbbdev task* execute(d1::execution_data& ed) override {
32351c0b2f7Stbbdev if(!execute_filter(ed)) {
32451c0b2f7Stbbdev finalize(ed);
32551c0b2f7Stbbdev return nullptr;
32651c0b2f7Stbbdev }
32751c0b2f7Stbbdev return this;
32851c0b2f7Stbbdev }
cancel(d1::execution_data & ed)32951c0b2f7Stbbdev task* cancel(d1::execution_data& ed) override {
33051c0b2f7Stbbdev finalize(ed);
33151c0b2f7Stbbdev return nullptr;
33251c0b2f7Stbbdev }
33351c0b2f7Stbbdev
~stage_task()334ba947f18SIlya Isaev ~stage_task() override {
33551c0b2f7Stbbdev if ( my_filter && my_object ) {
33651c0b2f7Stbbdev my_filter->finalize(my_object);
33751c0b2f7Stbbdev my_object = nullptr;
33851c0b2f7Stbbdev }
33951c0b2f7Stbbdev my_pipeline.wait_ctx.release();
34051c0b2f7Stbbdev }
34151c0b2f7Stbbdev //! Creates and spawns stage_task from task_info
spawn_stage_task(const task_info & info,d1::execution_data & ed)34251c0b2f7Stbbdev void spawn_stage_task(const task_info& info, d1::execution_data& ed) {
34351c0b2f7Stbbdev d1::small_object_allocator alloc{};
34451c0b2f7Stbbdev stage_task* clone = alloc.new_object<stage_task>(ed, my_pipeline, my_filter, info, alloc);
34551c0b2f7Stbbdev r1::spawn(*clone, my_pipeline.my_context);
34651c0b2f7Stbbdev }
34751c0b2f7Stbbdev };
34851c0b2f7Stbbdev
execute_filter(d1::execution_data & ed)34951c0b2f7Stbbdev bool stage_task::execute_filter(d1::execution_data& ed) {
35051c0b2f7Stbbdev __TBB_ASSERT( !my_at_start || !my_object, "invalid state of task" );
35151c0b2f7Stbbdev if( my_at_start ) {
35251c0b2f7Stbbdev if( my_filter->is_serial() ) {
35351c0b2f7Stbbdev my_object = (*my_filter)(my_object);
35451c0b2f7Stbbdev if( my_object || ( my_filter->object_may_be_null() && !my_pipeline.end_of_input.load(std::memory_order_relaxed)) ) {
35551c0b2f7Stbbdev if( my_filter->is_ordered() ) {
35651c0b2f7Stbbdev my_token = my_filter->my_input_buffer->get_ordered_token();
35751c0b2f7Stbbdev my_token_ready = true;
35851c0b2f7Stbbdev }
35951c0b2f7Stbbdev if( !my_filter->next_filter_in_pipeline ) { // we're only filter in pipeline
36051c0b2f7Stbbdev reset();
36151c0b2f7Stbbdev return true;
36251c0b2f7Stbbdev } else {
36351c0b2f7Stbbdev try_spawn_stage_task(ed);
36451c0b2f7Stbbdev }
36551c0b2f7Stbbdev } else {
36651c0b2f7Stbbdev my_pipeline.end_of_input.store(true, std::memory_order_relaxed);
36751c0b2f7Stbbdev return false;
36851c0b2f7Stbbdev }
36951c0b2f7Stbbdev } else /*not is_serial*/ {
37051c0b2f7Stbbdev if ( my_pipeline.end_of_input.load(std::memory_order_relaxed) ) {
37151c0b2f7Stbbdev return false;
37251c0b2f7Stbbdev }
37351c0b2f7Stbbdev
37451c0b2f7Stbbdev try_spawn_stage_task(ed);
37551c0b2f7Stbbdev
37651c0b2f7Stbbdev my_object = (*my_filter)(my_object);
37751c0b2f7Stbbdev if( !my_object && (!my_filter->object_may_be_null() || my_filter->my_input_buffer->my_tls_end_of_input()) ){
37851c0b2f7Stbbdev my_pipeline.end_of_input.store(true, std::memory_order_relaxed);
37951c0b2f7Stbbdev return false;
38051c0b2f7Stbbdev }
38151c0b2f7Stbbdev }
38251c0b2f7Stbbdev my_at_start = false;
38351c0b2f7Stbbdev } else {
38451c0b2f7Stbbdev my_object = (*my_filter)(my_object);
38551c0b2f7Stbbdev if( my_filter->is_serial() )
38651c0b2f7Stbbdev my_filter->my_input_buffer->try_to_spawn_task_for_next_token(*this, ed);
38751c0b2f7Stbbdev }
38851c0b2f7Stbbdev my_filter = my_filter->next_filter_in_pipeline;
38951c0b2f7Stbbdev if( my_filter ) {
39051c0b2f7Stbbdev // There is another filter to execute.
39151c0b2f7Stbbdev if( my_filter->is_serial() ) {
39251c0b2f7Stbbdev // The next filter must execute tokens when they are available (in order for serial_in_order)
39351c0b2f7Stbbdev if( my_filter->my_input_buffer->try_put_token(*this) ){
39451c0b2f7Stbbdev my_filter = nullptr; // To prevent deleting my_object twice if exception occurs
39551c0b2f7Stbbdev return false;
39651c0b2f7Stbbdev }
39751c0b2f7Stbbdev }
39851c0b2f7Stbbdev } else {
39951c0b2f7Stbbdev // Reached end of the pipe.
400478de5b1Stbbdev std::size_t ntokens_avail = my_pipeline.input_tokens.fetch_add(1, std::memory_order_acquire);
40151c0b2f7Stbbdev
40251c0b2f7Stbbdev if( ntokens_avail>0 // Only recycle if there is one available token
40351c0b2f7Stbbdev || my_pipeline.end_of_input.load(std::memory_order_relaxed) ) {
40451c0b2f7Stbbdev return false; // No need to recycle for new input
40551c0b2f7Stbbdev }
40651c0b2f7Stbbdev ITT_NOTIFY( sync_acquired, &my_pipeline.input_tokens );
40751c0b2f7Stbbdev // Recycle as an input stage task.
40851c0b2f7Stbbdev reset();
40951c0b2f7Stbbdev }
41051c0b2f7Stbbdev return true;
41151c0b2f7Stbbdev }
41251c0b2f7Stbbdev
~pipeline()41351c0b2f7Stbbdev pipeline::~pipeline() {
41451c0b2f7Stbbdev while( first_filter ) {
41551c0b2f7Stbbdev d1::base_filter* f = first_filter;
41651c0b2f7Stbbdev if( input_buffer* b = f->my_input_buffer ) {
41751c0b2f7Stbbdev b->~input_buffer();
41851c0b2f7Stbbdev deallocate_memory(b);
41951c0b2f7Stbbdev }
42051c0b2f7Stbbdev first_filter = f->next_filter_in_pipeline;
42151c0b2f7Stbbdev f->~base_filter();
42251c0b2f7Stbbdev deallocate_memory(f);
42351c0b2f7Stbbdev }
42451c0b2f7Stbbdev }
42551c0b2f7Stbbdev
add_filter(d1::base_filter & new_fitler)42651c0b2f7Stbbdev void pipeline::add_filter( d1::base_filter& new_fitler ) {
42751c0b2f7Stbbdev __TBB_ASSERT( new_fitler.next_filter_in_pipeline==d1::base_filter::not_in_pipeline(), "filter already part of pipeline?" );
42851c0b2f7Stbbdev new_fitler.my_pipeline = this;
42951c0b2f7Stbbdev if ( first_filter == nullptr )
43051c0b2f7Stbbdev first_filter = &new_fitler;
43151c0b2f7Stbbdev else
43251c0b2f7Stbbdev last_filter->next_filter_in_pipeline = &new_fitler;
43351c0b2f7Stbbdev new_fitler.next_filter_in_pipeline = nullptr;
43451c0b2f7Stbbdev last_filter = &new_fitler;
43551c0b2f7Stbbdev if( new_fitler.is_serial() ) {
43651c0b2f7Stbbdev new_fitler.my_input_buffer = new (allocate_memory(sizeof(input_buffer))) input_buffer( new_fitler.is_ordered() );
43751c0b2f7Stbbdev } else {
43851c0b2f7Stbbdev if( first_filter == &new_fitler && new_fitler.object_may_be_null() ) {
43951c0b2f7Stbbdev //TODO: buffer only needed to hold TLS; could improve
44051c0b2f7Stbbdev new_fitler.my_input_buffer = new (allocate_memory(sizeof(input_buffer))) input_buffer( /*is_ordered*/false );
44151c0b2f7Stbbdev new_fitler.my_input_buffer->create_my_tls();
44251c0b2f7Stbbdev }
44351c0b2f7Stbbdev }
44451c0b2f7Stbbdev }
44551c0b2f7Stbbdev
parallel_pipeline(d1::task_group_context & cxt,std::size_t max_token,const d1::filter_node & fn)44651c0b2f7Stbbdev void __TBB_EXPORTED_FUNC parallel_pipeline(d1::task_group_context& cxt, std::size_t max_token, const d1::filter_node& fn) {
44751c0b2f7Stbbdev pipeline pipe(cxt, max_token);
44851c0b2f7Stbbdev
44951c0b2f7Stbbdev pipe.fill_pipeline(fn);
45051c0b2f7Stbbdev
45151c0b2f7Stbbdev d1::small_object_allocator alloc{};
45251c0b2f7Stbbdev stage_task& st = *alloc.new_object<stage_task>(pipe, alloc);
45351c0b2f7Stbbdev
45451c0b2f7Stbbdev // Start execution of tasks
45551c0b2f7Stbbdev r1::execute_and_wait(st, cxt, pipe.wait_ctx, cxt);
45651c0b2f7Stbbdev }
45751c0b2f7Stbbdev
set_end_of_input(d1::base_filter & bf)45851c0b2f7Stbbdev void __TBB_EXPORTED_FUNC set_end_of_input(d1::base_filter& bf) {
45951c0b2f7Stbbdev __TBB_ASSERT(bf.my_input_buffer, nullptr);
46051c0b2f7Stbbdev __TBB_ASSERT(bf.object_may_be_null(), nullptr);
46151c0b2f7Stbbdev if(bf.is_serial() ) {
46251c0b2f7Stbbdev bf.my_pipeline->end_of_input.store(true, std::memory_order_relaxed);
46351c0b2f7Stbbdev } else {
46451c0b2f7Stbbdev __TBB_ASSERT(bf.my_input_buffer->end_of_input_tls_allocated, nullptr);
46551c0b2f7Stbbdev bf.my_input_buffer->set_my_tls_end_of_input();
46651c0b2f7Stbbdev }
46751c0b2f7Stbbdev }
46851c0b2f7Stbbdev
46951c0b2f7Stbbdev } // namespace r1
47051c0b2f7Stbbdev } // namespace detail
47151c0b2f7Stbbdev } // namespace tbb
472