xref: /oneTBB/python/tbb/__init__.py (revision 9acde482)
1# Copyright (c) 2016-2023 Intel Corporation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import multiprocessing.pool
16import ctypes
17import atexit
18import sys
19import os
20
21import platform
22
23if "Windows" in platform.system():
24    import site
25    path_to_env = site.getsitepackages()[0]
26    path_to_libs = os.path.join(path_to_env, "Library", "bin")
27    if sys.version_info.minor >= 8:
28        os.add_dll_directory(path_to_libs)
29    os.environ['PATH'] += os.pathsep + path_to_libs
30
31
32from .api import  *
33from .api import __all__ as api__all
34from .pool import *
35from .pool import __all__ as pool__all
36
37__all__ = ["Monkey", "is_active"] + api__all + pool__all
38
39__doc__ = """
40Python API for Intel(R) oneAPI Threading Building Blocks (oneTBB)
41extended with standard Python's pools implementation and monkey-patching.
42
43Command-line interface example:
44$  python3 -m tbb $your_script.py
45Runs your_script.py in context of tbb.Monkey
46"""
47
48is_active = False
49""" Indicates whether oneTBB context is activated """
50
51ipc_enabled = False
52""" Indicates whether IPC mode is enabled """
53
54libirml = "libirml.so.1"
55
56
57def _test(arg=None):
58    """Some tests"""
59    import platform
60    if platform.system() == "Linux":
61        ctypes.CDLL(libirml)
62        assert 256 == os.system("ldd "+_api.__file__+"| grep -E 'libimf|libsvml|libintlc'") # nosec
63    from .test import test
64    test(arg)
65    print("done")
66
67
68def tbb_process_pool_worker27(inqueue, outqueue, initializer=None, initargs=(),
69                            maxtasks=None):
70    from multiprocessing.pool import worker
71    worker(inqueue, outqueue, initializer, initargs, maxtasks)
72    if ipc_enabled:
73        try:
74            librml = ctypes.CDLL(libirml)
75            librml.release_resources()
76        except:
77            print("Warning: Can not load ", libirml, file=sys.stderr)
78
79
80class TBBProcessPool27(multiprocessing.pool.Pool):
81    def _repopulate_pool(self):
82        """Bring the number of pool processes up to the specified number,
83        for use after reaping workers which have exited.
84        """
85        from multiprocessing.util import debug
86
87        for i in range(self._processes - len(self._pool)):
88            w = self.Process(target=tbb_process_pool_worker27,
89                             args=(self._inqueue, self._outqueue,
90                                   self._initializer,
91                                   self._initargs, self._maxtasksperchild)
92                            )
93            self._pool.append(w)
94            w.name = w.name.replace('Process', 'PoolWorker')
95            w.daemon = True
96            w.start()
97            debug('added worker')
98
99    def __del__(self):
100        self.close()
101        for p in self._pool:
102            p.join()
103
104    def __exit__(self, *args):
105        self.close()
106        for p in self._pool:
107            p.join()
108
109
110def tbb_process_pool_worker3(inqueue, outqueue, initializer=None, initargs=(),
111                            maxtasks=None, wrap_exception=False):
112    from multiprocessing.pool import worker
113    worker(inqueue, outqueue, initializer, initargs, maxtasks, wrap_exception)
114    if ipc_enabled:
115        try:
116            librml = ctypes.CDLL(libirml)
117            librml.release_resources()
118        except:
119            print("Warning: Can not load ", libirml, file=sys.stderr)
120
121
122class TBBProcessPool3(multiprocessing.pool.Pool):
123    def _repopulate_pool(self):
124        """Bring the number of pool processes up to the specified number,
125        for use after reaping workers which have exited.
126        """
127        from multiprocessing.util import debug
128
129        for i in range(self._processes - len(self._pool)):
130            w = self.Process(target=tbb_process_pool_worker3,
131                             args=(self._inqueue, self._outqueue,
132                                   self._initializer,
133                                   self._initargs, self._maxtasksperchild,
134                                   self._wrap_exception)
135                            )
136            self._pool.append(w)
137            w.name = w.name.replace('Process', 'PoolWorker')
138            w.daemon = True
139            w.start()
140            debug('added worker')
141
142    def __del__(self):
143        self.close()
144        for p in self._pool:
145            p.join()
146
147    def __exit__(self, *args):
148        self.close()
149        for p in self._pool:
150            p.join()
151
152
153class Monkey:
154    """
155    Context manager which replaces standard multiprocessing.pool
156    implementations with tbb.pool using monkey-patching. It also enables oneTBB
157    threading for Intel(R) oneAPI Math Kernel Library (oneMKL). For example:
158
159        with tbb.Monkey():
160            run_my_numpy_code()
161
162    It allows multiple parallel tasks to be executed on the same thread pool
163    and coordinate number of threads across multiple processes thus avoiding
164    overheads from oversubscription.
165    """
166    _items   = {}
167    _modules = {}
168
169    def __init__(self, max_num_threads=None, benchmark=False):
170        """
171        Create context manager for running under TBB scheduler.
172        :param max_num_threads: if specified, limits maximal number of threads
173        :param benchmark: if specified, blocks in initialization until requested number of threads are ready
174        """
175        if max_num_threads:
176            self.ctl = global_control(global_control.max_allowed_parallelism, int(max_num_threads))
177        if benchmark:
178            if not max_num_threads:
179               max_num_threads = default_num_threads()
180            from .api import _concurrency_barrier
181            _concurrency_barrier(int(max_num_threads))
182
183    def _patch(self, class_name, module_name, obj):
184        m = self._modules[class_name] = __import__(module_name, globals(),
185                                                   locals(), [class_name])
186        if m == None:
187            return
188        oldattr = getattr(m, class_name, None)
189        if oldattr == None:
190            self._modules[class_name] = None
191            return
192        self._items[class_name] = oldattr
193        setattr(m, class_name, obj)
194
195    def __enter__(self):
196        global is_active
197        assert is_active == False, "tbb.Monkey does not support nesting yet"
198        is_active = True
199        self.env_mkl = os.getenv('MKL_THREADING_LAYER')
200        os.environ['MKL_THREADING_LAYER'] = 'TBB'
201        self.env_numba = os.getenv('NUMBA_THREADING_LAYER')
202        os.environ['NUMBA_THREADING_LAYER'] = 'TBB'
203
204        if ipc_enabled:
205            if sys.version_info.major == 2 and sys.version_info.minor >= 7:
206                self._patch("Pool", "multiprocessing.pool", TBBProcessPool27)
207            elif sys.version_info.major == 3 and sys.version_info.minor >= 5:
208                self._patch("Pool", "multiprocessing.pool", TBBProcessPool3)
209        self._patch("ThreadPool", "multiprocessing.pool", Pool)
210        return self
211
212    def __exit__(self, exc_type, exc_value, traceback):
213        global is_active
214        assert is_active == True, "modified?"
215        is_active = False
216        if self.env_mkl is None:
217            del os.environ['MKL_THREADING_LAYER']
218        else:
219            os.environ['MKL_THREADING_LAYER'] = self.env_mkl
220        if self.env_numba is None:
221            del os.environ['NUMBA_THREADING_LAYER']
222        else:
223            os.environ['NUMBA_THREADING_LAYER'] = self.env_numba
224        for name in self._items.keys():
225            setattr(self._modules[name], name, self._items[name])
226
227
228def init_sem_name():
229    try:
230        librml = ctypes.CDLL(libirml)
231        librml.set_active_sem_name()
232        librml.set_stop_sem_name()
233    except Exception as e:
234        print("Warning: Can not initialize name of shared semaphores:", e,
235              file=sys.stderr)
236
237
238def tbb_atexit():
239    if ipc_enabled:
240        try:
241            librml = ctypes.CDLL(libirml)
242            librml.release_semaphores()
243        except:
244            print("Warning: Can not release shared semaphores",
245                  file=sys.stderr)
246
247
248def _main():
249    # Run the module specified as the next command line argument
250    # python3 -m TBB user_app.py
251    global ipc_enabled
252
253    import platform
254    import argparse
255    parser = argparse.ArgumentParser(prog="python3 -m tbb", description="""
256                Run your Python script in context of tbb.Monkey, which
257                replaces standard Python pools and threading layer of
258                Intel(R) oneAPI Math Kernel Library (oneMKL) by implementation based on
259                Intel(R) oneAPI Threading Building Blocks (oneTBB). It enables multiple parallel
260                tasks to be executed on the same thread pool and coordinate
261                number of threads across multiple processes thus avoiding
262                overheads from oversubscription.
263             """, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
264    if platform.system() == "Linux":
265        parser.add_argument('--ipc', action='store_true',
266                        help="Enable inter-process (IPC) coordination between oneTBB schedulers")
267        parser.add_argument('-a', '--allocator', action='store_true',
268                        help="Enable oneTBB scalable allocator as a replacement for standard memory allocator")
269        parser.add_argument('--allocator-huge-pages', action='store_true',
270                        help="Enable huge pages for oneTBB allocator (implies: -a)")
271    parser.add_argument('-p', '--max-num-threads', default=default_num_threads(), type=int,
272                        help="Initialize oneTBB with P max number of threads per process", metavar='P')
273    parser.add_argument('-b', '--benchmark', action='store_true',
274                        help="Block oneTBB initialization until all the threads are created before continue the script. "
275                        "This is necessary for performance benchmarks that want to exclude lazy scheduler initialization effects from the measurements")
276    parser.add_argument('-v', '--verbose', action='store_true',
277                        help="Request verbose and version information")
278    parser.add_argument('-m', action='store_true', dest='module',
279                        help="Executes following as a module")
280    parser.add_argument('name', help="Script or module name")
281    parser.add_argument('args', nargs=argparse.REMAINDER,
282                        help="Command line arguments")
283    args = parser.parse_args()
284
285    if args.verbose:
286        os.environ["TBB_VERSION"] = "1"
287    if platform.system() == "Linux":
288        if args.allocator_huge_pages:
289            args.allocator = True
290        if args.allocator and not os.environ.get("_TBB_MALLOC_PRELOAD"):
291            libtbbmalloc_lib = 'libtbbmalloc_proxy.so.2'
292            ld_preload = 'LD_PRELOAD'
293            os.environ["_TBB_MALLOC_PRELOAD"] = "1"
294            preload_list = filter(None, os.environ.get(ld_preload, "").split(':'))
295            if libtbbmalloc_lib in preload_list:
296                print('Info:', ld_preload, "contains", libtbbmalloc_lib, "already\n")
297            else:
298                os.environ[ld_preload] = ':'.join([libtbbmalloc_lib] + list(preload_list))
299
300            if args.allocator_huge_pages:
301                assert platform.system() == "Linux"
302                try:
303                    with open('/proc/sys/vm/nr_hugepages', 'r') as f:
304                        pages = int(f.read())
305                    if pages == 0:
306                        print("oneTBB: Pre-allocated huge pages are not currently reserved in the system. To reserve, run e.g.:\n"
307                              "\tsudo sh -c 'echo 2000 > /proc/sys/vm/nr_hugepages'")
308                    os.environ["TBB_MALLOC_USE_HUGE_PAGES"] = "1"
309                except:
310                    print("oneTBB: Failed to read number of pages from /proc/sys/vm/nr_hugepages\n"
311                          "\tIs the Linux kernel configured with the huge pages feature?")
312                    sys.exit(1)
313
314            os.execl(sys.executable, sys.executable, '-m', 'tbb', *sys.argv[1:])
315            assert False, "Re-execution failed"
316
317    sys.argv = [args.name] + args.args
318    ipc_enabled = platform.system() == "Linux" and args.ipc
319    os.environ["IPC_ENABLE"] = "1" if ipc_enabled else "0"
320    if ipc_enabled:
321        atexit.register(tbb_atexit)
322        init_sem_name()
323    if not os.environ.get("KMP_BLOCKTIME"): # TODO move
324        os.environ["KMP_BLOCKTIME"] = "0"
325    if '_' + args.name in globals():
326        return globals()['_' + args.name](*args.args)
327    else:
328        import runpy
329        runf = runpy.run_module if args.module else runpy.run_path
330        with Monkey(max_num_threads=args.max_num_threads, benchmark=args.benchmark):
331            runf(args.name, run_name='__main__')
332