1 /*
2 Copyright (c) 2005-2023 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #ifndef __TBB_flow_graph_H
18 #define __TBB_flow_graph_H
19
20 #include <atomic>
21 #include <memory>
22 #include <type_traits>
23
24 #include "detail/_config.h"
25 #include "detail/_namespace_injection.h"
26 #include "spin_mutex.h"
27 #include "null_mutex.h"
28 #include "spin_rw_mutex.h"
29 #include "null_rw_mutex.h"
30 #include "detail/_pipeline_filters.h"
31 #include "detail/_task.h"
32 #include "detail/_small_object_pool.h"
33 #include "cache_aligned_allocator.h"
34 #include "detail/_exception.h"
35 #include "detail/_template_helpers.h"
36 #include "detail/_aggregator.h"
37 #include "detail/_allocator_traits.h"
38 #include "detail/_utils.h"
39 #include "profiling.h"
40 #include "task_arena.h"
41
42 #if TBB_USE_PROFILING_TOOLS && ( __unix__ || __APPLE__ )
43 #if __INTEL_COMPILER
44 // Disabled warning "routine is both inline and noinline"
45 #pragma warning (push)
46 #pragma warning( disable: 2196 )
47 #endif
48 #define __TBB_NOINLINE_SYM __attribute__((noinline))
49 #else
50 #define __TBB_NOINLINE_SYM
51 #endif
52
53 #include <tuple>
54 #include <list>
55 #include <queue>
56 #if __TBB_CPP20_CONCEPTS_PRESENT
57 #include <concepts>
58 #endif
59
60 /** @file
61 \brief The graph related classes and functions
62
63 There are some applications that best express dependencies as messages
64 passed between nodes in a graph. These messages may contain data or
65 simply act as signals that a predecessors has completed. The graph
66 class and its associated node classes can be used to express such
67 applications.
68 */
69
70 namespace tbb {
71 namespace detail {
72
73 namespace d1 {
74
75 //! An enumeration the provides the two most common concurrency levels: unlimited and serial
76 enum concurrency { unlimited = 0, serial = 1 };
77
78 //! A generic null type
79 struct null_type {};
80
81 //! An empty class used for messages that mean "I'm done"
82 class continue_msg {};
83
84 } // namespace d1
85
86 #if __TBB_CPP20_CONCEPTS_PRESENT
87 namespace d0 {
88
89 template <typename ReturnType, typename OutputType>
90 concept node_body_return_type = std::same_as<OutputType, tbb::detail::d1::continue_msg> ||
91 std::convertible_to<OutputType, ReturnType>;
92
93 // TODO: consider using std::invocable here
94 template <typename Body, typename Output>
95 concept continue_node_body = std::copy_constructible<Body> &&
requires(Body & body,const tbb::detail::d1::continue_msg & v)96 requires( Body& body, const tbb::detail::d1::continue_msg& v ) {
97 { body(v) } -> node_body_return_type<Output>;
98 };
99
100 template <typename Body, typename Input, typename Output>
101 concept function_node_body = std::copy_constructible<Body> &&
102 std::invocable<Body&, const Input&> &&
103 node_body_return_type<std::invoke_result_t<Body&, const Input&>, Output>;
104
105 template <typename FunctionObject, typename Input, typename Key>
106 concept join_node_function_object = std::copy_constructible<FunctionObject> &&
107 std::invocable<FunctionObject&, const Input&> &&
108 std::convertible_to<std::invoke_result_t<FunctionObject&, const Input&>, Key>;
109
110 template <typename Body, typename Output>
111 concept input_node_body = std::copy_constructible<Body> &&
requires(Body & body,tbb::detail::d1::flow_control & fc)112 requires( Body& body, tbb::detail::d1::flow_control& fc ) {
113 { body(fc) } -> adaptive_same_as<Output>;
114 };
115
116 template <typename Body, typename Input, typename OutputPortsType>
117 concept multifunction_node_body = std::copy_constructible<Body> &&
118 std::invocable<Body&, const Input&, OutputPortsType&>;
119
120 template <typename Sequencer, typename Value>
121 concept sequencer = std::copy_constructible<Sequencer> &&
122 std::invocable<Sequencer&, const Value&> &&
123 std::convertible_to<std::invoke_result_t<Sequencer&, const Value&>, std::size_t>;
124
125 template <typename Body, typename Input, typename GatewayType>
126 concept async_node_body = std::copy_constructible<Body> &&
127 std::invocable<Body&, const Input&, GatewayType&>;
128
129 } // namespace d0
130 #endif // __TBB_CPP20_CONCEPTS_PRESENT
131
132 namespace d1 {
133
134 //! Forward declaration section
135 template< typename T > class sender;
136 template< typename T > class receiver;
137 class continue_receiver;
138
139 template< typename T, typename U > class limiter_node; // needed for resetting decrementer
140
141 template<typename T, typename M> class successor_cache;
142 template<typename T, typename M> class broadcast_cache;
143 template<typename T, typename M> class round_robin_cache;
144 template<typename T, typename M> class predecessor_cache;
145 template<typename T, typename M> class reservable_predecessor_cache;
146
147 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
148 namespace order {
149 struct following;
150 struct preceding;
151 }
152 template<typename Order, typename... Args> struct node_set;
153 #endif
154
155
156 } // namespace d1
157 } // namespace detail
158 } // namespace tbb
159
160 //! The graph class
161 #include "detail/_flow_graph_impl.h"
162
163 namespace tbb {
164 namespace detail {
165 namespace d1 {
166
order_tasks(graph_task * first,graph_task * second)167 static inline std::pair<graph_task*, graph_task*> order_tasks(graph_task* first, graph_task* second) {
168 if (second->priority > first->priority)
169 return std::make_pair(second, first);
170 return std::make_pair(first, second);
171 }
172
173 // submit task if necessary. Returns the non-enqueued task if there is one.
combine_tasks(graph & g,graph_task * left,graph_task * right)174 static inline graph_task* combine_tasks(graph& g, graph_task* left, graph_task* right) {
175 // if no RHS task, don't change left.
176 if (right == nullptr) return left;
177 // right != nullptr
178 if (left == nullptr) return right;
179 if (left == SUCCESSFULLY_ENQUEUED) return right;
180 // left contains a task
181 if (right != SUCCESSFULLY_ENQUEUED) {
182 // both are valid tasks
183 auto tasks_pair = order_tasks(left, right);
184 spawn_in_graph_arena(g, *tasks_pair.first);
185 return tasks_pair.second;
186 }
187 return left;
188 }
189
190 //! Pure virtual template class that defines a sender of messages of type T
191 template< typename T >
192 class sender {
193 public:
~sender()194 virtual ~sender() {}
195
196 //! Request an item from the sender
try_get(T &)197 virtual bool try_get( T & ) { return false; }
198
199 //! Reserves an item in the sender
try_reserve(T &)200 virtual bool try_reserve( T & ) { return false; }
201
202 //! Releases the reserved item
try_release()203 virtual bool try_release( ) { return false; }
204
205 //! Consumes the reserved item
try_consume()206 virtual bool try_consume( ) { return false; }
207
208 protected:
209 //! The output type of this sender
210 typedef T output_type;
211
212 //! The successor type for this node
213 typedef receiver<T> successor_type;
214
215 //! Add a new successor to this node
216 virtual bool register_successor( successor_type &r ) = 0;
217
218 //! Removes a successor from this node
219 virtual bool remove_successor( successor_type &r ) = 0;
220
221 template<typename C>
222 friend bool register_successor(sender<C>& s, receiver<C>& r);
223
224 template<typename C>
225 friend bool remove_successor (sender<C>& s, receiver<C>& r);
226 }; // class sender<T>
227
228 template<typename C>
register_successor(sender<C> & s,receiver<C> & r)229 bool register_successor(sender<C>& s, receiver<C>& r) {
230 return s.register_successor(r);
231 }
232
233 template<typename C>
remove_successor(sender<C> & s,receiver<C> & r)234 bool remove_successor(sender<C>& s, receiver<C>& r) {
235 return s.remove_successor(r);
236 }
237
238 //! Pure virtual template class that defines a receiver of messages of type T
239 template< typename T >
240 class receiver {
241 public:
242 //! Destructor
~receiver()243 virtual ~receiver() {}
244
245 //! Put an item to the receiver
try_put(const T & t)246 bool try_put( const T& t ) {
247 graph_task *res = try_put_task(t);
248 if (!res) return false;
249 if (res != SUCCESSFULLY_ENQUEUED) spawn_in_graph_arena(graph_reference(), *res);
250 return true;
251 }
252
253 //! put item to successor; return task to run the successor if possible.
254 protected:
255 //! The input type of this receiver
256 typedef T input_type;
257
258 //! The predecessor type for this node
259 typedef sender<T> predecessor_type;
260
261 template< typename R, typename B > friend class run_and_put_task;
262 template< typename X, typename Y > friend class broadcast_cache;
263 template< typename X, typename Y > friend class round_robin_cache;
264 virtual graph_task *try_put_task(const T& t) = 0;
265 virtual graph& graph_reference() const = 0;
266
267 template<typename TT, typename M> friend class successor_cache;
is_continue_receiver()268 virtual bool is_continue_receiver() { return false; }
269
270 // TODO revamp: reconsider the inheritance and move node priority out of receiver
priority()271 virtual node_priority_t priority() const { return no_priority; }
272
273 //! Add a predecessor to the node
register_predecessor(predecessor_type &)274 virtual bool register_predecessor( predecessor_type & ) { return false; }
275
276 //! Remove a predecessor from the node
remove_predecessor(predecessor_type &)277 virtual bool remove_predecessor( predecessor_type & ) { return false; }
278
279 template <typename C>
280 friend bool register_predecessor(receiver<C>& r, sender<C>& s);
281 template <typename C>
282 friend bool remove_predecessor (receiver<C>& r, sender<C>& s);
283 }; // class receiver<T>
284
285 template <typename C>
register_predecessor(receiver<C> & r,sender<C> & s)286 bool register_predecessor(receiver<C>& r, sender<C>& s) {
287 return r.register_predecessor(s);
288 }
289
290 template <typename C>
remove_predecessor(receiver<C> & r,sender<C> & s)291 bool remove_predecessor(receiver<C>& r, sender<C>& s) {
292 return r.remove_predecessor(s);
293 }
294
295 //! Base class for receivers of completion messages
296 /** These receivers automatically reset, but cannot be explicitly waited on */
297 class continue_receiver : public receiver< continue_msg > {
298 protected:
299
300 //! Constructor
continue_receiver(int number_of_predecessors,node_priority_t a_priority)301 explicit continue_receiver( int number_of_predecessors, node_priority_t a_priority ) {
302 my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
303 my_current_count = 0;
304 my_priority = a_priority;
305 }
306
307 //! Copy constructor
continue_receiver(const continue_receiver & src)308 continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() {
309 my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
310 my_current_count = 0;
311 my_priority = src.my_priority;
312 }
313
314 //! Increments the trigger threshold
register_predecessor(predecessor_type &)315 bool register_predecessor( predecessor_type & ) override {
316 spin_mutex::scoped_lock l(my_mutex);
317 ++my_predecessor_count;
318 return true;
319 }
320
321 //! Decrements the trigger threshold
322 /** Does not check to see if the removal of the predecessor now makes the current count
323 exceed the new threshold. So removing a predecessor while the graph is active can cause
324 unexpected results. */
remove_predecessor(predecessor_type &)325 bool remove_predecessor( predecessor_type & ) override {
326 spin_mutex::scoped_lock l(my_mutex);
327 --my_predecessor_count;
328 return true;
329 }
330
331 //! The input type
332 typedef continue_msg input_type;
333
334 //! The predecessor type for this node
335 typedef receiver<input_type>::predecessor_type predecessor_type;
336
337 template< typename R, typename B > friend class run_and_put_task;
338 template<typename X, typename Y> friend class broadcast_cache;
339 template<typename X, typename Y> friend class round_robin_cache;
340 // execute body is supposed to be too small to create a task for.
try_put_task(const input_type &)341 graph_task* try_put_task( const input_type & ) override {
342 {
343 spin_mutex::scoped_lock l(my_mutex);
344 if ( ++my_current_count < my_predecessor_count )
345 return SUCCESSFULLY_ENQUEUED;
346 else
347 my_current_count = 0;
348 }
349 graph_task* res = execute();
350 return res? res : SUCCESSFULLY_ENQUEUED;
351 }
352
353 spin_mutex my_mutex;
354 int my_predecessor_count;
355 int my_current_count;
356 int my_initial_predecessor_count;
357 node_priority_t my_priority;
358 // the friend declaration in the base class did not eliminate the "protected class"
359 // error in gcc 4.1.2
360 template<typename U, typename V> friend class limiter_node;
361
reset_receiver(reset_flags f)362 virtual void reset_receiver( reset_flags f ) {
363 my_current_count = 0;
364 if (f & rf_clear_edges) {
365 my_predecessor_count = my_initial_predecessor_count;
366 }
367 }
368
369 //! Does whatever should happen when the threshold is reached
370 /** This should be very fast or else spawn a task. This is
371 called while the sender is blocked in the try_put(). */
372 virtual graph_task* execute() = 0;
373 template<typename TT, typename M> friend class successor_cache;
is_continue_receiver()374 bool is_continue_receiver() override { return true; }
375
priority()376 node_priority_t priority() const override { return my_priority; }
377 }; // class continue_receiver
378
379 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
380 template <typename K, typename T>
key_from_message(const T & t)381 K key_from_message( const T &t ) {
382 return t.key();
383 }
384 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
385
386 } // d1
387 } // detail
388 } // tbb
389
390 #include "detail/_flow_graph_trace_impl.h"
391 #include "detail/_hash_compare.h"
392
393 namespace tbb {
394 namespace detail {
395 namespace d1 {
396
397 #include "detail/_flow_graph_body_impl.h"
398 #include "detail/_flow_graph_cache_impl.h"
399 #include "detail/_flow_graph_types_impl.h"
400
401 using namespace graph_policy_namespace;
402
403 template <typename C, typename N>
graph_iterator(C * g,bool begin)404 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(nullptr)
405 {
406 if (begin) current_node = my_graph->my_nodes;
407 //else it is an end iterator by default
408 }
409
410 template <typename C, typename N>
411 typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() const {
412 __TBB_ASSERT(current_node, "graph_iterator at end");
413 return *operator->();
414 }
415
416 template <typename C, typename N>
417 typename graph_iterator<C,N>::pointer graph_iterator<C,N>::operator->() const {
418 return current_node;
419 }
420
421 template <typename C, typename N>
internal_forward()422 void graph_iterator<C,N>::internal_forward() {
423 if (current_node) current_node = current_node->next;
424 }
425
426 //! Constructs a graph with isolated task_group_context
graph()427 inline graph::graph() : my_wait_context(0), my_nodes(nullptr), my_nodes_last(nullptr), my_task_arena(nullptr) {
428 prepare_task_arena();
429 own_context = true;
430 cancelled = false;
431 caught_exception = false;
432 my_context = new (r1::cache_aligned_allocate(sizeof(task_group_context))) task_group_context(FLOW_TASKS);
433 fgt_graph(this);
434 my_is_active = true;
435 }
436
graph(task_group_context & use_this_context)437 inline graph::graph(task_group_context& use_this_context) :
438 my_wait_context(0), my_context(&use_this_context), my_nodes(nullptr), my_nodes_last(nullptr), my_task_arena(nullptr) {
439 prepare_task_arena();
440 own_context = false;
441 cancelled = false;
442 caught_exception = false;
443 fgt_graph(this);
444 my_is_active = true;
445 }
446
~graph()447 inline graph::~graph() {
448 wait_for_all();
449 if (own_context) {
450 my_context->~task_group_context();
451 r1::cache_aligned_deallocate(my_context);
452 }
453 delete my_task_arena;
454 }
455
reserve_wait()456 inline void graph::reserve_wait() {
457 my_wait_context.reserve();
458 fgt_reserve_wait(this);
459 }
460
release_wait()461 inline void graph::release_wait() {
462 fgt_release_wait(this);
463 my_wait_context.release();
464 }
465
register_node(graph_node * n)466 inline void graph::register_node(graph_node *n) {
467 n->next = nullptr;
468 {
469 spin_mutex::scoped_lock lock(nodelist_mutex);
470 n->prev = my_nodes_last;
471 if (my_nodes_last) my_nodes_last->next = n;
472 my_nodes_last = n;
473 if (!my_nodes) my_nodes = n;
474 }
475 }
476
remove_node(graph_node * n)477 inline void graph::remove_node(graph_node *n) {
478 {
479 spin_mutex::scoped_lock lock(nodelist_mutex);
480 __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
481 if (n->prev) n->prev->next = n->next;
482 if (n->next) n->next->prev = n->prev;
483 if (my_nodes_last == n) my_nodes_last = n->prev;
484 if (my_nodes == n) my_nodes = n->next;
485 }
486 n->prev = n->next = nullptr;
487 }
488
reset(reset_flags f)489 inline void graph::reset( reset_flags f ) {
490 // reset context
491 deactivate_graph(*this);
492
493 my_context->reset();
494 cancelled = false;
495 caught_exception = false;
496 // reset all the nodes comprising the graph
497 for(iterator ii = begin(); ii != end(); ++ii) {
498 graph_node *my_p = &(*ii);
499 my_p->reset_node(f);
500 }
501 // Reattach the arena. Might be useful to run the graph in a particular task_arena
502 // while not limiting graph lifetime to a single task_arena::execute() call.
503 prepare_task_arena( /*reinit=*/true );
504 activate_graph(*this);
505 }
506
cancel()507 inline void graph::cancel() {
508 my_context->cancel_group_execution();
509 }
510
begin()511 inline graph::iterator graph::begin() { return iterator(this, true); }
512
end()513 inline graph::iterator graph::end() { return iterator(this, false); }
514
begin()515 inline graph::const_iterator graph::begin() const { return const_iterator(this, true); }
516
end()517 inline graph::const_iterator graph::end() const { return const_iterator(this, false); }
518
cbegin()519 inline graph::const_iterator graph::cbegin() const { return const_iterator(this, true); }
520
cend()521 inline graph::const_iterator graph::cend() const { return const_iterator(this, false); }
522
graph_node(graph & g)523 inline graph_node::graph_node(graph& g) : my_graph(g) {
524 my_graph.register_node(this);
525 }
526
~graph_node()527 inline graph_node::~graph_node() {
528 my_graph.remove_node(this);
529 }
530
531 #include "detail/_flow_graph_node_impl.h"
532
533
534 //! An executable node that acts as a source, i.e. it has no predecessors
535
536 template < typename Output >
__TBB_requires(std::copyable<Output>)537 __TBB_requires(std::copyable<Output>)
538 class input_node : public graph_node, public sender< Output > {
539 public:
540 //! The type of the output message, which is complete
541 typedef Output output_type;
542
543 //! The type of successors of this node
544 typedef typename sender<output_type>::successor_type successor_type;
545
546 // Input node has no input type
547 typedef null_type input_type;
548
549 //! Constructor for a node with a successor
550 template< typename Body >
551 __TBB_requires(input_node_body<Body, Output>)
552 __TBB_NOINLINE_SYM input_node( graph &g, Body body )
553 : graph_node(g), my_active(false)
554 , my_body( new input_body_leaf< output_type, Body>(body) )
555 , my_init_body( new input_body_leaf< output_type, Body>(body) )
556 , my_successors(this), my_reserved(false), my_has_cached_item(false)
557 {
558 fgt_node_with_body(CODEPTR(), FLOW_INPUT_NODE, &this->my_graph,
559 static_cast<sender<output_type> *>(this), this->my_body);
560 }
561
562 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
563 template <typename Body, typename... Successors>
564 __TBB_requires(input_node_body<Body, Output>)
565 input_node( const node_set<order::preceding, Successors...>& successors, Body body )
566 : input_node(successors.graph_reference(), body)
567 {
568 make_edges(*this, successors);
569 }
570 #endif
571
572 //! Copy constructor
573 __TBB_NOINLINE_SYM input_node( const input_node& src )
574 : graph_node(src.my_graph), sender<Output>()
575 , my_active(false)
576 , my_body(src.my_init_body->clone()), my_init_body(src.my_init_body->clone())
577 , my_successors(this), my_reserved(false), my_has_cached_item(false)
578 {
579 fgt_node_with_body(CODEPTR(), FLOW_INPUT_NODE, &this->my_graph,
580 static_cast<sender<output_type> *>(this), this->my_body);
581 }
582
583 //! The destructor
584 ~input_node() { delete my_body; delete my_init_body; }
585
586 //! Add a new successor to this node
587 bool register_successor( successor_type &r ) override {
588 spin_mutex::scoped_lock lock(my_mutex);
589 my_successors.register_successor(r);
590 if ( my_active )
591 spawn_put();
592 return true;
593 }
594
595 //! Removes a successor from this node
596 bool remove_successor( successor_type &r ) override {
597 spin_mutex::scoped_lock lock(my_mutex);
598 my_successors.remove_successor(r);
599 return true;
600 }
601
602 //! Request an item from the node
603 bool try_get( output_type &v ) override {
604 spin_mutex::scoped_lock lock(my_mutex);
605 if ( my_reserved )
606 return false;
607
608 if ( my_has_cached_item ) {
609 v = my_cached_item;
610 my_has_cached_item = false;
611 return true;
612 }
613 // we've been asked to provide an item, but we have none. enqueue a task to
614 // provide one.
615 if ( my_active )
616 spawn_put();
617 return false;
618 }
619
620 //! Reserves an item.
621 bool try_reserve( output_type &v ) override {
622 spin_mutex::scoped_lock lock(my_mutex);
623 if ( my_reserved ) {
624 return false;
625 }
626
627 if ( my_has_cached_item ) {
628 v = my_cached_item;
629 my_reserved = true;
630 return true;
631 } else {
632 return false;
633 }
634 }
635
636 //! Release a reserved item.
637 /** true = item has been released and so remains in sender, dest must request or reserve future items */
638 bool try_release( ) override {
639 spin_mutex::scoped_lock lock(my_mutex);
640 __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
641 my_reserved = false;
642 if(!my_successors.empty())
643 spawn_put();
644 return true;
645 }
646
647 //! Consumes a reserved item
648 bool try_consume( ) override {
649 spin_mutex::scoped_lock lock(my_mutex);
650 __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
651 my_reserved = false;
652 my_has_cached_item = false;
653 if ( !my_successors.empty() ) {
654 spawn_put();
655 }
656 return true;
657 }
658
659 //! Activates a node that was created in the inactive state
660 void activate() {
661 spin_mutex::scoped_lock lock(my_mutex);
662 my_active = true;
663 if (!my_successors.empty())
664 spawn_put();
665 }
666
667 template<typename Body>
668 Body copy_function_object() {
669 input_body<output_type> &body_ref = *this->my_body;
670 return dynamic_cast< input_body_leaf<output_type, Body> & >(body_ref).get_body();
671 }
672
673 protected:
674
675 //! resets the input_node to its initial state
676 void reset_node( reset_flags f) override {
677 my_active = false;
678 my_reserved = false;
679 my_has_cached_item = false;
680
681 if(f & rf_clear_edges) my_successors.clear();
682 if(f & rf_reset_bodies) {
683 input_body<output_type> *tmp = my_init_body->clone();
684 delete my_body;
685 my_body = tmp;
686 }
687 }
688
689 private:
690 spin_mutex my_mutex;
691 bool my_active;
692 input_body<output_type> *my_body;
693 input_body<output_type> *my_init_body;
694 broadcast_cache< output_type > my_successors;
695 bool my_reserved;
696 bool my_has_cached_item;
697 output_type my_cached_item;
698
699 // used by apply_body_bypass, can invoke body of node.
700 bool try_reserve_apply_body(output_type &v) {
701 spin_mutex::scoped_lock lock(my_mutex);
702 if ( my_reserved ) {
703 return false;
704 }
705 if ( !my_has_cached_item ) {
706 flow_control control;
707
708 fgt_begin_body( my_body );
709
710 my_cached_item = (*my_body)(control);
711 my_has_cached_item = !control.is_pipeline_stopped;
712
713 fgt_end_body( my_body );
714 }
715 if ( my_has_cached_item ) {
716 v = my_cached_item;
717 my_reserved = true;
718 return true;
719 } else {
720 return false;
721 }
722 }
723
724 graph_task* create_put_task() {
725 small_object_allocator allocator{};
726 typedef input_node_task_bypass< input_node<output_type> > task_type;
727 graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
728 my_graph.reserve_wait();
729 return t;
730 }
731
732 //! Spawns a task that applies the body
733 void spawn_put( ) {
734 if(is_graph_active(this->my_graph)) {
735 spawn_in_graph_arena(this->my_graph, *create_put_task());
736 }
737 }
738
739 friend class input_node_task_bypass< input_node<output_type> >;
740 //! Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
741 graph_task* apply_body_bypass( ) {
742 output_type v;
743 if ( !try_reserve_apply_body(v) )
744 return nullptr;
745
746 graph_task *last_task = my_successors.try_put_task(v);
747 if ( last_task )
748 try_consume();
749 else
750 try_release();
751 return last_task;
752 }
753 }; // class input_node
754
755 //! Implements a function node that supports Input -> Output
756 template<typename Input, typename Output = continue_msg, typename Policy = queueing>
__TBB_requires(std::default_initializable<Input> && std::copy_constructible<Input> && std::copy_constructible<Output>)757 __TBB_requires(std::default_initializable<Input> &&
758 std::copy_constructible<Input> &&
759 std::copy_constructible<Output>)
760 class function_node
761 : public graph_node
762 , public function_input< Input, Output, Policy, cache_aligned_allocator<Input> >
763 , public function_output<Output>
764 {
765 typedef cache_aligned_allocator<Input> internals_allocator;
766
767 public:
768 typedef Input input_type;
769 typedef Output output_type;
770 typedef function_input<input_type,output_type,Policy,internals_allocator> input_impl_type;
771 typedef function_input_queue<input_type, internals_allocator> input_queue_type;
772 typedef function_output<output_type> fOutput_type;
773 typedef typename input_impl_type::predecessor_type predecessor_type;
774 typedef typename fOutput_type::successor_type successor_type;
775
776 using input_impl_type::my_predecessors;
777
778 //! Constructor
779 // input_queue_type is allocated here, but destroyed in the function_input_base.
780 // TODO: pass the graph_buffer_policy to the function_input_base so it can all
781 // be done in one place. This would be an interface-breaking change.
782 template< typename Body >
783 __TBB_requires(function_node_body<Body, Input, Output>)
784 __TBB_NOINLINE_SYM function_node( graph &g, size_t concurrency,
785 Body body, Policy = Policy(), node_priority_t a_priority = no_priority )
786 : graph_node(g), input_impl_type(g, concurrency, body, a_priority),
787 fOutput_type(g) {
788 fgt_node_with_body( CODEPTR(), FLOW_FUNCTION_NODE, &this->my_graph,
789 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
790 }
791
792 template <typename Body>
793 __TBB_requires(function_node_body<Body, Input, Output>)
794 function_node( graph& g, size_t concurrency, Body body, node_priority_t a_priority )
795 : function_node(g, concurrency, body, Policy(), a_priority) {}
796
797 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
798 template <typename Body, typename... Args>
799 __TBB_requires(function_node_body<Body, Input, Output>)
800 function_node( const node_set<Args...>& nodes, size_t concurrency, Body body,
801 Policy p = Policy(), node_priority_t a_priority = no_priority )
802 : function_node(nodes.graph_reference(), concurrency, body, p, a_priority) {
803 make_edges_in_order(nodes, *this);
804 }
805
806 template <typename Body, typename... Args>
807 __TBB_requires(function_node_body<Body, Input, Output>)
808 function_node( const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority )
809 : function_node(nodes, concurrency, body, Policy(), a_priority) {}
810 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
811
812 //! Copy constructor
813 __TBB_NOINLINE_SYM function_node( const function_node& src ) :
814 graph_node(src.my_graph),
815 input_impl_type(src),
816 fOutput_type(src.my_graph) {
817 fgt_node_with_body( CODEPTR(), FLOW_FUNCTION_NODE, &this->my_graph,
818 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
819 }
820
821 protected:
822 template< typename R, typename B > friend class run_and_put_task;
823 template<typename X, typename Y> friend class broadcast_cache;
824 template<typename X, typename Y> friend class round_robin_cache;
825 using input_impl_type::try_put_task;
826
827 broadcast_cache<output_type> &successors () override { return fOutput_type::my_successors; }
828
829 void reset_node(reset_flags f) override {
830 input_impl_type::reset_function_input(f);
831 // TODO: use clear() instead.
832 if(f & rf_clear_edges) {
833 successors().clear();
834 my_predecessors.clear();
835 }
836 __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "function_node successors not empty");
837 __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
838 }
839
840 }; // class function_node
841
842 //! implements a function node that supports Input -> (set of outputs)
843 // Output is a tuple of output types.
844 template<typename Input, typename Output, typename Policy = queueing>
__TBB_requires(std::default_initializable<Input> && std::copy_constructible<Input>)845 __TBB_requires(std::default_initializable<Input> &&
846 std::copy_constructible<Input>)
847 class multifunction_node :
848 public graph_node,
849 public multifunction_input
850 <
851 Input,
852 typename wrap_tuple_elements<
853 std::tuple_size<Output>::value, // #elements in tuple
854 multifunction_output, // wrap this around each element
855 Output // the tuple providing the types
856 >::type,
857 Policy,
858 cache_aligned_allocator<Input>
859 >
860 {
861 typedef cache_aligned_allocator<Input> internals_allocator;
862
863 protected:
864 static const int N = std::tuple_size<Output>::value;
865 public:
866 typedef Input input_type;
867 typedef null_type output_type;
868 typedef typename wrap_tuple_elements<N,multifunction_output, Output>::type output_ports_type;
869 typedef multifunction_input<
870 input_type, output_ports_type, Policy, internals_allocator> input_impl_type;
871 typedef function_input_queue<input_type, internals_allocator> input_queue_type;
872 private:
873 using input_impl_type::my_predecessors;
874 public:
875 template<typename Body>
876 __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
877 __TBB_NOINLINE_SYM multifunction_node(
878 graph &g, size_t concurrency,
879 Body body, Policy = Policy(), node_priority_t a_priority = no_priority
880 ) : graph_node(g), input_impl_type(g, concurrency, body, a_priority) {
881 fgt_multioutput_node_with_body<N>(
882 CODEPTR(), FLOW_MULTIFUNCTION_NODE,
883 &this->my_graph, static_cast<receiver<input_type> *>(this),
884 this->output_ports(), this->my_body
885 );
886 }
887
888 template <typename Body>
889 __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
890 __TBB_NOINLINE_SYM multifunction_node(graph& g, size_t concurrency, Body body, node_priority_t a_priority)
891 : multifunction_node(g, concurrency, body, Policy(), a_priority) {}
892
893 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
894 template <typename Body, typename... Args>
895 __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
896 __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body,
897 Policy p = Policy(), node_priority_t a_priority = no_priority)
898 : multifunction_node(nodes.graph_reference(), concurrency, body, p, a_priority) {
899 make_edges_in_order(nodes, *this);
900 }
901
902 template <typename Body, typename... Args>
903 __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
904 __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority)
905 : multifunction_node(nodes, concurrency, body, Policy(), a_priority) {}
906 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
907
908 __TBB_NOINLINE_SYM multifunction_node( const multifunction_node &other) :
909 graph_node(other.my_graph), input_impl_type(other) {
910 fgt_multioutput_node_with_body<N>( CODEPTR(), FLOW_MULTIFUNCTION_NODE,
911 &this->my_graph, static_cast<receiver<input_type> *>(this),
912 this->output_ports(), this->my_body );
913 }
914
915 // all the guts are in multifunction_input...
916 protected:
917 void reset_node(reset_flags f) override { input_impl_type::reset(f); }
918 }; // multifunction_node
919
920 //! split_node: accepts a tuple as input, forwards each element of the tuple to its
921 // successors. The node has unlimited concurrency, so it does not reject inputs.
922 template<typename TupleType>
923 class split_node : public graph_node, public receiver<TupleType> {
924 static const int N = std::tuple_size<TupleType>::value;
925 typedef receiver<TupleType> base_type;
926 public:
927 typedef TupleType input_type;
928 typedef typename wrap_tuple_elements<
929 N, // #elements in tuple
930 multifunction_output, // wrap this around each element
931 TupleType // the tuple providing the types
932 >::type output_ports_type;
933
split_node(graph & g)934 __TBB_NOINLINE_SYM explicit split_node(graph &g)
935 : graph_node(g),
936 my_output_ports(init_output_ports<output_ports_type>::call(g, my_output_ports))
937 {
938 fgt_multioutput_node<N>(CODEPTR(), FLOW_SPLIT_NODE, &this->my_graph,
939 static_cast<receiver<input_type> *>(this), this->output_ports());
940 }
941
942 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
943 template <typename... Args>
split_node(const node_set<Args...> & nodes)944 __TBB_NOINLINE_SYM split_node(const node_set<Args...>& nodes) : split_node(nodes.graph_reference()) {
945 make_edges_in_order(nodes, *this);
946 }
947 #endif
948
split_node(const split_node & other)949 __TBB_NOINLINE_SYM split_node(const split_node& other)
950 : graph_node(other.my_graph), base_type(other),
951 my_output_ports(init_output_ports<output_ports_type>::call(other.my_graph, my_output_ports))
952 {
953 fgt_multioutput_node<N>(CODEPTR(), FLOW_SPLIT_NODE, &this->my_graph,
954 static_cast<receiver<input_type> *>(this), this->output_ports());
955 }
956
output_ports()957 output_ports_type &output_ports() { return my_output_ports; }
958
959 protected:
try_put_task(const TupleType & t)960 graph_task *try_put_task(const TupleType& t) override {
961 // Sending split messages in parallel is not justified, as overheads would prevail.
962 // Also, we do not have successors here. So we just tell the task returned here is successful.
963 return emit_element<N>::emit_this(this->my_graph, t, output_ports());
964 }
reset_node(reset_flags f)965 void reset_node(reset_flags f) override {
966 if (f & rf_clear_edges)
967 clear_element<N>::clear_this(my_output_ports);
968
969 __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "split_node reset failed");
970 }
graph_reference()971 graph& graph_reference() const override {
972 return my_graph;
973 }
974
975 private:
976 output_ports_type my_output_ports;
977 };
978
979 //! Implements an executable node that supports continue_msg -> Output
980 template <typename Output, typename Policy = Policy<void> >
__TBB_requires(std::copy_constructible<Output>)981 __TBB_requires(std::copy_constructible<Output>)
982 class continue_node : public graph_node, public continue_input<Output, Policy>,
983 public function_output<Output> {
984 public:
985 typedef continue_msg input_type;
986 typedef Output output_type;
987 typedef continue_input<Output, Policy> input_impl_type;
988 typedef function_output<output_type> fOutput_type;
989 typedef typename input_impl_type::predecessor_type predecessor_type;
990 typedef typename fOutput_type::successor_type successor_type;
991
992 //! Constructor for executable node with continue_msg -> Output
993 template <typename Body >
994 __TBB_requires(continue_node_body<Body, Output>)
995 __TBB_NOINLINE_SYM continue_node(
996 graph &g,
997 Body body, Policy = Policy(), node_priority_t a_priority = no_priority
998 ) : graph_node(g), input_impl_type( g, body, a_priority ),
999 fOutput_type(g) {
1000 fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
1001
1002 static_cast<receiver<input_type> *>(this),
1003 static_cast<sender<output_type> *>(this), this->my_body );
1004 }
1005
1006 template <typename Body>
1007 __TBB_requires(continue_node_body<Body, Output>)
1008 continue_node( graph& g, Body body, node_priority_t a_priority )
1009 : continue_node(g, body, Policy(), a_priority) {}
1010
1011 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1012 template <typename Body, typename... Args>
1013 __TBB_requires(continue_node_body<Body, Output>)
1014 continue_node( const node_set<Args...>& nodes, Body body,
1015 Policy p = Policy(), node_priority_t a_priority = no_priority )
1016 : continue_node(nodes.graph_reference(), body, p, a_priority ) {
1017 make_edges_in_order(nodes, *this);
1018 }
1019 template <typename Body, typename... Args>
1020 __TBB_requires(continue_node_body<Body, Output>)
1021 continue_node( const node_set<Args...>& nodes, Body body, node_priority_t a_priority)
1022 : continue_node(nodes, body, Policy(), a_priority) {}
1023 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1024
1025 //! Constructor for executable node with continue_msg -> Output
1026 template <typename Body >
1027 __TBB_requires(continue_node_body<Body, Output>)
1028 __TBB_NOINLINE_SYM continue_node(
1029 graph &g, int number_of_predecessors,
1030 Body body, Policy = Policy(), node_priority_t a_priority = no_priority
1031 ) : graph_node(g)
1032 , input_impl_type(g, number_of_predecessors, body, a_priority),
1033 fOutput_type(g) {
1034 fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
1035 static_cast<receiver<input_type> *>(this),
1036 static_cast<sender<output_type> *>(this), this->my_body );
1037 }
1038
1039 template <typename Body>
1040 __TBB_requires(continue_node_body<Body, Output>)
1041 continue_node( graph& g, int number_of_predecessors, Body body, node_priority_t a_priority)
1042 : continue_node(g, number_of_predecessors, body, Policy(), a_priority) {}
1043
1044 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1045 template <typename Body, typename... Args>
1046 __TBB_requires(continue_node_body<Body, Output>)
1047 continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1048 Body body, Policy p = Policy(), node_priority_t a_priority = no_priority )
1049 : continue_node(nodes.graph_reference(), number_of_predecessors, body, p, a_priority) {
1050 make_edges_in_order(nodes, *this);
1051 }
1052
1053 template <typename Body, typename... Args>
1054 __TBB_requires(continue_node_body<Body, Output>)
1055 continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1056 Body body, node_priority_t a_priority )
1057 : continue_node(nodes, number_of_predecessors, body, Policy(), a_priority) {}
1058 #endif
1059
1060 //! Copy constructor
1061 __TBB_NOINLINE_SYM continue_node( const continue_node& src ) :
1062 graph_node(src.my_graph), input_impl_type(src),
1063 function_output<Output>(src.my_graph) {
1064 fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
1065 static_cast<receiver<input_type> *>(this),
1066 static_cast<sender<output_type> *>(this), this->my_body );
1067 }
1068
1069 protected:
1070 template< typename R, typename B > friend class run_and_put_task;
1071 template<typename X, typename Y> friend class broadcast_cache;
1072 template<typename X, typename Y> friend class round_robin_cache;
1073 using input_impl_type::try_put_task;
1074 broadcast_cache<output_type> &successors () override { return fOutput_type::my_successors; }
1075
1076 void reset_node(reset_flags f) override {
1077 input_impl_type::reset_receiver(f);
1078 if(f & rf_clear_edges)successors().clear();
1079 __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "continue_node not reset");
1080 }
1081 }; // continue_node
1082
1083 //! Forwards messages of type T to all successors
1084 template <typename T>
1085 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
1086 public:
1087 typedef T input_type;
1088 typedef T output_type;
1089 typedef typename receiver<input_type>::predecessor_type predecessor_type;
1090 typedef typename sender<output_type>::successor_type successor_type;
1091 private:
1092 broadcast_cache<input_type> my_successors;
1093 public:
1094
broadcast_node(graph & g)1095 __TBB_NOINLINE_SYM explicit broadcast_node(graph& g) : graph_node(g), my_successors(this) {
1096 fgt_node( CODEPTR(), FLOW_BROADCAST_NODE, &this->my_graph,
1097 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1098 }
1099
1100 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1101 template <typename... Args>
broadcast_node(const node_set<Args...> & nodes)1102 broadcast_node(const node_set<Args...>& nodes) : broadcast_node(nodes.graph_reference()) {
1103 make_edges_in_order(nodes, *this);
1104 }
1105 #endif
1106
1107 // Copy constructor
broadcast_node(const broadcast_node & src)1108 __TBB_NOINLINE_SYM broadcast_node( const broadcast_node& src ) : broadcast_node(src.my_graph) {}
1109
1110 //! Adds a successor
register_successor(successor_type & r)1111 bool register_successor( successor_type &r ) override {
1112 my_successors.register_successor( r );
1113 return true;
1114 }
1115
1116 //! Removes s as a successor
remove_successor(successor_type & r)1117 bool remove_successor( successor_type &r ) override {
1118 my_successors.remove_successor( r );
1119 return true;
1120 }
1121
1122 protected:
1123 template< typename R, typename B > friend class run_and_put_task;
1124 template<typename X, typename Y> friend class broadcast_cache;
1125 template<typename X, typename Y> friend class round_robin_cache;
1126 //! build a task to run the successor if possible. Default is old behavior.
try_put_task(const T & t)1127 graph_task *try_put_task(const T& t) override {
1128 graph_task *new_task = my_successors.try_put_task(t);
1129 if (!new_task) new_task = SUCCESSFULLY_ENQUEUED;
1130 return new_task;
1131 }
1132
graph_reference()1133 graph& graph_reference() const override {
1134 return my_graph;
1135 }
1136
reset_node(reset_flags f)1137 void reset_node(reset_flags f) override {
1138 if (f&rf_clear_edges) {
1139 my_successors.clear();
1140 }
1141 __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(), "Error resetting broadcast_node");
1142 }
1143 }; // broadcast_node
1144
1145 //! Forwards messages in arbitrary order
1146 template <typename T>
1147 class buffer_node
1148 : public graph_node
1149 , public reservable_item_buffer< T, cache_aligned_allocator<T> >
1150 , public receiver<T>, public sender<T>
1151 {
1152 typedef cache_aligned_allocator<T> internals_allocator;
1153
1154 public:
1155 typedef T input_type;
1156 typedef T output_type;
1157 typedef typename receiver<input_type>::predecessor_type predecessor_type;
1158 typedef typename sender<output_type>::successor_type successor_type;
1159 typedef buffer_node<T> class_type;
1160
1161 protected:
1162 typedef size_t size_type;
1163 round_robin_cache< T, null_rw_mutex > my_successors;
1164
1165 friend class forward_task_bypass< class_type >;
1166
1167 enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd_task
1168 };
1169
1170 // implements the aggregator_operation concept
1171 class buffer_operation : public aggregated_operation< buffer_operation > {
1172 public:
1173 char type;
1174 T* elem;
1175 graph_task* ltask;
1176 successor_type *r;
1177
buffer_operation(const T & e,op_type t)1178 buffer_operation(const T& e, op_type t) : type(char(t))
1179 , elem(const_cast<T*>(&e)) , ltask(nullptr)
1180 , r(nullptr)
1181 {}
buffer_operation(op_type t)1182 buffer_operation(op_type t) : type(char(t)), elem(nullptr), ltask(nullptr), r(nullptr) {}
1183 };
1184
1185 bool forwarder_busy;
1186 typedef aggregating_functor<class_type, buffer_operation> handler_type;
1187 friend class aggregating_functor<class_type, buffer_operation>;
1188 aggregator< handler_type, buffer_operation> my_aggregator;
1189
handle_operations(buffer_operation * op_list)1190 virtual void handle_operations(buffer_operation *op_list) {
1191 handle_operations_impl(op_list, this);
1192 }
1193
1194 template<typename derived_type>
handle_operations_impl(buffer_operation * op_list,derived_type * derived)1195 void handle_operations_impl(buffer_operation *op_list, derived_type* derived) {
1196 __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1197
1198 buffer_operation *tmp = nullptr;
1199 bool try_forwarding = false;
1200 while (op_list) {
1201 tmp = op_list;
1202 op_list = op_list->next;
1203 switch (tmp->type) {
1204 case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
1205 case rem_succ: internal_rem_succ(tmp); break;
1206 case req_item: internal_pop(tmp); break;
1207 case res_item: internal_reserve(tmp); break;
1208 case rel_res: internal_release(tmp); try_forwarding = true; break;
1209 case con_res: internal_consume(tmp); try_forwarding = true; break;
1210 case put_item: try_forwarding = internal_push(tmp); break;
1211 case try_fwd_task: internal_forward_task(tmp); break;
1212 }
1213 }
1214
1215 derived->order();
1216
1217 if (try_forwarding && !forwarder_busy) {
1218 if(is_graph_active(this->my_graph)) {
1219 forwarder_busy = true;
1220 typedef forward_task_bypass<class_type> task_type;
1221 small_object_allocator allocator{};
1222 graph_task* new_task = allocator.new_object<task_type>(graph_reference(), allocator, *this);
1223 my_graph.reserve_wait();
1224 // tmp should point to the last item handled by the aggregator. This is the operation
1225 // the handling thread enqueued. So modifying that record will be okay.
1226 // TODO revamp: check that the issue is still present
1227 // workaround for icc bug (at least 12.0 and 13.0)
1228 // error: function "tbb::flow::interfaceX::combine_tasks" cannot be called with the given argument list
1229 // argument types are: (graph, graph_task *, graph_task *)
1230 graph_task *z = tmp->ltask;
1231 graph &g = this->my_graph;
1232 tmp->ltask = combine_tasks(g, z, new_task); // in case the op generated a task
1233 }
1234 }
1235 } // handle_operations
1236
grab_forwarding_task(buffer_operation & op_data)1237 inline graph_task *grab_forwarding_task( buffer_operation &op_data) {
1238 return op_data.ltask;
1239 }
1240
enqueue_forwarding_task(buffer_operation & op_data)1241 inline bool enqueue_forwarding_task(buffer_operation &op_data) {
1242 graph_task *ft = grab_forwarding_task(op_data);
1243 if(ft) {
1244 spawn_in_graph_arena(graph_reference(), *ft);
1245 return true;
1246 }
1247 return false;
1248 }
1249
1250 //! This is executed by an enqueued task, the "forwarder"
forward_task()1251 virtual graph_task *forward_task() {
1252 buffer_operation op_data(try_fwd_task);
1253 graph_task *last_task = nullptr;
1254 do {
1255 op_data.status = WAIT;
1256 op_data.ltask = nullptr;
1257 my_aggregator.execute(&op_data);
1258
1259 // workaround for icc bug
1260 graph_task *xtask = op_data.ltask;
1261 graph& g = this->my_graph;
1262 last_task = combine_tasks(g, last_task, xtask);
1263 } while (op_data.status ==SUCCEEDED);
1264 return last_task;
1265 }
1266
1267 //! Register successor
internal_reg_succ(buffer_operation * op)1268 virtual void internal_reg_succ(buffer_operation *op) {
1269 __TBB_ASSERT(op->r, nullptr);
1270 my_successors.register_successor(*(op->r));
1271 op->status.store(SUCCEEDED, std::memory_order_release);
1272 }
1273
1274 //! Remove successor
internal_rem_succ(buffer_operation * op)1275 virtual void internal_rem_succ(buffer_operation *op) {
1276 __TBB_ASSERT(op->r, nullptr);
1277 my_successors.remove_successor(*(op->r));
1278 op->status.store(SUCCEEDED, std::memory_order_release);
1279 }
1280
1281 private:
order()1282 void order() {}
1283
is_item_valid()1284 bool is_item_valid() {
1285 return this->my_item_valid(this->my_tail - 1);
1286 }
1287
try_put_and_add_task(graph_task * & last_task)1288 void try_put_and_add_task(graph_task*& last_task) {
1289 graph_task *new_task = my_successors.try_put_task(this->back());
1290 if (new_task) {
1291 // workaround for icc bug
1292 graph& g = this->my_graph;
1293 last_task = combine_tasks(g, last_task, new_task);
1294 this->destroy_back();
1295 }
1296 }
1297
1298 protected:
1299 //! Tries to forward valid items to successors
internal_forward_task(buffer_operation * op)1300 virtual void internal_forward_task(buffer_operation *op) {
1301 internal_forward_task_impl(op, this);
1302 }
1303
1304 template<typename derived_type>
internal_forward_task_impl(buffer_operation * op,derived_type * derived)1305 void internal_forward_task_impl(buffer_operation *op, derived_type* derived) {
1306 __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1307
1308 if (this->my_reserved || !derived->is_item_valid()) {
1309 op->status.store(FAILED, std::memory_order_release);
1310 this->forwarder_busy = false;
1311 return;
1312 }
1313 // Try forwarding, giving each successor a chance
1314 graph_task* last_task = nullptr;
1315 size_type counter = my_successors.size();
1316 for (; counter > 0 && derived->is_item_valid(); --counter)
1317 derived->try_put_and_add_task(last_task);
1318
1319 op->ltask = last_task; // return task
1320 if (last_task && !counter) {
1321 op->status.store(SUCCEEDED, std::memory_order_release);
1322 }
1323 else {
1324 op->status.store(FAILED, std::memory_order_release);
1325 forwarder_busy = false;
1326 }
1327 }
1328
internal_push(buffer_operation * op)1329 virtual bool internal_push(buffer_operation *op) {
1330 __TBB_ASSERT(op->elem, nullptr);
1331 this->push_back(*(op->elem));
1332 op->status.store(SUCCEEDED, std::memory_order_release);
1333 return true;
1334 }
1335
internal_pop(buffer_operation * op)1336 virtual void internal_pop(buffer_operation *op) {
1337 __TBB_ASSERT(op->elem, nullptr);
1338 if(this->pop_back(*(op->elem))) {
1339 op->status.store(SUCCEEDED, std::memory_order_release);
1340 }
1341 else {
1342 op->status.store(FAILED, std::memory_order_release);
1343 }
1344 }
1345
internal_reserve(buffer_operation * op)1346 virtual void internal_reserve(buffer_operation *op) {
1347 __TBB_ASSERT(op->elem, nullptr);
1348 if(this->reserve_front(*(op->elem))) {
1349 op->status.store(SUCCEEDED, std::memory_order_release);
1350 }
1351 else {
1352 op->status.store(FAILED, std::memory_order_release);
1353 }
1354 }
1355
internal_consume(buffer_operation * op)1356 virtual void internal_consume(buffer_operation *op) {
1357 this->consume_front();
1358 op->status.store(SUCCEEDED, std::memory_order_release);
1359 }
1360
internal_release(buffer_operation * op)1361 virtual void internal_release(buffer_operation *op) {
1362 this->release_front();
1363 op->status.store(SUCCEEDED, std::memory_order_release);
1364 }
1365
1366 public:
1367 //! Constructor
buffer_node(graph & g)1368 __TBB_NOINLINE_SYM explicit buffer_node( graph &g )
1369 : graph_node(g), reservable_item_buffer<T, internals_allocator>(), receiver<T>(),
1370 sender<T>(), my_successors(this), forwarder_busy(false)
1371 {
1372 my_aggregator.initialize_handler(handler_type(this));
1373 fgt_node( CODEPTR(), FLOW_BUFFER_NODE, &this->my_graph,
1374 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1375 }
1376
1377 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1378 template <typename... Args>
buffer_node(const node_set<Args...> & nodes)1379 buffer_node(const node_set<Args...>& nodes) : buffer_node(nodes.graph_reference()) {
1380 make_edges_in_order(nodes, *this);
1381 }
1382 #endif
1383
1384 //! Copy constructor
buffer_node(const buffer_node & src)1385 __TBB_NOINLINE_SYM buffer_node( const buffer_node& src ) : buffer_node(src.my_graph) {}
1386
1387 //
1388 // message sender implementation
1389 //
1390
1391 //! Adds a new successor.
1392 /** Adds successor r to the list of successors; may forward tasks. */
register_successor(successor_type & r)1393 bool register_successor( successor_type &r ) override {
1394 buffer_operation op_data(reg_succ);
1395 op_data.r = &r;
1396 my_aggregator.execute(&op_data);
1397 (void)enqueue_forwarding_task(op_data);
1398 return true;
1399 }
1400
1401 //! Removes a successor.
1402 /** Removes successor r from the list of successors.
1403 It also calls r.remove_predecessor(*this) to remove this node as a predecessor. */
remove_successor(successor_type & r)1404 bool remove_successor( successor_type &r ) override {
1405 // TODO revamp: investigate why full qualification is necessary here
1406 tbb::detail::d1::remove_predecessor(r, *this);
1407 buffer_operation op_data(rem_succ);
1408 op_data.r = &r;
1409 my_aggregator.execute(&op_data);
1410 // even though this operation does not cause a forward, if we are the handler, and
1411 // a forward is scheduled, we may be the first to reach this point after the aggregator,
1412 // and so should check for the task.
1413 (void)enqueue_forwarding_task(op_data);
1414 return true;
1415 }
1416
1417 //! Request an item from the buffer_node
1418 /** true = v contains the returned item<BR>
1419 false = no item has been returned */
try_get(T & v)1420 bool try_get( T &v ) override {
1421 buffer_operation op_data(req_item);
1422 op_data.elem = &v;
1423 my_aggregator.execute(&op_data);
1424 (void)enqueue_forwarding_task(op_data);
1425 return (op_data.status==SUCCEEDED);
1426 }
1427
1428 //! Reserves an item.
1429 /** false = no item can be reserved<BR>
1430 true = an item is reserved */
try_reserve(T & v)1431 bool try_reserve( T &v ) override {
1432 buffer_operation op_data(res_item);
1433 op_data.elem = &v;
1434 my_aggregator.execute(&op_data);
1435 (void)enqueue_forwarding_task(op_data);
1436 return (op_data.status==SUCCEEDED);
1437 }
1438
1439 //! Release a reserved item.
1440 /** true = item has been released and so remains in sender */
try_release()1441 bool try_release() override {
1442 buffer_operation op_data(rel_res);
1443 my_aggregator.execute(&op_data);
1444 (void)enqueue_forwarding_task(op_data);
1445 return true;
1446 }
1447
1448 //! Consumes a reserved item.
1449 /** true = item is removed from sender and reservation removed */
try_consume()1450 bool try_consume() override {
1451 buffer_operation op_data(con_res);
1452 my_aggregator.execute(&op_data);
1453 (void)enqueue_forwarding_task(op_data);
1454 return true;
1455 }
1456
1457 protected:
1458
1459 template< typename R, typename B > friend class run_and_put_task;
1460 template<typename X, typename Y> friend class broadcast_cache;
1461 template<typename X, typename Y> friend class round_robin_cache;
1462 //! receive an item, return a task *if possible
try_put_task(const T & t)1463 graph_task *try_put_task(const T &t) override {
1464 buffer_operation op_data(t, put_item);
1465 my_aggregator.execute(&op_data);
1466 graph_task *ft = grab_forwarding_task(op_data);
1467 // sequencer_nodes can return failure (if an item has been previously inserted)
1468 // We have to spawn the returned task if our own operation fails.
1469
1470 if(ft && op_data.status ==FAILED) {
1471 // we haven't succeeded queueing the item, but for some reason the
1472 // call returned a task (if another request resulted in a successful
1473 // forward this could happen.) Queue the task and reset the pointer.
1474 spawn_in_graph_arena(graph_reference(), *ft); ft = nullptr;
1475 }
1476 else if(!ft && op_data.status ==SUCCEEDED) {
1477 ft = SUCCESSFULLY_ENQUEUED;
1478 }
1479 return ft;
1480 }
1481
graph_reference()1482 graph& graph_reference() const override {
1483 return my_graph;
1484 }
1485
1486 protected:
reset_node(reset_flags f)1487 void reset_node( reset_flags f) override {
1488 reservable_item_buffer<T, internals_allocator>::reset();
1489 // TODO: just clear structures
1490 if (f&rf_clear_edges) {
1491 my_successors.clear();
1492 }
1493 forwarder_busy = false;
1494 }
1495 }; // buffer_node
1496
1497 //! Forwards messages in FIFO order
1498 template <typename T>
1499 class queue_node : public buffer_node<T> {
1500 protected:
1501 typedef buffer_node<T> base_type;
1502 typedef typename base_type::size_type size_type;
1503 typedef typename base_type::buffer_operation queue_operation;
1504 typedef queue_node class_type;
1505
1506 private:
1507 template<typename> friend class buffer_node;
1508
is_item_valid()1509 bool is_item_valid() {
1510 return this->my_item_valid(this->my_head);
1511 }
1512
try_put_and_add_task(graph_task * & last_task)1513 void try_put_and_add_task(graph_task*& last_task) {
1514 graph_task *new_task = this->my_successors.try_put_task(this->front());
1515 if (new_task) {
1516 // workaround for icc bug
1517 graph& graph_ref = this->graph_reference();
1518 last_task = combine_tasks(graph_ref, last_task, new_task);
1519 this->destroy_front();
1520 }
1521 }
1522
1523 protected:
internal_forward_task(queue_operation * op)1524 void internal_forward_task(queue_operation *op) override {
1525 this->internal_forward_task_impl(op, this);
1526 }
1527
internal_pop(queue_operation * op)1528 void internal_pop(queue_operation *op) override {
1529 if ( this->my_reserved || !this->my_item_valid(this->my_head)){
1530 op->status.store(FAILED, std::memory_order_release);
1531 }
1532 else {
1533 this->pop_front(*(op->elem));
1534 op->status.store(SUCCEEDED, std::memory_order_release);
1535 }
1536 }
internal_reserve(queue_operation * op)1537 void internal_reserve(queue_operation *op) override {
1538 if (this->my_reserved || !this->my_item_valid(this->my_head)) {
1539 op->status.store(FAILED, std::memory_order_release);
1540 }
1541 else {
1542 this->reserve_front(*(op->elem));
1543 op->status.store(SUCCEEDED, std::memory_order_release);
1544 }
1545 }
internal_consume(queue_operation * op)1546 void internal_consume(queue_operation *op) override {
1547 this->consume_front();
1548 op->status.store(SUCCEEDED, std::memory_order_release);
1549 }
1550
1551 public:
1552 typedef T input_type;
1553 typedef T output_type;
1554 typedef typename receiver<input_type>::predecessor_type predecessor_type;
1555 typedef typename sender<output_type>::successor_type successor_type;
1556
1557 //! Constructor
queue_node(graph & g)1558 __TBB_NOINLINE_SYM explicit queue_node( graph &g ) : base_type(g) {
1559 fgt_node( CODEPTR(), FLOW_QUEUE_NODE, &(this->my_graph),
1560 static_cast<receiver<input_type> *>(this),
1561 static_cast<sender<output_type> *>(this) );
1562 }
1563
1564 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1565 template <typename... Args>
queue_node(const node_set<Args...> & nodes)1566 queue_node( const node_set<Args...>& nodes) : queue_node(nodes.graph_reference()) {
1567 make_edges_in_order(nodes, *this);
1568 }
1569 #endif
1570
1571 //! Copy constructor
queue_node(const queue_node & src)1572 __TBB_NOINLINE_SYM queue_node( const queue_node& src) : base_type(src) {
1573 fgt_node( CODEPTR(), FLOW_QUEUE_NODE, &(this->my_graph),
1574 static_cast<receiver<input_type> *>(this),
1575 static_cast<sender<output_type> *>(this) );
1576 }
1577
1578
1579 protected:
reset_node(reset_flags f)1580 void reset_node( reset_flags f) override {
1581 base_type::reset_node(f);
1582 }
1583 }; // queue_node
1584
1585 //! Forwards messages in sequence order
1586 template <typename T>
__TBB_requires(std::copyable<T>)1587 __TBB_requires(std::copyable<T>)
1588 class sequencer_node : public queue_node<T> {
1589 function_body< T, size_t > *my_sequencer;
1590 // my_sequencer should be a benign function and must be callable
1591 // from a parallel context. Does this mean it needn't be reset?
1592 public:
1593 typedef T input_type;
1594 typedef T output_type;
1595 typedef typename receiver<input_type>::predecessor_type predecessor_type;
1596 typedef typename sender<output_type>::successor_type successor_type;
1597
1598 //! Constructor
1599 template< typename Sequencer >
1600 __TBB_requires(sequencer<Sequencer, T>)
1601 __TBB_NOINLINE_SYM sequencer_node( graph &g, const Sequencer& s ) : queue_node<T>(g),
1602 my_sequencer(new function_body_leaf< T, size_t, Sequencer>(s) ) {
1603 fgt_node( CODEPTR(), FLOW_SEQUENCER_NODE, &(this->my_graph),
1604 static_cast<receiver<input_type> *>(this),
1605 static_cast<sender<output_type> *>(this) );
1606 }
1607
1608 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1609 template <typename Sequencer, typename... Args>
1610 __TBB_requires(sequencer<Sequencer, T>)
1611 sequencer_node( const node_set<Args...>& nodes, const Sequencer& s)
1612 : sequencer_node(nodes.graph_reference(), s) {
1613 make_edges_in_order(nodes, *this);
1614 }
1615 #endif
1616
1617 //! Copy constructor
1618 __TBB_NOINLINE_SYM sequencer_node( const sequencer_node& src ) : queue_node<T>(src),
1619 my_sequencer( src.my_sequencer->clone() ) {
1620 fgt_node( CODEPTR(), FLOW_SEQUENCER_NODE, &(this->my_graph),
1621 static_cast<receiver<input_type> *>(this),
1622 static_cast<sender<output_type> *>(this) );
1623 }
1624
1625 //! Destructor
1626 ~sequencer_node() { delete my_sequencer; }
1627
1628 protected:
1629 typedef typename buffer_node<T>::size_type size_type;
1630 typedef typename buffer_node<T>::buffer_operation sequencer_operation;
1631
1632 private:
1633 bool internal_push(sequencer_operation *op) override {
1634 size_type tag = (*my_sequencer)(*(op->elem));
1635 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
1636 if (tag < this->my_head) {
1637 // have already emitted a message with this tag
1638 op->status.store(FAILED, std::memory_order_release);
1639 return false;
1640 }
1641 #endif
1642 // cannot modify this->my_tail now; the buffer would be inconsistent.
1643 size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
1644
1645 if (this->size(new_tail) > this->capacity()) {
1646 this->grow_my_array(this->size(new_tail));
1647 }
1648 this->my_tail = new_tail;
1649
1650 const op_stat res = this->place_item(tag, *(op->elem)) ? SUCCEEDED : FAILED;
1651 op->status.store(res, std::memory_order_release);
1652 return res ==SUCCEEDED;
1653 }
1654 }; // sequencer_node
1655
1656 //! Forwards messages in priority order
1657 template<typename T, typename Compare = std::less<T>>
1658 class priority_queue_node : public buffer_node<T> {
1659 public:
1660 typedef T input_type;
1661 typedef T output_type;
1662 typedef buffer_node<T> base_type;
1663 typedef priority_queue_node class_type;
1664 typedef typename receiver<input_type>::predecessor_type predecessor_type;
1665 typedef typename sender<output_type>::successor_type successor_type;
1666
1667 //! Constructor
1668 __TBB_NOINLINE_SYM explicit priority_queue_node( graph &g, const Compare& comp = Compare() )
1669 : buffer_node<T>(g), compare(comp), mark(0) {
1670 fgt_node( CODEPTR(), FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
1671 static_cast<receiver<input_type> *>(this),
1672 static_cast<sender<output_type> *>(this) );
1673 }
1674
1675 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1676 template <typename... Args>
1677 priority_queue_node(const node_set<Args...>& nodes, const Compare& comp = Compare())
1678 : priority_queue_node(nodes.graph_reference(), comp) {
1679 make_edges_in_order(nodes, *this);
1680 }
1681 #endif
1682
1683 //! Copy constructor
priority_queue_node(const priority_queue_node & src)1684 __TBB_NOINLINE_SYM priority_queue_node( const priority_queue_node &src )
1685 : buffer_node<T>(src), mark(0)
1686 {
1687 fgt_node( CODEPTR(), FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
1688 static_cast<receiver<input_type> *>(this),
1689 static_cast<sender<output_type> *>(this) );
1690 }
1691
1692 protected:
1693
reset_node(reset_flags f)1694 void reset_node( reset_flags f) override {
1695 mark = 0;
1696 base_type::reset_node(f);
1697 }
1698
1699 typedef typename buffer_node<T>::size_type size_type;
1700 typedef typename buffer_node<T>::item_type item_type;
1701 typedef typename buffer_node<T>::buffer_operation prio_operation;
1702
1703 //! Tries to forward valid items to successors
internal_forward_task(prio_operation * op)1704 void internal_forward_task(prio_operation *op) override {
1705 this->internal_forward_task_impl(op, this);
1706 }
1707
handle_operations(prio_operation * op_list)1708 void handle_operations(prio_operation *op_list) override {
1709 this->handle_operations_impl(op_list, this);
1710 }
1711
internal_push(prio_operation * op)1712 bool internal_push(prio_operation *op) override {
1713 prio_push(*(op->elem));
1714 op->status.store(SUCCEEDED, std::memory_order_release);
1715 return true;
1716 }
1717
internal_pop(prio_operation * op)1718 void internal_pop(prio_operation *op) override {
1719 // if empty or already reserved, don't pop
1720 if ( this->my_reserved == true || this->my_tail == 0 ) {
1721 op->status.store(FAILED, std::memory_order_release);
1722 return;
1723 }
1724
1725 *(op->elem) = prio();
1726 op->status.store(SUCCEEDED, std::memory_order_release);
1727 prio_pop();
1728
1729 }
1730
1731 // pops the highest-priority item, saves copy
internal_reserve(prio_operation * op)1732 void internal_reserve(prio_operation *op) override {
1733 if (this->my_reserved == true || this->my_tail == 0) {
1734 op->status.store(FAILED, std::memory_order_release);
1735 return;
1736 }
1737 this->my_reserved = true;
1738 *(op->elem) = prio();
1739 reserved_item = *(op->elem);
1740 op->status.store(SUCCEEDED, std::memory_order_release);
1741 prio_pop();
1742 }
1743
internal_consume(prio_operation * op)1744 void internal_consume(prio_operation *op) override {
1745 op->status.store(SUCCEEDED, std::memory_order_release);
1746 this->my_reserved = false;
1747 reserved_item = input_type();
1748 }
1749
internal_release(prio_operation * op)1750 void internal_release(prio_operation *op) override {
1751 op->status.store(SUCCEEDED, std::memory_order_release);
1752 prio_push(reserved_item);
1753 this->my_reserved = false;
1754 reserved_item = input_type();
1755 }
1756
1757 private:
1758 template<typename> friend class buffer_node;
1759
order()1760 void order() {
1761 if (mark < this->my_tail) heapify();
1762 __TBB_ASSERT(mark == this->my_tail, "mark unequal after heapify");
1763 }
1764
is_item_valid()1765 bool is_item_valid() {
1766 return this->my_tail > 0;
1767 }
1768
try_put_and_add_task(graph_task * & last_task)1769 void try_put_and_add_task(graph_task*& last_task) {
1770 graph_task * new_task = this->my_successors.try_put_task(this->prio());
1771 if (new_task) {
1772 // workaround for icc bug
1773 graph& graph_ref = this->graph_reference();
1774 last_task = combine_tasks(graph_ref, last_task, new_task);
1775 prio_pop();
1776 }
1777 }
1778
1779 private:
1780 Compare compare;
1781 size_type mark;
1782
1783 input_type reserved_item;
1784
1785 // in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item
prio_use_tail()1786 bool prio_use_tail() {
1787 __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds before test");
1788 return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
1789 }
1790
1791 // prio_push: checks that the item will fit, expand array if necessary, put at end
prio_push(const T & src)1792 void prio_push(const T &src) {
1793 if ( this->my_tail >= this->my_array_size )
1794 this->grow_my_array( this->my_tail + 1 );
1795 (void) this->place_item(this->my_tail, src);
1796 ++(this->my_tail);
1797 __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push");
1798 }
1799
1800 // prio_pop: deletes highest priority item from the array, and if it is item
1801 // 0, move last item to 0 and reheap. If end of array, just destroy and decrement tail
1802 // and mark. Assumes the array has already been tested for emptiness; no failure.
prio_pop()1803 void prio_pop() {
1804 if (prio_use_tail()) {
1805 // there are newly pushed elements; last one higher than top
1806 // copy the data
1807 this->destroy_item(this->my_tail-1);
1808 --(this->my_tail);
1809 __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
1810 return;
1811 }
1812 this->destroy_item(0);
1813 if(this->my_tail > 1) {
1814 // push the last element down heap
1815 __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), nullptr);
1816 this->move_item(0,this->my_tail - 1);
1817 }
1818 --(this->my_tail);
1819 if(mark > this->my_tail) --mark;
1820 if (this->my_tail > 1) // don't reheap for heap of size 1
1821 reheap();
1822 __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
1823 }
1824
prio()1825 const T& prio() {
1826 return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0);
1827 }
1828
1829 // turn array into heap
heapify()1830 void heapify() {
1831 if(this->my_tail == 0) {
1832 mark = 0;
1833 return;
1834 }
1835 if (!mark) mark = 1;
1836 for (; mark<this->my_tail; ++mark) { // for each unheaped element
1837 size_type cur_pos = mark;
1838 input_type to_place;
1839 this->fetch_item(mark,to_place);
1840 do { // push to_place up the heap
1841 size_type parent = (cur_pos-1)>>1;
1842 if (!compare(this->get_my_item(parent), to_place))
1843 break;
1844 this->move_item(cur_pos, parent);
1845 cur_pos = parent;
1846 } while( cur_pos );
1847 (void) this->place_item(cur_pos, to_place);
1848 }
1849 }
1850
1851 // otherwise heapified array with new root element; rearrange to heap
reheap()1852 void reheap() {
1853 size_type cur_pos=0, child=1;
1854 while (child < mark) {
1855 size_type target = child;
1856 if (child+1<mark &&
1857 compare(this->get_my_item(child),
1858 this->get_my_item(child+1)))
1859 ++target;
1860 // target now has the higher priority child
1861 if (compare(this->get_my_item(target),
1862 this->get_my_item(cur_pos)))
1863 break;
1864 // swap
1865 this->swap_items(cur_pos, target);
1866 cur_pos = target;
1867 child = (cur_pos<<1)+1;
1868 }
1869 }
1870 }; // priority_queue_node
1871
1872 //! Forwards messages only if the threshold has not been reached
1873 /** This node forwards items until its threshold is reached.
1874 It contains no buffering. If the downstream node rejects, the
1875 message is dropped. */
1876 template< typename T, typename DecrementType=continue_msg >
1877 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
1878 public:
1879 typedef T input_type;
1880 typedef T output_type;
1881 typedef typename receiver<input_type>::predecessor_type predecessor_type;
1882 typedef typename sender<output_type>::successor_type successor_type;
1883 //TODO: There is a lack of predefined types for its controlling "decrementer" port. It should be fixed later.
1884
1885 private:
1886 size_t my_threshold;
1887 size_t my_count; // number of successful puts
1888 size_t my_tries; // number of active put attempts
1889 size_t my_future_decrement; // number of active decrement
1890 reservable_predecessor_cache< T, spin_mutex > my_predecessors;
1891 spin_mutex my_mutex;
1892 broadcast_cache< T > my_successors;
1893
1894 //! The internal receiver< DecrementType > that adjusts the count
1895 threshold_regulator< limiter_node<T, DecrementType>, DecrementType > decrement;
1896
decrement_counter(long long delta)1897 graph_task* decrement_counter( long long delta ) {
1898 if ( delta > 0 && size_t(delta) > my_threshold ) {
1899 delta = my_threshold;
1900 }
1901
1902 {
1903 spin_mutex::scoped_lock lock(my_mutex);
1904 if ( delta > 0 && size_t(delta) > my_count ) {
1905 if( my_tries > 0 ) {
1906 my_future_decrement += (size_t(delta) - my_count);
1907 }
1908 my_count = 0;
1909 }
1910 else if ( delta < 0 && size_t(-delta) > my_threshold - my_count ) {
1911 my_count = my_threshold;
1912 }
1913 else {
1914 my_count -= size_t(delta); // absolute value of delta is sufficiently small
1915 }
1916 __TBB_ASSERT(my_count <= my_threshold, "counter values are truncated to be inside the [0, threshold] interval");
1917 }
1918 return forward_task();
1919 }
1920
1921 // Let threshold_regulator call decrement_counter()
1922 friend class threshold_regulator< limiter_node<T, DecrementType>, DecrementType >;
1923
1924 friend class forward_task_bypass< limiter_node<T,DecrementType> >;
1925
check_conditions()1926 bool check_conditions() { // always called under lock
1927 return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
1928 }
1929
1930 // only returns a valid task pointer or nullptr, never SUCCESSFULLY_ENQUEUED
forward_task()1931 graph_task* forward_task() {
1932 input_type v;
1933 graph_task* rval = nullptr;
1934 bool reserved = false;
1935
1936 {
1937 spin_mutex::scoped_lock lock(my_mutex);
1938 if ( check_conditions() )
1939 ++my_tries;
1940 else
1941 return nullptr;
1942 }
1943
1944 //SUCCESS
1945 // if we can reserve and can put, we consume the reservation
1946 // we increment the count and decrement the tries
1947 if ( (my_predecessors.try_reserve(v)) == true ) {
1948 reserved = true;
1949 if ( (rval = my_successors.try_put_task(v)) != nullptr ) {
1950 {
1951 spin_mutex::scoped_lock lock(my_mutex);
1952 ++my_count;
1953 if ( my_future_decrement ) {
1954 if ( my_count > my_future_decrement ) {
1955 my_count -= my_future_decrement;
1956 my_future_decrement = 0;
1957 }
1958 else {
1959 my_future_decrement -= my_count;
1960 my_count = 0;
1961 }
1962 }
1963 --my_tries;
1964 my_predecessors.try_consume();
1965 if ( check_conditions() ) {
1966 if ( is_graph_active(this->my_graph) ) {
1967 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
1968 small_object_allocator allocator{};
1969 graph_task* rtask = allocator.new_object<task_type>( my_graph, allocator, *this );
1970 my_graph.reserve_wait();
1971 spawn_in_graph_arena(graph_reference(), *rtask);
1972 }
1973 }
1974 }
1975 return rval;
1976 }
1977 }
1978 //FAILURE
1979 //if we can't reserve, we decrement the tries
1980 //if we can reserve but can't put, we decrement the tries and release the reservation
1981 {
1982 spin_mutex::scoped_lock lock(my_mutex);
1983 --my_tries;
1984 if (reserved) my_predecessors.try_release();
1985 if ( check_conditions() ) {
1986 if ( is_graph_active(this->my_graph) ) {
1987 small_object_allocator allocator{};
1988 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
1989 graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
1990 my_graph.reserve_wait();
1991 __TBB_ASSERT(!rval, "Have two tasks to handle");
1992 return t;
1993 }
1994 }
1995 return rval;
1996 }
1997 }
1998
initialize()1999 void initialize() {
2000 fgt_node(
2001 CODEPTR(), FLOW_LIMITER_NODE, &this->my_graph,
2002 static_cast<receiver<input_type> *>(this), static_cast<receiver<DecrementType> *>(&decrement),
2003 static_cast<sender<output_type> *>(this)
2004 );
2005 }
2006
2007 public:
2008 //! Constructor
limiter_node(graph & g,size_t threshold)2009 limiter_node(graph &g, size_t threshold)
2010 : graph_node(g), my_threshold(threshold), my_count(0), my_tries(0), my_future_decrement(0),
2011 my_predecessors(this), my_successors(this), decrement(this)
2012 {
2013 initialize();
2014 }
2015
2016 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2017 template <typename... Args>
limiter_node(const node_set<Args...> & nodes,size_t threshold)2018 limiter_node(const node_set<Args...>& nodes, size_t threshold)
2019 : limiter_node(nodes.graph_reference(), threshold) {
2020 make_edges_in_order(nodes, *this);
2021 }
2022 #endif
2023
2024 //! Copy constructor
limiter_node(const limiter_node & src)2025 limiter_node( const limiter_node& src ) : limiter_node(src.my_graph, src.my_threshold) {}
2026
2027 //! The interface for accessing internal receiver< DecrementType > that adjusts the count
decrementer()2028 receiver<DecrementType>& decrementer() { return decrement; }
2029
2030 //! Replace the current successor with this new successor
register_successor(successor_type & r)2031 bool register_successor( successor_type &r ) override {
2032 spin_mutex::scoped_lock lock(my_mutex);
2033 bool was_empty = my_successors.empty();
2034 my_successors.register_successor(r);
2035 //spawn a forward task if this is the only successor
2036 if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
2037 if ( is_graph_active(this->my_graph) ) {
2038 small_object_allocator allocator{};
2039 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2040 graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
2041 my_graph.reserve_wait();
2042 spawn_in_graph_arena(graph_reference(), *t);
2043 }
2044 }
2045 return true;
2046 }
2047
2048 //! Removes a successor from this node
2049 /** r.remove_predecessor(*this) is also called. */
remove_successor(successor_type & r)2050 bool remove_successor( successor_type &r ) override {
2051 // TODO revamp: investigate why qualification is needed for remove_predecessor() call
2052 tbb::detail::d1::remove_predecessor(r, *this);
2053 my_successors.remove_successor(r);
2054 return true;
2055 }
2056
2057 //! Adds src to the list of cached predecessors.
register_predecessor(predecessor_type & src)2058 bool register_predecessor( predecessor_type &src ) override {
2059 spin_mutex::scoped_lock lock(my_mutex);
2060 my_predecessors.add( src );
2061 if ( my_count + my_tries < my_threshold && !my_successors.empty() && is_graph_active(this->my_graph) ) {
2062 small_object_allocator allocator{};
2063 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2064 graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
2065 my_graph.reserve_wait();
2066 spawn_in_graph_arena(graph_reference(), *t);
2067 }
2068 return true;
2069 }
2070
2071 //! Removes src from the list of cached predecessors.
remove_predecessor(predecessor_type & src)2072 bool remove_predecessor( predecessor_type &src ) override {
2073 my_predecessors.remove( src );
2074 return true;
2075 }
2076
2077 protected:
2078
2079 template< typename R, typename B > friend class run_and_put_task;
2080 template<typename X, typename Y> friend class broadcast_cache;
2081 template<typename X, typename Y> friend class round_robin_cache;
2082 //! Puts an item to this receiver
try_put_task(const T & t)2083 graph_task* try_put_task( const T &t ) override {
2084 {
2085 spin_mutex::scoped_lock lock(my_mutex);
2086 if ( my_count + my_tries >= my_threshold )
2087 return nullptr;
2088 else
2089 ++my_tries;
2090 }
2091
2092 graph_task* rtask = my_successors.try_put_task(t);
2093 if ( !rtask ) { // try_put_task failed.
2094 spin_mutex::scoped_lock lock(my_mutex);
2095 --my_tries;
2096 if (check_conditions() && is_graph_active(this->my_graph)) {
2097 small_object_allocator allocator{};
2098 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2099 rtask = allocator.new_object<task_type>(my_graph, allocator, *this);
2100 my_graph.reserve_wait();
2101 }
2102 }
2103 else {
2104 spin_mutex::scoped_lock lock(my_mutex);
2105 ++my_count;
2106 if ( my_future_decrement ) {
2107 if ( my_count > my_future_decrement ) {
2108 my_count -= my_future_decrement;
2109 my_future_decrement = 0;
2110 }
2111 else {
2112 my_future_decrement -= my_count;
2113 my_count = 0;
2114 }
2115 }
2116 --my_tries;
2117 }
2118 return rtask;
2119 }
2120
graph_reference()2121 graph& graph_reference() const override { return my_graph; }
2122
reset_node(reset_flags f)2123 void reset_node( reset_flags f ) override {
2124 my_count = 0;
2125 if ( f & rf_clear_edges ) {
2126 my_predecessors.clear();
2127 my_successors.clear();
2128 }
2129 else {
2130 my_predecessors.reset();
2131 }
2132 decrement.reset_receiver(f);
2133 }
2134 }; // limiter_node
2135
2136 #include "detail/_flow_graph_join_impl.h"
2137
2138 template<typename OutputTuple, typename JP=queueing> class join_node;
2139
2140 template<typename OutputTuple>
2141 class join_node<OutputTuple,reserving>: public unfolded_join_node<std::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
2142 private:
2143 static const int N = std::tuple_size<OutputTuple>::value;
2144 typedef unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
2145 public:
2146 typedef OutputTuple output_type;
2147 typedef typename unfolded_type::input_ports_type input_ports_type;
join_node(graph & g)2148 __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
2149 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2150 this->input_ports(), static_cast< sender< output_type > *>(this) );
2151 }
2152
2153 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2154 template <typename... Args>
2155 __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, reserving = reserving()) : join_node(nodes.graph_reference()) {
2156 make_edges_in_order(nodes, *this);
2157 }
2158 #endif
2159
join_node(const join_node & other)2160 __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2161 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2162 this->input_ports(), static_cast< sender< output_type > *>(this) );
2163 }
2164
2165 };
2166
2167 template<typename OutputTuple>
2168 class join_node<OutputTuple,queueing>: public unfolded_join_node<std::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
2169 private:
2170 static const int N = std::tuple_size<OutputTuple>::value;
2171 typedef unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
2172 public:
2173 typedef OutputTuple output_type;
2174 typedef typename unfolded_type::input_ports_type input_ports_type;
join_node(graph & g)2175 __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
2176 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2177 this->input_ports(), static_cast< sender< output_type > *>(this) );
2178 }
2179
2180 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2181 template <typename... Args>
2182 __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, queueing = queueing()) : join_node(nodes.graph_reference()) {
2183 make_edges_in_order(nodes, *this);
2184 }
2185 #endif
2186
join_node(const join_node & other)2187 __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2188 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2189 this->input_ports(), static_cast< sender< output_type > *>(this) );
2190 }
2191
2192 };
2193
2194 #if __TBB_CPP20_CONCEPTS_PRESENT
2195 // Helper function which is well-formed only if all of the elements in OutputTuple
2196 // satisfies join_node_function_object<body[i], tuple[i], K>
2197 template <typename OutputTuple, typename K,
2198 typename... Functions, std::size_t... Idx>
2199 void join_node_function_objects_helper( std::index_sequence<Idx...> )
2200 requires (std::tuple_size_v<OutputTuple> == sizeof...(Functions)) &&
2201 (... && join_node_function_object<Functions, std::tuple_element_t<Idx, OutputTuple>, K>);
2202
2203 template <typename OutputTuple, typename K, typename... Functions>
2204 concept join_node_functions = requires {
2205 join_node_function_objects_helper<OutputTuple, K, Functions...>(std::make_index_sequence<sizeof...(Functions)>{});
2206 };
2207
2208 #endif
2209
2210 // template for key_matching join_node
2211 // tag_matching join_node is a specialization of key_matching, and is source-compatible.
2212 template<typename OutputTuple, typename K, typename KHash>
2213 class join_node<OutputTuple, key_matching<K, KHash> > : public unfolded_join_node<std::tuple_size<OutputTuple>::value,
2214 key_matching_port, OutputTuple, key_matching<K,KHash> > {
2215 private:
2216 static const int N = std::tuple_size<OutputTuple>::value;
2217 typedef unfolded_join_node<N, key_matching_port, OutputTuple, key_matching<K,KHash> > unfolded_type;
2218 public:
2219 typedef OutputTuple output_type;
2220 typedef typename unfolded_type::input_ports_type input_ports_type;
2221
2222 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
join_node(graph & g)2223 join_node(graph &g) : unfolded_type(g) {}
2224 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
2225
2226 template<typename __TBB_B0, typename __TBB_B1>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1>)2227 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1>)
2228 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
2229 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2230 this->input_ports(), static_cast< sender< output_type > *>(this) );
2231 }
2232 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2>)2233 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2>)
2234 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
2235 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2236 this->input_ports(), static_cast< sender< output_type > *>(this) );
2237 }
2238 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3>)2239 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3>)
2240 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
2241 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2242 this->input_ports(), static_cast< sender< output_type > *>(this) );
2243 }
2244 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3,__TBB_B4>)2245 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4>)
2246 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
2247 unfolded_type(g, b0, b1, b2, b3, b4) {
2248 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2249 this->input_ports(), static_cast< sender< output_type > *>(this) );
2250 }
2251 #if __TBB_VARIADIC_MAX >= 6
2252 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2253 typename __TBB_B5>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3,__TBB_B4,__TBB_B5>)2254 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5>)
2255 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
2256 unfolded_type(g, b0, b1, b2, b3, b4, b5) {
2257 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2258 this->input_ports(), static_cast< sender< output_type > *>(this) );
2259 }
2260 #endif
2261 #if __TBB_VARIADIC_MAX >= 7
2262 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2263 typename __TBB_B5, typename __TBB_B6>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3,__TBB_B4,__TBB_B5,__TBB_B6>)2264 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6>)
2265 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
2266 unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
2267 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2268 this->input_ports(), static_cast< sender< output_type > *>(this) );
2269 }
2270 #endif
2271 #if __TBB_VARIADIC_MAX >= 8
2272 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2273 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3,__TBB_B4,__TBB_B5,__TBB_B6,__TBB_B7>)2274 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7>)
2275 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2276 __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
2277 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2278 this->input_ports(), static_cast< sender< output_type > *>(this) );
2279 }
2280 #endif
2281 #if __TBB_VARIADIC_MAX >= 9
2282 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2283 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3,__TBB_B4,__TBB_B5,__TBB_B6,__TBB_B7,__TBB_B8>)2284 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7, __TBB_B8>)
2285 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2286 __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
2287 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2288 this->input_ports(), static_cast< sender< output_type > *>(this) );
2289 }
2290 #endif
2291 #if __TBB_VARIADIC_MAX >= 10
2292 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2293 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3,__TBB_B4,__TBB_B5,__TBB_B6,__TBB_B7,__TBB_B8,__TBB_B9>)2294 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7, __TBB_B8, __TBB_B9>)
2295 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2296 __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
2297 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2298 this->input_ports(), static_cast< sender< output_type > *>(this) );
2299 }
2300 #endif
2301
2302 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2303 template <
2304 #if (__clang_major__ == 3 && __clang_minor__ == 4)
2305 // clang 3.4 misdeduces 'Args...' for 'node_set' while it can cope with template template parameter.
2306 template<typename...> class node_set,
2307 #endif
2308 typename... Args, typename... Bodies
2309 >
2310 __TBB_requires((sizeof...(Bodies) == 0) || join_node_functions<OutputTuple, K, Bodies...>)
join_node(const node_set<Args...> & nodes,Bodies...bodies)2311 __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, Bodies... bodies)
2312 : join_node(nodes.graph_reference(), bodies...) {
2313 make_edges_in_order(nodes, *this);
2314 }
2315 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2316
join_node(const join_node & other)2317 __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2318 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2319 this->input_ports(), static_cast< sender< output_type > *>(this) );
2320 }
2321
2322 };
2323
2324 // indexer node
2325 #include "detail/_flow_graph_indexer_impl.h"
2326
2327 // TODO: Implement interface with variadic template or tuple
2328 template<typename T0, typename T1=null_type, typename T2=null_type, typename T3=null_type,
2329 typename T4=null_type, typename T5=null_type, typename T6=null_type,
2330 typename T7=null_type, typename T8=null_type, typename T9=null_type> class indexer_node;
2331
2332 //indexer node specializations
2333 template<typename T0>
2334 class indexer_node<T0> : public unfolded_indexer_node<std::tuple<T0> > {
2335 private:
2336 static const int N = 1;
2337 public:
2338 typedef std::tuple<T0> InputTuple;
2339 typedef tagged_msg<size_t, T0> output_type;
2340 typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2341 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2342 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2343 this->input_ports(), static_cast< sender< output_type > *>(this) );
2344 }
2345
2346 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2347 template <typename... Args>
indexer_node(const node_set<Args...> & nodes)2348 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2349 make_edges_in_order(nodes, *this);
2350 }
2351 #endif
2352
2353 // Copy constructor
indexer_node(const indexer_node & other)2354 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2355 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2356 this->input_ports(), static_cast< sender< output_type > *>(this) );
2357 }
2358 };
2359
2360 template<typename T0, typename T1>
2361 class indexer_node<T0, T1> : public unfolded_indexer_node<std::tuple<T0, T1> > {
2362 private:
2363 static const int N = 2;
2364 public:
2365 typedef std::tuple<T0, T1> InputTuple;
2366 typedef tagged_msg<size_t, T0, T1> output_type;
2367 typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2368 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2369 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2370 this->input_ports(), static_cast< sender< output_type > *>(this) );
2371 }
2372
2373 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2374 template <typename... Args>
indexer_node(const node_set<Args...> & nodes)2375 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2376 make_edges_in_order(nodes, *this);
2377 }
2378 #endif
2379
2380 // Copy constructor
indexer_node(const indexer_node & other)2381 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2382 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2383 this->input_ports(), static_cast< sender< output_type > *>(this) );
2384 }
2385
2386 };
2387
2388 template<typename T0, typename T1, typename T2>
2389 class indexer_node<T0, T1, T2> : public unfolded_indexer_node<std::tuple<T0, T1, T2> > {
2390 private:
2391 static const int N = 3;
2392 public:
2393 typedef std::tuple<T0, T1, T2> InputTuple;
2394 typedef tagged_msg<size_t, T0, T1, T2> output_type;
2395 typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2396 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2397 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2398 this->input_ports(), static_cast< sender< output_type > *>(this) );
2399 }
2400
2401 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2402 template <typename... Args>
indexer_node(const node_set<Args...> & nodes)2403 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2404 make_edges_in_order(nodes, *this);
2405 }
2406 #endif
2407
2408 // Copy constructor
indexer_node(const indexer_node & other)2409 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2410 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2411 this->input_ports(), static_cast< sender< output_type > *>(this) );
2412 }
2413
2414 };
2415
2416 template<typename T0, typename T1, typename T2, typename T3>
2417 class indexer_node<T0, T1, T2, T3> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3> > {
2418 private:
2419 static const int N = 4;
2420 public:
2421 typedef std::tuple<T0, T1, T2, T3> InputTuple;
2422 typedef tagged_msg<size_t, T0, T1, T2, T3> output_type;
2423 typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2424 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2425 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2426 this->input_ports(), static_cast< sender< output_type > *>(this) );
2427 }
2428
2429 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2430 template <typename... Args>
indexer_node(const node_set<Args...> & nodes)2431 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2432 make_edges_in_order(nodes, *this);
2433 }
2434 #endif
2435
2436 // Copy constructor
indexer_node(const indexer_node & other)2437 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2438 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2439 this->input_ports(), static_cast< sender< output_type > *>(this) );
2440 }
2441
2442 };
2443
2444 template<typename T0, typename T1, typename T2, typename T3, typename T4>
2445 class indexer_node<T0, T1, T2, T3, T4> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4> > {
2446 private:
2447 static const int N = 5;
2448 public:
2449 typedef std::tuple<T0, T1, T2, T3, T4> InputTuple;
2450 typedef tagged_msg<size_t, T0, T1, T2, T3, T4> output_type;
2451 typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2452 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2453 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2454 this->input_ports(), static_cast< sender< output_type > *>(this) );
2455 }
2456
2457 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2458 template <typename... Args>
indexer_node(const node_set<Args...> & nodes)2459 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2460 make_edges_in_order(nodes, *this);
2461 }
2462 #endif
2463
2464 // Copy constructor
indexer_node(const indexer_node & other)2465 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2466 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2467 this->input_ports(), static_cast< sender< output_type > *>(this) );
2468 }
2469
2470 };
2471
2472 #if __TBB_VARIADIC_MAX >= 6
2473 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
2474 class indexer_node<T0, T1, T2, T3, T4, T5> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5> > {
2475 private:
2476 static const int N = 6;
2477 public:
2478 typedef std::tuple<T0, T1, T2, T3, T4, T5> InputTuple;
2479 typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5> output_type;
2480 typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2481 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2482 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2483 this->input_ports(), static_cast< sender< output_type > *>(this) );
2484 }
2485
2486 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2487 template <typename... Args>
indexer_node(const node_set<Args...> & nodes)2488 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2489 make_edges_in_order(nodes, *this);
2490 }
2491 #endif
2492
2493 // Copy constructor
indexer_node(const indexer_node & other)2494 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2495 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2496 this->input_ports(), static_cast< sender< output_type > *>(this) );
2497 }
2498
2499 };
2500 #endif //variadic max 6
2501
2502 #if __TBB_VARIADIC_MAX >= 7
2503 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2504 typename T6>
2505 class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6> > {
2506 private:
2507 static const int N = 7;
2508 public:
2509 typedef std::tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
2510 typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6> output_type;
2511 typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2512 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2513 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2514 this->input_ports(), static_cast< sender< output_type > *>(this) );
2515 }
2516
2517 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2518 template <typename... Args>
indexer_node(const node_set<Args...> & nodes)2519 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2520 make_edges_in_order(nodes, *this);
2521 }
2522 #endif
2523
2524 // Copy constructor
indexer_node(const indexer_node & other)2525 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2526 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2527 this->input_ports(), static_cast< sender< output_type > *>(this) );
2528 }
2529
2530 };
2531 #endif //variadic max 7
2532
2533 #if __TBB_VARIADIC_MAX >= 8
2534 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2535 typename T6, typename T7>
2536 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
2537 private:
2538 static const int N = 8;
2539 public:
2540 typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
2541 typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7> output_type;
2542 typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2543 indexer_node(graph& g) : unfolded_type(g) {
2544 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2545 this->input_ports(), static_cast< sender< output_type > *>(this) );
2546 }
2547
2548 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2549 template <typename... Args>
indexer_node(const node_set<Args...> & nodes)2550 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2551 make_edges_in_order(nodes, *this);
2552 }
2553 #endif
2554
2555 // Copy constructor
indexer_node(const indexer_node & other)2556 indexer_node( const indexer_node& other ) : unfolded_type(other) {
2557 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2558 this->input_ports(), static_cast< sender< output_type > *>(this) );
2559 }
2560
2561 };
2562 #endif //variadic max 8
2563
2564 #if __TBB_VARIADIC_MAX >= 9
2565 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2566 typename T6, typename T7, typename T8>
2567 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
2568 private:
2569 static const int N = 9;
2570 public:
2571 typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
2572 typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8> output_type;
2573 typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2574 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2575 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2576 this->input_ports(), static_cast< sender< output_type > *>(this) );
2577 }
2578
2579 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2580 template <typename... Args>
indexer_node(const node_set<Args...> & nodes)2581 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2582 make_edges_in_order(nodes, *this);
2583 }
2584 #endif
2585
2586 // Copy constructor
indexer_node(const indexer_node & other)2587 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2588 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2589 this->input_ports(), static_cast< sender< output_type > *>(this) );
2590 }
2591
2592 };
2593 #endif //variadic max 9
2594
2595 #if __TBB_VARIADIC_MAX >= 10
2596 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2597 typename T6, typename T7, typename T8, typename T9>
2598 class indexer_node/*default*/ : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
2599 private:
2600 static const int N = 10;
2601 public:
2602 typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
2603 typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> output_type;
2604 typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2605 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2606 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2607 this->input_ports(), static_cast< sender< output_type > *>(this) );
2608 }
2609
2610 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2611 template <typename... Args>
indexer_node(const node_set<Args...> & nodes)2612 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2613 make_edges_in_order(nodes, *this);
2614 }
2615 #endif
2616
2617 // Copy constructor
indexer_node(const indexer_node & other)2618 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2619 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2620 this->input_ports(), static_cast< sender< output_type > *>(this) );
2621 }
2622
2623 };
2624 #endif //variadic max 10
2625
2626 template< typename T >
internal_make_edge(sender<T> & p,receiver<T> & s)2627 inline void internal_make_edge( sender<T> &p, receiver<T> &s ) {
2628 register_successor(p, s);
2629 fgt_make_edge( &p, &s );
2630 }
2631
2632 //! Makes an edge between a single predecessor and a single successor
2633 template< typename T >
make_edge(sender<T> & p,receiver<T> & s)2634 inline void make_edge( sender<T> &p, receiver<T> &s ) {
2635 internal_make_edge( p, s );
2636 }
2637
2638 //Makes an edge from port 0 of a multi-output predecessor to port 0 of a multi-input successor.
2639 template< typename T, typename V,
2640 typename = typename T::output_ports_type, typename = typename V::input_ports_type >
make_edge(T & output,V & input)2641 inline void make_edge( T& output, V& input) {
2642 make_edge(std::get<0>(output.output_ports()), std::get<0>(input.input_ports()));
2643 }
2644
2645 //Makes an edge from port 0 of a multi-output predecessor to a receiver.
2646 template< typename T, typename R,
2647 typename = typename T::output_ports_type >
make_edge(T & output,receiver<R> & input)2648 inline void make_edge( T& output, receiver<R>& input) {
2649 make_edge(std::get<0>(output.output_ports()), input);
2650 }
2651
2652 //Makes an edge from a sender to port 0 of a multi-input successor.
2653 template< typename S, typename V,
2654 typename = typename V::input_ports_type >
make_edge(sender<S> & output,V & input)2655 inline void make_edge( sender<S>& output, V& input) {
2656 make_edge(output, std::get<0>(input.input_ports()));
2657 }
2658
2659 template< typename T >
internal_remove_edge(sender<T> & p,receiver<T> & s)2660 inline void internal_remove_edge( sender<T> &p, receiver<T> &s ) {
2661 remove_successor( p, s );
2662 fgt_remove_edge( &p, &s );
2663 }
2664
2665 //! Removes an edge between a single predecessor and a single successor
2666 template< typename T >
remove_edge(sender<T> & p,receiver<T> & s)2667 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
2668 internal_remove_edge( p, s );
2669 }
2670
2671 //Removes an edge between port 0 of a multi-output predecessor and port 0 of a multi-input successor.
2672 template< typename T, typename V,
2673 typename = typename T::output_ports_type, typename = typename V::input_ports_type >
remove_edge(T & output,V & input)2674 inline void remove_edge( T& output, V& input) {
2675 remove_edge(std::get<0>(output.output_ports()), std::get<0>(input.input_ports()));
2676 }
2677
2678 //Removes an edge between port 0 of a multi-output predecessor and a receiver.
2679 template< typename T, typename R,
2680 typename = typename T::output_ports_type >
remove_edge(T & output,receiver<R> & input)2681 inline void remove_edge( T& output, receiver<R>& input) {
2682 remove_edge(std::get<0>(output.output_ports()), input);
2683 }
2684 //Removes an edge between a sender and port 0 of a multi-input successor.
2685 template< typename S, typename V,
2686 typename = typename V::input_ports_type >
remove_edge(sender<S> & output,V & input)2687 inline void remove_edge( sender<S>& output, V& input) {
2688 remove_edge(output, std::get<0>(input.input_ports()));
2689 }
2690
2691 //! Returns a copy of the body from a function or continue node
2692 template< typename Body, typename Node >
copy_body(Node & n)2693 Body copy_body( Node &n ) {
2694 return n.template copy_function_object<Body>();
2695 }
2696
2697 //composite_node
2698 template< typename InputTuple, typename OutputTuple > class composite_node;
2699
2700 template< typename... InputTypes, typename... OutputTypes>
2701 class composite_node <std::tuple<InputTypes...>, std::tuple<OutputTypes...> > : public graph_node {
2702
2703 public:
2704 typedef std::tuple< receiver<InputTypes>&... > input_ports_type;
2705 typedef std::tuple< sender<OutputTypes>&... > output_ports_type;
2706
2707 private:
2708 std::unique_ptr<input_ports_type> my_input_ports;
2709 std::unique_ptr<output_ports_type> my_output_ports;
2710
2711 static const size_t NUM_INPUTS = sizeof...(InputTypes);
2712 static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
2713
2714 protected:
reset_node(reset_flags)2715 void reset_node(reset_flags) override {}
2716
2717 public:
composite_node(graph & g)2718 composite_node( graph &g ) : graph_node(g) {
2719 fgt_multiinput_multioutput_node( CODEPTR(), FLOW_COMPOSITE_NODE, this, &this->my_graph );
2720 }
2721
2722 template<typename T1, typename T2>
set_external_ports(T1 && input_ports_tuple,T2 && output_ports_tuple)2723 void set_external_ports(T1&& input_ports_tuple, T2&& output_ports_tuple) {
2724 static_assert(NUM_INPUTS == std::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
2725 static_assert(NUM_OUTPUTS == std::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
2726
2727 fgt_internal_input_alias_helper<T1, NUM_INPUTS>::alias_port( this, input_ports_tuple);
2728 fgt_internal_output_alias_helper<T2, NUM_OUTPUTS>::alias_port( this, output_ports_tuple);
2729
2730 my_input_ports.reset( new input_ports_type(std::forward<T1>(input_ports_tuple)) );
2731 my_output_ports.reset( new output_ports_type(std::forward<T2>(output_ports_tuple)) );
2732 }
2733
2734 template< typename... NodeTypes >
add_visible_nodes(const NodeTypes &...n)2735 void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
2736
2737 template< typename... NodeTypes >
add_nodes(const NodeTypes &...n)2738 void add_nodes(const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
2739
2740
input_ports()2741 input_ports_type& input_ports() {
2742 __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
2743 return *my_input_ports;
2744 }
2745
output_ports()2746 output_ports_type& output_ports() {
2747 __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
2748 return *my_output_ports;
2749 }
2750 }; // class composite_node
2751
2752 //composite_node with only input ports
2753 template< typename... InputTypes>
2754 class composite_node <std::tuple<InputTypes...>, std::tuple<> > : public graph_node {
2755 public:
2756 typedef std::tuple< receiver<InputTypes>&... > input_ports_type;
2757
2758 private:
2759 std::unique_ptr<input_ports_type> my_input_ports;
2760 static const size_t NUM_INPUTS = sizeof...(InputTypes);
2761
2762 protected:
reset_node(reset_flags)2763 void reset_node(reset_flags) override {}
2764
2765 public:
composite_node(graph & g)2766 composite_node( graph &g ) : graph_node(g) {
2767 fgt_composite( CODEPTR(), this, &g );
2768 }
2769
2770 template<typename T>
set_external_ports(T && input_ports_tuple)2771 void set_external_ports(T&& input_ports_tuple) {
2772 static_assert(NUM_INPUTS == std::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
2773
2774 fgt_internal_input_alias_helper<T, NUM_INPUTS>::alias_port( this, input_ports_tuple);
2775
2776 my_input_ports.reset( new input_ports_type(std::forward<T>(input_ports_tuple)) );
2777 }
2778
2779 template< typename... NodeTypes >
add_visible_nodes(const NodeTypes &...n)2780 void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
2781
2782 template< typename... NodeTypes >
add_nodes(const NodeTypes &...n)2783 void add_nodes( const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
2784
2785
input_ports()2786 input_ports_type& input_ports() {
2787 __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
2788 return *my_input_ports;
2789 }
2790
2791 }; // class composite_node
2792
2793 //composite_nodes with only output_ports
2794 template<typename... OutputTypes>
2795 class composite_node <std::tuple<>, std::tuple<OutputTypes...> > : public graph_node {
2796 public:
2797 typedef std::tuple< sender<OutputTypes>&... > output_ports_type;
2798
2799 private:
2800 std::unique_ptr<output_ports_type> my_output_ports;
2801 static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
2802
2803 protected:
reset_node(reset_flags)2804 void reset_node(reset_flags) override {}
2805
2806 public:
composite_node(graph & g)2807 __TBB_NOINLINE_SYM composite_node( graph &g ) : graph_node(g) {
2808 fgt_composite( CODEPTR(), this, &g );
2809 }
2810
2811 template<typename T>
set_external_ports(T && output_ports_tuple)2812 void set_external_ports(T&& output_ports_tuple) {
2813 static_assert(NUM_OUTPUTS == std::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
2814
2815 fgt_internal_output_alias_helper<T, NUM_OUTPUTS>::alias_port( this, output_ports_tuple);
2816
2817 my_output_ports.reset( new output_ports_type(std::forward<T>(output_ports_tuple)) );
2818 }
2819
2820 template<typename... NodeTypes >
add_visible_nodes(const NodeTypes &...n)2821 void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
2822
2823 template<typename... NodeTypes >
add_nodes(const NodeTypes &...n)2824 void add_nodes(const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
2825
2826
output_ports()2827 output_ports_type& output_ports() {
2828 __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
2829 return *my_output_ports;
2830 }
2831
2832 }; // class composite_node
2833
2834 template<typename Gateway>
2835 class async_body_base: no_assign {
2836 public:
2837 typedef Gateway gateway_type;
2838
async_body_base(gateway_type * gateway)2839 async_body_base(gateway_type *gateway): my_gateway(gateway) { }
set_gateway(gateway_type * gateway)2840 void set_gateway(gateway_type *gateway) {
2841 my_gateway = gateway;
2842 }
2843
2844 protected:
2845 gateway_type *my_gateway;
2846 };
2847
2848 template<typename Input, typename Ports, typename Gateway, typename Body>
2849 class async_body: public async_body_base<Gateway> {
2850 private:
2851 Body my_body;
2852
2853 public:
2854 typedef async_body_base<Gateway> base_type;
2855 typedef Gateway gateway_type;
2856
async_body(const Body & body,gateway_type * gateway)2857 async_body(const Body &body, gateway_type *gateway)
2858 : base_type(gateway), my_body(body) { }
2859
operator()2860 void operator()( const Input &v, Ports & ) noexcept(noexcept(tbb::detail::invoke(my_body, v, std::declval<gateway_type&>()))) {
2861 tbb::detail::invoke(my_body, v, *this->my_gateway);
2862 }
2863
get_body()2864 Body get_body() { return my_body; }
2865 };
2866
2867 //! Implements async node
2868 template < typename Input, typename Output,
2869 typename Policy = queueing_lightweight >
__TBB_requires(std::default_initializable<Input> && std::copy_constructible<Input>)2870 __TBB_requires(std::default_initializable<Input> && std::copy_constructible<Input>)
2871 class async_node
2872 : public multifunction_node< Input, std::tuple< Output >, Policy >, public sender< Output >
2873 {
2874 typedef multifunction_node< Input, std::tuple< Output >, Policy > base_type;
2875 typedef multifunction_input<
2876 Input, typename base_type::output_ports_type, Policy, cache_aligned_allocator<Input>> mfn_input_type;
2877
2878 public:
2879 typedef Input input_type;
2880 typedef Output output_type;
2881 typedef receiver<input_type> receiver_type;
2882 typedef receiver<output_type> successor_type;
2883 typedef sender<input_type> predecessor_type;
2884 typedef receiver_gateway<output_type> gateway_type;
2885 typedef async_body_base<gateway_type> async_body_base_type;
2886 typedef typename base_type::output_ports_type output_ports_type;
2887
2888 private:
2889 class receiver_gateway_impl: public receiver_gateway<Output> {
2890 public:
2891 receiver_gateway_impl(async_node* node): my_node(node) {}
2892 void reserve_wait() override {
2893 fgt_async_reserve(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
2894 my_node->my_graph.reserve_wait();
2895 }
2896
2897 void release_wait() override {
2898 async_node* n = my_node;
2899 graph* g = &n->my_graph;
2900 g->release_wait();
2901 fgt_async_commit(static_cast<typename async_node::receiver_type *>(n), g);
2902 }
2903
2904 //! Implements gateway_type::try_put for an external activity to submit a message to FG
2905 bool try_put(const Output &i) override {
2906 return my_node->try_put_impl(i);
2907 }
2908
2909 private:
2910 async_node* my_node;
2911 } my_gateway;
2912
2913 //The substitute of 'this' for member construction, to prevent compiler warnings
2914 async_node* self() { return this; }
2915
2916 //! Implements gateway_type::try_put for an external activity to submit a message to FG
2917 bool try_put_impl(const Output &i) {
2918 multifunction_output<Output> &port_0 = output_port<0>(*this);
2919 broadcast_cache<output_type>& port_successors = port_0.successors();
2920 fgt_async_try_put_begin(this, &port_0);
2921 // TODO revamp: change to std::list<graph_task*>
2922 graph_task_list tasks;
2923 bool is_at_least_one_put_successful = port_successors.gather_successful_try_puts(i, tasks);
2924 __TBB_ASSERT( is_at_least_one_put_successful || tasks.empty(),
2925 "Return status is inconsistent with the method operation." );
2926
2927 while( !tasks.empty() ) {
2928 enqueue_in_graph_arena(this->my_graph, tasks.pop_front());
2929 }
2930 fgt_async_try_put_end(this, &port_0);
2931 return is_at_least_one_put_successful;
2932 }
2933
2934 public:
2935 template<typename Body>
2936 __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2937 __TBB_NOINLINE_SYM async_node(
2938 graph &g, size_t concurrency,
2939 Body body, Policy = Policy(), node_priority_t a_priority = no_priority
2940 ) : base_type(
2941 g, concurrency,
2942 async_body<Input, typename base_type::output_ports_type, gateway_type, Body>
2943 (body, &my_gateway), a_priority ), my_gateway(self()) {
2944 fgt_multioutput_node_with_body<1>(
2945 CODEPTR(), FLOW_ASYNC_NODE,
2946 &this->my_graph, static_cast<receiver<input_type> *>(this),
2947 this->output_ports(), this->my_body
2948 );
2949 }
2950
2951 template <typename Body>
2952 __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2953 __TBB_NOINLINE_SYM async_node(graph& g, size_t concurrency, Body body, node_priority_t a_priority)
2954 : async_node(g, concurrency, body, Policy(), a_priority) {}
2955
2956 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2957 template <typename Body, typename... Args>
2958 __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2959 __TBB_NOINLINE_SYM async_node(
2960 const node_set<Args...>& nodes, size_t concurrency, Body body,
2961 Policy = Policy(), node_priority_t a_priority = no_priority )
2962 : async_node(nodes.graph_reference(), concurrency, body, a_priority) {
2963 make_edges_in_order(nodes, *this);
2964 }
2965
2966 template <typename Body, typename... Args>
2967 __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2968 __TBB_NOINLINE_SYM async_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority)
2969 : async_node(nodes, concurrency, body, Policy(), a_priority) {}
2970 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2971
2972 __TBB_NOINLINE_SYM async_node( const async_node &other ) : base_type(other), sender<Output>(), my_gateway(self()) {
2973 static_cast<async_body_base_type*>(this->my_body->get_body_ptr())->set_gateway(&my_gateway);
2974 static_cast<async_body_base_type*>(this->my_init_body->get_body_ptr())->set_gateway(&my_gateway);
2975
2976 fgt_multioutput_node_with_body<1>( CODEPTR(), FLOW_ASYNC_NODE,
2977 &this->my_graph, static_cast<receiver<input_type> *>(this),
2978 this->output_ports(), this->my_body );
2979 }
2980
2981 gateway_type& gateway() {
2982 return my_gateway;
2983 }
2984
2985 // Define sender< Output >
2986
2987 //! Add a new successor to this node
2988 bool register_successor(successor_type&) override {
2989 __TBB_ASSERT(false, "Successors must be registered only via ports");
2990 return false;
2991 }
2992
2993 //! Removes a successor from this node
2994 bool remove_successor(successor_type&) override {
2995 __TBB_ASSERT(false, "Successors must be removed only via ports");
2996 return false;
2997 }
2998
2999 template<typename Body>
3000 Body copy_function_object() {
3001 typedef multifunction_body<input_type, typename base_type::output_ports_type> mfn_body_type;
3002 typedef async_body<Input, typename base_type::output_ports_type, gateway_type, Body> async_body_type;
3003 mfn_body_type &body_ref = *this->my_body;
3004 async_body_type ab = *static_cast<async_body_type*>(dynamic_cast< multifunction_body_leaf<input_type, typename base_type::output_ports_type, async_body_type> & >(body_ref).get_body_ptr());
3005 return ab.get_body();
3006 }
3007
3008 protected:
3009
3010 void reset_node( reset_flags f) override {
3011 base_type::reset_node(f);
3012 }
3013 };
3014
3015 #include "detail/_flow_graph_node_set_impl.h"
3016
3017 template< typename T >
3018 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
3019 public:
3020 typedef T input_type;
3021 typedef T output_type;
3022 typedef typename receiver<input_type>::predecessor_type predecessor_type;
3023 typedef typename sender<output_type>::successor_type successor_type;
3024
overwrite_node(graph & g)3025 __TBB_NOINLINE_SYM explicit overwrite_node(graph &g)
3026 : graph_node(g), my_successors(this), my_buffer_is_valid(false)
3027 {
3028 fgt_node( CODEPTR(), FLOW_OVERWRITE_NODE, &this->my_graph,
3029 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
3030 }
3031
3032 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3033 template <typename... Args>
overwrite_node(const node_set<Args...> & nodes)3034 overwrite_node(const node_set<Args...>& nodes) : overwrite_node(nodes.graph_reference()) {
3035 make_edges_in_order(nodes, *this);
3036 }
3037 #endif
3038
3039 //! Copy constructor; doesn't take anything from src; default won't work
overwrite_node(const overwrite_node & src)3040 __TBB_NOINLINE_SYM overwrite_node( const overwrite_node& src ) : overwrite_node(src.my_graph) {}
3041
~overwrite_node()3042 ~overwrite_node() {}
3043
register_successor(successor_type & s)3044 bool register_successor( successor_type &s ) override {
3045 spin_mutex::scoped_lock l( my_mutex );
3046 if (my_buffer_is_valid && is_graph_active( my_graph )) {
3047 // We have a valid value that must be forwarded immediately.
3048 bool ret = s.try_put( my_buffer );
3049 if ( ret ) {
3050 // We add the successor that accepted our put
3051 my_successors.register_successor( s );
3052 } else {
3053 // In case of reservation a race between the moment of reservation and register_successor can appear,
3054 // because failed reserve does not mean that register_successor is not ready to put a message immediately.
3055 // We have some sort of infinite loop: reserving node tries to set pull state for the edge,
3056 // but overwrite_node tries to return push state back. That is why we have to break this loop with task creation.
3057 small_object_allocator allocator{};
3058 typedef register_predecessor_task task_type;
3059 graph_task* t = allocator.new_object<task_type>(graph_reference(), allocator, *this, s);
3060 graph_reference().reserve_wait();
3061 spawn_in_graph_arena( my_graph, *t );
3062 }
3063 } else {
3064 // No valid value yet, just add as successor
3065 my_successors.register_successor( s );
3066 }
3067 return true;
3068 }
3069
remove_successor(successor_type & s)3070 bool remove_successor( successor_type &s ) override {
3071 spin_mutex::scoped_lock l( my_mutex );
3072 my_successors.remove_successor(s);
3073 return true;
3074 }
3075
try_get(input_type & v)3076 bool try_get( input_type &v ) override {
3077 spin_mutex::scoped_lock l( my_mutex );
3078 if ( my_buffer_is_valid ) {
3079 v = my_buffer;
3080 return true;
3081 }
3082 return false;
3083 }
3084
3085 //! Reserves an item
try_reserve(T & v)3086 bool try_reserve( T &v ) override {
3087 return try_get(v);
3088 }
3089
3090 //! Releases the reserved item
try_release()3091 bool try_release() override { return true; }
3092
3093 //! Consumes the reserved item
try_consume()3094 bool try_consume() override { return true; }
3095
is_valid()3096 bool is_valid() {
3097 spin_mutex::scoped_lock l( my_mutex );
3098 return my_buffer_is_valid;
3099 }
3100
clear()3101 void clear() {
3102 spin_mutex::scoped_lock l( my_mutex );
3103 my_buffer_is_valid = false;
3104 }
3105
3106 protected:
3107
3108 template< typename R, typename B > friend class run_and_put_task;
3109 template<typename X, typename Y> friend class broadcast_cache;
3110 template<typename X, typename Y> friend class round_robin_cache;
try_put_task(const input_type & v)3111 graph_task* try_put_task( const input_type &v ) override {
3112 spin_mutex::scoped_lock l( my_mutex );
3113 return try_put_task_impl(v);
3114 }
3115
try_put_task_impl(const input_type & v)3116 graph_task * try_put_task_impl(const input_type &v) {
3117 my_buffer = v;
3118 my_buffer_is_valid = true;
3119 graph_task* rtask = my_successors.try_put_task(v);
3120 if (!rtask) rtask = SUCCESSFULLY_ENQUEUED;
3121 return rtask;
3122 }
3123
graph_reference()3124 graph& graph_reference() const override {
3125 return my_graph;
3126 }
3127
3128 //! Breaks an infinite loop between the node reservation and register_successor call
3129 struct register_predecessor_task : public graph_task {
register_predecessor_taskregister_predecessor_task3130 register_predecessor_task(
3131 graph& g, small_object_allocator& allocator, predecessor_type& owner, successor_type& succ)
3132 : graph_task(g, allocator), o(owner), s(succ) {};
3133
executeregister_predecessor_task3134 task* execute(execution_data& ed) override {
3135 // TODO revamp: investigate why qualification is needed for register_successor() call
3136 using tbb::detail::d1::register_predecessor;
3137 using tbb::detail::d1::register_successor;
3138 if ( !register_predecessor(s, o) ) {
3139 register_successor(o, s);
3140 }
3141 finalize<register_predecessor_task>(ed);
3142 return nullptr;
3143 }
3144
cancelregister_predecessor_task3145 task* cancel(execution_data& ed) override {
3146 finalize<register_predecessor_task>(ed);
3147 return nullptr;
3148 }
3149
3150 predecessor_type& o;
3151 successor_type& s;
3152 };
3153
3154 spin_mutex my_mutex;
3155 broadcast_cache< input_type, null_rw_mutex > my_successors;
3156 input_type my_buffer;
3157 bool my_buffer_is_valid;
3158
reset_node(reset_flags f)3159 void reset_node( reset_flags f) override {
3160 my_buffer_is_valid = false;
3161 if (f&rf_clear_edges) {
3162 my_successors.clear();
3163 }
3164 }
3165 }; // overwrite_node
3166
3167 template< typename T >
3168 class write_once_node : public overwrite_node<T> {
3169 public:
3170 typedef T input_type;
3171 typedef T output_type;
3172 typedef overwrite_node<T> base_type;
3173 typedef typename receiver<input_type>::predecessor_type predecessor_type;
3174 typedef typename sender<output_type>::successor_type successor_type;
3175
3176 //! Constructor
write_once_node(graph & g)3177 __TBB_NOINLINE_SYM explicit write_once_node(graph& g) : base_type(g) {
3178 fgt_node( CODEPTR(), FLOW_WRITE_ONCE_NODE, &(this->my_graph),
3179 static_cast<receiver<input_type> *>(this),
3180 static_cast<sender<output_type> *>(this) );
3181 }
3182
3183 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3184 template <typename... Args>
write_once_node(const node_set<Args...> & nodes)3185 write_once_node(const node_set<Args...>& nodes) : write_once_node(nodes.graph_reference()) {
3186 make_edges_in_order(nodes, *this);
3187 }
3188 #endif
3189
3190 //! Copy constructor: call base class copy constructor
write_once_node(const write_once_node & src)3191 __TBB_NOINLINE_SYM write_once_node( const write_once_node& src ) : base_type(src) {
3192 fgt_node( CODEPTR(), FLOW_WRITE_ONCE_NODE, &(this->my_graph),
3193 static_cast<receiver<input_type> *>(this),
3194 static_cast<sender<output_type> *>(this) );
3195 }
3196
3197 protected:
3198 template< typename R, typename B > friend class run_and_put_task;
3199 template<typename X, typename Y> friend class broadcast_cache;
3200 template<typename X, typename Y> friend class round_robin_cache;
try_put_task(const T & v)3201 graph_task *try_put_task( const T &v ) override {
3202 spin_mutex::scoped_lock l( this->my_mutex );
3203 return this->my_buffer_is_valid ? nullptr : this->try_put_task_impl(v);
3204 }
3205 }; // write_once_node
3206
set_name(const graph & g,const char * name)3207 inline void set_name(const graph& g, const char *name) {
3208 fgt_graph_desc(&g, name);
3209 }
3210
3211 template <typename Output>
set_name(const input_node<Output> & node,const char * name)3212 inline void set_name(const input_node<Output>& node, const char *name) {
3213 fgt_node_desc(&node, name);
3214 }
3215
3216 template <typename Input, typename Output, typename Policy>
set_name(const function_node<Input,Output,Policy> & node,const char * name)3217 inline void set_name(const function_node<Input, Output, Policy>& node, const char *name) {
3218 fgt_node_desc(&node, name);
3219 }
3220
3221 template <typename Output, typename Policy>
set_name(const continue_node<Output,Policy> & node,const char * name)3222 inline void set_name(const continue_node<Output,Policy>& node, const char *name) {
3223 fgt_node_desc(&node, name);
3224 }
3225
3226 template <typename T>
set_name(const broadcast_node<T> & node,const char * name)3227 inline void set_name(const broadcast_node<T>& node, const char *name) {
3228 fgt_node_desc(&node, name);
3229 }
3230
3231 template <typename T>
set_name(const buffer_node<T> & node,const char * name)3232 inline void set_name(const buffer_node<T>& node, const char *name) {
3233 fgt_node_desc(&node, name);
3234 }
3235
3236 template <typename T>
set_name(const queue_node<T> & node,const char * name)3237 inline void set_name(const queue_node<T>& node, const char *name) {
3238 fgt_node_desc(&node, name);
3239 }
3240
3241 template <typename T>
set_name(const sequencer_node<T> & node,const char * name)3242 inline void set_name(const sequencer_node<T>& node, const char *name) {
3243 fgt_node_desc(&node, name);
3244 }
3245
3246 template <typename T, typename Compare>
set_name(const priority_queue_node<T,Compare> & node,const char * name)3247 inline void set_name(const priority_queue_node<T, Compare>& node, const char *name) {
3248 fgt_node_desc(&node, name);
3249 }
3250
3251 template <typename T, typename DecrementType>
set_name(const limiter_node<T,DecrementType> & node,const char * name)3252 inline void set_name(const limiter_node<T, DecrementType>& node, const char *name) {
3253 fgt_node_desc(&node, name);
3254 }
3255
3256 template <typename OutputTuple, typename JP>
set_name(const join_node<OutputTuple,JP> & node,const char * name)3257 inline void set_name(const join_node<OutputTuple, JP>& node, const char *name) {
3258 fgt_node_desc(&node, name);
3259 }
3260
3261 template <typename... Types>
set_name(const indexer_node<Types...> & node,const char * name)3262 inline void set_name(const indexer_node<Types...>& node, const char *name) {
3263 fgt_node_desc(&node, name);
3264 }
3265
3266 template <typename T>
set_name(const overwrite_node<T> & node,const char * name)3267 inline void set_name(const overwrite_node<T>& node, const char *name) {
3268 fgt_node_desc(&node, name);
3269 }
3270
3271 template <typename T>
set_name(const write_once_node<T> & node,const char * name)3272 inline void set_name(const write_once_node<T>& node, const char *name) {
3273 fgt_node_desc(&node, name);
3274 }
3275
3276 template<typename Input, typename Output, typename Policy>
set_name(const multifunction_node<Input,Output,Policy> & node,const char * name)3277 inline void set_name(const multifunction_node<Input, Output, Policy>& node, const char *name) {
3278 fgt_multioutput_node_desc(&node, name);
3279 }
3280
3281 template<typename TupleType>
set_name(const split_node<TupleType> & node,const char * name)3282 inline void set_name(const split_node<TupleType>& node, const char *name) {
3283 fgt_multioutput_node_desc(&node, name);
3284 }
3285
3286 template< typename InputTuple, typename OutputTuple >
set_name(const composite_node<InputTuple,OutputTuple> & node,const char * name)3287 inline void set_name(const composite_node<InputTuple, OutputTuple>& node, const char *name) {
3288 fgt_multiinput_multioutput_node_desc(&node, name);
3289 }
3290
3291 template<typename Input, typename Output, typename Policy>
set_name(const async_node<Input,Output,Policy> & node,const char * name)3292 inline void set_name(const async_node<Input, Output, Policy>& node, const char *name)
3293 {
3294 fgt_multioutput_node_desc(&node, name);
3295 }
3296 } // d1
3297 } // detail
3298 } // tbb
3299
3300
3301 // Include deduction guides for node classes
3302 #include "detail/_flow_graph_nodes_deduction.h"
3303
3304 namespace tbb {
3305 namespace flow {
3306 inline namespace v1 {
3307 using detail::d1::receiver;
3308 using detail::d1::sender;
3309
3310 using detail::d1::serial;
3311 using detail::d1::unlimited;
3312
3313 using detail::d1::reset_flags;
3314 using detail::d1::rf_reset_protocol;
3315 using detail::d1::rf_reset_bodies;
3316 using detail::d1::rf_clear_edges;
3317
3318 using detail::d1::graph;
3319 using detail::d1::graph_node;
3320 using detail::d1::continue_msg;
3321
3322 using detail::d1::input_node;
3323 using detail::d1::function_node;
3324 using detail::d1::multifunction_node;
3325 using detail::d1::split_node;
3326 using detail::d1::output_port;
3327 using detail::d1::indexer_node;
3328 using detail::d1::tagged_msg;
3329 using detail::d1::cast_to;
3330 using detail::d1::is_a;
3331 using detail::d1::continue_node;
3332 using detail::d1::overwrite_node;
3333 using detail::d1::write_once_node;
3334 using detail::d1::broadcast_node;
3335 using detail::d1::buffer_node;
3336 using detail::d1::queue_node;
3337 using detail::d1::sequencer_node;
3338 using detail::d1::priority_queue_node;
3339 using detail::d1::limiter_node;
3340 using namespace detail::d1::graph_policy_namespace;
3341 using detail::d1::join_node;
3342 using detail::d1::input_port;
3343 using detail::d1::copy_body;
3344 using detail::d1::make_edge;
3345 using detail::d1::remove_edge;
3346 using detail::d1::tag_value;
3347 using detail::d1::composite_node;
3348 using detail::d1::async_node;
3349 using detail::d1::node_priority_t;
3350 using detail::d1::no_priority;
3351
3352 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3353 using detail::d1::follows;
3354 using detail::d1::precedes;
3355 using detail::d1::make_node_set;
3356 using detail::d1::make_edges;
3357 #endif
3358
3359 } // v1
3360 } // flow
3361
3362 using detail::d1::flow_control;
3363
3364 namespace profiling {
3365 using detail::d1::set_name;
3366 } // profiling
3367
3368 } // tbb
3369
3370
3371 #if TBB_USE_PROFILING_TOOLS && ( __unix__ || __APPLE__ )
3372 // We don't do pragma pop here, since it still gives warning on the USER side
3373 #undef __TBB_NOINLINE_SYM
3374 #endif
3375
3376 #endif // __TBB_flow_graph_H
3377