1#!/usr/bin/env python3 2# 3# Copyright (c) 2016-2021 Intel Corporation 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import multiprocessing.pool 18import ctypes 19import atexit 20import sys 21import os 22 23from .api import * 24from .api import __all__ as api__all 25from .pool import * 26from .pool import __all__ as pool__all 27 28__all__ = ["Monkey", "is_active"] + api__all + pool__all 29 30__doc__ = """ 31Python API for Intel(R) oneAPI Threading Building Blocks (oneTBB) 32extended with standard Python's pools implementation and monkey-patching. 33 34Command-line interface example: 35$ python3 -m tbb $your_script.py 36Runs your_script.py in context of tbb.Monkey 37""" 38 39is_active = False 40""" Indicates whether oneTBB context is activated """ 41 42ipc_enabled = False 43""" Indicates whether IPC mode is enabled """ 44 45libirml = "libirml.so.1" 46 47 48def _test(arg=None): 49 """Some tests""" 50 import platform 51 if platform.system() == "Linux": 52 ctypes.CDLL(libirml) 53 assert 256 == os.system("ldd "+_api.__file__+"| grep -E 'libimf|libsvml|libintlc'") # nosec 54 from .test import test 55 test(arg) 56 print("done") 57 58 59def tbb_process_pool_worker27(inqueue, outqueue, initializer=None, initargs=(), 60 maxtasks=None): 61 from multiprocessing.pool import worker 62 worker(inqueue, outqueue, initializer, initargs, maxtasks) 63 if ipc_enabled: 64 try: 65 librml = ctypes.CDLL(libirml) 66 librml.release_resources() 67 except: 68 print("Warning: Can not load ", libirml, file=sys.stderr) 69 70 71class TBBProcessPool27(multiprocessing.pool.Pool): 72 def _repopulate_pool(self): 73 """Bring the number of pool processes up to the specified number, 74 for use after reaping workers which have exited. 75 """ 76 from multiprocessing.util import debug 77 78 for i in range(self._processes - len(self._pool)): 79 w = self.Process(target=tbb_process_pool_worker27, 80 args=(self._inqueue, self._outqueue, 81 self._initializer, 82 self._initargs, self._maxtasksperchild) 83 ) 84 self._pool.append(w) 85 w.name = w.name.replace('Process', 'PoolWorker') 86 w.daemon = True 87 w.start() 88 debug('added worker') 89 90 def __del__(self): 91 self.close() 92 for p in self._pool: 93 p.join() 94 95 def __exit__(self, *args): 96 self.close() 97 for p in self._pool: 98 p.join() 99 100 101def tbb_process_pool_worker3(inqueue, outqueue, initializer=None, initargs=(), 102 maxtasks=None, wrap_exception=False): 103 from multiprocessing.pool import worker 104 worker(inqueue, outqueue, initializer, initargs, maxtasks, wrap_exception) 105 if ipc_enabled: 106 try: 107 librml = ctypes.CDLL(libirml) 108 librml.release_resources() 109 except: 110 print("Warning: Can not load ", libirml, file=sys.stderr) 111 112 113class TBBProcessPool3(multiprocessing.pool.Pool): 114 def _repopulate_pool(self): 115 """Bring the number of pool processes up to the specified number, 116 for use after reaping workers which have exited. 117 """ 118 from multiprocessing.util import debug 119 120 for i in range(self._processes - len(self._pool)): 121 w = self.Process(target=tbb_process_pool_worker3, 122 args=(self._inqueue, self._outqueue, 123 self._initializer, 124 self._initargs, self._maxtasksperchild, 125 self._wrap_exception) 126 ) 127 self._pool.append(w) 128 w.name = w.name.replace('Process', 'PoolWorker') 129 w.daemon = True 130 w.start() 131 debug('added worker') 132 133 def __del__(self): 134 self.close() 135 for p in self._pool: 136 p.join() 137 138 def __exit__(self, *args): 139 self.close() 140 for p in self._pool: 141 p.join() 142 143 144class Monkey: 145 """ 146 Context manager which replaces standard multiprocessing.pool 147 implementations with tbb.pool using monkey-patching. It also enables oneTBB 148 threading for Intel(R) oneAPI Math Kernel Library (oneMKL). For example: 149 150 with tbb.Monkey(): 151 run_my_numpy_code() 152 153 It allows multiple parallel tasks to be executed on the same thread pool 154 and coordinate number of threads across multiple processes thus avoiding 155 overheads from oversubscription. 156 """ 157 _items = {} 158 _modules = {} 159 160 def __init__(self, max_num_threads=None, benchmark=False): 161 """ 162 Create context manager for running under TBB scheduler. 163 :param max_num_threads: if specified, limits maximal number of threads 164 :param benchmark: if specified, blocks in initialization until requested number of threads are ready 165 """ 166 if max_num_threads: 167 self.ctl = global_control(global_control.max_allowed_parallelism, int(max_num_threads)) 168 if benchmark: 169 if not max_num_threads: 170 max_num_threads = default_num_threads() 171 from .api import _concurrency_barrier 172 _concurrency_barrier(int(max_num_threads)) 173 174 def _patch(self, class_name, module_name, obj): 175 m = self._modules[class_name] = __import__(module_name, globals(), 176 locals(), [class_name]) 177 if m == None: 178 return 179 oldattr = getattr(m, class_name, None) 180 if oldattr == None: 181 self._modules[class_name] = None 182 return 183 self._items[class_name] = oldattr 184 setattr(m, class_name, obj) 185 186 def __enter__(self): 187 global is_active 188 assert is_active == False, "tbb.Monkey does not support nesting yet" 189 is_active = True 190 self.env_mkl = os.getenv('MKL_THREADING_LAYER') 191 os.environ['MKL_THREADING_LAYER'] = 'TBB' 192 self.env_numba = os.getenv('NUMBA_THREADING_LAYER') 193 os.environ['NUMBA_THREADING_LAYER'] = 'TBB' 194 195 if ipc_enabled: 196 if sys.version_info.major == 2 and sys.version_info.minor >= 7: 197 self._patch("Pool", "multiprocessing.pool", TBBProcessPool27) 198 elif sys.version_info.major == 3 and sys.version_info.minor >= 5: 199 self._patch("Pool", "multiprocessing.pool", TBBProcessPool3) 200 self._patch("ThreadPool", "multiprocessing.pool", Pool) 201 return self 202 203 def __exit__(self, exc_type, exc_value, traceback): 204 global is_active 205 assert is_active == True, "modified?" 206 is_active = False 207 if self.env_mkl is None: 208 del os.environ['MKL_THREADING_LAYER'] 209 else: 210 os.environ['MKL_THREADING_LAYER'] = self.env_mkl 211 if self.env_numba is None: 212 del os.environ['NUMBA_THREADING_LAYER'] 213 else: 214 os.environ['NUMBA_THREADING_LAYER'] = self.env_numba 215 for name in self._items.keys(): 216 setattr(self._modules[name], name, self._items[name]) 217 218 219def init_sem_name(): 220 try: 221 librml = ctypes.CDLL(libirml) 222 librml.set_active_sem_name() 223 librml.set_stop_sem_name() 224 except Exception as e: 225 print("Warning: Can not initialize name of shared semaphores:", e, 226 file=sys.stderr) 227 228 229def tbb_atexit(): 230 if ipc_enabled: 231 try: 232 librml = ctypes.CDLL(libirml) 233 librml.release_semaphores() 234 except: 235 print("Warning: Can not release shared semaphores", 236 file=sys.stderr) 237 238 239def _main(): 240 # Run the module specified as the next command line argument 241 # python3 -m TBB user_app.py 242 global ipc_enabled 243 244 import platform 245 import argparse 246 parser = argparse.ArgumentParser(prog="python3 -m tbb", description=""" 247 Run your Python script in context of tbb.Monkey, which 248 replaces standard Python pools and threading layer of 249 Intel(R) oneAPI Math Kernel Library (oneMKL) by implementation based on 250 Intel(R) oneAPI Threading Building Blocks (oneTBB). It enables multiple parallel 251 tasks to be executed on the same thread pool and coordinate 252 number of threads across multiple processes thus avoiding 253 overheads from oversubscription. 254 """, formatter_class=argparse.ArgumentDefaultsHelpFormatter) 255 if platform.system() == "Linux": 256 parser.add_argument('--ipc', action='store_true', 257 help="Enable inter-process (IPC) coordination between oneTBB schedulers") 258 parser.add_argument('-a', '--allocator', action='store_true', 259 help="Enable oneTBB scalable allocator as a replacement for standard memory allocator") 260 parser.add_argument('--allocator-huge-pages', action='store_true', 261 help="Enable huge pages for oneTBB allocator (implies: -a)") 262 parser.add_argument('-p', '--max-num-threads', default=default_num_threads(), type=int, 263 help="Initialize oneTBB with P max number of threads per process", metavar='P') 264 parser.add_argument('-b', '--benchmark', action='store_true', 265 help="Block oneTBB initialization until all the threads are created before continue the script. " 266 "This is necessary for performance benchmarks that want to exclude lazy scheduler initialization effects from the measurements") 267 parser.add_argument('-v', '--verbose', action='store_true', 268 help="Request verbose and version information") 269 parser.add_argument('-m', action='store_true', dest='module', 270 help="Executes following as a module") 271 parser.add_argument('name', help="Script or module name") 272 parser.add_argument('args', nargs=argparse.REMAINDER, 273 help="Command line arguments") 274 args = parser.parse_args() 275 276 if args.verbose: 277 os.environ["TBB_VERSION"] = "1" 278 if platform.system() == "Linux": 279 if args.allocator_huge_pages: 280 args.allocator = True 281 if args.allocator and not os.environ.get("_TBB_MALLOC_PRELOAD"): 282 libtbbmalloc_lib = 'libtbbmalloc_proxy.so.2' 283 ld_preload = 'LD_PRELOAD' 284 os.environ["_TBB_MALLOC_PRELOAD"] = "1" 285 preload_list = filter(None, os.environ.get(ld_preload, "").split(':')) 286 if libtbbmalloc_lib in preload_list: 287 print('Info:', ld_preload, "contains", libtbbmalloc_lib, "already\n") 288 else: 289 os.environ[ld_preload] = ':'.join([libtbbmalloc_lib] + list(preload_list)) 290 291 if args.allocator_huge_pages: 292 assert platform.system() == "Linux" 293 try: 294 with open('/proc/sys/vm/nr_hugepages', 'r') as f: 295 pages = int(f.read()) 296 if pages == 0: 297 print("oneTBB: Pre-allocated huge pages are not currently reserved in the system. To reserve, run e.g.:\n" 298 "\tsudo sh -c 'echo 2000 > /proc/sys/vm/nr_hugepages'") 299 os.environ["TBB_MALLOC_USE_HUGE_PAGES"] = "1" 300 except: 301 print("oneTBB: Failed to read number of pages from /proc/sys/vm/nr_hugepages\n" 302 "\tIs the Linux kernel configured with the huge pages feature?") 303 sys.exit(1) 304 305 os.execl(sys.executable, sys.executable, '-m', 'tbb', *sys.argv[1:]) 306 assert False, "Re-execution failed" 307 308 sys.argv = [args.name] + args.args 309 ipc_enabled = platform.system() == "Linux" and args.ipc 310 os.environ["IPC_ENABLE"] = "1" if ipc_enabled else "0" 311 if ipc_enabled: 312 atexit.register(tbb_atexit) 313 init_sem_name() 314 if not os.environ.get("KMP_BLOCKTIME"): # TODO move 315 os.environ["KMP_BLOCKTIME"] = "0" 316 if '_' + args.name in globals(): 317 return globals()['_' + args.name](*args.args) 318 else: 319 import runpy 320 runf = runpy.run_module if args.module else runpy.run_path 321 with Monkey(max_num_threads=args.max_num_threads, benchmark=args.benchmark): 322 runf(args.name, run_name='__main__') 323