1*9acde482SJonathan Wakely# Copyright (c) 2016-2023 Intel Corporation 251c0b2f7Stbbdev# 351c0b2f7Stbbdev# Licensed under the Apache License, Version 2.0 (the "License"); 451c0b2f7Stbbdev# you may not use this file except in compliance with the License. 551c0b2f7Stbbdev# You may obtain a copy of the License at 651c0b2f7Stbbdev# 751c0b2f7Stbbdev# http://www.apache.org/licenses/LICENSE-2.0 851c0b2f7Stbbdev# 951c0b2f7Stbbdev# Unless required by applicable law or agreed to in writing, software 1051c0b2f7Stbbdev# distributed under the License is distributed on an "AS IS" BASIS, 1151c0b2f7Stbbdev# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1251c0b2f7Stbbdev# See the License for the specific language governing permissions and 1351c0b2f7Stbbdev# limitations under the License. 1451c0b2f7Stbbdev 1551c0b2f7Stbbdevimport multiprocessing.pool 1651c0b2f7Stbbdevimport ctypes 1751c0b2f7Stbbdevimport atexit 1851c0b2f7Stbbdevimport sys 1951c0b2f7Stbbdevimport os 2051c0b2f7Stbbdev 2135e0f552SSergey Zheltovimport platform 2235e0f552SSergey Zheltov 2335e0f552SSergey Zheltovif "Windows" in platform.system(): 2435e0f552SSergey Zheltov import site 2535e0f552SSergey Zheltov path_to_env = site.getsitepackages()[0] 2635e0f552SSergey Zheltov path_to_libs = os.path.join(path_to_env, "Library", "bin") 2735e0f552SSergey Zheltov if sys.version_info.minor >= 8: 2835e0f552SSergey Zheltov os.add_dll_directory(path_to_libs) 2935e0f552SSergey Zheltov os.environ['PATH'] += os.pathsep + path_to_libs 3035e0f552SSergey Zheltov 3135e0f552SSergey Zheltov 3251c0b2f7Stbbdevfrom .api import * 3351c0b2f7Stbbdevfrom .api import __all__ as api__all 3451c0b2f7Stbbdevfrom .pool import * 3551c0b2f7Stbbdevfrom .pool import __all__ as pool__all 3651c0b2f7Stbbdev 3751c0b2f7Stbbdev__all__ = ["Monkey", "is_active"] + api__all + pool__all 3851c0b2f7Stbbdev 3951c0b2f7Stbbdev__doc__ = """ 4051c0b2f7StbbdevPython API for Intel(R) oneAPI Threading Building Blocks (oneTBB) 4151c0b2f7Stbbdevextended with standard Python's pools implementation and monkey-patching. 4251c0b2f7Stbbdev 4351c0b2f7StbbdevCommand-line interface example: 4451c0b2f7Stbbdev$ python3 -m tbb $your_script.py 4551c0b2f7StbbdevRuns your_script.py in context of tbb.Monkey 4651c0b2f7Stbbdev""" 4751c0b2f7Stbbdev 4851c0b2f7Stbbdevis_active = False 4951c0b2f7Stbbdev""" Indicates whether oneTBB context is activated """ 5051c0b2f7Stbbdev 5151c0b2f7Stbbdevipc_enabled = False 5251c0b2f7Stbbdev""" Indicates whether IPC mode is enabled """ 5351c0b2f7Stbbdev 5451c0b2f7Stbbdevlibirml = "libirml.so.1" 5551c0b2f7Stbbdev 5651c0b2f7Stbbdev 5751c0b2f7Stbbdevdef _test(arg=None): 5851c0b2f7Stbbdev """Some tests""" 5951c0b2f7Stbbdev import platform 6051c0b2f7Stbbdev if platform.system() == "Linux": 6151c0b2f7Stbbdev ctypes.CDLL(libirml) 628dcbd5b1Stbbdev assert 256 == os.system("ldd "+_api.__file__+"| grep -E 'libimf|libsvml|libintlc'") # nosec 6351c0b2f7Stbbdev from .test import test 6451c0b2f7Stbbdev test(arg) 6551c0b2f7Stbbdev print("done") 6651c0b2f7Stbbdev 6751c0b2f7Stbbdev 6851c0b2f7Stbbdevdef tbb_process_pool_worker27(inqueue, outqueue, initializer=None, initargs=(), 6951c0b2f7Stbbdev maxtasks=None): 7051c0b2f7Stbbdev from multiprocessing.pool import worker 7151c0b2f7Stbbdev worker(inqueue, outqueue, initializer, initargs, maxtasks) 7251c0b2f7Stbbdev if ipc_enabled: 7351c0b2f7Stbbdev try: 7451c0b2f7Stbbdev librml = ctypes.CDLL(libirml) 7551c0b2f7Stbbdev librml.release_resources() 7651c0b2f7Stbbdev except: 7751c0b2f7Stbbdev print("Warning: Can not load ", libirml, file=sys.stderr) 7851c0b2f7Stbbdev 7951c0b2f7Stbbdev 8051c0b2f7Stbbdevclass TBBProcessPool27(multiprocessing.pool.Pool): 8151c0b2f7Stbbdev def _repopulate_pool(self): 8251c0b2f7Stbbdev """Bring the number of pool processes up to the specified number, 8351c0b2f7Stbbdev for use after reaping workers which have exited. 8451c0b2f7Stbbdev """ 8551c0b2f7Stbbdev from multiprocessing.util import debug 8651c0b2f7Stbbdev 8751c0b2f7Stbbdev for i in range(self._processes - len(self._pool)): 8851c0b2f7Stbbdev w = self.Process(target=tbb_process_pool_worker27, 8951c0b2f7Stbbdev args=(self._inqueue, self._outqueue, 9051c0b2f7Stbbdev self._initializer, 9151c0b2f7Stbbdev self._initargs, self._maxtasksperchild) 9251c0b2f7Stbbdev ) 9351c0b2f7Stbbdev self._pool.append(w) 9451c0b2f7Stbbdev w.name = w.name.replace('Process', 'PoolWorker') 9551c0b2f7Stbbdev w.daemon = True 9651c0b2f7Stbbdev w.start() 9751c0b2f7Stbbdev debug('added worker') 9851c0b2f7Stbbdev 9951c0b2f7Stbbdev def __del__(self): 10051c0b2f7Stbbdev self.close() 10151c0b2f7Stbbdev for p in self._pool: 10251c0b2f7Stbbdev p.join() 10351c0b2f7Stbbdev 10451c0b2f7Stbbdev def __exit__(self, *args): 10551c0b2f7Stbbdev self.close() 10651c0b2f7Stbbdev for p in self._pool: 10751c0b2f7Stbbdev p.join() 10851c0b2f7Stbbdev 10951c0b2f7Stbbdev 11051c0b2f7Stbbdevdef tbb_process_pool_worker3(inqueue, outqueue, initializer=None, initargs=(), 11151c0b2f7Stbbdev maxtasks=None, wrap_exception=False): 11251c0b2f7Stbbdev from multiprocessing.pool import worker 11351c0b2f7Stbbdev worker(inqueue, outqueue, initializer, initargs, maxtasks, wrap_exception) 11451c0b2f7Stbbdev if ipc_enabled: 11551c0b2f7Stbbdev try: 11651c0b2f7Stbbdev librml = ctypes.CDLL(libirml) 11751c0b2f7Stbbdev librml.release_resources() 11851c0b2f7Stbbdev except: 11951c0b2f7Stbbdev print("Warning: Can not load ", libirml, file=sys.stderr) 12051c0b2f7Stbbdev 12151c0b2f7Stbbdev 12251c0b2f7Stbbdevclass TBBProcessPool3(multiprocessing.pool.Pool): 12351c0b2f7Stbbdev def _repopulate_pool(self): 12451c0b2f7Stbbdev """Bring the number of pool processes up to the specified number, 12551c0b2f7Stbbdev for use after reaping workers which have exited. 12651c0b2f7Stbbdev """ 12751c0b2f7Stbbdev from multiprocessing.util import debug 12851c0b2f7Stbbdev 12951c0b2f7Stbbdev for i in range(self._processes - len(self._pool)): 13051c0b2f7Stbbdev w = self.Process(target=tbb_process_pool_worker3, 13151c0b2f7Stbbdev args=(self._inqueue, self._outqueue, 13251c0b2f7Stbbdev self._initializer, 13351c0b2f7Stbbdev self._initargs, self._maxtasksperchild, 13451c0b2f7Stbbdev self._wrap_exception) 13551c0b2f7Stbbdev ) 13651c0b2f7Stbbdev self._pool.append(w) 13751c0b2f7Stbbdev w.name = w.name.replace('Process', 'PoolWorker') 13851c0b2f7Stbbdev w.daemon = True 13951c0b2f7Stbbdev w.start() 14051c0b2f7Stbbdev debug('added worker') 14151c0b2f7Stbbdev 14251c0b2f7Stbbdev def __del__(self): 14351c0b2f7Stbbdev self.close() 14451c0b2f7Stbbdev for p in self._pool: 14551c0b2f7Stbbdev p.join() 14651c0b2f7Stbbdev 14751c0b2f7Stbbdev def __exit__(self, *args): 14851c0b2f7Stbbdev self.close() 14951c0b2f7Stbbdev for p in self._pool: 15051c0b2f7Stbbdev p.join() 15151c0b2f7Stbbdev 15251c0b2f7Stbbdev 15351c0b2f7Stbbdevclass Monkey: 15451c0b2f7Stbbdev """ 15551c0b2f7Stbbdev Context manager which replaces standard multiprocessing.pool 15651c0b2f7Stbbdev implementations with tbb.pool using monkey-patching. It also enables oneTBB 15751c0b2f7Stbbdev threading for Intel(R) oneAPI Math Kernel Library (oneMKL). For example: 15851c0b2f7Stbbdev 15951c0b2f7Stbbdev with tbb.Monkey(): 16051c0b2f7Stbbdev run_my_numpy_code() 16151c0b2f7Stbbdev 16251c0b2f7Stbbdev It allows multiple parallel tasks to be executed on the same thread pool 16351c0b2f7Stbbdev and coordinate number of threads across multiple processes thus avoiding 16451c0b2f7Stbbdev overheads from oversubscription. 16551c0b2f7Stbbdev """ 16651c0b2f7Stbbdev _items = {} 16751c0b2f7Stbbdev _modules = {} 16851c0b2f7Stbbdev 16951c0b2f7Stbbdev def __init__(self, max_num_threads=None, benchmark=False): 17051c0b2f7Stbbdev """ 17151c0b2f7Stbbdev Create context manager for running under TBB scheduler. 17251c0b2f7Stbbdev :param max_num_threads: if specified, limits maximal number of threads 17351c0b2f7Stbbdev :param benchmark: if specified, blocks in initialization until requested number of threads are ready 17451c0b2f7Stbbdev """ 17551c0b2f7Stbbdev if max_num_threads: 17651c0b2f7Stbbdev self.ctl = global_control(global_control.max_allowed_parallelism, int(max_num_threads)) 17751c0b2f7Stbbdev if benchmark: 17851c0b2f7Stbbdev if not max_num_threads: 17951c0b2f7Stbbdev max_num_threads = default_num_threads() 18051c0b2f7Stbbdev from .api import _concurrency_barrier 18151c0b2f7Stbbdev _concurrency_barrier(int(max_num_threads)) 18251c0b2f7Stbbdev 18351c0b2f7Stbbdev def _patch(self, class_name, module_name, obj): 18451c0b2f7Stbbdev m = self._modules[class_name] = __import__(module_name, globals(), 18551c0b2f7Stbbdev locals(), [class_name]) 18651c0b2f7Stbbdev if m == None: 18751c0b2f7Stbbdev return 18851c0b2f7Stbbdev oldattr = getattr(m, class_name, None) 18951c0b2f7Stbbdev if oldattr == None: 19051c0b2f7Stbbdev self._modules[class_name] = None 19151c0b2f7Stbbdev return 19251c0b2f7Stbbdev self._items[class_name] = oldattr 19351c0b2f7Stbbdev setattr(m, class_name, obj) 19451c0b2f7Stbbdev 19551c0b2f7Stbbdev def __enter__(self): 19651c0b2f7Stbbdev global is_active 19751c0b2f7Stbbdev assert is_active == False, "tbb.Monkey does not support nesting yet" 19851c0b2f7Stbbdev is_active = True 19951c0b2f7Stbbdev self.env_mkl = os.getenv('MKL_THREADING_LAYER') 20051c0b2f7Stbbdev os.environ['MKL_THREADING_LAYER'] = 'TBB' 20151c0b2f7Stbbdev self.env_numba = os.getenv('NUMBA_THREADING_LAYER') 20251c0b2f7Stbbdev os.environ['NUMBA_THREADING_LAYER'] = 'TBB' 20351c0b2f7Stbbdev 20451c0b2f7Stbbdev if ipc_enabled: 20551c0b2f7Stbbdev if sys.version_info.major == 2 and sys.version_info.minor >= 7: 20651c0b2f7Stbbdev self._patch("Pool", "multiprocessing.pool", TBBProcessPool27) 20751c0b2f7Stbbdev elif sys.version_info.major == 3 and sys.version_info.minor >= 5: 20851c0b2f7Stbbdev self._patch("Pool", "multiprocessing.pool", TBBProcessPool3) 20951c0b2f7Stbbdev self._patch("ThreadPool", "multiprocessing.pool", Pool) 21051c0b2f7Stbbdev return self 21151c0b2f7Stbbdev 21251c0b2f7Stbbdev def __exit__(self, exc_type, exc_value, traceback): 21351c0b2f7Stbbdev global is_active 21451c0b2f7Stbbdev assert is_active == True, "modified?" 21551c0b2f7Stbbdev is_active = False 21651c0b2f7Stbbdev if self.env_mkl is None: 21751c0b2f7Stbbdev del os.environ['MKL_THREADING_LAYER'] 21851c0b2f7Stbbdev else: 21951c0b2f7Stbbdev os.environ['MKL_THREADING_LAYER'] = self.env_mkl 22051c0b2f7Stbbdev if self.env_numba is None: 22151c0b2f7Stbbdev del os.environ['NUMBA_THREADING_LAYER'] 22251c0b2f7Stbbdev else: 22351c0b2f7Stbbdev os.environ['NUMBA_THREADING_LAYER'] = self.env_numba 22451c0b2f7Stbbdev for name in self._items.keys(): 22551c0b2f7Stbbdev setattr(self._modules[name], name, self._items[name]) 22651c0b2f7Stbbdev 22751c0b2f7Stbbdev 22851c0b2f7Stbbdevdef init_sem_name(): 22951c0b2f7Stbbdev try: 23051c0b2f7Stbbdev librml = ctypes.CDLL(libirml) 23151c0b2f7Stbbdev librml.set_active_sem_name() 23251c0b2f7Stbbdev librml.set_stop_sem_name() 23351c0b2f7Stbbdev except Exception as e: 23451c0b2f7Stbbdev print("Warning: Can not initialize name of shared semaphores:", e, 23551c0b2f7Stbbdev file=sys.stderr) 23651c0b2f7Stbbdev 23751c0b2f7Stbbdev 23851c0b2f7Stbbdevdef tbb_atexit(): 23951c0b2f7Stbbdev if ipc_enabled: 24051c0b2f7Stbbdev try: 24151c0b2f7Stbbdev librml = ctypes.CDLL(libirml) 24251c0b2f7Stbbdev librml.release_semaphores() 24351c0b2f7Stbbdev except: 24451c0b2f7Stbbdev print("Warning: Can not release shared semaphores", 24551c0b2f7Stbbdev file=sys.stderr) 24651c0b2f7Stbbdev 24751c0b2f7Stbbdev 24851c0b2f7Stbbdevdef _main(): 24951c0b2f7Stbbdev # Run the module specified as the next command line argument 25051c0b2f7Stbbdev # python3 -m TBB user_app.py 25151c0b2f7Stbbdev global ipc_enabled 25251c0b2f7Stbbdev 25351c0b2f7Stbbdev import platform 25451c0b2f7Stbbdev import argparse 25551c0b2f7Stbbdev parser = argparse.ArgumentParser(prog="python3 -m tbb", description=""" 25651c0b2f7Stbbdev Run your Python script in context of tbb.Monkey, which 25751c0b2f7Stbbdev replaces standard Python pools and threading layer of 25851c0b2f7Stbbdev Intel(R) oneAPI Math Kernel Library (oneMKL) by implementation based on 25951c0b2f7Stbbdev Intel(R) oneAPI Threading Building Blocks (oneTBB). It enables multiple parallel 26051c0b2f7Stbbdev tasks to be executed on the same thread pool and coordinate 26151c0b2f7Stbbdev number of threads across multiple processes thus avoiding 26251c0b2f7Stbbdev overheads from oversubscription. 26351c0b2f7Stbbdev """, formatter_class=argparse.ArgumentDefaultsHelpFormatter) 26451c0b2f7Stbbdev if platform.system() == "Linux": 26551c0b2f7Stbbdev parser.add_argument('--ipc', action='store_true', 26651c0b2f7Stbbdev help="Enable inter-process (IPC) coordination between oneTBB schedulers") 26751c0b2f7Stbbdev parser.add_argument('-a', '--allocator', action='store_true', 26851c0b2f7Stbbdev help="Enable oneTBB scalable allocator as a replacement for standard memory allocator") 26951c0b2f7Stbbdev parser.add_argument('--allocator-huge-pages', action='store_true', 27051c0b2f7Stbbdev help="Enable huge pages for oneTBB allocator (implies: -a)") 27151c0b2f7Stbbdev parser.add_argument('-p', '--max-num-threads', default=default_num_threads(), type=int, 27251c0b2f7Stbbdev help="Initialize oneTBB with P max number of threads per process", metavar='P') 27351c0b2f7Stbbdev parser.add_argument('-b', '--benchmark', action='store_true', 27451c0b2f7Stbbdev help="Block oneTBB initialization until all the threads are created before continue the script. " 27551c0b2f7Stbbdev "This is necessary for performance benchmarks that want to exclude lazy scheduler initialization effects from the measurements") 27651c0b2f7Stbbdev parser.add_argument('-v', '--verbose', action='store_true', 27751c0b2f7Stbbdev help="Request verbose and version information") 27851c0b2f7Stbbdev parser.add_argument('-m', action='store_true', dest='module', 27951c0b2f7Stbbdev help="Executes following as a module") 28051c0b2f7Stbbdev parser.add_argument('name', help="Script or module name") 28151c0b2f7Stbbdev parser.add_argument('args', nargs=argparse.REMAINDER, 28251c0b2f7Stbbdev help="Command line arguments") 28351c0b2f7Stbbdev args = parser.parse_args() 28451c0b2f7Stbbdev 28551c0b2f7Stbbdev if args.verbose: 28651c0b2f7Stbbdev os.environ["TBB_VERSION"] = "1" 28751c0b2f7Stbbdev if platform.system() == "Linux": 28851c0b2f7Stbbdev if args.allocator_huge_pages: 28951c0b2f7Stbbdev args.allocator = True 29051c0b2f7Stbbdev if args.allocator and not os.environ.get("_TBB_MALLOC_PRELOAD"): 29151c0b2f7Stbbdev libtbbmalloc_lib = 'libtbbmalloc_proxy.so.2' 29251c0b2f7Stbbdev ld_preload = 'LD_PRELOAD' 29351c0b2f7Stbbdev os.environ["_TBB_MALLOC_PRELOAD"] = "1" 29451c0b2f7Stbbdev preload_list = filter(None, os.environ.get(ld_preload, "").split(':')) 29551c0b2f7Stbbdev if libtbbmalloc_lib in preload_list: 29651c0b2f7Stbbdev print('Info:', ld_preload, "contains", libtbbmalloc_lib, "already\n") 29751c0b2f7Stbbdev else: 29851c0b2f7Stbbdev os.environ[ld_preload] = ':'.join([libtbbmalloc_lib] + list(preload_list)) 29951c0b2f7Stbbdev 30051c0b2f7Stbbdev if args.allocator_huge_pages: 30151c0b2f7Stbbdev assert platform.system() == "Linux" 30251c0b2f7Stbbdev try: 30351c0b2f7Stbbdev with open('/proc/sys/vm/nr_hugepages', 'r') as f: 30451c0b2f7Stbbdev pages = int(f.read()) 30551c0b2f7Stbbdev if pages == 0: 30651c0b2f7Stbbdev print("oneTBB: Pre-allocated huge pages are not currently reserved in the system. To reserve, run e.g.:\n" 30751c0b2f7Stbbdev "\tsudo sh -c 'echo 2000 > /proc/sys/vm/nr_hugepages'") 30851c0b2f7Stbbdev os.environ["TBB_MALLOC_USE_HUGE_PAGES"] = "1" 30951c0b2f7Stbbdev except: 31051c0b2f7Stbbdev print("oneTBB: Failed to read number of pages from /proc/sys/vm/nr_hugepages\n" 31151c0b2f7Stbbdev "\tIs the Linux kernel configured with the huge pages feature?") 31251c0b2f7Stbbdev sys.exit(1) 31351c0b2f7Stbbdev 31451c0b2f7Stbbdev os.execl(sys.executable, sys.executable, '-m', 'tbb', *sys.argv[1:]) 31551c0b2f7Stbbdev assert False, "Re-execution failed" 31651c0b2f7Stbbdev 31751c0b2f7Stbbdev sys.argv = [args.name] + args.args 31851c0b2f7Stbbdev ipc_enabled = platform.system() == "Linux" and args.ipc 31951c0b2f7Stbbdev os.environ["IPC_ENABLE"] = "1" if ipc_enabled else "0" 32051c0b2f7Stbbdev if ipc_enabled: 32151c0b2f7Stbbdev atexit.register(tbb_atexit) 32251c0b2f7Stbbdev init_sem_name() 32351c0b2f7Stbbdev if not os.environ.get("KMP_BLOCKTIME"): # TODO move 32451c0b2f7Stbbdev os.environ["KMP_BLOCKTIME"] = "0" 32551c0b2f7Stbbdev if '_' + args.name in globals(): 32651c0b2f7Stbbdev return globals()['_' + args.name](*args.args) 32751c0b2f7Stbbdev else: 32851c0b2f7Stbbdev import runpy 32951c0b2f7Stbbdev runf = runpy.run_module if args.module else runpy.run_path 33051c0b2f7Stbbdev with Monkey(max_num_threads=args.max_num_threads, benchmark=args.benchmark): 33151c0b2f7Stbbdev runf(args.name, run_name='__main__') 332