xref: /oneTBB/include/oneapi/tbb/detail/_aggregator.h (revision f2af7473)
1 /*
2     Copyright (c) 2005-2022 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 
18 #ifndef __TBB_detail__aggregator_H
19 #define __TBB_detail__aggregator_H
20 
21 #include "_assert.h"
22 #include "_utils.h"
23 #include <atomic>
24 #if !__TBBMALLOC_BUILD // TODO: check this macro with TBB Malloc
25 #include "../profiling.h"
26 #endif
27 
28 namespace tbb {
29 namespace detail {
30 namespace d1 {
31 
32 // Base class for aggregated operation
33 template <typename Derived>
34 class aggregated_operation {
35 public:
36     // Zero value means "wait" status, all other values are "user" specified values and
37     // are defined into the scope of a class which uses "status"
38     std::atomic<uintptr_t> status;
39 
40     std::atomic<Derived*> next;
aggregated_operation()41     aggregated_operation() : status{}, next(nullptr) {}
42 }; // class aggregated_operation
43 
44 // Aggregator base class
45 /* An aggregator for collecting operations coming from multiple sources and executing
46    them serially on a single thread.  OperationType must be derived from
47    aggregated_operation. The parameter HandlerType is a functor that will be passed the
48    list of operations and is expected to handle each operation appropriately, setting the
49    status of each operation to non-zero. */
50 template <typename OperationType>
51 class aggregator_generic {
52 public:
aggregator_generic()53     aggregator_generic() : pending_operations(nullptr), handler_busy(false) {}
54 
55     // Execute an operation
56     /* Places an operation into the waitlist (pending_operations), and either handles the list,
57        or waits for the operation to complete, or returns.
58        The long_life_time parameter specifies the life time of the given operation object.
59        Operations with long_life_time == true may be accessed after execution.
60        A "short" life time operation (long_life_time == false) can be destroyed
61        during execution, and so any access to it after it was put into the waitlist,
62        including status check, is invalid. As a consequence, waiting for completion
63        of such operation causes undefined behavior. */
64     template <typename HandlerType>
65     void execute( OperationType* op, HandlerType& handle_operations, bool long_life_time = true ) {
66         // op->status should be read before inserting the operation into the
67         // aggregator waitlist since it can become invalid after executing a
68         // handler (if the operation has 'short' life time.)
69         const uintptr_t status = op->status.load(std::memory_order_relaxed);
70 
71         // ITT note: &(op->status) tag is used to cover accesses to this op node. This
72         // thread has created the operation, and now releases it so that the handler
73         // thread may handle the associated operation w/o triggering a race condition;
74         // thus this tag will be acquired just before the operation is handled in the
75         // handle_operations functor.
76         call_itt_notify(releasing, &(op->status));
77         // insert the operation in the queue.
78         OperationType* res = pending_operations.load(std::memory_order_relaxed);
79         do {
80             op->next.store(res, std::memory_order_relaxed);
81         } while (!pending_operations.compare_exchange_strong(res, op));
82         if (!res) { // first in the list; handle the operations
83             // ITT note: &pending_operations tag covers access to the handler_busy flag,
84             // which this waiting handler thread will try to set before entering
85             // handle_operations.
86             call_itt_notify(acquired, &pending_operations);
87             start_handle_operations(handle_operations);
88             // The operation with 'short' life time can already be destroyed
89             if (long_life_time)
90                 __TBB_ASSERT(op->status.load(std::memory_order_relaxed), nullptr);
91         }
92         // Not first; wait for op to be ready
93         else if (!status) { // operation is blocking here.
94             __TBB_ASSERT(long_life_time, "Waiting for an operation object that might be destroyed during processing");
95             call_itt_notify(prepare, &(op->status));
96             spin_wait_while_eq(op->status, uintptr_t(0));
97         }
98    }
99 
100 private:
101     // Trigger the handling of operations when the handler is free
102     template <typename HandlerType>
start_handle_operations(HandlerType & handle_operations)103     void start_handle_operations( HandlerType& handle_operations ) {
104         OperationType* op_list;
105 
106         // ITT note: &handler_busy tag covers access to pending_operations as it is passed
107         // between active and waiting handlers.  Below, the waiting handler waits until
108         // the active handler releases, and the waiting handler acquires &handler_busy as
109         // it becomes the active_handler. The release point is at the end of this
110         // function, when all operations in pending_operations have been handled by the
111         // owner of this aggregator.
112         call_itt_notify(prepare, &handler_busy);
113         // get the handler_busy:
114         // only one thread can possibly spin here at a time
115         spin_wait_until_eq(handler_busy, uintptr_t(0));
116         call_itt_notify(acquired, &handler_busy);
117         // acquire fence not necessary here due to causality rule and surrounding atomics
118         handler_busy.store(1, std::memory_order_relaxed);
119 
120         // ITT note: &pending_operations tag covers access to the handler_busy flag
121         // itself. Capturing the state of the pending_operations signifies that
122         // handler_busy has been set and a new active handler will now process that list's
123         // operations.
124         call_itt_notify(releasing, &pending_operations);
125         // grab pending_operations
126         op_list = pending_operations.exchange(nullptr);
127 
128         // handle all the operations
129         handle_operations(op_list);
130 
131         // release the handler
132         handler_busy.store(0, std::memory_order_release);
133     }
134 
135     // An atomically updated list (aka mailbox) of pending operations
136     std::atomic<OperationType*> pending_operations;
137     // Controls threads access to handle_operations
138     std::atomic<uintptr_t> handler_busy;
139 }; // class aggregator_generic
140 
141 template <typename HandlerType, typename OperationType>
142 class aggregator : public aggregator_generic<OperationType> {
143     HandlerType handle_operations;
144 public:
145     aggregator() = default;
146 
initialize_handler(HandlerType h)147     void initialize_handler( HandlerType h ) { handle_operations = h; }
148 
execute(OperationType * op)149     void execute(OperationType* op) {
150         aggregator_generic<OperationType>::execute(op, handle_operations);
151     }
152 }; // class aggregator
153 
154 // the most-compatible friend declaration (vs, gcc, icc) is
155 // template<class U, class V> friend class aggregating_functor;
156 template <typename AggregatingClass, typename OperationList>
157 class aggregating_functor {
158     AggregatingClass* my_object{nullptr};
159 public:
160     aggregating_functor() = default;
aggregating_functor(AggregatingClass * object)161     aggregating_functor( AggregatingClass* object ) : my_object(object) {
162         __TBB_ASSERT(my_object, nullptr);
163     }
164 
operator()165     void operator()( OperationList* op_list ) {
166         __TBB_ASSERT(my_object, nullptr);
167         my_object->handle_operations(op_list);
168     }
169 }; // class aggregating_functor
170 
171 
172 } // namespace d1
173 } // namespace detail
174 } // namespace tbb
175 
176 #endif // __TBB_detail__aggregator_H
177