1 /* 2 Copyright (c) 2005-2022 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB__flow_graph_cache_impl_H 18 #define __TBB__flow_graph_cache_impl_H 19 20 #ifndef __TBB_flow_graph_H 21 #error Do not #include this internal file directly; use public TBB headers instead. 22 #endif 23 24 // included in namespace tbb::detail::d1 (in flow_graph.h) 25 26 //! A node_cache maintains a std::queue of elements of type T. Each operation is protected by a lock. 27 template< typename T, typename M=spin_mutex > 28 class node_cache { 29 public: 30 31 typedef size_t size_type; 32 empty()33 bool empty() { 34 typename mutex_type::scoped_lock lock( my_mutex ); 35 return internal_empty(); 36 } 37 add(T & n)38 void add( T &n ) { 39 typename mutex_type::scoped_lock lock( my_mutex ); 40 internal_push(n); 41 } 42 remove(T & n)43 void remove( T &n ) { 44 typename mutex_type::scoped_lock lock( my_mutex ); 45 for ( size_t i = internal_size(); i != 0; --i ) { 46 T &s = internal_pop(); 47 if ( &s == &n ) 48 break; // only remove one predecessor per request 49 internal_push(s); 50 } 51 } 52 clear()53 void clear() { 54 while( !my_q.empty()) (void)my_q.pop(); 55 } 56 57 protected: 58 59 typedef M mutex_type; 60 mutex_type my_mutex; 61 std::queue< T * > my_q; 62 63 // Assumes lock is held internal_empty()64 inline bool internal_empty( ) { 65 return my_q.empty(); 66 } 67 68 // Assumes lock is held internal_size()69 inline size_type internal_size( ) { 70 return my_q.size(); 71 } 72 73 // Assumes lock is held internal_push(T & n)74 inline void internal_push( T &n ) { 75 my_q.push(&n); 76 } 77 78 // Assumes lock is held internal_pop()79 inline T &internal_pop() { 80 T *v = my_q.front(); 81 my_q.pop(); 82 return *v; 83 } 84 85 }; 86 87 //! A cache of predecessors that only supports try_get 88 template< typename T, typename M=spin_mutex > 89 class predecessor_cache : public node_cache< sender<T>, M > { 90 public: 91 typedef M mutex_type; 92 typedef T output_type; 93 typedef sender<output_type> predecessor_type; 94 typedef receiver<output_type> successor_type; 95 predecessor_cache(successor_type * owner)96 predecessor_cache( successor_type* owner ) : my_owner( owner ) { 97 __TBB_ASSERT( my_owner, "predecessor_cache should have an owner." ); 98 // Do not work with the passed pointer here as it may not be fully initialized yet 99 } 100 get_item(output_type & v)101 bool get_item( output_type& v ) { 102 103 bool msg = false; 104 105 do { 106 predecessor_type *src; 107 { 108 typename mutex_type::scoped_lock lock(this->my_mutex); 109 if ( this->internal_empty() ) { 110 break; 111 } 112 src = &this->internal_pop(); 113 } 114 115 // Try to get from this sender 116 msg = src->try_get( v ); 117 118 if (msg == false) { 119 // Relinquish ownership of the edge 120 register_successor(*src, *my_owner); 121 } else { 122 // Retain ownership of the edge 123 this->add(*src); 124 } 125 } while ( msg == false ); 126 return msg; 127 } 128 129 // If we are removing arcs (rf_clear_edges), call clear() rather than reset(). reset()130 void reset() { 131 for(;;) { 132 predecessor_type *src; 133 { 134 if (this->internal_empty()) break; 135 src = &this->internal_pop(); 136 } 137 register_successor(*src, *my_owner); 138 } 139 } 140 141 protected: 142 successor_type* my_owner; 143 }; 144 145 //! An cache of predecessors that supports requests and reservations 146 template< typename T, typename M=spin_mutex > 147 class reservable_predecessor_cache : public predecessor_cache< T, M > { 148 public: 149 typedef M mutex_type; 150 typedef T output_type; 151 typedef sender<T> predecessor_type; 152 typedef receiver<T> successor_type; 153 reservable_predecessor_cache(successor_type * owner)154 reservable_predecessor_cache( successor_type* owner ) 155 : predecessor_cache<T,M>(owner), reserved_src(nullptr) 156 { 157 // Do not work with the passed pointer here as it may not be fully initialized yet 158 } 159 try_reserve(output_type & v)160 bool try_reserve( output_type &v ) { 161 bool msg = false; 162 163 do { 164 predecessor_type* pred = nullptr; 165 { 166 typename mutex_type::scoped_lock lock(this->my_mutex); 167 if ( reserved_src.load(std::memory_order_relaxed) || this->internal_empty() ) 168 return false; 169 170 pred = &this->internal_pop(); 171 reserved_src.store(pred, std::memory_order_relaxed); 172 } 173 174 // Try to get from this sender 175 msg = pred->try_reserve( v ); 176 177 if (msg == false) { 178 typename mutex_type::scoped_lock lock(this->my_mutex); 179 // Relinquish ownership of the edge 180 register_successor( *pred, *this->my_owner ); 181 reserved_src.store(nullptr, std::memory_order_relaxed); 182 } else { 183 // Retain ownership of the edge 184 this->add( *pred); 185 } 186 } while ( msg == false ); 187 188 return msg; 189 } 190 try_release()191 bool try_release() { 192 reserved_src.load(std::memory_order_relaxed)->try_release(); 193 reserved_src.store(nullptr, std::memory_order_relaxed); 194 return true; 195 } 196 try_consume()197 bool try_consume() { 198 reserved_src.load(std::memory_order_relaxed)->try_consume(); 199 reserved_src.store(nullptr, std::memory_order_relaxed); 200 return true; 201 } 202 reset()203 void reset() { 204 reserved_src.store(nullptr, std::memory_order_relaxed); 205 predecessor_cache<T, M>::reset(); 206 } 207 clear()208 void clear() { 209 reserved_src.store(nullptr, std::memory_order_relaxed); 210 predecessor_cache<T, M>::clear(); 211 } 212 213 private: 214 std::atomic<predecessor_type*> reserved_src; 215 }; 216 217 218 //! An abstract cache of successors 219 template<typename T, typename M=spin_rw_mutex > 220 class successor_cache : no_copy { 221 protected: 222 223 typedef M mutex_type; 224 mutex_type my_mutex; 225 226 typedef receiver<T> successor_type; 227 typedef receiver<T>* pointer_type; 228 typedef sender<T> owner_type; 229 // TODO revamp: introduce heapified collection of successors for strict priorities 230 typedef std::list< pointer_type > successors_type; 231 successors_type my_successors; 232 233 owner_type* my_owner; 234 235 public: successor_cache(owner_type * owner)236 successor_cache( owner_type* owner ) : my_owner(owner) { 237 // Do not work with the passed pointer here as it may not be fully initialized yet 238 } 239 ~successor_cache()240 virtual ~successor_cache() {} 241 register_successor(successor_type & r)242 void register_successor( successor_type& r ) { 243 typename mutex_type::scoped_lock l(my_mutex, true); 244 if( r.priority() != no_priority ) 245 my_successors.push_front( &r ); 246 else 247 my_successors.push_back( &r ); 248 } 249 remove_successor(successor_type & r)250 void remove_successor( successor_type& r ) { 251 typename mutex_type::scoped_lock l(my_mutex, true); 252 for ( typename successors_type::iterator i = my_successors.begin(); 253 i != my_successors.end(); ++i ) { 254 if ( *i == & r ) { 255 my_successors.erase(i); 256 break; 257 } 258 } 259 } 260 empty()261 bool empty() { 262 typename mutex_type::scoped_lock l(my_mutex, false); 263 return my_successors.empty(); 264 } 265 clear()266 void clear() { 267 my_successors.clear(); 268 } 269 270 virtual graph_task* try_put_task( const T& t ) = 0; 271 }; // successor_cache<T> 272 273 //! An abstract cache of successors, specialized to continue_msg 274 template<typename M> 275 class successor_cache< continue_msg, M > : no_copy { 276 protected: 277 278 typedef M mutex_type; 279 mutex_type my_mutex; 280 281 typedef receiver<continue_msg> successor_type; 282 typedef receiver<continue_msg>* pointer_type; 283 typedef sender<continue_msg> owner_type; 284 typedef std::list< pointer_type > successors_type; 285 successors_type my_successors; 286 owner_type* my_owner; 287 288 public: successor_cache(sender<continue_msg> * owner)289 successor_cache( sender<continue_msg>* owner ) : my_owner(owner) { 290 // Do not work with the passed pointer here as it may not be fully initialized yet 291 } 292 ~successor_cache()293 virtual ~successor_cache() {} 294 register_successor(successor_type & r)295 void register_successor( successor_type& r ) { 296 typename mutex_type::scoped_lock l(my_mutex, true); 297 if( r.priority() != no_priority ) 298 my_successors.push_front( &r ); 299 else 300 my_successors.push_back( &r ); 301 __TBB_ASSERT( my_owner, "Cache of successors must have an owner." ); 302 if ( r.is_continue_receiver() ) { 303 r.register_predecessor( *my_owner ); 304 } 305 } 306 remove_successor(successor_type & r)307 void remove_successor( successor_type& r ) { 308 typename mutex_type::scoped_lock l(my_mutex, true); 309 for ( successors_type::iterator i = my_successors.begin(); i != my_successors.end(); ++i ) { 310 if ( *i == &r ) { 311 __TBB_ASSERT(my_owner, "Cache of successors must have an owner."); 312 // TODO: check if we need to test for continue_receiver before removing from r. 313 r.remove_predecessor( *my_owner ); 314 my_successors.erase(i); 315 break; 316 } 317 } 318 } 319 empty()320 bool empty() { 321 typename mutex_type::scoped_lock l(my_mutex, false); 322 return my_successors.empty(); 323 } 324 clear()325 void clear() { 326 my_successors.clear(); 327 } 328 329 virtual graph_task* try_put_task( const continue_msg& t ) = 0; 330 }; // successor_cache< continue_msg > 331 332 //! A cache of successors that are broadcast to 333 template<typename T, typename M=spin_rw_mutex> 334 class broadcast_cache : public successor_cache<T, M> { 335 typedef successor_cache<T, M> base_type; 336 typedef M mutex_type; 337 typedef typename successor_cache<T,M>::successors_type successors_type; 338 339 public: 340 broadcast_cache(typename base_type::owner_type * owner)341 broadcast_cache( typename base_type::owner_type* owner ): base_type(owner) { 342 // Do not work with the passed pointer here as it may not be fully initialized yet 343 } 344 345 // as above, but call try_put_task instead, and return the last task we received (if any) try_put_task(const T & t)346 graph_task* try_put_task( const T &t ) override { 347 graph_task * last_task = nullptr; 348 typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true); 349 typename successors_type::iterator i = this->my_successors.begin(); 350 while ( i != this->my_successors.end() ) { 351 graph_task *new_task = (*i)->try_put_task(t); 352 // workaround for icc bug 353 graph& graph_ref = (*i)->graph_reference(); 354 last_task = combine_tasks(graph_ref, last_task, new_task); // enqueue if necessary 355 if(new_task) { 356 ++i; 357 } 358 else { // failed 359 if ( (*i)->register_predecessor(*this->my_owner) ) { 360 i = this->my_successors.erase(i); 361 } else { 362 ++i; 363 } 364 } 365 } 366 return last_task; 367 } 368 369 // call try_put_task and return list of received tasks gather_successful_try_puts(const T & t,graph_task_list & tasks)370 bool gather_successful_try_puts( const T &t, graph_task_list& tasks ) { 371 bool is_at_least_one_put_successful = false; 372 typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true); 373 typename successors_type::iterator i = this->my_successors.begin(); 374 while ( i != this->my_successors.end() ) { 375 graph_task * new_task = (*i)->try_put_task(t); 376 if(new_task) { 377 ++i; 378 if(new_task != SUCCESSFULLY_ENQUEUED) { 379 tasks.push_back(*new_task); 380 } 381 is_at_least_one_put_successful = true; 382 } 383 else { // failed 384 if ( (*i)->register_predecessor(*this->my_owner) ) { 385 i = this->my_successors.erase(i); 386 } else { 387 ++i; 388 } 389 } 390 } 391 return is_at_least_one_put_successful; 392 } 393 }; 394 395 //! A cache of successors that are put in a round-robin fashion 396 template<typename T, typename M=spin_rw_mutex > 397 class round_robin_cache : public successor_cache<T, M> { 398 typedef successor_cache<T, M> base_type; 399 typedef size_t size_type; 400 typedef M mutex_type; 401 typedef typename successor_cache<T,M>::successors_type successors_type; 402 403 public: 404 round_robin_cache(typename base_type::owner_type * owner)405 round_robin_cache( typename base_type::owner_type* owner ): base_type(owner) { 406 // Do not work with the passed pointer here as it may not be fully initialized yet 407 } 408 size()409 size_type size() { 410 typename mutex_type::scoped_lock l(this->my_mutex, false); 411 return this->my_successors.size(); 412 } 413 try_put_task(const T & t)414 graph_task* try_put_task( const T &t ) override { 415 typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true); 416 typename successors_type::iterator i = this->my_successors.begin(); 417 while ( i != this->my_successors.end() ) { 418 graph_task* new_task = (*i)->try_put_task(t); 419 if ( new_task ) { 420 return new_task; 421 } else { 422 if ( (*i)->register_predecessor(*this->my_owner) ) { 423 i = this->my_successors.erase(i); 424 } 425 else { 426 ++i; 427 } 428 } 429 } 430 return nullptr; 431 } 432 }; 433 434 #endif // __TBB__flow_graph_cache_impl_H 435