1 /*
2     Copyright (c) 2005-2022 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB__flow_graph_cache_impl_H
18 #define __TBB__flow_graph_cache_impl_H
19 
20 #ifndef __TBB_flow_graph_H
21 #error Do not #include this internal file directly; use public TBB headers instead.
22 #endif
23 
24 // included in namespace tbb::detail::d1 (in flow_graph.h)
25 
26 //! A node_cache maintains a std::queue of elements of type T.  Each operation is protected by a lock.
27 template< typename T, typename M=spin_mutex >
28 class node_cache {
29     public:
30 
31     typedef size_t size_type;
32 
empty()33     bool empty() {
34         typename mutex_type::scoped_lock lock( my_mutex );
35         return internal_empty();
36     }
37 
add(T & n)38     void add( T &n ) {
39         typename mutex_type::scoped_lock lock( my_mutex );
40         internal_push(n);
41     }
42 
remove(T & n)43     void remove( T &n ) {
44         typename mutex_type::scoped_lock lock( my_mutex );
45         for ( size_t i = internal_size(); i != 0; --i ) {
46             T &s = internal_pop();
47             if ( &s == &n )
48                 break;  // only remove one predecessor per request
49             internal_push(s);
50         }
51     }
52 
clear()53     void clear() {
54         while( !my_q.empty()) (void)my_q.pop();
55     }
56 
57 protected:
58 
59     typedef M mutex_type;
60     mutex_type my_mutex;
61     std::queue< T * > my_q;
62 
63     // Assumes lock is held
internal_empty()64     inline bool internal_empty( )  {
65         return my_q.empty();
66     }
67 
68     // Assumes lock is held
internal_size()69     inline size_type internal_size( )  {
70         return my_q.size();
71     }
72 
73     // Assumes lock is held
internal_push(T & n)74     inline void internal_push( T &n )  {
75         my_q.push(&n);
76     }
77 
78     // Assumes lock is held
internal_pop()79     inline T &internal_pop() {
80         T *v = my_q.front();
81         my_q.pop();
82         return *v;
83     }
84 
85 };
86 
87 //! A cache of predecessors that only supports try_get
88 template< typename T, typename M=spin_mutex >
89 class predecessor_cache : public node_cache< sender<T>, M > {
90 public:
91     typedef M mutex_type;
92     typedef T output_type;
93     typedef sender<output_type> predecessor_type;
94     typedef receiver<output_type> successor_type;
95 
predecessor_cache(successor_type * owner)96     predecessor_cache( successor_type* owner ) : my_owner( owner ) {
97         __TBB_ASSERT( my_owner, "predecessor_cache should have an owner." );
98         // Do not work with the passed pointer here as it may not be fully initialized yet
99     }
100 
get_item(output_type & v)101     bool get_item( output_type& v ) {
102 
103         bool msg = false;
104 
105         do {
106             predecessor_type *src;
107             {
108                 typename mutex_type::scoped_lock lock(this->my_mutex);
109                 if ( this->internal_empty() ) {
110                     break;
111                 }
112                 src = &this->internal_pop();
113             }
114 
115             // Try to get from this sender
116             msg = src->try_get( v );
117 
118             if (msg == false) {
119                 // Relinquish ownership of the edge
120                 register_successor(*src, *my_owner);
121             } else {
122                 // Retain ownership of the edge
123                 this->add(*src);
124             }
125         } while ( msg == false );
126         return msg;
127     }
128 
129     // If we are removing arcs (rf_clear_edges), call clear() rather than reset().
reset()130     void reset() {
131         for(;;) {
132             predecessor_type *src;
133             {
134                 if (this->internal_empty()) break;
135                 src = &this->internal_pop();
136             }
137             register_successor(*src, *my_owner);
138         }
139     }
140 
141 protected:
142     successor_type* my_owner;
143 };
144 
145 //! An cache of predecessors that supports requests and reservations
146 template< typename T, typename M=spin_mutex >
147 class reservable_predecessor_cache : public predecessor_cache< T, M > {
148 public:
149     typedef M mutex_type;
150     typedef T output_type;
151     typedef sender<T> predecessor_type;
152     typedef receiver<T> successor_type;
153 
reservable_predecessor_cache(successor_type * owner)154     reservable_predecessor_cache( successor_type* owner )
155         : predecessor_cache<T,M>(owner), reserved_src(nullptr)
156     {
157         // Do not work with the passed pointer here as it may not be fully initialized yet
158     }
159 
try_reserve(output_type & v)160     bool try_reserve( output_type &v ) {
161         bool msg = false;
162 
163         do {
164             predecessor_type* pred = nullptr;
165             {
166                 typename mutex_type::scoped_lock lock(this->my_mutex);
167                 if ( reserved_src.load(std::memory_order_relaxed) || this->internal_empty() )
168                     return false;
169 
170                 pred = &this->internal_pop();
171                 reserved_src.store(pred, std::memory_order_relaxed);
172             }
173 
174             // Try to get from this sender
175             msg = pred->try_reserve( v );
176 
177             if (msg == false) {
178                 typename mutex_type::scoped_lock lock(this->my_mutex);
179                 // Relinquish ownership of the edge
180                 register_successor( *pred, *this->my_owner );
181                 reserved_src.store(nullptr, std::memory_order_relaxed);
182             } else {
183                 // Retain ownership of the edge
184                 this->add( *pred);
185             }
186         } while ( msg == false );
187 
188         return msg;
189     }
190 
try_release()191     bool try_release() {
192         reserved_src.load(std::memory_order_relaxed)->try_release();
193         reserved_src.store(nullptr, std::memory_order_relaxed);
194         return true;
195     }
196 
try_consume()197     bool try_consume() {
198         reserved_src.load(std::memory_order_relaxed)->try_consume();
199         reserved_src.store(nullptr, std::memory_order_relaxed);
200         return true;
201     }
202 
reset()203     void reset() {
204         reserved_src.store(nullptr, std::memory_order_relaxed);
205         predecessor_cache<T, M>::reset();
206     }
207 
clear()208     void clear() {
209         reserved_src.store(nullptr, std::memory_order_relaxed);
210         predecessor_cache<T, M>::clear();
211     }
212 
213 private:
214     std::atomic<predecessor_type*> reserved_src;
215 };
216 
217 
218 //! An abstract cache of successors
219 template<typename T, typename M=spin_rw_mutex >
220 class successor_cache : no_copy {
221 protected:
222 
223     typedef M mutex_type;
224     mutex_type my_mutex;
225 
226     typedef receiver<T> successor_type;
227     typedef receiver<T>* pointer_type;
228     typedef sender<T> owner_type;
229     // TODO revamp: introduce heapified collection of successors for strict priorities
230     typedef std::list< pointer_type > successors_type;
231     successors_type my_successors;
232 
233     owner_type* my_owner;
234 
235 public:
successor_cache(owner_type * owner)236     successor_cache( owner_type* owner ) : my_owner(owner) {
237         // Do not work with the passed pointer here as it may not be fully initialized yet
238     }
239 
~successor_cache()240     virtual ~successor_cache() {}
241 
register_successor(successor_type & r)242     void register_successor( successor_type& r ) {
243         typename mutex_type::scoped_lock l(my_mutex, true);
244         if( r.priority() != no_priority )
245             my_successors.push_front( &r );
246         else
247             my_successors.push_back( &r );
248     }
249 
remove_successor(successor_type & r)250     void remove_successor( successor_type& r ) {
251         typename mutex_type::scoped_lock l(my_mutex, true);
252         for ( typename successors_type::iterator i = my_successors.begin();
253               i != my_successors.end(); ++i ) {
254             if ( *i == & r ) {
255                 my_successors.erase(i);
256                 break;
257             }
258         }
259     }
260 
empty()261     bool empty() {
262         typename mutex_type::scoped_lock l(my_mutex, false);
263         return my_successors.empty();
264     }
265 
clear()266     void clear() {
267         my_successors.clear();
268     }
269 
270     virtual graph_task* try_put_task( const T& t ) = 0;
271 };  // successor_cache<T>
272 
273 //! An abstract cache of successors, specialized to continue_msg
274 template<typename M>
275 class successor_cache< continue_msg, M > : no_copy {
276 protected:
277 
278     typedef M mutex_type;
279     mutex_type my_mutex;
280 
281     typedef receiver<continue_msg> successor_type;
282     typedef receiver<continue_msg>* pointer_type;
283     typedef sender<continue_msg> owner_type;
284     typedef std::list< pointer_type > successors_type;
285     successors_type my_successors;
286     owner_type* my_owner;
287 
288 public:
successor_cache(sender<continue_msg> * owner)289     successor_cache( sender<continue_msg>* owner ) : my_owner(owner) {
290         // Do not work with the passed pointer here as it may not be fully initialized yet
291     }
292 
~successor_cache()293     virtual ~successor_cache() {}
294 
register_successor(successor_type & r)295     void register_successor( successor_type& r ) {
296         typename mutex_type::scoped_lock l(my_mutex, true);
297         if( r.priority() != no_priority )
298             my_successors.push_front( &r );
299         else
300             my_successors.push_back( &r );
301         __TBB_ASSERT( my_owner, "Cache of successors must have an owner." );
302         if ( r.is_continue_receiver() ) {
303             r.register_predecessor( *my_owner );
304         }
305     }
306 
remove_successor(successor_type & r)307     void remove_successor( successor_type& r ) {
308         typename mutex_type::scoped_lock l(my_mutex, true);
309         for ( successors_type::iterator i = my_successors.begin(); i != my_successors.end(); ++i ) {
310             if ( *i == &r ) {
311                 __TBB_ASSERT(my_owner, "Cache of successors must have an owner.");
312                 // TODO: check if we need to test for continue_receiver before removing from r.
313                 r.remove_predecessor( *my_owner );
314                 my_successors.erase(i);
315                 break;
316             }
317         }
318     }
319 
empty()320     bool empty() {
321         typename mutex_type::scoped_lock l(my_mutex, false);
322         return my_successors.empty();
323     }
324 
clear()325     void clear() {
326         my_successors.clear();
327     }
328 
329     virtual graph_task* try_put_task( const continue_msg& t ) = 0;
330 };  // successor_cache< continue_msg >
331 
332 //! A cache of successors that are broadcast to
333 template<typename T, typename M=spin_rw_mutex>
334 class broadcast_cache : public successor_cache<T, M> {
335     typedef successor_cache<T, M> base_type;
336     typedef M mutex_type;
337     typedef typename successor_cache<T,M>::successors_type successors_type;
338 
339 public:
340 
broadcast_cache(typename base_type::owner_type * owner)341     broadcast_cache( typename base_type::owner_type* owner ): base_type(owner) {
342         // Do not work with the passed pointer here as it may not be fully initialized yet
343     }
344 
345     // as above, but call try_put_task instead, and return the last task we received (if any)
try_put_task(const T & t)346     graph_task* try_put_task( const T &t ) override {
347         graph_task * last_task = nullptr;
348         typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true);
349         typename successors_type::iterator i = this->my_successors.begin();
350         while ( i != this->my_successors.end() ) {
351             graph_task *new_task = (*i)->try_put_task(t);
352             // workaround for icc bug
353             graph& graph_ref = (*i)->graph_reference();
354             last_task = combine_tasks(graph_ref, last_task, new_task);  // enqueue if necessary
355             if(new_task) {
356                 ++i;
357             }
358             else {  // failed
359                 if ( (*i)->register_predecessor(*this->my_owner) ) {
360                     i = this->my_successors.erase(i);
361                 } else {
362                     ++i;
363                 }
364             }
365         }
366         return last_task;
367     }
368 
369     // call try_put_task and return list of received tasks
gather_successful_try_puts(const T & t,graph_task_list & tasks)370     bool gather_successful_try_puts( const T &t, graph_task_list& tasks ) {
371         bool is_at_least_one_put_successful = false;
372         typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true);
373         typename successors_type::iterator i = this->my_successors.begin();
374         while ( i != this->my_successors.end() ) {
375             graph_task * new_task = (*i)->try_put_task(t);
376             if(new_task) {
377                 ++i;
378                 if(new_task != SUCCESSFULLY_ENQUEUED) {
379                     tasks.push_back(*new_task);
380                 }
381                 is_at_least_one_put_successful = true;
382             }
383             else {  // failed
384                 if ( (*i)->register_predecessor(*this->my_owner) ) {
385                     i = this->my_successors.erase(i);
386                 } else {
387                     ++i;
388                 }
389             }
390         }
391         return is_at_least_one_put_successful;
392     }
393 };
394 
395 //! A cache of successors that are put in a round-robin fashion
396 template<typename T, typename M=spin_rw_mutex >
397 class round_robin_cache : public successor_cache<T, M> {
398     typedef successor_cache<T, M> base_type;
399     typedef size_t size_type;
400     typedef M mutex_type;
401     typedef typename successor_cache<T,M>::successors_type successors_type;
402 
403 public:
404 
round_robin_cache(typename base_type::owner_type * owner)405     round_robin_cache( typename base_type::owner_type* owner ): base_type(owner) {
406         // Do not work with the passed pointer here as it may not be fully initialized yet
407     }
408 
size()409     size_type size() {
410         typename mutex_type::scoped_lock l(this->my_mutex, false);
411         return this->my_successors.size();
412     }
413 
try_put_task(const T & t)414     graph_task* try_put_task( const T &t ) override {
415         typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true);
416         typename successors_type::iterator i = this->my_successors.begin();
417         while ( i != this->my_successors.end() ) {
418             graph_task* new_task = (*i)->try_put_task(t);
419             if ( new_task ) {
420                 return new_task;
421             } else {
422                if ( (*i)->register_predecessor(*this->my_owner) ) {
423                    i = this->my_successors.erase(i);
424                }
425                else {
426                    ++i;
427                }
428             }
429         }
430         return nullptr;
431     }
432 };
433 
434 #endif // __TBB__flow_graph_cache_impl_H
435