xref: /oneTBB/python/tbb/__init__.py (revision 4eda0f93)
1#!/usr/bin/env python3
2#
3# Copyright (c) 2016-2022 Intel Corporation
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import multiprocessing.pool
18import ctypes
19import atexit
20import sys
21import os
22
23import platform
24
25if "Windows" in platform.system():
26    import site
27    path_to_env = site.getsitepackages()[0]
28    path_to_libs = os.path.join(path_to_env, "Library", "bin")
29    if sys.version_info.minor >= 8:
30        os.add_dll_directory(path_to_libs)
31    os.environ['PATH'] += os.pathsep + path_to_libs
32
33
34from .api import  *
35from .api import __all__ as api__all
36from .pool import *
37from .pool import __all__ as pool__all
38
39__all__ = ["Monkey", "is_active"] + api__all + pool__all
40
41__doc__ = """
42Python API for Intel(R) oneAPI Threading Building Blocks (oneTBB)
43extended with standard Python's pools implementation and monkey-patching.
44
45Command-line interface example:
46$  python3 -m tbb $your_script.py
47Runs your_script.py in context of tbb.Monkey
48"""
49
50is_active = False
51""" Indicates whether oneTBB context is activated """
52
53ipc_enabled = False
54""" Indicates whether IPC mode is enabled """
55
56libirml = "libirml.so.1"
57
58
59def _test(arg=None):
60    """Some tests"""
61    import platform
62    if platform.system() == "Linux":
63        ctypes.CDLL(libirml)
64        assert 256 == os.system("ldd "+_api.__file__+"| grep -E 'libimf|libsvml|libintlc'") # nosec
65    from .test import test
66    test(arg)
67    print("done")
68
69
70def tbb_process_pool_worker27(inqueue, outqueue, initializer=None, initargs=(),
71                            maxtasks=None):
72    from multiprocessing.pool import worker
73    worker(inqueue, outqueue, initializer, initargs, maxtasks)
74    if ipc_enabled:
75        try:
76            librml = ctypes.CDLL(libirml)
77            librml.release_resources()
78        except:
79            print("Warning: Can not load ", libirml, file=sys.stderr)
80
81
82class TBBProcessPool27(multiprocessing.pool.Pool):
83    def _repopulate_pool(self):
84        """Bring the number of pool processes up to the specified number,
85        for use after reaping workers which have exited.
86        """
87        from multiprocessing.util import debug
88
89        for i in range(self._processes - len(self._pool)):
90            w = self.Process(target=tbb_process_pool_worker27,
91                             args=(self._inqueue, self._outqueue,
92                                   self._initializer,
93                                   self._initargs, self._maxtasksperchild)
94                            )
95            self._pool.append(w)
96            w.name = w.name.replace('Process', 'PoolWorker')
97            w.daemon = True
98            w.start()
99            debug('added worker')
100
101    def __del__(self):
102        self.close()
103        for p in self._pool:
104            p.join()
105
106    def __exit__(self, *args):
107        self.close()
108        for p in self._pool:
109            p.join()
110
111
112def tbb_process_pool_worker3(inqueue, outqueue, initializer=None, initargs=(),
113                            maxtasks=None, wrap_exception=False):
114    from multiprocessing.pool import worker
115    worker(inqueue, outqueue, initializer, initargs, maxtasks, wrap_exception)
116    if ipc_enabled:
117        try:
118            librml = ctypes.CDLL(libirml)
119            librml.release_resources()
120        except:
121            print("Warning: Can not load ", libirml, file=sys.stderr)
122
123
124class TBBProcessPool3(multiprocessing.pool.Pool):
125    def _repopulate_pool(self):
126        """Bring the number of pool processes up to the specified number,
127        for use after reaping workers which have exited.
128        """
129        from multiprocessing.util import debug
130
131        for i in range(self._processes - len(self._pool)):
132            w = self.Process(target=tbb_process_pool_worker3,
133                             args=(self._inqueue, self._outqueue,
134                                   self._initializer,
135                                   self._initargs, self._maxtasksperchild,
136                                   self._wrap_exception)
137                            )
138            self._pool.append(w)
139            w.name = w.name.replace('Process', 'PoolWorker')
140            w.daemon = True
141            w.start()
142            debug('added worker')
143
144    def __del__(self):
145        self.close()
146        for p in self._pool:
147            p.join()
148
149    def __exit__(self, *args):
150        self.close()
151        for p in self._pool:
152            p.join()
153
154
155class Monkey:
156    """
157    Context manager which replaces standard multiprocessing.pool
158    implementations with tbb.pool using monkey-patching. It also enables oneTBB
159    threading for Intel(R) oneAPI Math Kernel Library (oneMKL). For example:
160
161        with tbb.Monkey():
162            run_my_numpy_code()
163
164    It allows multiple parallel tasks to be executed on the same thread pool
165    and coordinate number of threads across multiple processes thus avoiding
166    overheads from oversubscription.
167    """
168    _items   = {}
169    _modules = {}
170
171    def __init__(self, max_num_threads=None, benchmark=False):
172        """
173        Create context manager for running under TBB scheduler.
174        :param max_num_threads: if specified, limits maximal number of threads
175        :param benchmark: if specified, blocks in initialization until requested number of threads are ready
176        """
177        if max_num_threads:
178            self.ctl = global_control(global_control.max_allowed_parallelism, int(max_num_threads))
179        if benchmark:
180            if not max_num_threads:
181               max_num_threads = default_num_threads()
182            from .api import _concurrency_barrier
183            _concurrency_barrier(int(max_num_threads))
184
185    def _patch(self, class_name, module_name, obj):
186        m = self._modules[class_name] = __import__(module_name, globals(),
187                                                   locals(), [class_name])
188        if m == None:
189            return
190        oldattr = getattr(m, class_name, None)
191        if oldattr == None:
192            self._modules[class_name] = None
193            return
194        self._items[class_name] = oldattr
195        setattr(m, class_name, obj)
196
197    def __enter__(self):
198        global is_active
199        assert is_active == False, "tbb.Monkey does not support nesting yet"
200        is_active = True
201        self.env_mkl = os.getenv('MKL_THREADING_LAYER')
202        os.environ['MKL_THREADING_LAYER'] = 'TBB'
203        self.env_numba = os.getenv('NUMBA_THREADING_LAYER')
204        os.environ['NUMBA_THREADING_LAYER'] = 'TBB'
205
206        if ipc_enabled:
207            if sys.version_info.major == 2 and sys.version_info.minor >= 7:
208                self._patch("Pool", "multiprocessing.pool", TBBProcessPool27)
209            elif sys.version_info.major == 3 and sys.version_info.minor >= 5:
210                self._patch("Pool", "multiprocessing.pool", TBBProcessPool3)
211        self._patch("ThreadPool", "multiprocessing.pool", Pool)
212        return self
213
214    def __exit__(self, exc_type, exc_value, traceback):
215        global is_active
216        assert is_active == True, "modified?"
217        is_active = False
218        if self.env_mkl is None:
219            del os.environ['MKL_THREADING_LAYER']
220        else:
221            os.environ['MKL_THREADING_LAYER'] = self.env_mkl
222        if self.env_numba is None:
223            del os.environ['NUMBA_THREADING_LAYER']
224        else:
225            os.environ['NUMBA_THREADING_LAYER'] = self.env_numba
226        for name in self._items.keys():
227            setattr(self._modules[name], name, self._items[name])
228
229
230def init_sem_name():
231    try:
232        librml = ctypes.CDLL(libirml)
233        librml.set_active_sem_name()
234        librml.set_stop_sem_name()
235    except Exception as e:
236        print("Warning: Can not initialize name of shared semaphores:", e,
237              file=sys.stderr)
238
239
240def tbb_atexit():
241    if ipc_enabled:
242        try:
243            librml = ctypes.CDLL(libirml)
244            librml.release_semaphores()
245        except:
246            print("Warning: Can not release shared semaphores",
247                  file=sys.stderr)
248
249
250def _main():
251    # Run the module specified as the next command line argument
252    # python3 -m TBB user_app.py
253    global ipc_enabled
254
255    import platform
256    import argparse
257    parser = argparse.ArgumentParser(prog="python3 -m tbb", description="""
258                Run your Python script in context of tbb.Monkey, which
259                replaces standard Python pools and threading layer of
260                Intel(R) oneAPI Math Kernel Library (oneMKL) by implementation based on
261                Intel(R) oneAPI Threading Building Blocks (oneTBB). It enables multiple parallel
262                tasks to be executed on the same thread pool and coordinate
263                number of threads across multiple processes thus avoiding
264                overheads from oversubscription.
265             """, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
266    if platform.system() == "Linux":
267        parser.add_argument('--ipc', action='store_true',
268                        help="Enable inter-process (IPC) coordination between oneTBB schedulers")
269        parser.add_argument('-a', '--allocator', action='store_true',
270                        help="Enable oneTBB scalable allocator as a replacement for standard memory allocator")
271        parser.add_argument('--allocator-huge-pages', action='store_true',
272                        help="Enable huge pages for oneTBB allocator (implies: -a)")
273    parser.add_argument('-p', '--max-num-threads', default=default_num_threads(), type=int,
274                        help="Initialize oneTBB with P max number of threads per process", metavar='P')
275    parser.add_argument('-b', '--benchmark', action='store_true',
276                        help="Block oneTBB initialization until all the threads are created before continue the script. "
277                        "This is necessary for performance benchmarks that want to exclude lazy scheduler initialization effects from the measurements")
278    parser.add_argument('-v', '--verbose', action='store_true',
279                        help="Request verbose and version information")
280    parser.add_argument('-m', action='store_true', dest='module',
281                        help="Executes following as a module")
282    parser.add_argument('name', help="Script or module name")
283    parser.add_argument('args', nargs=argparse.REMAINDER,
284                        help="Command line arguments")
285    args = parser.parse_args()
286
287    if args.verbose:
288        os.environ["TBB_VERSION"] = "1"
289    if platform.system() == "Linux":
290        if args.allocator_huge_pages:
291            args.allocator = True
292        if args.allocator and not os.environ.get("_TBB_MALLOC_PRELOAD"):
293            libtbbmalloc_lib = 'libtbbmalloc_proxy.so.2'
294            ld_preload = 'LD_PRELOAD'
295            os.environ["_TBB_MALLOC_PRELOAD"] = "1"
296            preload_list = filter(None, os.environ.get(ld_preload, "").split(':'))
297            if libtbbmalloc_lib in preload_list:
298                print('Info:', ld_preload, "contains", libtbbmalloc_lib, "already\n")
299            else:
300                os.environ[ld_preload] = ':'.join([libtbbmalloc_lib] + list(preload_list))
301
302            if args.allocator_huge_pages:
303                assert platform.system() == "Linux"
304                try:
305                    with open('/proc/sys/vm/nr_hugepages', 'r') as f:
306                        pages = int(f.read())
307                    if pages == 0:
308                        print("oneTBB: Pre-allocated huge pages are not currently reserved in the system. To reserve, run e.g.:\n"
309                              "\tsudo sh -c 'echo 2000 > /proc/sys/vm/nr_hugepages'")
310                    os.environ["TBB_MALLOC_USE_HUGE_PAGES"] = "1"
311                except:
312                    print("oneTBB: Failed to read number of pages from /proc/sys/vm/nr_hugepages\n"
313                          "\tIs the Linux kernel configured with the huge pages feature?")
314                    sys.exit(1)
315
316            os.execl(sys.executable, sys.executable, '-m', 'tbb', *sys.argv[1:])
317            assert False, "Re-execution failed"
318
319    sys.argv = [args.name] + args.args
320    ipc_enabled = platform.system() == "Linux" and args.ipc
321    os.environ["IPC_ENABLE"] = "1" if ipc_enabled else "0"
322    if ipc_enabled:
323        atexit.register(tbb_atexit)
324        init_sem_name()
325    if not os.environ.get("KMP_BLOCKTIME"): # TODO move
326        os.environ["KMP_BLOCKTIME"] = "0"
327    if '_' + args.name in globals():
328        return globals()['_' + args.name](*args.args)
329    else:
330        import runpy
331        runf = runpy.run_module if args.module else runpy.run_path
332        with Monkey(max_num_threads=args.max_num_threads, benchmark=args.benchmark):
333            runf(args.name, run_name='__main__')
334