151c0b2f7Stbbdev %pythonbegin %{ 251c0b2f7Stbbdev # 3*b15aabb3Stbbdev # Copyright (c) 2016-2021 Intel Corporation 451c0b2f7Stbbdev # 551c0b2f7Stbbdev # Licensed under the Apache License, Version 2.0 (the "License"); 651c0b2f7Stbbdev # you may not use this file except in compliance with the License. 751c0b2f7Stbbdev # You may obtain a copy of the License at 851c0b2f7Stbbdev # 951c0b2f7Stbbdev # http://www.apache.org/licenses/LICENSE-2.0 1051c0b2f7Stbbdev # 1151c0b2f7Stbbdev # Unless required by applicable law or agreed to in writing, software 1251c0b2f7Stbbdev # distributed under the License is distributed on an "AS IS" BASIS, 1351c0b2f7Stbbdev # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1451c0b2f7Stbbdev # See the License for the specific language governing permissions and 1551c0b2f7Stbbdev # limitations under the License. 1651c0b2f7Stbbdev 1751c0b2f7Stbbdev 1851c0b2f7Stbbdev __all__ = ["task_arena", 1951c0b2f7Stbbdev "task_group", 2051c0b2f7Stbbdev "global_control", 2151c0b2f7Stbbdev "default_num_threads", 2251c0b2f7Stbbdev "this_task_arena_max_concurrency", 2351c0b2f7Stbbdev "this_task_arena_current_thread_index", 2451c0b2f7Stbbdev "runtime_version", 2551c0b2f7Stbbdev "runtime_interface_version"] 2651c0b2f7Stbbdev %} 2751c0b2f7Stbbdev %begin %{ 2851c0b2f7Stbbdev /* Defines Python wrappers for Intel(R) oneAPI Threading Building Blocks (oneTBB) */ 2951c0b2f7Stbbdev %} 3051c0b2f7Stbbdev %module api 3151c0b2f7Stbbdev 3251c0b2f7Stbbdev #if SWIG_VERSION < 0x030001 3351c0b2f7Stbbdev #error SWIG version 3.0.6 or newer is required for correct functioning 3451c0b2f7Stbbdev #endif 3551c0b2f7Stbbdev 3651c0b2f7Stbbdev %{ 3751c0b2f7Stbbdev #define TBB_PREVIEW_WAITING_FOR_WORKERS 1 3851c0b2f7Stbbdev #include "tbb/task_arena.h" 3951c0b2f7Stbbdev #include "tbb/task_group.h" 4051c0b2f7Stbbdev #include "tbb/global_control.h" 4151c0b2f7Stbbdev #include "tbb/version.h" 4251c0b2f7Stbbdev 4351c0b2f7Stbbdev #include <condition_variable> 4451c0b2f7Stbbdev #include <mutex> 4551c0b2f7Stbbdev #include <memory> 4651c0b2f7Stbbdev 4751c0b2f7Stbbdev using namespace tbb; 4851c0b2f7Stbbdev 4951c0b2f7Stbbdev class PyCaller : public swig::SwigPtr_PyObject { 5051c0b2f7Stbbdev public: 5151c0b2f7Stbbdev // icpc 2013 does not support simple using SwigPtr_PyObject::SwigPtr_PyObject; PyCaller(const PyCaller & s)5251c0b2f7Stbbdev PyCaller(const PyCaller& s) : SwigPtr_PyObject(s) {} SwigPtr_PyObject(p,initial)5351c0b2f7Stbbdev PyCaller(PyObject *p, bool initial = true) : SwigPtr_PyObject(p, initial) {} 5451c0b2f7Stbbdev operator()5551c0b2f7Stbbdev void operator()() const { 5651c0b2f7Stbbdev SWIG_PYTHON_THREAD_BEGIN_BLOCK; 5751c0b2f7Stbbdev PyObject* r = PyObject_CallFunctionObjArgs((PyObject*)*this, nullptr); 5851c0b2f7Stbbdev if(r) Py_DECREF(r); 5951c0b2f7Stbbdev SWIG_PYTHON_THREAD_END_BLOCK; 6051c0b2f7Stbbdev } 6151c0b2f7Stbbdev }; 6251c0b2f7Stbbdev 6351c0b2f7Stbbdev struct ArenaPyCaller { 6451c0b2f7Stbbdev task_arena *my_arena; 6551c0b2f7Stbbdev PyObject *my_callable; ArenaPyCallerArenaPyCaller6651c0b2f7Stbbdev ArenaPyCaller(task_arena *a, PyObject *c) : my_arena(a), my_callable(c) { 6751c0b2f7Stbbdev SWIG_PYTHON_THREAD_BEGIN_BLOCK; 6851c0b2f7Stbbdev Py_XINCREF(c); 6951c0b2f7Stbbdev SWIG_PYTHON_THREAD_END_BLOCK; 7051c0b2f7Stbbdev } operatorArenaPyCaller7151c0b2f7Stbbdev void operator()() const { 7251c0b2f7Stbbdev my_arena->execute(PyCaller(my_callable, false)); 7351c0b2f7Stbbdev } 7451c0b2f7Stbbdev }; 7551c0b2f7Stbbdev 7651c0b2f7Stbbdev struct barrier_data { 7751c0b2f7Stbbdev std::condition_variable event; 7851c0b2f7Stbbdev std::mutex m; 7951c0b2f7Stbbdev int worker_threads, full_threads; 8051c0b2f7Stbbdev }; 8151c0b2f7Stbbdev 8251c0b2f7Stbbdev void _concurrency_barrier(int threads = tbb::task_arena::automatic) { 8351c0b2f7Stbbdev if(threads == tbb::task_arena::automatic) 8451c0b2f7Stbbdev threads = tbb::this_task_arena::max_concurrency(); 8551c0b2f7Stbbdev if(threads < 2) 8651c0b2f7Stbbdev return; 8751c0b2f7Stbbdev std::unique_ptr<global_control> g( 8851c0b2f7Stbbdev (global_control::active_value(global_control::max_allowed_parallelism) < unsigned(threads))? 8951c0b2f7Stbbdev new global_control(global_control::max_allowed_parallelism, threads) : nullptr); 9051c0b2f7Stbbdev 9151c0b2f7Stbbdev tbb::task_group tg; 9251c0b2f7Stbbdev barrier_data b; 9351c0b2f7Stbbdev b.worker_threads = 0; 9451c0b2f7Stbbdev b.full_threads = threads-1; 9551c0b2f7Stbbdev for(int i = 0; i < b.full_threads; i++) 9651c0b2f7Stbbdev tg.run([&b]{ 9751c0b2f7Stbbdev std::unique_lock<std::mutex> lock(b.m); 9851c0b2f7Stbbdev if(++b.worker_threads >= b.full_threads) 9951c0b2f7Stbbdev b.event.notify_all(); 10051c0b2f7Stbbdev else while(b.worker_threads < b.full_threads) 10151c0b2f7Stbbdev b.event.wait(lock); 10251c0b2f7Stbbdev }); 10351c0b2f7Stbbdev std::unique_lock<std::mutex> lock(b.m); 10451c0b2f7Stbbdev b.event.wait(lock); 10551c0b2f7Stbbdev tg.wait(); 10651c0b2f7Stbbdev }; 10751c0b2f7Stbbdev 10851c0b2f7Stbbdev %} 10951c0b2f7Stbbdev 11051c0b2f7Stbbdev void _concurrency_barrier(int threads = tbb::task_arena::automatic); 11151c0b2f7Stbbdev 11251c0b2f7Stbbdev namespace tbb { 11351c0b2f7Stbbdev 11451c0b2f7Stbbdev class task_arena { 11551c0b2f7Stbbdev public: 11651c0b2f7Stbbdev static const int automatic = -1; 11751c0b2f7Stbbdev task_arena(int max_concurrency = automatic, unsigned reserved_for_masters = 1); 11851c0b2f7Stbbdev task_arena(const task_arena &s); 11951c0b2f7Stbbdev ~task_arena(); 12051c0b2f7Stbbdev void initialize(); 12151c0b2f7Stbbdev void initialize(int max_concurrency, unsigned reserved_for_masters = 1); 12251c0b2f7Stbbdev void terminate(); 12351c0b2f7Stbbdev bool is_active(); 12451c0b2f7Stbbdev %extend { enqueue(PyObject * c)12551c0b2f7Stbbdev void enqueue( PyObject *c ) { $self->enqueue(PyCaller(c)); } execute(PyObject * c)12651c0b2f7Stbbdev void execute( PyObject *c ) { $self->execute(PyCaller(c)); } 12751c0b2f7Stbbdev }; 12851c0b2f7Stbbdev }; 12951c0b2f7Stbbdev 13051c0b2f7Stbbdev class task_group { 13151c0b2f7Stbbdev public: 13251c0b2f7Stbbdev task_group(); 13351c0b2f7Stbbdev ~task_group(); 13451c0b2f7Stbbdev void wait(); 13551c0b2f7Stbbdev void cancel(); 13651c0b2f7Stbbdev %extend { run(PyObject * c)13751c0b2f7Stbbdev void run( PyObject *c ) { $self->run(PyCaller(c)); } run(PyObject * c,task_arena * a)13851c0b2f7Stbbdev void run( PyObject *c, task_arena *a ) { $self->run(ArenaPyCaller(a, c)); } 13951c0b2f7Stbbdev }; 14051c0b2f7Stbbdev }; 14151c0b2f7Stbbdev 14251c0b2f7Stbbdev class global_control { 14351c0b2f7Stbbdev public: 14451c0b2f7Stbbdev enum parameter { 14551c0b2f7Stbbdev max_allowed_parallelism, 14651c0b2f7Stbbdev thread_stack_size, 14751c0b2f7Stbbdev parameter_max // insert new parameters above this point 14851c0b2f7Stbbdev }; 14951c0b2f7Stbbdev global_control(parameter param, size_t value); 15051c0b2f7Stbbdev ~global_control(); 15151c0b2f7Stbbdev static size_t active_value(parameter param); 15251c0b2f7Stbbdev }; 15351c0b2f7Stbbdev 15451c0b2f7Stbbdev } // tbb 15551c0b2f7Stbbdev 15651c0b2f7Stbbdev %inline { runtime_version()15751c0b2f7Stbbdev inline const char* runtime_version() { return TBB_runtime_version();} runtime_interface_version()15851c0b2f7Stbbdev inline int runtime_interface_version() { return TBB_runtime_interface_version();} this_task_arena_max_concurrency()15951c0b2f7Stbbdev inline int this_task_arena_max_concurrency() { return this_task_arena::max_concurrency();} this_task_arena_current_thread_index()16051c0b2f7Stbbdev inline int this_task_arena_current_thread_index() { return this_task_arena::current_thread_index();} 16151c0b2f7Stbbdev }; 16251c0b2f7Stbbdev 16351c0b2f7Stbbdev // Additional definitions for Python part of the module 16451c0b2f7Stbbdev %pythoncode %{ 16551c0b2f7Stbbdev default_num_threads = this_task_arena_max_concurrency 16651c0b2f7Stbbdev %} 167