xref: /oneTBB/python/tbb/__init__.py (revision 6caecf96)
1#!/usr/bin/env python3
2#
3# Copyright (c) 2016-2021 Intel Corporation
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import multiprocessing.pool
18import ctypes
19import atexit
20import sys
21import os
22
23from .api import  *
24from .api import __all__ as api__all
25from .pool import *
26from .pool import __all__ as pool__all
27
28__all__ = ["Monkey", "is_active"] + api__all + pool__all
29
30__doc__ = """
31Python API for Intel(R) oneAPI Threading Building Blocks (oneTBB)
32extended with standard Python's pools implementation and monkey-patching.
33
34Command-line interface example:
35$  python3 -m tbb $your_script.py
36Runs your_script.py in context of tbb.Monkey
37"""
38
39is_active = False
40""" Indicates whether oneTBB context is activated """
41
42ipc_enabled = False
43""" Indicates whether IPC mode is enabled """
44
45libirml = "libirml.so.1"
46
47
48def _test(arg=None):
49    """Some tests"""
50    import platform
51    if platform.system() == "Linux":
52        ctypes.CDLL(libirml)
53        assert 256 == os.system("ldd "+_api.__file__+"| grep -E 'libimf|libsvml|libintlc'") # nosec
54    from .test import test
55    test(arg)
56    print("done")
57
58
59def tbb_process_pool_worker27(inqueue, outqueue, initializer=None, initargs=(),
60                            maxtasks=None):
61    from multiprocessing.pool import worker
62    worker(inqueue, outqueue, initializer, initargs, maxtasks)
63    if ipc_enabled:
64        try:
65            librml = ctypes.CDLL(libirml)
66            librml.release_resources()
67        except:
68            print("Warning: Can not load ", libirml, file=sys.stderr)
69
70
71class TBBProcessPool27(multiprocessing.pool.Pool):
72    def _repopulate_pool(self):
73        """Bring the number of pool processes up to the specified number,
74        for use after reaping workers which have exited.
75        """
76        from multiprocessing.util import debug
77
78        for i in range(self._processes - len(self._pool)):
79            w = self.Process(target=tbb_process_pool_worker27,
80                             args=(self._inqueue, self._outqueue,
81                                   self._initializer,
82                                   self._initargs, self._maxtasksperchild)
83                            )
84            self._pool.append(w)
85            w.name = w.name.replace('Process', 'PoolWorker')
86            w.daemon = True
87            w.start()
88            debug('added worker')
89
90    def __del__(self):
91        self.close()
92        for p in self._pool:
93            p.join()
94
95    def __exit__(self, *args):
96        self.close()
97        for p in self._pool:
98            p.join()
99
100
101def tbb_process_pool_worker3(inqueue, outqueue, initializer=None, initargs=(),
102                            maxtasks=None, wrap_exception=False):
103    from multiprocessing.pool import worker
104    worker(inqueue, outqueue, initializer, initargs, maxtasks, wrap_exception)
105    if ipc_enabled:
106        try:
107            librml = ctypes.CDLL(libirml)
108            librml.release_resources()
109        except:
110            print("Warning: Can not load ", libirml, file=sys.stderr)
111
112
113class TBBProcessPool3(multiprocessing.pool.Pool):
114    def _repopulate_pool(self):
115        """Bring the number of pool processes up to the specified number,
116        for use after reaping workers which have exited.
117        """
118        from multiprocessing.util import debug
119
120        for i in range(self._processes - len(self._pool)):
121            w = self.Process(target=tbb_process_pool_worker3,
122                             args=(self._inqueue, self._outqueue,
123                                   self._initializer,
124                                   self._initargs, self._maxtasksperchild,
125                                   self._wrap_exception)
126                            )
127            self._pool.append(w)
128            w.name = w.name.replace('Process', 'PoolWorker')
129            w.daemon = True
130            w.start()
131            debug('added worker')
132
133    def __del__(self):
134        self.close()
135        for p in self._pool:
136            p.join()
137
138    def __exit__(self, *args):
139        self.close()
140        for p in self._pool:
141            p.join()
142
143
144class Monkey:
145    """
146    Context manager which replaces standard multiprocessing.pool
147    implementations with tbb.pool using monkey-patching. It also enables oneTBB
148    threading for Intel(R) oneAPI Math Kernel Library (oneMKL). For example:
149
150        with tbb.Monkey():
151            run_my_numpy_code()
152
153    It allows multiple parallel tasks to be executed on the same thread pool
154    and coordinate number of threads across multiple processes thus avoiding
155    overheads from oversubscription.
156    """
157    _items   = {}
158    _modules = {}
159
160    def __init__(self, max_num_threads=None, benchmark=False):
161        """
162        Create context manager for running under TBB scheduler.
163        :param max_num_threads: if specified, limits maximal number of threads
164        :param benchmark: if specified, blocks in initialization until requested number of threads are ready
165        """
166        if max_num_threads:
167            self.ctl = global_control(global_control.max_allowed_parallelism, int(max_num_threads))
168        if benchmark:
169            if not max_num_threads:
170               max_num_threads = default_num_threads()
171            from .api import _concurrency_barrier
172            _concurrency_barrier(int(max_num_threads))
173
174    def _patch(self, class_name, module_name, obj):
175        m = self._modules[class_name] = __import__(module_name, globals(),
176                                                   locals(), [class_name])
177        if m == None:
178            return
179        oldattr = getattr(m, class_name, None)
180        if oldattr == None:
181            self._modules[class_name] = None
182            return
183        self._items[class_name] = oldattr
184        setattr(m, class_name, obj)
185
186    def __enter__(self):
187        global is_active
188        assert is_active == False, "tbb.Monkey does not support nesting yet"
189        is_active = True
190        self.env_mkl = os.getenv('MKL_THREADING_LAYER')
191        os.environ['MKL_THREADING_LAYER'] = 'TBB'
192        self.env_numba = os.getenv('NUMBA_THREADING_LAYER')
193        os.environ['NUMBA_THREADING_LAYER'] = 'TBB'
194
195        if ipc_enabled:
196            if sys.version_info.major == 2 and sys.version_info.minor >= 7:
197                self._patch("Pool", "multiprocessing.pool", TBBProcessPool27)
198            elif sys.version_info.major == 3 and sys.version_info.minor >= 5:
199                self._patch("Pool", "multiprocessing.pool", TBBProcessPool3)
200        self._patch("ThreadPool", "multiprocessing.pool", Pool)
201        return self
202
203    def __exit__(self, exc_type, exc_value, traceback):
204        global is_active
205        assert is_active == True, "modified?"
206        is_active = False
207        if self.env_mkl is None:
208            del os.environ['MKL_THREADING_LAYER']
209        else:
210            os.environ['MKL_THREADING_LAYER'] = self.env_mkl
211        if self.env_numba is None:
212            del os.environ['NUMBA_THREADING_LAYER']
213        else:
214            os.environ['NUMBA_THREADING_LAYER'] = self.env_numba
215        for name in self._items.keys():
216            setattr(self._modules[name], name, self._items[name])
217
218
219def init_sem_name():
220    try:
221        librml = ctypes.CDLL(libirml)
222        librml.set_active_sem_name()
223        librml.set_stop_sem_name()
224    except Exception as e:
225        print("Warning: Can not initialize name of shared semaphores:", e,
226              file=sys.stderr)
227
228
229def tbb_atexit():
230    if ipc_enabled:
231        try:
232            librml = ctypes.CDLL(libirml)
233            librml.release_semaphores()
234        except:
235            print("Warning: Can not release shared semaphores",
236                  file=sys.stderr)
237
238
239def _main():
240    # Run the module specified as the next command line argument
241    # python3 -m TBB user_app.py
242    global ipc_enabled
243
244    import platform
245    import argparse
246    parser = argparse.ArgumentParser(prog="python3 -m tbb", description="""
247                Run your Python script in context of tbb.Monkey, which
248                replaces standard Python pools and threading layer of
249                Intel(R) oneAPI Math Kernel Library (oneMKL) by implementation based on
250                Intel(R) oneAPI Threading Building Blocks (oneTBB). It enables multiple parallel
251                tasks to be executed on the same thread pool and coordinate
252                number of threads across multiple processes thus avoiding
253                overheads from oversubscription.
254             """, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
255    if platform.system() == "Linux":
256        parser.add_argument('--ipc', action='store_true',
257                        help="Enable inter-process (IPC) coordination between oneTBB schedulers")
258        parser.add_argument('-a', '--allocator', action='store_true',
259                        help="Enable oneTBB scalable allocator as a replacement for standard memory allocator")
260        parser.add_argument('--allocator-huge-pages', action='store_true',
261                        help="Enable huge pages for oneTBB allocator (implies: -a)")
262    parser.add_argument('-p', '--max-num-threads', default=default_num_threads(), type=int,
263                        help="Initialize oneTBB with P max number of threads per process", metavar='P')
264    parser.add_argument('-b', '--benchmark', action='store_true',
265                        help="Block oneTBB initialization until all the threads are created before continue the script. "
266                        "This is necessary for performance benchmarks that want to exclude lazy scheduler initialization effects from the measurements")
267    parser.add_argument('-v', '--verbose', action='store_true',
268                        help="Request verbose and version information")
269    parser.add_argument('-m', action='store_true', dest='module',
270                        help="Executes following as a module")
271    parser.add_argument('name', help="Script or module name")
272    parser.add_argument('args', nargs=argparse.REMAINDER,
273                        help="Command line arguments")
274    args = parser.parse_args()
275
276    if args.verbose:
277        os.environ["TBB_VERSION"] = "1"
278    if platform.system() == "Linux":
279        if args.allocator_huge_pages:
280            args.allocator = True
281        if args.allocator and not os.environ.get("_TBB_MALLOC_PRELOAD"):
282            libtbbmalloc_lib = 'libtbbmalloc_proxy.so.2'
283            ld_preload = 'LD_PRELOAD'
284            os.environ["_TBB_MALLOC_PRELOAD"] = "1"
285            preload_list = filter(None, os.environ.get(ld_preload, "").split(':'))
286            if libtbbmalloc_lib in preload_list:
287                print('Info:', ld_preload, "contains", libtbbmalloc_lib, "already\n")
288            else:
289                os.environ[ld_preload] = ':'.join([libtbbmalloc_lib] + list(preload_list))
290
291            if args.allocator_huge_pages:
292                assert platform.system() == "Linux"
293                try:
294                    with open('/proc/sys/vm/nr_hugepages', 'r') as f:
295                        pages = int(f.read())
296                    if pages == 0:
297                        print("oneTBB: Pre-allocated huge pages are not currently reserved in the system. To reserve, run e.g.:\n"
298                              "\tsudo sh -c 'echo 2000 > /proc/sys/vm/nr_hugepages'")
299                    os.environ["TBB_MALLOC_USE_HUGE_PAGES"] = "1"
300                except:
301                    print("oneTBB: Failed to read number of pages from /proc/sys/vm/nr_hugepages\n"
302                          "\tIs the Linux kernel configured with the huge pages feature?")
303                    sys.exit(1)
304
305            os.execl(sys.executable, sys.executable, '-m', 'tbb', *sys.argv[1:])
306            assert False, "Re-execution failed"
307
308    sys.argv = [args.name] + args.args
309    ipc_enabled = platform.system() == "Linux" and args.ipc
310    os.environ["IPC_ENABLE"] = "1" if ipc_enabled else "0"
311    if ipc_enabled:
312        atexit.register(tbb_atexit)
313        init_sem_name()
314    if not os.environ.get("KMP_BLOCKTIME"): # TODO move
315        os.environ["KMP_BLOCKTIME"] = "0"
316    if '_' + args.name in globals():
317        return globals()['_' + args.name](*args.args)
318    else:
319        import runpy
320        runf = runpy.run_module if args.module else runpy.run_path
321        with Monkey(max_num_threads=args.max_num_threads, benchmark=args.benchmark):
322            runf(args.name, run_name='__main__')
323