xref: /oneTBB/python/tbb/__init__.py (revision 9acde482)
1*9acde482SJonathan Wakely# Copyright (c) 2016-2023 Intel Corporation
251c0b2f7Stbbdev#
351c0b2f7Stbbdev# Licensed under the Apache License, Version 2.0 (the "License");
451c0b2f7Stbbdev# you may not use this file except in compliance with the License.
551c0b2f7Stbbdev# You may obtain a copy of the License at
651c0b2f7Stbbdev#
751c0b2f7Stbbdev#     http://www.apache.org/licenses/LICENSE-2.0
851c0b2f7Stbbdev#
951c0b2f7Stbbdev# Unless required by applicable law or agreed to in writing, software
1051c0b2f7Stbbdev# distributed under the License is distributed on an "AS IS" BASIS,
1151c0b2f7Stbbdev# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1251c0b2f7Stbbdev# See the License for the specific language governing permissions and
1351c0b2f7Stbbdev# limitations under the License.
1451c0b2f7Stbbdev
1551c0b2f7Stbbdevimport multiprocessing.pool
1651c0b2f7Stbbdevimport ctypes
1751c0b2f7Stbbdevimport atexit
1851c0b2f7Stbbdevimport sys
1951c0b2f7Stbbdevimport os
2051c0b2f7Stbbdev
2135e0f552SSergey Zheltovimport platform
2235e0f552SSergey Zheltov
2335e0f552SSergey Zheltovif "Windows" in platform.system():
2435e0f552SSergey Zheltov    import site
2535e0f552SSergey Zheltov    path_to_env = site.getsitepackages()[0]
2635e0f552SSergey Zheltov    path_to_libs = os.path.join(path_to_env, "Library", "bin")
2735e0f552SSergey Zheltov    if sys.version_info.minor >= 8:
2835e0f552SSergey Zheltov        os.add_dll_directory(path_to_libs)
2935e0f552SSergey Zheltov    os.environ['PATH'] += os.pathsep + path_to_libs
3035e0f552SSergey Zheltov
3135e0f552SSergey Zheltov
3251c0b2f7Stbbdevfrom .api import  *
3351c0b2f7Stbbdevfrom .api import __all__ as api__all
3451c0b2f7Stbbdevfrom .pool import *
3551c0b2f7Stbbdevfrom .pool import __all__ as pool__all
3651c0b2f7Stbbdev
3751c0b2f7Stbbdev__all__ = ["Monkey", "is_active"] + api__all + pool__all
3851c0b2f7Stbbdev
3951c0b2f7Stbbdev__doc__ = """
4051c0b2f7StbbdevPython API for Intel(R) oneAPI Threading Building Blocks (oneTBB)
4151c0b2f7Stbbdevextended with standard Python's pools implementation and monkey-patching.
4251c0b2f7Stbbdev
4351c0b2f7StbbdevCommand-line interface example:
4451c0b2f7Stbbdev$  python3 -m tbb $your_script.py
4551c0b2f7StbbdevRuns your_script.py in context of tbb.Monkey
4651c0b2f7Stbbdev"""
4751c0b2f7Stbbdev
4851c0b2f7Stbbdevis_active = False
4951c0b2f7Stbbdev""" Indicates whether oneTBB context is activated """
5051c0b2f7Stbbdev
5151c0b2f7Stbbdevipc_enabled = False
5251c0b2f7Stbbdev""" Indicates whether IPC mode is enabled """
5351c0b2f7Stbbdev
5451c0b2f7Stbbdevlibirml = "libirml.so.1"
5551c0b2f7Stbbdev
5651c0b2f7Stbbdev
5751c0b2f7Stbbdevdef _test(arg=None):
5851c0b2f7Stbbdev    """Some tests"""
5951c0b2f7Stbbdev    import platform
6051c0b2f7Stbbdev    if platform.system() == "Linux":
6151c0b2f7Stbbdev        ctypes.CDLL(libirml)
628dcbd5b1Stbbdev        assert 256 == os.system("ldd "+_api.__file__+"| grep -E 'libimf|libsvml|libintlc'") # nosec
6351c0b2f7Stbbdev    from .test import test
6451c0b2f7Stbbdev    test(arg)
6551c0b2f7Stbbdev    print("done")
6651c0b2f7Stbbdev
6751c0b2f7Stbbdev
6851c0b2f7Stbbdevdef tbb_process_pool_worker27(inqueue, outqueue, initializer=None, initargs=(),
6951c0b2f7Stbbdev                            maxtasks=None):
7051c0b2f7Stbbdev    from multiprocessing.pool import worker
7151c0b2f7Stbbdev    worker(inqueue, outqueue, initializer, initargs, maxtasks)
7251c0b2f7Stbbdev    if ipc_enabled:
7351c0b2f7Stbbdev        try:
7451c0b2f7Stbbdev            librml = ctypes.CDLL(libirml)
7551c0b2f7Stbbdev            librml.release_resources()
7651c0b2f7Stbbdev        except:
7751c0b2f7Stbbdev            print("Warning: Can not load ", libirml, file=sys.stderr)
7851c0b2f7Stbbdev
7951c0b2f7Stbbdev
8051c0b2f7Stbbdevclass TBBProcessPool27(multiprocessing.pool.Pool):
8151c0b2f7Stbbdev    def _repopulate_pool(self):
8251c0b2f7Stbbdev        """Bring the number of pool processes up to the specified number,
8351c0b2f7Stbbdev        for use after reaping workers which have exited.
8451c0b2f7Stbbdev        """
8551c0b2f7Stbbdev        from multiprocessing.util import debug
8651c0b2f7Stbbdev
8751c0b2f7Stbbdev        for i in range(self._processes - len(self._pool)):
8851c0b2f7Stbbdev            w = self.Process(target=tbb_process_pool_worker27,
8951c0b2f7Stbbdev                             args=(self._inqueue, self._outqueue,
9051c0b2f7Stbbdev                                   self._initializer,
9151c0b2f7Stbbdev                                   self._initargs, self._maxtasksperchild)
9251c0b2f7Stbbdev                            )
9351c0b2f7Stbbdev            self._pool.append(w)
9451c0b2f7Stbbdev            w.name = w.name.replace('Process', 'PoolWorker')
9551c0b2f7Stbbdev            w.daemon = True
9651c0b2f7Stbbdev            w.start()
9751c0b2f7Stbbdev            debug('added worker')
9851c0b2f7Stbbdev
9951c0b2f7Stbbdev    def __del__(self):
10051c0b2f7Stbbdev        self.close()
10151c0b2f7Stbbdev        for p in self._pool:
10251c0b2f7Stbbdev            p.join()
10351c0b2f7Stbbdev
10451c0b2f7Stbbdev    def __exit__(self, *args):
10551c0b2f7Stbbdev        self.close()
10651c0b2f7Stbbdev        for p in self._pool:
10751c0b2f7Stbbdev            p.join()
10851c0b2f7Stbbdev
10951c0b2f7Stbbdev
11051c0b2f7Stbbdevdef tbb_process_pool_worker3(inqueue, outqueue, initializer=None, initargs=(),
11151c0b2f7Stbbdev                            maxtasks=None, wrap_exception=False):
11251c0b2f7Stbbdev    from multiprocessing.pool import worker
11351c0b2f7Stbbdev    worker(inqueue, outqueue, initializer, initargs, maxtasks, wrap_exception)
11451c0b2f7Stbbdev    if ipc_enabled:
11551c0b2f7Stbbdev        try:
11651c0b2f7Stbbdev            librml = ctypes.CDLL(libirml)
11751c0b2f7Stbbdev            librml.release_resources()
11851c0b2f7Stbbdev        except:
11951c0b2f7Stbbdev            print("Warning: Can not load ", libirml, file=sys.stderr)
12051c0b2f7Stbbdev
12151c0b2f7Stbbdev
12251c0b2f7Stbbdevclass TBBProcessPool3(multiprocessing.pool.Pool):
12351c0b2f7Stbbdev    def _repopulate_pool(self):
12451c0b2f7Stbbdev        """Bring the number of pool processes up to the specified number,
12551c0b2f7Stbbdev        for use after reaping workers which have exited.
12651c0b2f7Stbbdev        """
12751c0b2f7Stbbdev        from multiprocessing.util import debug
12851c0b2f7Stbbdev
12951c0b2f7Stbbdev        for i in range(self._processes - len(self._pool)):
13051c0b2f7Stbbdev            w = self.Process(target=tbb_process_pool_worker3,
13151c0b2f7Stbbdev                             args=(self._inqueue, self._outqueue,
13251c0b2f7Stbbdev                                   self._initializer,
13351c0b2f7Stbbdev                                   self._initargs, self._maxtasksperchild,
13451c0b2f7Stbbdev                                   self._wrap_exception)
13551c0b2f7Stbbdev                            )
13651c0b2f7Stbbdev            self._pool.append(w)
13751c0b2f7Stbbdev            w.name = w.name.replace('Process', 'PoolWorker')
13851c0b2f7Stbbdev            w.daemon = True
13951c0b2f7Stbbdev            w.start()
14051c0b2f7Stbbdev            debug('added worker')
14151c0b2f7Stbbdev
14251c0b2f7Stbbdev    def __del__(self):
14351c0b2f7Stbbdev        self.close()
14451c0b2f7Stbbdev        for p in self._pool:
14551c0b2f7Stbbdev            p.join()
14651c0b2f7Stbbdev
14751c0b2f7Stbbdev    def __exit__(self, *args):
14851c0b2f7Stbbdev        self.close()
14951c0b2f7Stbbdev        for p in self._pool:
15051c0b2f7Stbbdev            p.join()
15151c0b2f7Stbbdev
15251c0b2f7Stbbdev
15351c0b2f7Stbbdevclass Monkey:
15451c0b2f7Stbbdev    """
15551c0b2f7Stbbdev    Context manager which replaces standard multiprocessing.pool
15651c0b2f7Stbbdev    implementations with tbb.pool using monkey-patching. It also enables oneTBB
15751c0b2f7Stbbdev    threading for Intel(R) oneAPI Math Kernel Library (oneMKL). For example:
15851c0b2f7Stbbdev
15951c0b2f7Stbbdev        with tbb.Monkey():
16051c0b2f7Stbbdev            run_my_numpy_code()
16151c0b2f7Stbbdev
16251c0b2f7Stbbdev    It allows multiple parallel tasks to be executed on the same thread pool
16351c0b2f7Stbbdev    and coordinate number of threads across multiple processes thus avoiding
16451c0b2f7Stbbdev    overheads from oversubscription.
16551c0b2f7Stbbdev    """
16651c0b2f7Stbbdev    _items   = {}
16751c0b2f7Stbbdev    _modules = {}
16851c0b2f7Stbbdev
16951c0b2f7Stbbdev    def __init__(self, max_num_threads=None, benchmark=False):
17051c0b2f7Stbbdev        """
17151c0b2f7Stbbdev        Create context manager for running under TBB scheduler.
17251c0b2f7Stbbdev        :param max_num_threads: if specified, limits maximal number of threads
17351c0b2f7Stbbdev        :param benchmark: if specified, blocks in initialization until requested number of threads are ready
17451c0b2f7Stbbdev        """
17551c0b2f7Stbbdev        if max_num_threads:
17651c0b2f7Stbbdev            self.ctl = global_control(global_control.max_allowed_parallelism, int(max_num_threads))
17751c0b2f7Stbbdev        if benchmark:
17851c0b2f7Stbbdev            if not max_num_threads:
17951c0b2f7Stbbdev               max_num_threads = default_num_threads()
18051c0b2f7Stbbdev            from .api import _concurrency_barrier
18151c0b2f7Stbbdev            _concurrency_barrier(int(max_num_threads))
18251c0b2f7Stbbdev
18351c0b2f7Stbbdev    def _patch(self, class_name, module_name, obj):
18451c0b2f7Stbbdev        m = self._modules[class_name] = __import__(module_name, globals(),
18551c0b2f7Stbbdev                                                   locals(), [class_name])
18651c0b2f7Stbbdev        if m == None:
18751c0b2f7Stbbdev            return
18851c0b2f7Stbbdev        oldattr = getattr(m, class_name, None)
18951c0b2f7Stbbdev        if oldattr == None:
19051c0b2f7Stbbdev            self._modules[class_name] = None
19151c0b2f7Stbbdev            return
19251c0b2f7Stbbdev        self._items[class_name] = oldattr
19351c0b2f7Stbbdev        setattr(m, class_name, obj)
19451c0b2f7Stbbdev
19551c0b2f7Stbbdev    def __enter__(self):
19651c0b2f7Stbbdev        global is_active
19751c0b2f7Stbbdev        assert is_active == False, "tbb.Monkey does not support nesting yet"
19851c0b2f7Stbbdev        is_active = True
19951c0b2f7Stbbdev        self.env_mkl = os.getenv('MKL_THREADING_LAYER')
20051c0b2f7Stbbdev        os.environ['MKL_THREADING_LAYER'] = 'TBB'
20151c0b2f7Stbbdev        self.env_numba = os.getenv('NUMBA_THREADING_LAYER')
20251c0b2f7Stbbdev        os.environ['NUMBA_THREADING_LAYER'] = 'TBB'
20351c0b2f7Stbbdev
20451c0b2f7Stbbdev        if ipc_enabled:
20551c0b2f7Stbbdev            if sys.version_info.major == 2 and sys.version_info.minor >= 7:
20651c0b2f7Stbbdev                self._patch("Pool", "multiprocessing.pool", TBBProcessPool27)
20751c0b2f7Stbbdev            elif sys.version_info.major == 3 and sys.version_info.minor >= 5:
20851c0b2f7Stbbdev                self._patch("Pool", "multiprocessing.pool", TBBProcessPool3)
20951c0b2f7Stbbdev        self._patch("ThreadPool", "multiprocessing.pool", Pool)
21051c0b2f7Stbbdev        return self
21151c0b2f7Stbbdev
21251c0b2f7Stbbdev    def __exit__(self, exc_type, exc_value, traceback):
21351c0b2f7Stbbdev        global is_active
21451c0b2f7Stbbdev        assert is_active == True, "modified?"
21551c0b2f7Stbbdev        is_active = False
21651c0b2f7Stbbdev        if self.env_mkl is None:
21751c0b2f7Stbbdev            del os.environ['MKL_THREADING_LAYER']
21851c0b2f7Stbbdev        else:
21951c0b2f7Stbbdev            os.environ['MKL_THREADING_LAYER'] = self.env_mkl
22051c0b2f7Stbbdev        if self.env_numba is None:
22151c0b2f7Stbbdev            del os.environ['NUMBA_THREADING_LAYER']
22251c0b2f7Stbbdev        else:
22351c0b2f7Stbbdev            os.environ['NUMBA_THREADING_LAYER'] = self.env_numba
22451c0b2f7Stbbdev        for name in self._items.keys():
22551c0b2f7Stbbdev            setattr(self._modules[name], name, self._items[name])
22651c0b2f7Stbbdev
22751c0b2f7Stbbdev
22851c0b2f7Stbbdevdef init_sem_name():
22951c0b2f7Stbbdev    try:
23051c0b2f7Stbbdev        librml = ctypes.CDLL(libirml)
23151c0b2f7Stbbdev        librml.set_active_sem_name()
23251c0b2f7Stbbdev        librml.set_stop_sem_name()
23351c0b2f7Stbbdev    except Exception as e:
23451c0b2f7Stbbdev        print("Warning: Can not initialize name of shared semaphores:", e,
23551c0b2f7Stbbdev              file=sys.stderr)
23651c0b2f7Stbbdev
23751c0b2f7Stbbdev
23851c0b2f7Stbbdevdef tbb_atexit():
23951c0b2f7Stbbdev    if ipc_enabled:
24051c0b2f7Stbbdev        try:
24151c0b2f7Stbbdev            librml = ctypes.CDLL(libirml)
24251c0b2f7Stbbdev            librml.release_semaphores()
24351c0b2f7Stbbdev        except:
24451c0b2f7Stbbdev            print("Warning: Can not release shared semaphores",
24551c0b2f7Stbbdev                  file=sys.stderr)
24651c0b2f7Stbbdev
24751c0b2f7Stbbdev
24851c0b2f7Stbbdevdef _main():
24951c0b2f7Stbbdev    # Run the module specified as the next command line argument
25051c0b2f7Stbbdev    # python3 -m TBB user_app.py
25151c0b2f7Stbbdev    global ipc_enabled
25251c0b2f7Stbbdev
25351c0b2f7Stbbdev    import platform
25451c0b2f7Stbbdev    import argparse
25551c0b2f7Stbbdev    parser = argparse.ArgumentParser(prog="python3 -m tbb", description="""
25651c0b2f7Stbbdev                Run your Python script in context of tbb.Monkey, which
25751c0b2f7Stbbdev                replaces standard Python pools and threading layer of
25851c0b2f7Stbbdev                Intel(R) oneAPI Math Kernel Library (oneMKL) by implementation based on
25951c0b2f7Stbbdev                Intel(R) oneAPI Threading Building Blocks (oneTBB). It enables multiple parallel
26051c0b2f7Stbbdev                tasks to be executed on the same thread pool and coordinate
26151c0b2f7Stbbdev                number of threads across multiple processes thus avoiding
26251c0b2f7Stbbdev                overheads from oversubscription.
26351c0b2f7Stbbdev             """, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
26451c0b2f7Stbbdev    if platform.system() == "Linux":
26551c0b2f7Stbbdev        parser.add_argument('--ipc', action='store_true',
26651c0b2f7Stbbdev                        help="Enable inter-process (IPC) coordination between oneTBB schedulers")
26751c0b2f7Stbbdev        parser.add_argument('-a', '--allocator', action='store_true',
26851c0b2f7Stbbdev                        help="Enable oneTBB scalable allocator as a replacement for standard memory allocator")
26951c0b2f7Stbbdev        parser.add_argument('--allocator-huge-pages', action='store_true',
27051c0b2f7Stbbdev                        help="Enable huge pages for oneTBB allocator (implies: -a)")
27151c0b2f7Stbbdev    parser.add_argument('-p', '--max-num-threads', default=default_num_threads(), type=int,
27251c0b2f7Stbbdev                        help="Initialize oneTBB with P max number of threads per process", metavar='P')
27351c0b2f7Stbbdev    parser.add_argument('-b', '--benchmark', action='store_true',
27451c0b2f7Stbbdev                        help="Block oneTBB initialization until all the threads are created before continue the script. "
27551c0b2f7Stbbdev                        "This is necessary for performance benchmarks that want to exclude lazy scheduler initialization effects from the measurements")
27651c0b2f7Stbbdev    parser.add_argument('-v', '--verbose', action='store_true',
27751c0b2f7Stbbdev                        help="Request verbose and version information")
27851c0b2f7Stbbdev    parser.add_argument('-m', action='store_true', dest='module',
27951c0b2f7Stbbdev                        help="Executes following as a module")
28051c0b2f7Stbbdev    parser.add_argument('name', help="Script or module name")
28151c0b2f7Stbbdev    parser.add_argument('args', nargs=argparse.REMAINDER,
28251c0b2f7Stbbdev                        help="Command line arguments")
28351c0b2f7Stbbdev    args = parser.parse_args()
28451c0b2f7Stbbdev
28551c0b2f7Stbbdev    if args.verbose:
28651c0b2f7Stbbdev        os.environ["TBB_VERSION"] = "1"
28751c0b2f7Stbbdev    if platform.system() == "Linux":
28851c0b2f7Stbbdev        if args.allocator_huge_pages:
28951c0b2f7Stbbdev            args.allocator = True
29051c0b2f7Stbbdev        if args.allocator and not os.environ.get("_TBB_MALLOC_PRELOAD"):
29151c0b2f7Stbbdev            libtbbmalloc_lib = 'libtbbmalloc_proxy.so.2'
29251c0b2f7Stbbdev            ld_preload = 'LD_PRELOAD'
29351c0b2f7Stbbdev            os.environ["_TBB_MALLOC_PRELOAD"] = "1"
29451c0b2f7Stbbdev            preload_list = filter(None, os.environ.get(ld_preload, "").split(':'))
29551c0b2f7Stbbdev            if libtbbmalloc_lib in preload_list:
29651c0b2f7Stbbdev                print('Info:', ld_preload, "contains", libtbbmalloc_lib, "already\n")
29751c0b2f7Stbbdev            else:
29851c0b2f7Stbbdev                os.environ[ld_preload] = ':'.join([libtbbmalloc_lib] + list(preload_list))
29951c0b2f7Stbbdev
30051c0b2f7Stbbdev            if args.allocator_huge_pages:
30151c0b2f7Stbbdev                assert platform.system() == "Linux"
30251c0b2f7Stbbdev                try:
30351c0b2f7Stbbdev                    with open('/proc/sys/vm/nr_hugepages', 'r') as f:
30451c0b2f7Stbbdev                        pages = int(f.read())
30551c0b2f7Stbbdev                    if pages == 0:
30651c0b2f7Stbbdev                        print("oneTBB: Pre-allocated huge pages are not currently reserved in the system. To reserve, run e.g.:\n"
30751c0b2f7Stbbdev                              "\tsudo sh -c 'echo 2000 > /proc/sys/vm/nr_hugepages'")
30851c0b2f7Stbbdev                    os.environ["TBB_MALLOC_USE_HUGE_PAGES"] = "1"
30951c0b2f7Stbbdev                except:
31051c0b2f7Stbbdev                    print("oneTBB: Failed to read number of pages from /proc/sys/vm/nr_hugepages\n"
31151c0b2f7Stbbdev                          "\tIs the Linux kernel configured with the huge pages feature?")
31251c0b2f7Stbbdev                    sys.exit(1)
31351c0b2f7Stbbdev
31451c0b2f7Stbbdev            os.execl(sys.executable, sys.executable, '-m', 'tbb', *sys.argv[1:])
31551c0b2f7Stbbdev            assert False, "Re-execution failed"
31651c0b2f7Stbbdev
31751c0b2f7Stbbdev    sys.argv = [args.name] + args.args
31851c0b2f7Stbbdev    ipc_enabled = platform.system() == "Linux" and args.ipc
31951c0b2f7Stbbdev    os.environ["IPC_ENABLE"] = "1" if ipc_enabled else "0"
32051c0b2f7Stbbdev    if ipc_enabled:
32151c0b2f7Stbbdev        atexit.register(tbb_atexit)
32251c0b2f7Stbbdev        init_sem_name()
32351c0b2f7Stbbdev    if not os.environ.get("KMP_BLOCKTIME"): # TODO move
32451c0b2f7Stbbdev        os.environ["KMP_BLOCKTIME"] = "0"
32551c0b2f7Stbbdev    if '_' + args.name in globals():
32651c0b2f7Stbbdev        return globals()['_' + args.name](*args.args)
32751c0b2f7Stbbdev    else:
32851c0b2f7Stbbdev        import runpy
32951c0b2f7Stbbdev        runf = runpy.run_module if args.module else runpy.run_path
33051c0b2f7Stbbdev        with Monkey(max_num_threads=args.max_num_threads, benchmark=args.benchmark):
33151c0b2f7Stbbdev            runf(args.name, run_name='__main__')
332