1 /*
2     Copyright (c) 2022-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef _TBB_thread_serializer_handlers_H
18 #define _TBB_thread_serializer_handlers_H
19 
20 #include "oneapi/tbb/mutex.h"
21 #include "oneapi/tbb/rw_mutex.h"
22 
23 #include "thread_dispatcher.h"
24 
25 namespace tbb {
26 namespace detail {
27 namespace r1 {
28 
29 class thread_request_observer {
30 protected:
~thread_request_observer()31     virtual ~thread_request_observer() {}
32 public:
33     virtual void update(int delta) = 0;
34 };
35 
36 
37 class thread_request_serializer : public thread_request_observer {
38     using mutex_type = d1::mutex;
39 public:
40     thread_request_serializer(thread_dispatcher& td, int soft_limit);
41     void set_active_num_workers(int soft_limit);
is_no_workers_avaliable()42     bool is_no_workers_avaliable() { return my_soft_limit == 0; }
43 
44 private:
45     friend class thread_request_serializer_proxy;
46     void update(int delta) override;
47     static int limit_delta(int delta, int limit, int new_value);
48 
49     thread_dispatcher& my_thread_dispatcher;
50     int my_soft_limit{ 0 };
51     int my_total_request{ 0 };
52     // my_pending_delta is set to pending_delta_base to have ability to hold negative values
53     // consider increase base since thead number will be bigger than 1 << 15
54     static constexpr std::uint64_t pending_delta_base = 1 << 15;
55     std::atomic<std::uint64_t> my_pending_delta{ pending_delta_base };
56     mutex_type my_mutex;
57 };
58 
59 // Handles mandatory concurrency i.e. enables worker threads for enqueue tasks
60 class thread_request_serializer_proxy : public thread_request_observer {
61     using mutex_type = d1::rw_mutex;
62 public:
63     thread_request_serializer_proxy(thread_dispatcher& td, int soft_limit);
64     void register_mandatory_request(int mandatory_delta);
65     void set_active_num_workers(int soft_limit);
66 
67 private:
68     void update(int delta) override;
69     void enable_mandatory_concurrency(mutex_type::scoped_lock& lock);
70     void disable_mandatory_concurrency(mutex_type::scoped_lock& lock);
71 
72     std::atomic<int> my_num_mandatory_requests{0};
73     bool my_is_mandatory_concurrency_enabled{false};
74     thread_request_serializer my_serializer;
75     mutex_type my_mutex;
76 };
77 
78 } // namespace r1
79 } // namespace detail
80 } // namespace tbb
81 
82 #endif // _TBB_thread_serializer_handlers_H
83