1 /* 2 Copyright (c) 2022-2023 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef _TBB_thread_dispatcher_H 18 #define _TBB_thread_dispatcher_H 19 20 #include "oneapi/tbb/detail/_config.h" 21 #include "oneapi/tbb/detail/_utils.h" 22 #include "oneapi/tbb/rw_mutex.h" 23 #include "oneapi/tbb/task_arena.h" 24 25 #include "arena.h" 26 #include "governor.h" 27 #include "thread_data.h" 28 #include "rml_tbb.h" 29 #include "thread_dispatcher_client.h" 30 31 namespace tbb { 32 namespace detail { 33 namespace r1 { 34 35 class threading_control_impl; 36 37 class thread_dispatcher : no_copy, rml::tbb_client { 38 using client_list_type = intrusive_list<thread_dispatcher_client>; 39 using client_list_mutex_type = d1::rw_mutex; 40 public: 41 thread_dispatcher(threading_control& tc, unsigned hard_limit, std::size_t stack_size); 42 ~thread_dispatcher(); 43 44 thread_dispatcher_client* create_client(arena& a); 45 void register_client(thread_dispatcher_client* client); 46 bool try_unregister_client(thread_dispatcher_client* client, std::uint64_t aba_epoch, unsigned priority); 47 48 void adjust_job_count_estimate(int delta); 49 void release(bool blocking_terminate); 50 void process(job& j) override; 51 //! Used when RML asks for join mode during workers termination. 52 bool must_join_workers() const; 53 //! Returns the requested stack size of worker threads. 54 std::size_t worker_stack_size() const; 55 56 private: version()57 version_type version () const override { return 0; } max_job_count()58 unsigned max_job_count () const override { return my_num_workers_hard_limit; } min_stack_size()59 std::size_t min_stack_size () const override { return worker_stack_size(); } 60 void cleanup(job& j) override; 61 void acknowledge_close_connection() override; 62 ::rml::job* create_one_job() override; 63 64 thread_dispatcher_client* select_next_client(thread_dispatcher_client* hint); 65 void destroy_client(thread_dispatcher_client* client); 66 void insert_client(thread_dispatcher_client& client); 67 void remove_client(thread_dispatcher_client& client); 68 bool is_client_alive(thread_dispatcher_client* client); 69 thread_dispatcher_client* client_in_need(client_list_type* clients, thread_dispatcher_client* hint); 70 thread_dispatcher_client* client_in_need(thread_dispatcher_client* prev); 71 72 friend class threading_control_impl; 73 static constexpr unsigned num_priority_levels = d1::num_priority_levels; 74 client_list_mutex_type my_list_mutex; 75 client_list_type my_client_list[num_priority_levels]; 76 77 thread_dispatcher_client* my_next_client{nullptr}; 78 79 //! Shutdown mode 80 bool my_join_workers{false}; 81 82 threading_control& my_threading_control; 83 84 //! ABA prevention marker to assign to newly created clients 85 std::atomic<std::uint64_t> my_clients_aba_epoch{0}; 86 87 //! Maximal number of workers allowed for use by the underlying resource manager 88 /** It can't be changed after thread_dispatcher creation. **/ 89 unsigned my_num_workers_hard_limit{0}; 90 91 //! Stack size of worker threads 92 std::size_t my_stack_size{0}; 93 94 //! First unused index of worker 95 /** Used to assign indices to the new workers coming from RML **/ 96 std::atomic<unsigned> my_first_unused_worker_idx{0}; 97 98 //! Pointer to the RML server object that services this TBB instance. 99 rml::tbb_server* my_server{nullptr}; 100 }; 101 102 } // namespace r1 103 } // namespace detail 104 } // namespace tbb 105 106 #endif // _TBB_thread_dispatcher_H 107