1# Copyright (c) 2016-2023 Intel Corporation 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import multiprocessing.pool 16import ctypes 17import atexit 18import sys 19import os 20 21import platform 22 23if "Windows" in platform.system(): 24 import site 25 path_to_env = site.getsitepackages()[0] 26 path_to_libs = os.path.join(path_to_env, "Library", "bin") 27 if sys.version_info.minor >= 8: 28 os.add_dll_directory(path_to_libs) 29 os.environ['PATH'] += os.pathsep + path_to_libs 30 31 32from .api import * 33from .api import __all__ as api__all 34from .pool import * 35from .pool import __all__ as pool__all 36 37__all__ = ["Monkey", "is_active"] + api__all + pool__all 38 39__doc__ = """ 40Python API for Intel(R) oneAPI Threading Building Blocks (oneTBB) 41extended with standard Python's pools implementation and monkey-patching. 42 43Command-line interface example: 44$ python3 -m tbb $your_script.py 45Runs your_script.py in context of tbb.Monkey 46""" 47 48is_active = False 49""" Indicates whether oneTBB context is activated """ 50 51ipc_enabled = False 52""" Indicates whether IPC mode is enabled """ 53 54libirml = "libirml.so.1" 55 56 57def _test(arg=None): 58 """Some tests""" 59 import platform 60 if platform.system() == "Linux": 61 ctypes.CDLL(libirml) 62 assert 256 == os.system("ldd "+_api.__file__+"| grep -E 'libimf|libsvml|libintlc'") # nosec 63 from .test import test 64 test(arg) 65 print("done") 66 67 68def tbb_process_pool_worker27(inqueue, outqueue, initializer=None, initargs=(), 69 maxtasks=None): 70 from multiprocessing.pool import worker 71 worker(inqueue, outqueue, initializer, initargs, maxtasks) 72 if ipc_enabled: 73 try: 74 librml = ctypes.CDLL(libirml) 75 librml.release_resources() 76 except: 77 print("Warning: Can not load ", libirml, file=sys.stderr) 78 79 80class TBBProcessPool27(multiprocessing.pool.Pool): 81 def _repopulate_pool(self): 82 """Bring the number of pool processes up to the specified number, 83 for use after reaping workers which have exited. 84 """ 85 from multiprocessing.util import debug 86 87 for i in range(self._processes - len(self._pool)): 88 w = self.Process(target=tbb_process_pool_worker27, 89 args=(self._inqueue, self._outqueue, 90 self._initializer, 91 self._initargs, self._maxtasksperchild) 92 ) 93 self._pool.append(w) 94 w.name = w.name.replace('Process', 'PoolWorker') 95 w.daemon = True 96 w.start() 97 debug('added worker') 98 99 def __del__(self): 100 self.close() 101 for p in self._pool: 102 p.join() 103 104 def __exit__(self, *args): 105 self.close() 106 for p in self._pool: 107 p.join() 108 109 110def tbb_process_pool_worker3(inqueue, outqueue, initializer=None, initargs=(), 111 maxtasks=None, wrap_exception=False): 112 from multiprocessing.pool import worker 113 worker(inqueue, outqueue, initializer, initargs, maxtasks, wrap_exception) 114 if ipc_enabled: 115 try: 116 librml = ctypes.CDLL(libirml) 117 librml.release_resources() 118 except: 119 print("Warning: Can not load ", libirml, file=sys.stderr) 120 121 122class TBBProcessPool3(multiprocessing.pool.Pool): 123 def _repopulate_pool(self): 124 """Bring the number of pool processes up to the specified number, 125 for use after reaping workers which have exited. 126 """ 127 from multiprocessing.util import debug 128 129 for i in range(self._processes - len(self._pool)): 130 w = self.Process(target=tbb_process_pool_worker3, 131 args=(self._inqueue, self._outqueue, 132 self._initializer, 133 self._initargs, self._maxtasksperchild, 134 self._wrap_exception) 135 ) 136 self._pool.append(w) 137 w.name = w.name.replace('Process', 'PoolWorker') 138 w.daemon = True 139 w.start() 140 debug('added worker') 141 142 def __del__(self): 143 self.close() 144 for p in self._pool: 145 p.join() 146 147 def __exit__(self, *args): 148 self.close() 149 for p in self._pool: 150 p.join() 151 152 153class Monkey: 154 """ 155 Context manager which replaces standard multiprocessing.pool 156 implementations with tbb.pool using monkey-patching. It also enables oneTBB 157 threading for Intel(R) oneAPI Math Kernel Library (oneMKL). For example: 158 159 with tbb.Monkey(): 160 run_my_numpy_code() 161 162 It allows multiple parallel tasks to be executed on the same thread pool 163 and coordinate number of threads across multiple processes thus avoiding 164 overheads from oversubscription. 165 """ 166 _items = {} 167 _modules = {} 168 169 def __init__(self, max_num_threads=None, benchmark=False): 170 """ 171 Create context manager for running under TBB scheduler. 172 :param max_num_threads: if specified, limits maximal number of threads 173 :param benchmark: if specified, blocks in initialization until requested number of threads are ready 174 """ 175 if max_num_threads: 176 self.ctl = global_control(global_control.max_allowed_parallelism, int(max_num_threads)) 177 if benchmark: 178 if not max_num_threads: 179 max_num_threads = default_num_threads() 180 from .api import _concurrency_barrier 181 _concurrency_barrier(int(max_num_threads)) 182 183 def _patch(self, class_name, module_name, obj): 184 m = self._modules[class_name] = __import__(module_name, globals(), 185 locals(), [class_name]) 186 if m == None: 187 return 188 oldattr = getattr(m, class_name, None) 189 if oldattr == None: 190 self._modules[class_name] = None 191 return 192 self._items[class_name] = oldattr 193 setattr(m, class_name, obj) 194 195 def __enter__(self): 196 global is_active 197 assert is_active == False, "tbb.Monkey does not support nesting yet" 198 is_active = True 199 self.env_mkl = os.getenv('MKL_THREADING_LAYER') 200 os.environ['MKL_THREADING_LAYER'] = 'TBB' 201 self.env_numba = os.getenv('NUMBA_THREADING_LAYER') 202 os.environ['NUMBA_THREADING_LAYER'] = 'TBB' 203 204 if ipc_enabled: 205 if sys.version_info.major == 2 and sys.version_info.minor >= 7: 206 self._patch("Pool", "multiprocessing.pool", TBBProcessPool27) 207 elif sys.version_info.major == 3 and sys.version_info.minor >= 5: 208 self._patch("Pool", "multiprocessing.pool", TBBProcessPool3) 209 self._patch("ThreadPool", "multiprocessing.pool", Pool) 210 return self 211 212 def __exit__(self, exc_type, exc_value, traceback): 213 global is_active 214 assert is_active == True, "modified?" 215 is_active = False 216 if self.env_mkl is None: 217 del os.environ['MKL_THREADING_LAYER'] 218 else: 219 os.environ['MKL_THREADING_LAYER'] = self.env_mkl 220 if self.env_numba is None: 221 del os.environ['NUMBA_THREADING_LAYER'] 222 else: 223 os.environ['NUMBA_THREADING_LAYER'] = self.env_numba 224 for name in self._items.keys(): 225 setattr(self._modules[name], name, self._items[name]) 226 227 228def init_sem_name(): 229 try: 230 librml = ctypes.CDLL(libirml) 231 librml.set_active_sem_name() 232 librml.set_stop_sem_name() 233 except Exception as e: 234 print("Warning: Can not initialize name of shared semaphores:", e, 235 file=sys.stderr) 236 237 238def tbb_atexit(): 239 if ipc_enabled: 240 try: 241 librml = ctypes.CDLL(libirml) 242 librml.release_semaphores() 243 except: 244 print("Warning: Can not release shared semaphores", 245 file=sys.stderr) 246 247 248def _main(): 249 # Run the module specified as the next command line argument 250 # python3 -m TBB user_app.py 251 global ipc_enabled 252 253 import platform 254 import argparse 255 parser = argparse.ArgumentParser(prog="python3 -m tbb", description=""" 256 Run your Python script in context of tbb.Monkey, which 257 replaces standard Python pools and threading layer of 258 Intel(R) oneAPI Math Kernel Library (oneMKL) by implementation based on 259 Intel(R) oneAPI Threading Building Blocks (oneTBB). It enables multiple parallel 260 tasks to be executed on the same thread pool and coordinate 261 number of threads across multiple processes thus avoiding 262 overheads from oversubscription. 263 """, formatter_class=argparse.ArgumentDefaultsHelpFormatter) 264 if platform.system() == "Linux": 265 parser.add_argument('--ipc', action='store_true', 266 help="Enable inter-process (IPC) coordination between oneTBB schedulers") 267 parser.add_argument('-a', '--allocator', action='store_true', 268 help="Enable oneTBB scalable allocator as a replacement for standard memory allocator") 269 parser.add_argument('--allocator-huge-pages', action='store_true', 270 help="Enable huge pages for oneTBB allocator (implies: -a)") 271 parser.add_argument('-p', '--max-num-threads', default=default_num_threads(), type=int, 272 help="Initialize oneTBB with P max number of threads per process", metavar='P') 273 parser.add_argument('-b', '--benchmark', action='store_true', 274 help="Block oneTBB initialization until all the threads are created before continue the script. " 275 "This is necessary for performance benchmarks that want to exclude lazy scheduler initialization effects from the measurements") 276 parser.add_argument('-v', '--verbose', action='store_true', 277 help="Request verbose and version information") 278 parser.add_argument('-m', action='store_true', dest='module', 279 help="Executes following as a module") 280 parser.add_argument('name', help="Script or module name") 281 parser.add_argument('args', nargs=argparse.REMAINDER, 282 help="Command line arguments") 283 args = parser.parse_args() 284 285 if args.verbose: 286 os.environ["TBB_VERSION"] = "1" 287 if platform.system() == "Linux": 288 if args.allocator_huge_pages: 289 args.allocator = True 290 if args.allocator and not os.environ.get("_TBB_MALLOC_PRELOAD"): 291 libtbbmalloc_lib = 'libtbbmalloc_proxy.so.2' 292 ld_preload = 'LD_PRELOAD' 293 os.environ["_TBB_MALLOC_PRELOAD"] = "1" 294 preload_list = filter(None, os.environ.get(ld_preload, "").split(':')) 295 if libtbbmalloc_lib in preload_list: 296 print('Info:', ld_preload, "contains", libtbbmalloc_lib, "already\n") 297 else: 298 os.environ[ld_preload] = ':'.join([libtbbmalloc_lib] + list(preload_list)) 299 300 if args.allocator_huge_pages: 301 assert platform.system() == "Linux" 302 try: 303 with open('/proc/sys/vm/nr_hugepages', 'r') as f: 304 pages = int(f.read()) 305 if pages == 0: 306 print("oneTBB: Pre-allocated huge pages are not currently reserved in the system. To reserve, run e.g.:\n" 307 "\tsudo sh -c 'echo 2000 > /proc/sys/vm/nr_hugepages'") 308 os.environ["TBB_MALLOC_USE_HUGE_PAGES"] = "1" 309 except: 310 print("oneTBB: Failed to read number of pages from /proc/sys/vm/nr_hugepages\n" 311 "\tIs the Linux kernel configured with the huge pages feature?") 312 sys.exit(1) 313 314 os.execl(sys.executable, sys.executable, '-m', 'tbb', *sys.argv[1:]) 315 assert False, "Re-execution failed" 316 317 sys.argv = [args.name] + args.args 318 ipc_enabled = platform.system() == "Linux" and args.ipc 319 os.environ["IPC_ENABLE"] = "1" if ipc_enabled else "0" 320 if ipc_enabled: 321 atexit.register(tbb_atexit) 322 init_sem_name() 323 if not os.environ.get("KMP_BLOCKTIME"): # TODO move 324 os.environ["KMP_BLOCKTIME"] = "0" 325 if '_' + args.name in globals(): 326 return globals()['_' + args.name](*args.args) 327 else: 328 import runpy 329 runf = runpy.run_module if args.module else runpy.run_path 330 with Monkey(max_num_threads=args.max_num_threads, benchmark=args.benchmark): 331 runf(args.name, run_name='__main__') 332