1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB__flow_graph_cache_impl_H 18 #define __TBB__flow_graph_cache_impl_H 19 20 #ifndef __TBB_flow_graph_H 21 #error Do not #include this internal file directly; use public TBB headers instead. 22 #endif 23 24 // included in namespace tbb::detail::d1 (in flow_graph.h) 25 26 //! A node_cache maintains a std::queue of elements of type T. Each operation is protected by a lock. 27 template< typename T, typename M=spin_mutex > 28 class node_cache { 29 public: 30 31 typedef size_t size_type; 32 33 bool empty() { 34 typename mutex_type::scoped_lock lock( my_mutex ); 35 return internal_empty(); 36 } 37 38 void add( T &n ) { 39 typename mutex_type::scoped_lock lock( my_mutex ); 40 internal_push(n); 41 } 42 43 void remove( T &n ) { 44 typename mutex_type::scoped_lock lock( my_mutex ); 45 for ( size_t i = internal_size(); i != 0; --i ) { 46 T &s = internal_pop(); 47 if ( &s == &n ) 48 break; // only remove one predecessor per request 49 internal_push(s); 50 } 51 } 52 53 void clear() { 54 while( !my_q.empty()) (void)my_q.pop(); 55 } 56 57 protected: 58 59 typedef M mutex_type; 60 mutex_type my_mutex; 61 std::queue< T * > my_q; 62 63 // Assumes lock is held 64 inline bool internal_empty( ) { 65 return my_q.empty(); 66 } 67 68 // Assumes lock is held 69 inline size_type internal_size( ) { 70 return my_q.size(); 71 } 72 73 // Assumes lock is held 74 inline void internal_push( T &n ) { 75 my_q.push(&n); 76 } 77 78 // Assumes lock is held 79 inline T &internal_pop() { 80 T *v = my_q.front(); 81 my_q.pop(); 82 return *v; 83 } 84 85 }; 86 87 //! A cache of predecessors that only supports try_get 88 template< typename T, typename M=spin_mutex > 89 class predecessor_cache : public node_cache< sender<T>, M > { 90 public: 91 typedef M mutex_type; 92 typedef T output_type; 93 typedef sender<output_type> predecessor_type; 94 typedef receiver<output_type> successor_type; 95 96 predecessor_cache( successor_type* owner ) : my_owner( owner ) { 97 __TBB_ASSERT( my_owner, "predecessor_cache should have an owner." ); 98 // Do not work with the passed pointer here as it may not be fully initialized yet 99 } 100 101 bool get_item( output_type& v ) { 102 103 bool msg = false; 104 105 do { 106 predecessor_type *src; 107 { 108 typename mutex_type::scoped_lock lock(this->my_mutex); 109 if ( this->internal_empty() ) { 110 break; 111 } 112 src = &this->internal_pop(); 113 } 114 115 // Try to get from this sender 116 msg = src->try_get( v ); 117 118 if (msg == false) { 119 // Relinquish ownership of the edge 120 register_successor(*src, *my_owner); 121 } else { 122 // Retain ownership of the edge 123 this->add(*src); 124 } 125 } while ( msg == false ); 126 return msg; 127 } 128 129 // If we are removing arcs (rf_clear_edges), call clear() rather than reset(). 130 void reset() { 131 for(;;) { 132 predecessor_type *src; 133 { 134 if (this->internal_empty()) break; 135 src = &this->internal_pop(); 136 } 137 register_successor(*src, *my_owner); 138 } 139 } 140 141 protected: 142 successor_type* my_owner; 143 }; 144 145 //! An cache of predecessors that supports requests and reservations 146 template< typename T, typename M=spin_mutex > 147 class reservable_predecessor_cache : public predecessor_cache< T, M > { 148 public: 149 typedef M mutex_type; 150 typedef T output_type; 151 typedef sender<T> predecessor_type; 152 typedef receiver<T> successor_type; 153 154 reservable_predecessor_cache( successor_type* owner ) 155 : predecessor_cache<T,M>(owner), reserved_src(NULL) 156 { 157 // Do not work with the passed pointer here as it may not be fully initialized yet 158 } 159 160 bool 161 try_reserve( output_type &v ) { 162 bool msg = false; 163 164 do { 165 { 166 typename mutex_type::scoped_lock lock(this->my_mutex); 167 if ( reserved_src || this->internal_empty() ) 168 return false; 169 170 reserved_src = &this->internal_pop(); 171 } 172 173 // Try to get from this sender 174 msg = reserved_src->try_reserve( v ); 175 176 if (msg == false) { 177 typename mutex_type::scoped_lock lock(this->my_mutex); 178 // Relinquish ownership of the edge 179 register_successor( *reserved_src, *this->my_owner ); 180 reserved_src = NULL; 181 } else { 182 // Retain ownership of the edge 183 this->add( *reserved_src ); 184 } 185 } while ( msg == false ); 186 187 return msg; 188 } 189 190 bool 191 try_release( ) { 192 reserved_src->try_release( ); 193 reserved_src = NULL; 194 return true; 195 } 196 197 bool 198 try_consume( ) { 199 reserved_src->try_consume( ); 200 reserved_src = NULL; 201 return true; 202 } 203 204 void reset( ) { 205 reserved_src = NULL; 206 predecessor_cache<T,M>::reset( ); 207 } 208 209 void clear() { 210 reserved_src = NULL; 211 predecessor_cache<T,M>::clear(); 212 } 213 214 private: 215 predecessor_type *reserved_src; 216 }; 217 218 219 //! An abstract cache of successors 220 template<typename T, typename M=spin_rw_mutex > 221 class successor_cache : no_copy { 222 protected: 223 224 typedef M mutex_type; 225 mutex_type my_mutex; 226 227 typedef receiver<T> successor_type; 228 typedef receiver<T>* pointer_type; 229 typedef sender<T> owner_type; 230 // TODO revamp: introduce heapified collection of successors for strict priorities 231 typedef std::list< pointer_type > successors_type; 232 successors_type my_successors; 233 234 owner_type* my_owner; 235 236 public: 237 successor_cache( owner_type* owner ) : my_owner(owner) { 238 // Do not work with the passed pointer here as it may not be fully initialized yet 239 } 240 241 virtual ~successor_cache() {} 242 243 void register_successor( successor_type& r ) { 244 typename mutex_type::scoped_lock l(my_mutex, true); 245 if( r.priority() != no_priority ) 246 my_successors.push_front( &r ); 247 else 248 my_successors.push_back( &r ); 249 } 250 251 void remove_successor( successor_type& r ) { 252 typename mutex_type::scoped_lock l(my_mutex, true); 253 for ( typename successors_type::iterator i = my_successors.begin(); 254 i != my_successors.end(); ++i ) { 255 if ( *i == & r ) { 256 my_successors.erase(i); 257 break; 258 } 259 } 260 } 261 262 bool empty() { 263 typename mutex_type::scoped_lock l(my_mutex, false); 264 return my_successors.empty(); 265 } 266 267 void clear() { 268 my_successors.clear(); 269 } 270 271 virtual graph_task* try_put_task( const T& t ) = 0; 272 }; // successor_cache<T> 273 274 //! An abstract cache of successors, specialized to continue_msg 275 template<typename M> 276 class successor_cache< continue_msg, M > : no_copy { 277 protected: 278 279 typedef M mutex_type; 280 mutex_type my_mutex; 281 282 typedef receiver<continue_msg> successor_type; 283 typedef receiver<continue_msg>* pointer_type; 284 typedef sender<continue_msg> owner_type; 285 typedef std::list< pointer_type > successors_type; 286 successors_type my_successors; 287 owner_type* my_owner; 288 289 public: 290 successor_cache( sender<continue_msg>* owner ) : my_owner(owner) { 291 // Do not work with the passed pointer here as it may not be fully initialized yet 292 } 293 294 virtual ~successor_cache() {} 295 296 void register_successor( successor_type& r ) { 297 typename mutex_type::scoped_lock l(my_mutex, true); 298 if( r.priority() != no_priority ) 299 my_successors.push_front( &r ); 300 else 301 my_successors.push_back( &r ); 302 __TBB_ASSERT( my_owner, "Cache of successors must have an owner." ); 303 if ( r.is_continue_receiver() ) { 304 r.register_predecessor( *my_owner ); 305 } 306 } 307 308 void remove_successor( successor_type& r ) { 309 typename mutex_type::scoped_lock l(my_mutex, true); 310 for ( successors_type::iterator i = my_successors.begin(); i != my_successors.end(); ++i ) { 311 if ( *i == &r ) { 312 __TBB_ASSERT(my_owner, "Cache of successors must have an owner."); 313 // TODO: check if we need to test for continue_receiver before removing from r. 314 r.remove_predecessor( *my_owner ); 315 my_successors.erase(i); 316 break; 317 } 318 } 319 } 320 321 bool empty() { 322 typename mutex_type::scoped_lock l(my_mutex, false); 323 return my_successors.empty(); 324 } 325 326 void clear() { 327 my_successors.clear(); 328 } 329 330 virtual graph_task* try_put_task( const continue_msg& t ) = 0; 331 }; // successor_cache< continue_msg > 332 333 //! A cache of successors that are broadcast to 334 template<typename T, typename M=spin_rw_mutex> 335 class broadcast_cache : public successor_cache<T, M> { 336 typedef successor_cache<T, M> base_type; 337 typedef M mutex_type; 338 typedef typename successor_cache<T,M>::successors_type successors_type; 339 340 public: 341 342 broadcast_cache( typename base_type::owner_type* owner ): base_type(owner) { 343 // Do not work with the passed pointer here as it may not be fully initialized yet 344 } 345 346 // as above, but call try_put_task instead, and return the last task we received (if any) 347 graph_task* try_put_task( const T &t ) override { 348 graph_task * last_task = nullptr; 349 typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true); 350 typename successors_type::iterator i = this->my_successors.begin(); 351 while ( i != this->my_successors.end() ) { 352 graph_task *new_task = (*i)->try_put_task(t); 353 // workaround for icc bug 354 graph& graph_ref = (*i)->graph_reference(); 355 last_task = combine_tasks(graph_ref, last_task, new_task); // enqueue if necessary 356 if(new_task) { 357 ++i; 358 } 359 else { // failed 360 if ( (*i)->register_predecessor(*this->my_owner) ) { 361 i = this->my_successors.erase(i); 362 } else { 363 ++i; 364 } 365 } 366 } 367 return last_task; 368 } 369 370 // call try_put_task and return list of received tasks 371 bool gather_successful_try_puts( const T &t, graph_task_list& tasks ) { 372 bool is_at_least_one_put_successful = false; 373 typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true); 374 typename successors_type::iterator i = this->my_successors.begin(); 375 while ( i != this->my_successors.end() ) { 376 graph_task * new_task = (*i)->try_put_task(t); 377 if(new_task) { 378 ++i; 379 if(new_task != SUCCESSFULLY_ENQUEUED) { 380 tasks.push_back(*new_task); 381 } 382 is_at_least_one_put_successful = true; 383 } 384 else { // failed 385 if ( (*i)->register_predecessor(*this->my_owner) ) { 386 i = this->my_successors.erase(i); 387 } else { 388 ++i; 389 } 390 } 391 } 392 return is_at_least_one_put_successful; 393 } 394 }; 395 396 //! A cache of successors that are put in a round-robin fashion 397 template<typename T, typename M=spin_rw_mutex > 398 class round_robin_cache : public successor_cache<T, M> { 399 typedef successor_cache<T, M> base_type; 400 typedef size_t size_type; 401 typedef M mutex_type; 402 typedef typename successor_cache<T,M>::successors_type successors_type; 403 404 public: 405 406 round_robin_cache( typename base_type::owner_type* owner ): base_type(owner) { 407 // Do not work with the passed pointer here as it may not be fully initialized yet 408 } 409 410 size_type size() { 411 typename mutex_type::scoped_lock l(this->my_mutex, false); 412 return this->my_successors.size(); 413 } 414 415 graph_task* try_put_task( const T &t ) override { 416 typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true); 417 typename successors_type::iterator i = this->my_successors.begin(); 418 while ( i != this->my_successors.end() ) { 419 graph_task* new_task = (*i)->try_put_task(t); 420 if ( new_task ) { 421 return new_task; 422 } else { 423 if ( (*i)->register_predecessor(*this->my_owner) ) { 424 i = this->my_successors.erase(i); 425 } 426 else { 427 ++i; 428 } 429 } 430 } 431 return NULL; 432 } 433 }; 434 435 #endif // __TBB__flow_graph_cache_impl_H 436