1 /* 2 Copyright (c) 2022-2023 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "thread_dispatcher.h" 18 #include "threading_control.h" 19 20 namespace tbb { 21 namespace detail { 22 namespace r1 { 23 24 thread_dispatcher::thread_dispatcher(threading_control& tc, unsigned hard_limit, std::size_t stack_size) 25 : my_threading_control(tc) 26 , my_num_workers_hard_limit(hard_limit) 27 , my_stack_size(stack_size) 28 { 29 my_server = governor::create_rml_server( *this ); 30 __TBB_ASSERT( my_server, "Failed to create RML server" ); 31 } 32 33 thread_dispatcher::~thread_dispatcher() { 34 poison_pointer(my_server); 35 } 36 37 thread_dispatcher_client* thread_dispatcher::select_next_client(thread_dispatcher_client* hint) { 38 unsigned next_client_priority_level = num_priority_levels; 39 if (hint) { 40 next_client_priority_level = hint->priority_level(); 41 } 42 43 for (unsigned idx = 0; idx < next_client_priority_level; ++idx) { 44 if (!my_client_list[idx].empty()) { 45 return &*my_client_list[idx].begin(); 46 } 47 } 48 49 return hint; 50 } 51 52 thread_dispatcher_client* thread_dispatcher::create_client(arena& a) { 53 return new (cache_aligned_allocate(sizeof(thread_dispatcher_client))) thread_dispatcher_client(a, my_clients_aba_epoch); 54 } 55 56 57 void thread_dispatcher::register_client(thread_dispatcher_client* client) { 58 client_list_mutex_type::scoped_lock lock(my_list_mutex); 59 insert_client(*client); 60 } 61 62 bool thread_dispatcher::try_unregister_client(thread_dispatcher_client* client, std::uint64_t aba_epoch, unsigned priority) { 63 __TBB_ASSERT(client, nullptr); 64 // we hold reference to the server, so market cannot be destroyed at any moment here 65 __TBB_ASSERT(!is_poisoned(my_server), nullptr); 66 my_list_mutex.lock(); 67 for (auto& it : my_client_list[priority]) { 68 if (client == &it) { 69 if (it.get_aba_epoch() == aba_epoch) { 70 // Client is alive 71 // Acquire my_references to sync with threads that just left the arena 72 // Pay attention that references should be read before workers_requested because 73 // if references is no zero some other thread might call adjust_demand and lead to 74 // a race over workers_requested 75 if (!client->references() && !client->has_request()) { 76 // Client is abandoned. Destroy it. 77 remove_client(*client); 78 ++my_clients_aba_epoch; 79 80 my_list_mutex.unlock(); 81 destroy_client(client); 82 83 return true; 84 } 85 } 86 break; 87 } 88 } 89 my_list_mutex.unlock(); 90 return false; 91 } 92 93 void thread_dispatcher::destroy_client(thread_dispatcher_client* client) { 94 client->~thread_dispatcher_client(); 95 cache_aligned_deallocate(client); 96 } 97 98 // Should be called under lock 99 void thread_dispatcher::insert_client(thread_dispatcher_client& client) { 100 __TBB_ASSERT(client.priority_level() < num_priority_levels, nullptr); 101 my_client_list[client.priority_level()].push_front(client); 102 103 __TBB_ASSERT(!my_next_client || my_next_client->priority_level() < num_priority_levels, nullptr); 104 my_next_client = select_next_client(my_next_client); 105 } 106 107 // Should be called under lock 108 void thread_dispatcher::remove_client(thread_dispatcher_client& client) { 109 __TBB_ASSERT(client.priority_level() < num_priority_levels, nullptr); 110 my_client_list[client.priority_level()].remove(client); 111 112 if (my_next_client == &client) { 113 my_next_client = nullptr; 114 } 115 my_next_client = select_next_client(my_next_client); 116 } 117 118 bool thread_dispatcher::is_client_alive(thread_dispatcher_client* client) { 119 if (!client) { 120 return false; 121 } 122 123 // Still cannot access internals of the client since the object itself might be destroyed. 124 for (auto& priority_list : my_client_list) { 125 for (auto& c : priority_list) { 126 if (client == &c) { 127 return true; 128 } 129 } 130 } 131 return false; 132 } 133 134 thread_dispatcher_client* thread_dispatcher::client_in_need(client_list_type* clients, thread_dispatcher_client* hint) { 135 // TODO: make sure client with higher priority returned only if there are available slots in it. 136 hint = select_next_client(hint); 137 if (!hint) { 138 return nullptr; 139 } 140 141 client_list_type::iterator it = hint; 142 unsigned curr_priority_level = hint->priority_level(); 143 __TBB_ASSERT(it != clients[curr_priority_level].end(), nullptr); 144 do { 145 thread_dispatcher_client& t = *it; 146 if (++it == clients[curr_priority_level].end()) { 147 do { 148 ++curr_priority_level %= num_priority_levels; 149 } while (clients[curr_priority_level].empty()); 150 it = clients[curr_priority_level].begin(); 151 } 152 if (t.try_join()) { 153 return &t; 154 } 155 } while (it != hint); 156 return nullptr; 157 } 158 159 thread_dispatcher_client* thread_dispatcher::client_in_need(thread_dispatcher_client* prev) { 160 client_list_mutex_type::scoped_lock lock(my_list_mutex, /*is_writer=*/false); 161 if (is_client_alive(prev)) { 162 return client_in_need(my_client_list, prev); 163 } 164 return client_in_need(my_client_list, my_next_client); 165 } 166 167 void thread_dispatcher::adjust_job_count_estimate(int delta) { 168 my_server->adjust_job_count_estimate(delta); 169 } 170 171 void thread_dispatcher::release(bool blocking_terminate) { 172 my_join_workers = blocking_terminate; 173 my_server->request_close_connection(); 174 } 175 176 void thread_dispatcher::process(job& j) { 177 thread_data& td = static_cast<thread_data&>(j); 178 // td.my_last_client can be dead. Don't access it until client_in_need is called 179 thread_dispatcher_client* client = td.my_last_client; 180 for (int i = 0; i < 2; ++i) { 181 while ((client = client_in_need(client)) ) { 182 td.my_last_client = client; 183 client->process(td); 184 } 185 // Workers leave thread_dispatcher because there is no client in need. It can happen earlier than 186 // adjust_job_count_estimate() decreases my_slack and RML can put this thread to sleep. 187 // It might result in a busy-loop checking for my_slack<0 and calling this method instantly. 188 // the yield refines this spinning. 189 if ( !i ) { 190 yield(); 191 } 192 } 193 } 194 195 196 //! Used when RML asks for join mode during workers termination. 197 bool thread_dispatcher::must_join_workers() const { return my_join_workers; } 198 199 //! Returns the requested stack size of worker threads. 200 std::size_t thread_dispatcher::worker_stack_size() const { return my_stack_size; } 201 202 void thread_dispatcher::acknowledge_close_connection() { 203 my_threading_control.destroy(); 204 } 205 206 ::rml::job* thread_dispatcher::create_one_job() { 207 unsigned short index = ++my_first_unused_worker_idx; 208 __TBB_ASSERT(index > 0, nullptr); 209 ITT_THREAD_SET_NAME(_T("TBB Worker Thread")); 210 // index serves as a hint decreasing conflicts between workers when they migrate between arenas 211 thread_data* td = new (cache_aligned_allocate(sizeof(thread_data))) thread_data{ index, true }; 212 __TBB_ASSERT(index <= my_num_workers_hard_limit, nullptr); 213 my_threading_control.register_thread(*td); 214 return td; 215 } 216 217 void thread_dispatcher::cleanup(job& j) { 218 my_threading_control.unregister_thread(static_cast<thread_data&>(j)); 219 governor::auto_terminate(&j); 220 } 221 222 } // namespace r1 223 } // namespace detail 224 } // namespace tbb 225