1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB__flow_graph_cache_impl_H
18 #define __TBB__flow_graph_cache_impl_H
19 
20 #ifndef __TBB_flow_graph_H
21 #error Do not #include this internal file directly; use public TBB headers instead.
22 #endif
23 
24 // included in namespace tbb::detail::d1 (in flow_graph.h)
25 
26 //! A node_cache maintains a std::queue of elements of type T.  Each operation is protected by a lock.
27 template< typename T, typename M=spin_mutex >
28 class node_cache {
29     public:
30 
31     typedef size_t size_type;
32 
33     bool empty() {
34         typename mutex_type::scoped_lock lock( my_mutex );
35         return internal_empty();
36     }
37 
38     void add( T &n ) {
39         typename mutex_type::scoped_lock lock( my_mutex );
40         internal_push(n);
41     }
42 
43     void remove( T &n ) {
44         typename mutex_type::scoped_lock lock( my_mutex );
45         for ( size_t i = internal_size(); i != 0; --i ) {
46             T &s = internal_pop();
47             if ( &s == &n )
48                 break;  // only remove one predecessor per request
49             internal_push(s);
50         }
51     }
52 
53     void clear() {
54         while( !my_q.empty()) (void)my_q.pop();
55     }
56 
57 protected:
58 
59     typedef M mutex_type;
60     mutex_type my_mutex;
61     std::queue< T * > my_q;
62 
63     // Assumes lock is held
64     inline bool internal_empty( )  {
65         return my_q.empty();
66     }
67 
68     // Assumes lock is held
69     inline size_type internal_size( )  {
70         return my_q.size();
71     }
72 
73     // Assumes lock is held
74     inline void internal_push( T &n )  {
75         my_q.push(&n);
76     }
77 
78     // Assumes lock is held
79     inline T &internal_pop() {
80         T *v = my_q.front();
81         my_q.pop();
82         return *v;
83     }
84 
85 };
86 
87 //! A cache of predecessors that only supports try_get
88 template< typename T, typename M=spin_mutex >
89 class predecessor_cache : public node_cache< sender<T>, M > {
90 public:
91     typedef M mutex_type;
92     typedef T output_type;
93     typedef sender<output_type> predecessor_type;
94     typedef receiver<output_type> successor_type;
95 
96     predecessor_cache( successor_type* owner ) : my_owner( owner ) {
97         __TBB_ASSERT( my_owner, "predecessor_cache should have an owner." );
98         // Do not work with the passed pointer here as it may not be fully initialized yet
99     }
100 
101     bool get_item( output_type& v ) {
102 
103         bool msg = false;
104 
105         do {
106             predecessor_type *src;
107             {
108                 typename mutex_type::scoped_lock lock(this->my_mutex);
109                 if ( this->internal_empty() ) {
110                     break;
111                 }
112                 src = &this->internal_pop();
113             }
114 
115             // Try to get from this sender
116             msg = src->try_get( v );
117 
118             if (msg == false) {
119                 // Relinquish ownership of the edge
120                 register_successor(*src, *my_owner);
121             } else {
122                 // Retain ownership of the edge
123                 this->add(*src);
124             }
125         } while ( msg == false );
126         return msg;
127     }
128 
129     // If we are removing arcs (rf_clear_edges), call clear() rather than reset().
130     void reset() {
131         for(;;) {
132             predecessor_type *src;
133             {
134                 if (this->internal_empty()) break;
135                 src = &this->internal_pop();
136             }
137             register_successor(*src, *my_owner);
138         }
139     }
140 
141 protected:
142     successor_type* my_owner;
143 };
144 
145 //! An cache of predecessors that supports requests and reservations
146 template< typename T, typename M=spin_mutex >
147 class reservable_predecessor_cache : public predecessor_cache< T, M > {
148 public:
149     typedef M mutex_type;
150     typedef T output_type;
151     typedef sender<T> predecessor_type;
152     typedef receiver<T> successor_type;
153 
154     reservable_predecessor_cache( successor_type* owner )
155         : predecessor_cache<T,M>(owner), reserved_src(NULL)
156     {
157         // Do not work with the passed pointer here as it may not be fully initialized yet
158     }
159 
160     bool
161     try_reserve( output_type &v ) {
162         bool msg = false;
163 
164         do {
165             {
166                 typename mutex_type::scoped_lock lock(this->my_mutex);
167                 if ( reserved_src || this->internal_empty() )
168                     return false;
169 
170                 reserved_src = &this->internal_pop();
171             }
172 
173             // Try to get from this sender
174             msg = reserved_src->try_reserve( v );
175 
176             if (msg == false) {
177                 typename mutex_type::scoped_lock lock(this->my_mutex);
178                 // Relinquish ownership of the edge
179                 register_successor( *reserved_src, *this->my_owner );
180                 reserved_src = NULL;
181             } else {
182                 // Retain ownership of the edge
183                 this->add( *reserved_src );
184             }
185         } while ( msg == false );
186 
187         return msg;
188     }
189 
190     bool
191     try_release( ) {
192         reserved_src->try_release( );
193         reserved_src = NULL;
194         return true;
195     }
196 
197     bool
198     try_consume( ) {
199         reserved_src->try_consume( );
200         reserved_src = NULL;
201         return true;
202     }
203 
204     void reset( ) {
205         reserved_src = NULL;
206         predecessor_cache<T,M>::reset( );
207     }
208 
209     void clear() {
210         reserved_src = NULL;
211         predecessor_cache<T,M>::clear();
212     }
213 
214 private:
215     predecessor_type *reserved_src;
216 };
217 
218 
219 //! An abstract cache of successors
220 template<typename T, typename M=spin_rw_mutex >
221 class successor_cache : no_copy {
222 protected:
223 
224     typedef M mutex_type;
225     mutex_type my_mutex;
226 
227     typedef receiver<T> successor_type;
228     typedef receiver<T>* pointer_type;
229     typedef sender<T> owner_type;
230     // TODO revamp: introduce heapified collection of successors for strict priorities
231     typedef std::list< pointer_type > successors_type;
232     successors_type my_successors;
233 
234     owner_type* my_owner;
235 
236 public:
237     successor_cache( owner_type* owner ) : my_owner(owner) {
238         // Do not work with the passed pointer here as it may not be fully initialized yet
239     }
240 
241     virtual ~successor_cache() {}
242 
243     void register_successor( successor_type& r ) {
244         typename mutex_type::scoped_lock l(my_mutex, true);
245         if( r.priority() != no_priority )
246             my_successors.push_front( &r );
247         else
248             my_successors.push_back( &r );
249     }
250 
251     void remove_successor( successor_type& r ) {
252         typename mutex_type::scoped_lock l(my_mutex, true);
253         for ( typename successors_type::iterator i = my_successors.begin();
254               i != my_successors.end(); ++i ) {
255             if ( *i == & r ) {
256                 my_successors.erase(i);
257                 break;
258             }
259         }
260     }
261 
262     bool empty() {
263         typename mutex_type::scoped_lock l(my_mutex, false);
264         return my_successors.empty();
265     }
266 
267     void clear() {
268         my_successors.clear();
269     }
270 
271     virtual graph_task* try_put_task( const T& t ) = 0;
272 };  // successor_cache<T>
273 
274 //! An abstract cache of successors, specialized to continue_msg
275 template<typename M>
276 class successor_cache< continue_msg, M > : no_copy {
277 protected:
278 
279     typedef M mutex_type;
280     mutex_type my_mutex;
281 
282     typedef receiver<continue_msg> successor_type;
283     typedef receiver<continue_msg>* pointer_type;
284     typedef sender<continue_msg> owner_type;
285     typedef std::list< pointer_type > successors_type;
286     successors_type my_successors;
287     owner_type* my_owner;
288 
289 public:
290     successor_cache( sender<continue_msg>* owner ) : my_owner(owner) {
291         // Do not work with the passed pointer here as it may not be fully initialized yet
292     }
293 
294     virtual ~successor_cache() {}
295 
296     void register_successor( successor_type& r ) {
297         typename mutex_type::scoped_lock l(my_mutex, true);
298         if( r.priority() != no_priority )
299             my_successors.push_front( &r );
300         else
301             my_successors.push_back( &r );
302         __TBB_ASSERT( my_owner, "Cache of successors must have an owner." );
303         if ( r.is_continue_receiver() ) {
304             r.register_predecessor( *my_owner );
305         }
306     }
307 
308     void remove_successor( successor_type& r ) {
309         typename mutex_type::scoped_lock l(my_mutex, true);
310         for ( successors_type::iterator i = my_successors.begin(); i != my_successors.end(); ++i ) {
311             if ( *i == &r ) {
312                 __TBB_ASSERT(my_owner, "Cache of successors must have an owner.");
313                 // TODO: check if we need to test for continue_receiver before removing from r.
314                 r.remove_predecessor( *my_owner );
315                 my_successors.erase(i);
316                 break;
317             }
318         }
319     }
320 
321     bool empty() {
322         typename mutex_type::scoped_lock l(my_mutex, false);
323         return my_successors.empty();
324     }
325 
326     void clear() {
327         my_successors.clear();
328     }
329 
330     virtual graph_task* try_put_task( const continue_msg& t ) = 0;
331 };  // successor_cache< continue_msg >
332 
333 //! A cache of successors that are broadcast to
334 template<typename T, typename M=spin_rw_mutex>
335 class broadcast_cache : public successor_cache<T, M> {
336     typedef successor_cache<T, M> base_type;
337     typedef M mutex_type;
338     typedef typename successor_cache<T,M>::successors_type successors_type;
339 
340 public:
341 
342     broadcast_cache( typename base_type::owner_type* owner ): base_type(owner) {
343         // Do not work with the passed pointer here as it may not be fully initialized yet
344     }
345 
346     // as above, but call try_put_task instead, and return the last task we received (if any)
347     graph_task* try_put_task( const T &t ) override {
348         graph_task * last_task = nullptr;
349         typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true);
350         typename successors_type::iterator i = this->my_successors.begin();
351         while ( i != this->my_successors.end() ) {
352             graph_task *new_task = (*i)->try_put_task(t);
353             // workaround for icc bug
354             graph& graph_ref = (*i)->graph_reference();
355             last_task = combine_tasks(graph_ref, last_task, new_task);  // enqueue if necessary
356             if(new_task) {
357                 ++i;
358             }
359             else {  // failed
360                 if ( (*i)->register_predecessor(*this->my_owner) ) {
361                     i = this->my_successors.erase(i);
362                 } else {
363                     ++i;
364                 }
365             }
366         }
367         return last_task;
368     }
369 
370     // call try_put_task and return list of received tasks
371     bool gather_successful_try_puts( const T &t, graph_task_list& tasks ) {
372         bool is_at_least_one_put_successful = false;
373         typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true);
374         typename successors_type::iterator i = this->my_successors.begin();
375         while ( i != this->my_successors.end() ) {
376             graph_task * new_task = (*i)->try_put_task(t);
377             if(new_task) {
378                 ++i;
379                 if(new_task != SUCCESSFULLY_ENQUEUED) {
380                     tasks.push_back(*new_task);
381                 }
382                 is_at_least_one_put_successful = true;
383             }
384             else {  // failed
385                 if ( (*i)->register_predecessor(*this->my_owner) ) {
386                     i = this->my_successors.erase(i);
387                 } else {
388                     ++i;
389                 }
390             }
391         }
392         return is_at_least_one_put_successful;
393     }
394 };
395 
396 //! A cache of successors that are put in a round-robin fashion
397 template<typename T, typename M=spin_rw_mutex >
398 class round_robin_cache : public successor_cache<T, M> {
399     typedef successor_cache<T, M> base_type;
400     typedef size_t size_type;
401     typedef M mutex_type;
402     typedef typename successor_cache<T,M>::successors_type successors_type;
403 
404 public:
405 
406     round_robin_cache( typename base_type::owner_type* owner ): base_type(owner) {
407         // Do not work with the passed pointer here as it may not be fully initialized yet
408     }
409 
410     size_type size() {
411         typename mutex_type::scoped_lock l(this->my_mutex, false);
412         return this->my_successors.size();
413     }
414 
415     graph_task* try_put_task( const T &t ) override {
416         typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true);
417         typename successors_type::iterator i = this->my_successors.begin();
418         while ( i != this->my_successors.end() ) {
419             graph_task* new_task = (*i)->try_put_task(t);
420             if ( new_task ) {
421                 return new_task;
422             } else {
423                if ( (*i)->register_predecessor(*this->my_owner) ) {
424                    i = this->my_successors.erase(i);
425                }
426                else {
427                    ++i;
428                }
429             }
430         }
431         return NULL;
432     }
433 };
434 
435 #endif // __TBB__flow_graph_cache_impl_H
436