1#!/usr/bin/env python3 2# 3# Copyright (c) 2016-2022 Intel Corporation 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import multiprocessing.pool 18import ctypes 19import atexit 20import sys 21import os 22 23import platform 24 25if "Windows" in platform.system(): 26 import site 27 path_to_env = site.getsitepackages()[0] 28 path_to_libs = os.path.join(path_to_env, "Library", "bin") 29 if sys.version_info.minor >= 8: 30 os.add_dll_directory(path_to_libs) 31 os.environ['PATH'] += os.pathsep + path_to_libs 32 33 34from .api import * 35from .api import __all__ as api__all 36from .pool import * 37from .pool import __all__ as pool__all 38 39__all__ = ["Monkey", "is_active"] + api__all + pool__all 40 41__doc__ = """ 42Python API for Intel(R) oneAPI Threading Building Blocks (oneTBB) 43extended with standard Python's pools implementation and monkey-patching. 44 45Command-line interface example: 46$ python3 -m tbb $your_script.py 47Runs your_script.py in context of tbb.Monkey 48""" 49 50is_active = False 51""" Indicates whether oneTBB context is activated """ 52 53ipc_enabled = False 54""" Indicates whether IPC mode is enabled """ 55 56libirml = "libirml.so.1" 57 58 59def _test(arg=None): 60 """Some tests""" 61 import platform 62 if platform.system() == "Linux": 63 ctypes.CDLL(libirml) 64 assert 256 == os.system("ldd "+_api.__file__+"| grep -E 'libimf|libsvml|libintlc'") # nosec 65 from .test import test 66 test(arg) 67 print("done") 68 69 70def tbb_process_pool_worker27(inqueue, outqueue, initializer=None, initargs=(), 71 maxtasks=None): 72 from multiprocessing.pool import worker 73 worker(inqueue, outqueue, initializer, initargs, maxtasks) 74 if ipc_enabled: 75 try: 76 librml = ctypes.CDLL(libirml) 77 librml.release_resources() 78 except: 79 print("Warning: Can not load ", libirml, file=sys.stderr) 80 81 82class TBBProcessPool27(multiprocessing.pool.Pool): 83 def _repopulate_pool(self): 84 """Bring the number of pool processes up to the specified number, 85 for use after reaping workers which have exited. 86 """ 87 from multiprocessing.util import debug 88 89 for i in range(self._processes - len(self._pool)): 90 w = self.Process(target=tbb_process_pool_worker27, 91 args=(self._inqueue, self._outqueue, 92 self._initializer, 93 self._initargs, self._maxtasksperchild) 94 ) 95 self._pool.append(w) 96 w.name = w.name.replace('Process', 'PoolWorker') 97 w.daemon = True 98 w.start() 99 debug('added worker') 100 101 def __del__(self): 102 self.close() 103 for p in self._pool: 104 p.join() 105 106 def __exit__(self, *args): 107 self.close() 108 for p in self._pool: 109 p.join() 110 111 112def tbb_process_pool_worker3(inqueue, outqueue, initializer=None, initargs=(), 113 maxtasks=None, wrap_exception=False): 114 from multiprocessing.pool import worker 115 worker(inqueue, outqueue, initializer, initargs, maxtasks, wrap_exception) 116 if ipc_enabled: 117 try: 118 librml = ctypes.CDLL(libirml) 119 librml.release_resources() 120 except: 121 print("Warning: Can not load ", libirml, file=sys.stderr) 122 123 124class TBBProcessPool3(multiprocessing.pool.Pool): 125 def _repopulate_pool(self): 126 """Bring the number of pool processes up to the specified number, 127 for use after reaping workers which have exited. 128 """ 129 from multiprocessing.util import debug 130 131 for i in range(self._processes - len(self._pool)): 132 w = self.Process(target=tbb_process_pool_worker3, 133 args=(self._inqueue, self._outqueue, 134 self._initializer, 135 self._initargs, self._maxtasksperchild, 136 self._wrap_exception) 137 ) 138 self._pool.append(w) 139 w.name = w.name.replace('Process', 'PoolWorker') 140 w.daemon = True 141 w.start() 142 debug('added worker') 143 144 def __del__(self): 145 self.close() 146 for p in self._pool: 147 p.join() 148 149 def __exit__(self, *args): 150 self.close() 151 for p in self._pool: 152 p.join() 153 154 155class Monkey: 156 """ 157 Context manager which replaces standard multiprocessing.pool 158 implementations with tbb.pool using monkey-patching. It also enables oneTBB 159 threading for Intel(R) oneAPI Math Kernel Library (oneMKL). For example: 160 161 with tbb.Monkey(): 162 run_my_numpy_code() 163 164 It allows multiple parallel tasks to be executed on the same thread pool 165 and coordinate number of threads across multiple processes thus avoiding 166 overheads from oversubscription. 167 """ 168 _items = {} 169 _modules = {} 170 171 def __init__(self, max_num_threads=None, benchmark=False): 172 """ 173 Create context manager for running under TBB scheduler. 174 :param max_num_threads: if specified, limits maximal number of threads 175 :param benchmark: if specified, blocks in initialization until requested number of threads are ready 176 """ 177 if max_num_threads: 178 self.ctl = global_control(global_control.max_allowed_parallelism, int(max_num_threads)) 179 if benchmark: 180 if not max_num_threads: 181 max_num_threads = default_num_threads() 182 from .api import _concurrency_barrier 183 _concurrency_barrier(int(max_num_threads)) 184 185 def _patch(self, class_name, module_name, obj): 186 m = self._modules[class_name] = __import__(module_name, globals(), 187 locals(), [class_name]) 188 if m == None: 189 return 190 oldattr = getattr(m, class_name, None) 191 if oldattr == None: 192 self._modules[class_name] = None 193 return 194 self._items[class_name] = oldattr 195 setattr(m, class_name, obj) 196 197 def __enter__(self): 198 global is_active 199 assert is_active == False, "tbb.Monkey does not support nesting yet" 200 is_active = True 201 self.env_mkl = os.getenv('MKL_THREADING_LAYER') 202 os.environ['MKL_THREADING_LAYER'] = 'TBB' 203 self.env_numba = os.getenv('NUMBA_THREADING_LAYER') 204 os.environ['NUMBA_THREADING_LAYER'] = 'TBB' 205 206 if ipc_enabled: 207 if sys.version_info.major == 2 and sys.version_info.minor >= 7: 208 self._patch("Pool", "multiprocessing.pool", TBBProcessPool27) 209 elif sys.version_info.major == 3 and sys.version_info.minor >= 5: 210 self._patch("Pool", "multiprocessing.pool", TBBProcessPool3) 211 self._patch("ThreadPool", "multiprocessing.pool", Pool) 212 return self 213 214 def __exit__(self, exc_type, exc_value, traceback): 215 global is_active 216 assert is_active == True, "modified?" 217 is_active = False 218 if self.env_mkl is None: 219 del os.environ['MKL_THREADING_LAYER'] 220 else: 221 os.environ['MKL_THREADING_LAYER'] = self.env_mkl 222 if self.env_numba is None: 223 del os.environ['NUMBA_THREADING_LAYER'] 224 else: 225 os.environ['NUMBA_THREADING_LAYER'] = self.env_numba 226 for name in self._items.keys(): 227 setattr(self._modules[name], name, self._items[name]) 228 229 230def init_sem_name(): 231 try: 232 librml = ctypes.CDLL(libirml) 233 librml.set_active_sem_name() 234 librml.set_stop_sem_name() 235 except Exception as e: 236 print("Warning: Can not initialize name of shared semaphores:", e, 237 file=sys.stderr) 238 239 240def tbb_atexit(): 241 if ipc_enabled: 242 try: 243 librml = ctypes.CDLL(libirml) 244 librml.release_semaphores() 245 except: 246 print("Warning: Can not release shared semaphores", 247 file=sys.stderr) 248 249 250def _main(): 251 # Run the module specified as the next command line argument 252 # python3 -m TBB user_app.py 253 global ipc_enabled 254 255 import platform 256 import argparse 257 parser = argparse.ArgumentParser(prog="python3 -m tbb", description=""" 258 Run your Python script in context of tbb.Monkey, which 259 replaces standard Python pools and threading layer of 260 Intel(R) oneAPI Math Kernel Library (oneMKL) by implementation based on 261 Intel(R) oneAPI Threading Building Blocks (oneTBB). It enables multiple parallel 262 tasks to be executed on the same thread pool and coordinate 263 number of threads across multiple processes thus avoiding 264 overheads from oversubscription. 265 """, formatter_class=argparse.ArgumentDefaultsHelpFormatter) 266 if platform.system() == "Linux": 267 parser.add_argument('--ipc', action='store_true', 268 help="Enable inter-process (IPC) coordination between oneTBB schedulers") 269 parser.add_argument('-a', '--allocator', action='store_true', 270 help="Enable oneTBB scalable allocator as a replacement for standard memory allocator") 271 parser.add_argument('--allocator-huge-pages', action='store_true', 272 help="Enable huge pages for oneTBB allocator (implies: -a)") 273 parser.add_argument('-p', '--max-num-threads', default=default_num_threads(), type=int, 274 help="Initialize oneTBB with P max number of threads per process", metavar='P') 275 parser.add_argument('-b', '--benchmark', action='store_true', 276 help="Block oneTBB initialization until all the threads are created before continue the script. " 277 "This is necessary for performance benchmarks that want to exclude lazy scheduler initialization effects from the measurements") 278 parser.add_argument('-v', '--verbose', action='store_true', 279 help="Request verbose and version information") 280 parser.add_argument('-m', action='store_true', dest='module', 281 help="Executes following as a module") 282 parser.add_argument('name', help="Script or module name") 283 parser.add_argument('args', nargs=argparse.REMAINDER, 284 help="Command line arguments") 285 args = parser.parse_args() 286 287 if args.verbose: 288 os.environ["TBB_VERSION"] = "1" 289 if platform.system() == "Linux": 290 if args.allocator_huge_pages: 291 args.allocator = True 292 if args.allocator and not os.environ.get("_TBB_MALLOC_PRELOAD"): 293 libtbbmalloc_lib = 'libtbbmalloc_proxy.so.2' 294 ld_preload = 'LD_PRELOAD' 295 os.environ["_TBB_MALLOC_PRELOAD"] = "1" 296 preload_list = filter(None, os.environ.get(ld_preload, "").split(':')) 297 if libtbbmalloc_lib in preload_list: 298 print('Info:', ld_preload, "contains", libtbbmalloc_lib, "already\n") 299 else: 300 os.environ[ld_preload] = ':'.join([libtbbmalloc_lib] + list(preload_list)) 301 302 if args.allocator_huge_pages: 303 assert platform.system() == "Linux" 304 try: 305 with open('/proc/sys/vm/nr_hugepages', 'r') as f: 306 pages = int(f.read()) 307 if pages == 0: 308 print("oneTBB: Pre-allocated huge pages are not currently reserved in the system. To reserve, run e.g.:\n" 309 "\tsudo sh -c 'echo 2000 > /proc/sys/vm/nr_hugepages'") 310 os.environ["TBB_MALLOC_USE_HUGE_PAGES"] = "1" 311 except: 312 print("oneTBB: Failed to read number of pages from /proc/sys/vm/nr_hugepages\n" 313 "\tIs the Linux kernel configured with the huge pages feature?") 314 sys.exit(1) 315 316 os.execl(sys.executable, sys.executable, '-m', 'tbb', *sys.argv[1:]) 317 assert False, "Re-execution failed" 318 319 sys.argv = [args.name] + args.args 320 ipc_enabled = platform.system() == "Linux" and args.ipc 321 os.environ["IPC_ENABLE"] = "1" if ipc_enabled else "0" 322 if ipc_enabled: 323 atexit.register(tbb_atexit) 324 init_sem_name() 325 if not os.environ.get("KMP_BLOCKTIME"): # TODO move 326 os.environ["KMP_BLOCKTIME"] = "0" 327 if '_' + args.name in globals(): 328 return globals()['_' + args.name](*args.args) 329 else: 330 import runpy 331 runf = runpy.run_module if args.module else runpy.run_path 332 with Monkey(max_num_threads=args.max_num_threads, benchmark=args.benchmark): 333 runf(args.name, run_name='__main__') 334