1*9acde482SJonathan Wakely# Copyright (c) 2016-2023 Intel Corporation 251c0b2f7Stbbdev# 351c0b2f7Stbbdev# Licensed under the Apache License, Version 2.0 (the "License"); 451c0b2f7Stbbdev# you may not use this file except in compliance with the License. 551c0b2f7Stbbdev# You may obtain a copy of the License at 651c0b2f7Stbbdev# 751c0b2f7Stbbdev# http://www.apache.org/licenses/LICENSE-2.0 851c0b2f7Stbbdev# 951c0b2f7Stbbdev# Unless required by applicable law or agreed to in writing, software 1051c0b2f7Stbbdev# distributed under the License is distributed on an "AS IS" BASIS, 1151c0b2f7Stbbdev# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1251c0b2f7Stbbdev# See the License for the specific language governing permissions and 1351c0b2f7Stbbdev# limitations under the License. 1451c0b2f7Stbbdev 1551c0b2f7Stbbdev# Based on the software developed by: 1651c0b2f7Stbbdev# Copyright (c) 2008,2016 david decotigny (Pool of threads) 1751c0b2f7Stbbdev# Copyright (c) 2006-2008, R Oudkerk (multiprocessing.Pool) 1851c0b2f7Stbbdev# All rights reserved. 1951c0b2f7Stbbdev# 2051c0b2f7Stbbdev# Redistribution and use in source and binary forms, with or without 2151c0b2f7Stbbdev# modification, are permitted provided that the following conditions 2251c0b2f7Stbbdev# are met: 2351c0b2f7Stbbdev# 2451c0b2f7Stbbdev# 1. Redistributions of source code must retain the above copyright 2551c0b2f7Stbbdev# notice, this list of conditions and the following disclaimer. 2651c0b2f7Stbbdev# 2. Redistributions in binary form must reproduce the above copyright 2751c0b2f7Stbbdev# notice, this list of conditions and the following disclaimer in the 2851c0b2f7Stbbdev# documentation and/or other materials provided with the distribution. 2951c0b2f7Stbbdev# 3. Neither the name of author nor the names of any contributors may be 3051c0b2f7Stbbdev# used to endorse or promote products derived from this software 3151c0b2f7Stbbdev# without specific prior written permission. 3251c0b2f7Stbbdev# 3351c0b2f7Stbbdev# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 3451c0b2f7Stbbdev# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 3551c0b2f7Stbbdev# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 3651c0b2f7Stbbdev# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 3751c0b2f7Stbbdev# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 3851c0b2f7Stbbdev# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 3951c0b2f7Stbbdev# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 4051c0b2f7Stbbdev# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 4151c0b2f7Stbbdev# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 4251c0b2f7Stbbdev# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 4351c0b2f7Stbbdev# SUCH DAMAGE. 4451c0b2f7Stbbdev# 4551c0b2f7Stbbdev 4651c0b2f7Stbbdev# @brief Python Pool implementation based on TBB with monkey-patching 4751c0b2f7Stbbdev# 4851c0b2f7Stbbdev# See http://docs.python.org/dev/library/multiprocessing.html 4951c0b2f7Stbbdev# Differences: added imap_async and imap_unordered_async, and terminate() 5051c0b2f7Stbbdev# has to be called explicitly (it's not registered by atexit). 5151c0b2f7Stbbdev# 5251c0b2f7Stbbdev# The general idea is that we submit works to a workqueue, either as 5351c0b2f7Stbbdev# single Jobs (one function to call), or JobSequences (batch of 5451c0b2f7Stbbdev# Jobs). Each Job is associated with an ApplyResult object which has 2 5551c0b2f7Stbbdev# states: waiting for the Job to complete, or Ready. Instead of 5651c0b2f7Stbbdev# waiting for the jobs to finish, we wait for their ApplyResult object 5751c0b2f7Stbbdev# to become ready: an event mechanism is used for that. 5851c0b2f7Stbbdev# When we apply a function to several arguments in "parallel", we need 5951c0b2f7Stbbdev# a way to wait for all/part of the Jobs to be processed: that's what 6051c0b2f7Stbbdev# "collectors" are for; they group and wait for a set of ApplyResult 6151c0b2f7Stbbdev# objects. Once a collector is ready to be used, we can use a 6251c0b2f7Stbbdev# CollectorIterator to iterate over the result values it's collecting. 6351c0b2f7Stbbdev# 6451c0b2f7Stbbdev# The methods of a Pool object use all these concepts and expose 6551c0b2f7Stbbdev# them to their caller in a very simple way. 6651c0b2f7Stbbdev 6751c0b2f7Stbbdevimport sys 6851c0b2f7Stbbdevimport threading 6951c0b2f7Stbbdevimport traceback 7051c0b2f7Stbbdevfrom .api import * 7151c0b2f7Stbbdev 7251c0b2f7Stbbdev__all__ = ["Pool", "TimeoutError"] 7351c0b2f7Stbbdev__doc__ = """ 7451c0b2f7StbbdevStandard Python Pool implementation based on Python API 7551c0b2f7Stbbdevfor Intel(R) oneAPI Threading Building Blocks (oneTBB) 7651c0b2f7Stbbdev""" 7751c0b2f7Stbbdev 7851c0b2f7Stbbdev 7951c0b2f7Stbbdevclass TimeoutError(Exception): 8051c0b2f7Stbbdev """Raised when a result is not available within the given timeout""" 8151c0b2f7Stbbdev pass 8251c0b2f7Stbbdev 8351c0b2f7Stbbdev 8451c0b2f7Stbbdevclass Pool(object): 8551c0b2f7Stbbdev """ 8651c0b2f7Stbbdev The Pool class provides standard multiprocessing.Pool interface 8751c0b2f7Stbbdev which is mapped onto oneTBB tasks executing in its thread pool 8851c0b2f7Stbbdev """ 8951c0b2f7Stbbdev 9051c0b2f7Stbbdev def __init__(self, nworkers=0, name="Pool"): 9151c0b2f7Stbbdev """ 9251c0b2f7Stbbdev \param nworkers (integer) number of worker threads to start 9351c0b2f7Stbbdev \param name (string) prefix for the worker threads' name 9451c0b2f7Stbbdev """ 9551c0b2f7Stbbdev self._closed = False 9651c0b2f7Stbbdev self._tasks = task_group() 9751c0b2f7Stbbdev self._pool = [None,]*default_num_threads() # Dask asks for len(_pool) 9851c0b2f7Stbbdev 9951c0b2f7Stbbdev def apply(self, func, args=(), kwds=dict()): 10051c0b2f7Stbbdev """Equivalent of the apply() builtin function. It blocks till 10151c0b2f7Stbbdev the result is ready.""" 10251c0b2f7Stbbdev return self.apply_async(func, args, kwds).get() 10351c0b2f7Stbbdev 10451c0b2f7Stbbdev def map(self, func, iterable, chunksize=None): 10551c0b2f7Stbbdev """A parallel equivalent of the map() builtin function. It 10651c0b2f7Stbbdev blocks till the result is ready. 10751c0b2f7Stbbdev 10851c0b2f7Stbbdev This method chops the iterable into a number of chunks which 10951c0b2f7Stbbdev it submits to the process pool as separate tasks. The 11051c0b2f7Stbbdev (approximate) size of these chunks can be specified by setting 11151c0b2f7Stbbdev chunksize to a positive integer.""" 11251c0b2f7Stbbdev return self.map_async(func, iterable, chunksize).get() 11351c0b2f7Stbbdev 11451c0b2f7Stbbdev def imap(self, func, iterable, chunksize=1): 11551c0b2f7Stbbdev """ 11651c0b2f7Stbbdev An equivalent of itertools.imap(). 11751c0b2f7Stbbdev 11851c0b2f7Stbbdev The chunksize argument is the same as the one used by the 11951c0b2f7Stbbdev map() method. For very long iterables using a large value for 12051c0b2f7Stbbdev chunksize can make the job complete much faster than 12151c0b2f7Stbbdev using the default value of 1. 12251c0b2f7Stbbdev 12351c0b2f7Stbbdev Also if chunksize is 1 then the next() method of the iterator 12451c0b2f7Stbbdev returned by the imap() method has an optional timeout 12551c0b2f7Stbbdev parameter: next(timeout) will raise processing.TimeoutError if 12651c0b2f7Stbbdev the result cannot be returned within timeout seconds. 12751c0b2f7Stbbdev """ 12851c0b2f7Stbbdev collector = OrderedResultCollector(as_iterator=True) 12951c0b2f7Stbbdev self._create_sequences(func, iterable, chunksize, collector) 13051c0b2f7Stbbdev return iter(collector) 13151c0b2f7Stbbdev 13251c0b2f7Stbbdev def imap_unordered(self, func, iterable, chunksize=1): 13351c0b2f7Stbbdev """The same as imap() except that the ordering of the results 13451c0b2f7Stbbdev from the returned iterator should be considered 13551c0b2f7Stbbdev arbitrary. (Only when there is only one worker process is the 13651c0b2f7Stbbdev order guaranteed to be "correct".)""" 13751c0b2f7Stbbdev collector = UnorderedResultCollector() 13851c0b2f7Stbbdev self._create_sequences(func, iterable, chunksize, collector) 13951c0b2f7Stbbdev return iter(collector) 14051c0b2f7Stbbdev 14151c0b2f7Stbbdev def apply_async(self, func, args=(), kwds=dict(), callback=None): 14251c0b2f7Stbbdev """A variant of the apply() method which returns an 14351c0b2f7Stbbdev ApplyResult object. 14451c0b2f7Stbbdev 14551c0b2f7Stbbdev If callback is specified then it should be a callable which 14651c0b2f7Stbbdev accepts a single argument. When the result becomes ready, 14751c0b2f7Stbbdev callback is applied to it (unless the call failed). callback 14851c0b2f7Stbbdev should complete immediately since otherwise the thread which 14951c0b2f7Stbbdev handles the results will get blocked.""" 15051c0b2f7Stbbdev assert not self._closed # No lock here. We assume it's atomic... 15151c0b2f7Stbbdev apply_result = ApplyResult(callback=callback) 15251c0b2f7Stbbdev job = Job(func, args, kwds, apply_result) 15351c0b2f7Stbbdev self._tasks.run(job) 15451c0b2f7Stbbdev return apply_result 15551c0b2f7Stbbdev 15651c0b2f7Stbbdev def map_async(self, func, iterable, chunksize=None, callback=None): 15751c0b2f7Stbbdev """A variant of the map() method which returns a ApplyResult 15851c0b2f7Stbbdev object. 15951c0b2f7Stbbdev 16051c0b2f7Stbbdev If callback is specified then it should be a callable which 16151c0b2f7Stbbdev accepts a single argument. When the result becomes ready 16251c0b2f7Stbbdev callback is applied to it (unless the call failed). callback 16351c0b2f7Stbbdev should complete immediately since otherwise the thread which 16451c0b2f7Stbbdev handles the results will get blocked.""" 16551c0b2f7Stbbdev apply_result = ApplyResult(callback=callback) 16651c0b2f7Stbbdev collector = OrderedResultCollector(apply_result, as_iterator=False) 16751c0b2f7Stbbdev if not self._create_sequences(func, iterable, chunksize, collector): 16851c0b2f7Stbbdev apply_result._set_value([]) 16951c0b2f7Stbbdev return apply_result 17051c0b2f7Stbbdev 17151c0b2f7Stbbdev def imap_async(self, func, iterable, chunksize=None, callback=None): 17251c0b2f7Stbbdev """A variant of the imap() method which returns an ApplyResult 17351c0b2f7Stbbdev object that provides an iterator (next method(timeout) 17451c0b2f7Stbbdev available). 17551c0b2f7Stbbdev 17651c0b2f7Stbbdev If callback is specified then it should be a callable which 17751c0b2f7Stbbdev accepts a single argument. When the resulting iterator becomes 17851c0b2f7Stbbdev ready, callback is applied to it (unless the call 17951c0b2f7Stbbdev failed). callback should complete immediately since otherwise 18051c0b2f7Stbbdev the thread which handles the results will get blocked.""" 18151c0b2f7Stbbdev apply_result = ApplyResult(callback=callback) 18251c0b2f7Stbbdev collector = OrderedResultCollector(apply_result, as_iterator=True) 18351c0b2f7Stbbdev if not self._create_sequences(func, iterable, chunksize, collector): 18451c0b2f7Stbbdev apply_result._set_value(iter([])) 18551c0b2f7Stbbdev return apply_result 18651c0b2f7Stbbdev 18751c0b2f7Stbbdev def imap_unordered_async(self, func, iterable, chunksize=None, 18851c0b2f7Stbbdev callback=None): 18951c0b2f7Stbbdev """A variant of the imap_unordered() method which returns an 19051c0b2f7Stbbdev ApplyResult object that provides an iterator (next 19151c0b2f7Stbbdev method(timeout) available). 19251c0b2f7Stbbdev 19351c0b2f7Stbbdev If callback is specified then it should be a callable which 19451c0b2f7Stbbdev accepts a single argument. When the resulting iterator becomes 19551c0b2f7Stbbdev ready, callback is applied to it (unless the call 19651c0b2f7Stbbdev failed). callback should complete immediately since otherwise 19751c0b2f7Stbbdev the thread which handles the results will get blocked.""" 19851c0b2f7Stbbdev apply_result = ApplyResult(callback=callback) 19951c0b2f7Stbbdev collector = UnorderedResultCollector(apply_result) 20051c0b2f7Stbbdev if not self._create_sequences(func, iterable, chunksize, collector): 20151c0b2f7Stbbdev apply_result._set_value(iter([])) 20251c0b2f7Stbbdev return apply_result 20351c0b2f7Stbbdev 20451c0b2f7Stbbdev def close(self): 20551c0b2f7Stbbdev """Prevents any more tasks from being submitted to the 20651c0b2f7Stbbdev pool. Once all the tasks have been completed the worker 20751c0b2f7Stbbdev processes will exit.""" 20851c0b2f7Stbbdev # No lock here. We assume it's sufficiently atomic... 20951c0b2f7Stbbdev self._closed = True 21051c0b2f7Stbbdev 21151c0b2f7Stbbdev def terminate(self): 21251c0b2f7Stbbdev """Stops the worker processes immediately without completing 21351c0b2f7Stbbdev outstanding work. When the pool object is garbage collected 21451c0b2f7Stbbdev terminate() will be called immediately.""" 21551c0b2f7Stbbdev self.close() 21651c0b2f7Stbbdev self._tasks.cancel() 21751c0b2f7Stbbdev 21851c0b2f7Stbbdev def join(self): 21951c0b2f7Stbbdev """Wait for the worker processes to exit. One must call 22051c0b2f7Stbbdev close() or terminate() before using join().""" 22151c0b2f7Stbbdev self._tasks.wait() 22251c0b2f7Stbbdev 22351c0b2f7Stbbdev def __enter__(self): 22451c0b2f7Stbbdev return self 22551c0b2f7Stbbdev 22651c0b2f7Stbbdev def __exit__(self, exc_type, exc_value, traceback): 22751c0b2f7Stbbdev self.join() 22851c0b2f7Stbbdev 22951c0b2f7Stbbdev def __del__(self): 23051c0b2f7Stbbdev self.terminate() 23151c0b2f7Stbbdev self.join() 23251c0b2f7Stbbdev 23351c0b2f7Stbbdev def _create_sequences(self, func, iterable, chunksize, collector): 23451c0b2f7Stbbdev """ 23551c0b2f7Stbbdev Create callable objects to process and pushes them on the 23651c0b2f7Stbbdev work queue. Each work unit is meant to process a slice of 23751c0b2f7Stbbdev iterable of size chunksize. If collector is specified, then 23851c0b2f7Stbbdev the ApplyResult objects associated with the jobs will notify 23951c0b2f7Stbbdev collector when their result becomes ready. 24051c0b2f7Stbbdev 24151c0b2f7Stbbdev \return the list callable objects (basically: JobSequences) 24251c0b2f7Stbbdev pushed onto the work queue 24351c0b2f7Stbbdev """ 24451c0b2f7Stbbdev assert not self._closed # No lock here. We assume it's atomic... 24551c0b2f7Stbbdev it_ = iter(iterable) 24651c0b2f7Stbbdev exit_loop = False 24751c0b2f7Stbbdev sequences = [] 24851c0b2f7Stbbdev while not exit_loop: 24951c0b2f7Stbbdev seq = [] 25051c0b2f7Stbbdev for _ in range(chunksize or 1): 25151c0b2f7Stbbdev try: 25251c0b2f7Stbbdev arg = next(it_) 25351c0b2f7Stbbdev except StopIteration: 25451c0b2f7Stbbdev exit_loop = True 25551c0b2f7Stbbdev break 25651c0b2f7Stbbdev apply_result = ApplyResult(collector) 25751c0b2f7Stbbdev job = Job(func, (arg,), {}, apply_result) 25851c0b2f7Stbbdev seq.append(job) 25951c0b2f7Stbbdev if seq: 26051c0b2f7Stbbdev sequences.append(JobSequence(seq)) 26151c0b2f7Stbbdev for t in sequences: 26251c0b2f7Stbbdev self._tasks.run(t) 26351c0b2f7Stbbdev return sequences 26451c0b2f7Stbbdev 26551c0b2f7Stbbdev 26651c0b2f7Stbbdevclass Job: 26751c0b2f7Stbbdev """A work unit that corresponds to the execution of a single function""" 26851c0b2f7Stbbdev 26951c0b2f7Stbbdev def __init__(self, func, args, kwds, apply_result): 27051c0b2f7Stbbdev """ 27151c0b2f7Stbbdev \param func/args/kwds used to call the function 27251c0b2f7Stbbdev \param apply_result ApplyResult object that holds the result 27351c0b2f7Stbbdev of the function call 27451c0b2f7Stbbdev """ 27551c0b2f7Stbbdev self._func = func 27651c0b2f7Stbbdev self._args = args 27751c0b2f7Stbbdev self._kwds = kwds 27851c0b2f7Stbbdev self._result = apply_result 27951c0b2f7Stbbdev 28051c0b2f7Stbbdev def __call__(self): 28151c0b2f7Stbbdev """ 28251c0b2f7Stbbdev Call the function with the args/kwds and tell the ApplyResult 28351c0b2f7Stbbdev that its result is ready. Correctly handles the exceptions 28451c0b2f7Stbbdev happening during the execution of the function 28551c0b2f7Stbbdev """ 28651c0b2f7Stbbdev try: 28751c0b2f7Stbbdev result = self._func(*self._args, **self._kwds) 28851c0b2f7Stbbdev except: 28951c0b2f7Stbbdev self._result._set_exception() 29051c0b2f7Stbbdev else: 29151c0b2f7Stbbdev self._result._set_value(result) 29251c0b2f7Stbbdev 29351c0b2f7Stbbdev 29451c0b2f7Stbbdevclass JobSequence: 29551c0b2f7Stbbdev """A work unit that corresponds to the processing of a continuous 29651c0b2f7Stbbdev sequence of Job objects""" 29751c0b2f7Stbbdev 29851c0b2f7Stbbdev def __init__(self, jobs): 29951c0b2f7Stbbdev self._jobs = jobs 30051c0b2f7Stbbdev 30151c0b2f7Stbbdev def __call__(self): 30251c0b2f7Stbbdev """ 30351c0b2f7Stbbdev Call all the Job objects that have been specified 30451c0b2f7Stbbdev """ 30551c0b2f7Stbbdev for job in self._jobs: 30651c0b2f7Stbbdev job() 30751c0b2f7Stbbdev 30851c0b2f7Stbbdev 30951c0b2f7Stbbdevclass ApplyResult(object): 31051c0b2f7Stbbdev """An object associated with a Job object that holds its result: 31151c0b2f7Stbbdev it's available during the whole life the Job and after, even when 31251c0b2f7Stbbdev the Job didn't process yet. It's possible to use this object to 31351c0b2f7Stbbdev wait for the result/exception of the job to be available. 31451c0b2f7Stbbdev 31551c0b2f7Stbbdev The result objects returns by the Pool::*_async() methods are of 31651c0b2f7Stbbdev this type""" 31751c0b2f7Stbbdev 31851c0b2f7Stbbdev def __init__(self, collector=None, callback=None): 31951c0b2f7Stbbdev """ 32051c0b2f7Stbbdev \param collector when not None, the notify_ready() method of 32151c0b2f7Stbbdev the collector will be called when the result from the Job is 32251c0b2f7Stbbdev ready 32351c0b2f7Stbbdev \param callback when not None, function to call when the 32451c0b2f7Stbbdev result becomes available (this is the parameter passed to the 32551c0b2f7Stbbdev Pool::*_async() methods. 32651c0b2f7Stbbdev """ 32751c0b2f7Stbbdev self._success = False 32851c0b2f7Stbbdev self._event = threading.Event() 32951c0b2f7Stbbdev self._data = None 33051c0b2f7Stbbdev self._collector = None 33151c0b2f7Stbbdev self._callback = callback 33251c0b2f7Stbbdev 33351c0b2f7Stbbdev if collector is not None: 33451c0b2f7Stbbdev collector.register_result(self) 33551c0b2f7Stbbdev self._collector = collector 33651c0b2f7Stbbdev 33751c0b2f7Stbbdev def get(self, timeout=None): 33851c0b2f7Stbbdev """ 33951c0b2f7Stbbdev Returns the result when it arrives. If timeout is not None and 34051c0b2f7Stbbdev the result does not arrive within timeout seconds then 34151c0b2f7Stbbdev TimeoutError is raised. If the remote call raised an exception 34251c0b2f7Stbbdev then that exception will be reraised by get(). 34351c0b2f7Stbbdev """ 34451c0b2f7Stbbdev if not self.wait(timeout): 34551c0b2f7Stbbdev raise TimeoutError("Result not available within %fs" % timeout) 34651c0b2f7Stbbdev if self._success: 34751c0b2f7Stbbdev return self._data 34851c0b2f7Stbbdev raise self._data[0](self._data[1]).with_traceback(self._data[2]) 34951c0b2f7Stbbdev 35051c0b2f7Stbbdev def wait(self, timeout=None): 35151c0b2f7Stbbdev """Waits until the result is available or until timeout 35251c0b2f7Stbbdev seconds pass.""" 35351c0b2f7Stbbdev self._event.wait(timeout) 35451c0b2f7Stbbdev return self._event.isSet() 35551c0b2f7Stbbdev 35651c0b2f7Stbbdev def ready(self): 35751c0b2f7Stbbdev """Returns whether the call has completed.""" 35851c0b2f7Stbbdev return self._event.isSet() 35951c0b2f7Stbbdev 36051c0b2f7Stbbdev def successful(self): 36151c0b2f7Stbbdev """Returns whether the call completed without raising an 36251c0b2f7Stbbdev exception. Will raise AssertionError if the result is not 36351c0b2f7Stbbdev ready.""" 36451c0b2f7Stbbdev assert self.ready() 36551c0b2f7Stbbdev return self._success 36651c0b2f7Stbbdev 36751c0b2f7Stbbdev def _set_value(self, value): 36851c0b2f7Stbbdev """Called by a Job object to tell the result is ready, and 36951c0b2f7Stbbdev provides the value of this result. The object will become 37051c0b2f7Stbbdev ready and successful. The collector's notify_ready() method 37151c0b2f7Stbbdev will be called, and the callback method too""" 37251c0b2f7Stbbdev assert not self.ready() 37351c0b2f7Stbbdev self._data = value 37451c0b2f7Stbbdev self._success = True 37551c0b2f7Stbbdev self._event.set() 37651c0b2f7Stbbdev if self._collector is not None: 37751c0b2f7Stbbdev self._collector.notify_ready(self) 37851c0b2f7Stbbdev if self._callback is not None: 37951c0b2f7Stbbdev try: 38051c0b2f7Stbbdev self._callback(value) 38151c0b2f7Stbbdev except: 38251c0b2f7Stbbdev traceback.print_exc() 38351c0b2f7Stbbdev 38451c0b2f7Stbbdev def _set_exception(self): 38551c0b2f7Stbbdev """Called by a Job object to tell that an exception occurred 38651c0b2f7Stbbdev during the processing of the function. The object will become 38751c0b2f7Stbbdev ready but not successful. The collector's notify_ready() 38851c0b2f7Stbbdev method will be called, but NOT the callback method""" 38951c0b2f7Stbbdev # traceback.print_exc() 39051c0b2f7Stbbdev assert not self.ready() 39151c0b2f7Stbbdev self._data = sys.exc_info() 39251c0b2f7Stbbdev self._success = False 39351c0b2f7Stbbdev self._event.set() 39451c0b2f7Stbbdev if self._collector is not None: 39551c0b2f7Stbbdev self._collector.notify_ready(self) 39651c0b2f7Stbbdev 39751c0b2f7Stbbdev 39851c0b2f7Stbbdevclass AbstractResultCollector(object): 39951c0b2f7Stbbdev """ABC to define the interface of a ResultCollector object. It is 40051c0b2f7Stbbdev basically an object which knows whuich results it's waiting for, 40151c0b2f7Stbbdev and which is able to get notify when they get available. It is 40251c0b2f7Stbbdev also able to provide an iterator over the results when they are 40351c0b2f7Stbbdev available""" 40451c0b2f7Stbbdev 40551c0b2f7Stbbdev def __init__(self, to_notify): 40651c0b2f7Stbbdev """ 40751c0b2f7Stbbdev \param to_notify ApplyResult object to notify when all the 40851c0b2f7Stbbdev results we're waiting for become available. Can be None. 40951c0b2f7Stbbdev """ 41051c0b2f7Stbbdev self._to_notify = to_notify 41151c0b2f7Stbbdev 41251c0b2f7Stbbdev def register_result(self, apply_result): 41351c0b2f7Stbbdev """Used to identify which results we're waiting for. Will 41451c0b2f7Stbbdev always be called BEFORE the Jobs get submitted to the work 41551c0b2f7Stbbdev queue, and BEFORE the __iter__ and _get_result() methods can 41651c0b2f7Stbbdev be called 41751c0b2f7Stbbdev \param apply_result ApplyResult object to add in our collection 41851c0b2f7Stbbdev """ 41951c0b2f7Stbbdev raise NotImplementedError("Children classes must implement it") 42051c0b2f7Stbbdev 42151c0b2f7Stbbdev def notify_ready(self, apply_result): 42251c0b2f7Stbbdev """Called by the ApplyResult object (already registered via 42351c0b2f7Stbbdev register_result()) that it is now ready (ie. the Job's result 42451c0b2f7Stbbdev is available or an exception has been raised). 42551c0b2f7Stbbdev \param apply_result ApplyResult object telling us that the job 42651c0b2f7Stbbdev has been processed 42751c0b2f7Stbbdev """ 42851c0b2f7Stbbdev raise NotImplementedError("Children classes must implement it") 42951c0b2f7Stbbdev 43051c0b2f7Stbbdev def _get_result(self, idx, timeout=None): 43151c0b2f7Stbbdev """Called by the CollectorIterator object to retrieve the 43251c0b2f7Stbbdev result's values one after another (order defined by the 43351c0b2f7Stbbdev implementation) 43451c0b2f7Stbbdev \param idx The index of the result we want, wrt collector's order 43551c0b2f7Stbbdev \param timeout integer telling how long to wait (in seconds) 43651c0b2f7Stbbdev for the result at index idx to be available, or None (wait 43751c0b2f7Stbbdev forever) 43851c0b2f7Stbbdev """ 43951c0b2f7Stbbdev raise NotImplementedError("Children classes must implement it") 44051c0b2f7Stbbdev 44151c0b2f7Stbbdev def __iter__(self): 44251c0b2f7Stbbdev """Return a new CollectorIterator object for this collector""" 44351c0b2f7Stbbdev return CollectorIterator(self) 44451c0b2f7Stbbdev 44551c0b2f7Stbbdev 44651c0b2f7Stbbdevclass CollectorIterator(object): 44751c0b2f7Stbbdev """An iterator that allows to iterate over the result values 44851c0b2f7Stbbdev available in the given collector object. Equipped with an extended 44951c0b2f7Stbbdev next() method accepting a timeout argument. Created by the 45051c0b2f7Stbbdev AbstractResultCollector::__iter__() method""" 45151c0b2f7Stbbdev 45251c0b2f7Stbbdev def __init__(self, collector): 45351c0b2f7Stbbdev """\param AbstractResultCollector instance""" 45451c0b2f7Stbbdev self._collector = collector 45551c0b2f7Stbbdev self._idx = 0 45651c0b2f7Stbbdev 45751c0b2f7Stbbdev def __iter__(self): 45851c0b2f7Stbbdev return self 45951c0b2f7Stbbdev 46051c0b2f7Stbbdev def next(self, timeout=None): 46151c0b2f7Stbbdev """Return the next result value in the sequence. Raise 46251c0b2f7Stbbdev StopIteration at the end. Can raise the exception raised by 46351c0b2f7Stbbdev the Job""" 46451c0b2f7Stbbdev try: 46551c0b2f7Stbbdev apply_result = self._collector._get_result(self._idx, timeout) 46651c0b2f7Stbbdev except IndexError: 46751c0b2f7Stbbdev # Reset for next time 46851c0b2f7Stbbdev self._idx = 0 46951c0b2f7Stbbdev raise StopIteration 47051c0b2f7Stbbdev except: 47151c0b2f7Stbbdev self._idx = 0 47251c0b2f7Stbbdev raise 47351c0b2f7Stbbdev self._idx += 1 47451c0b2f7Stbbdev assert apply_result.ready() 47551c0b2f7Stbbdev return apply_result.get(0) 47651c0b2f7Stbbdev 47751c0b2f7Stbbdev def __next__(self): 47851c0b2f7Stbbdev return self.next() 47951c0b2f7Stbbdev 48051c0b2f7Stbbdev 48151c0b2f7Stbbdevclass UnorderedResultCollector(AbstractResultCollector): 48251c0b2f7Stbbdev """An AbstractResultCollector implementation that collects the 48351c0b2f7Stbbdev values of the ApplyResult objects in the order they become ready. The 48451c0b2f7Stbbdev CollectorIterator object returned by __iter__() will iterate over 48551c0b2f7Stbbdev them in the order they become ready""" 48651c0b2f7Stbbdev 48751c0b2f7Stbbdev def __init__(self, to_notify=None): 48851c0b2f7Stbbdev """ 48951c0b2f7Stbbdev \param to_notify ApplyResult object to notify when all the 49051c0b2f7Stbbdev results we're waiting for become available. Can be None. 49151c0b2f7Stbbdev """ 49251c0b2f7Stbbdev AbstractResultCollector.__init__(self, to_notify) 49351c0b2f7Stbbdev self._cond = threading.Condition() 49451c0b2f7Stbbdev self._collection = [] 49551c0b2f7Stbbdev self._expected = 0 49651c0b2f7Stbbdev 49751c0b2f7Stbbdev def register_result(self, apply_result): 49851c0b2f7Stbbdev """Used to identify which results we're waiting for. Will 49951c0b2f7Stbbdev always be called BEFORE the Jobs get submitted to the work 50051c0b2f7Stbbdev queue, and BEFORE the __iter__ and _get_result() methods can 50151c0b2f7Stbbdev be called 50251c0b2f7Stbbdev \param apply_result ApplyResult object to add in our collection 50351c0b2f7Stbbdev """ 50451c0b2f7Stbbdev self._expected += 1 50551c0b2f7Stbbdev 50651c0b2f7Stbbdev def _get_result(self, idx, timeout=None): 50751c0b2f7Stbbdev """Called by the CollectorIterator object to retrieve the 50851c0b2f7Stbbdev result's values one after another, in the order the results have 50951c0b2f7Stbbdev become available. 51051c0b2f7Stbbdev \param idx The index of the result we want, wrt collector's order 51151c0b2f7Stbbdev \param timeout integer telling how long to wait (in seconds) 51251c0b2f7Stbbdev for the result at index idx to be available, or None (wait 51351c0b2f7Stbbdev forever) 51451c0b2f7Stbbdev """ 51551c0b2f7Stbbdev self._cond.acquire() 51651c0b2f7Stbbdev try: 51751c0b2f7Stbbdev if idx >= self._expected: 51851c0b2f7Stbbdev raise IndexError 51951c0b2f7Stbbdev elif idx < len(self._collection): 52051c0b2f7Stbbdev return self._collection[idx] 52151c0b2f7Stbbdev elif idx != len(self._collection): 52251c0b2f7Stbbdev # Violation of the sequence protocol 52351c0b2f7Stbbdev raise IndexError() 52451c0b2f7Stbbdev else: 52551c0b2f7Stbbdev self._cond.wait(timeout=timeout) 52651c0b2f7Stbbdev try: 52751c0b2f7Stbbdev return self._collection[idx] 52851c0b2f7Stbbdev except IndexError: 52951c0b2f7Stbbdev # Still not added ! 53051c0b2f7Stbbdev raise TimeoutError("Timeout while waiting for results") 53151c0b2f7Stbbdev finally: 53251c0b2f7Stbbdev self._cond.release() 53351c0b2f7Stbbdev 53451c0b2f7Stbbdev def notify_ready(self, apply_result=None): 53551c0b2f7Stbbdev """Called by the ApplyResult object (already registered via 53651c0b2f7Stbbdev register_result()) that it is now ready (ie. the Job's result 53751c0b2f7Stbbdev is available or an exception has been raised). 53851c0b2f7Stbbdev \param apply_result ApplyResult object telling us that the job 53951c0b2f7Stbbdev has been processed 54051c0b2f7Stbbdev """ 54151c0b2f7Stbbdev first_item = False 54251c0b2f7Stbbdev self._cond.acquire() 54351c0b2f7Stbbdev try: 54451c0b2f7Stbbdev self._collection.append(apply_result) 54551c0b2f7Stbbdev first_item = (len(self._collection) == 1) 54651c0b2f7Stbbdev 54751c0b2f7Stbbdev self._cond.notifyAll() 54851c0b2f7Stbbdev finally: 54951c0b2f7Stbbdev self._cond.release() 55051c0b2f7Stbbdev 55151c0b2f7Stbbdev if first_item and self._to_notify is not None: 55251c0b2f7Stbbdev self._to_notify._set_value(iter(self)) 55351c0b2f7Stbbdev 55451c0b2f7Stbbdev 55551c0b2f7Stbbdevclass OrderedResultCollector(AbstractResultCollector): 55651c0b2f7Stbbdev """An AbstractResultCollector implementation that collects the 55751c0b2f7Stbbdev values of the ApplyResult objects in the order they have been 55851c0b2f7Stbbdev submitted. The CollectorIterator object returned by __iter__() 55951c0b2f7Stbbdev will iterate over them in the order they have been submitted""" 56051c0b2f7Stbbdev 56151c0b2f7Stbbdev def __init__(self, to_notify=None, as_iterator=True): 56251c0b2f7Stbbdev """ 56351c0b2f7Stbbdev \param to_notify ApplyResult object to notify when all the 56451c0b2f7Stbbdev results we're waiting for become available. Can be None. 56551c0b2f7Stbbdev \param as_iterator boolean telling whether the result value 56651c0b2f7Stbbdev set on to_notify should be an iterator (available as soon as 1 56751c0b2f7Stbbdev result arrived) or a list (available only after the last 56851c0b2f7Stbbdev result arrived) 56951c0b2f7Stbbdev """ 57051c0b2f7Stbbdev AbstractResultCollector.__init__(self, to_notify) 57151c0b2f7Stbbdev self._results = [] 57251c0b2f7Stbbdev self._lock = threading.Lock() 57351c0b2f7Stbbdev self._remaining = 0 57451c0b2f7Stbbdev self._as_iterator = as_iterator 57551c0b2f7Stbbdev 57651c0b2f7Stbbdev def register_result(self, apply_result): 57751c0b2f7Stbbdev """Used to identify which results we're waiting for. Will 57851c0b2f7Stbbdev always be called BEFORE the Jobs get submitted to the work 57951c0b2f7Stbbdev queue, and BEFORE the __iter__ and _get_result() methods can 58051c0b2f7Stbbdev be called 58151c0b2f7Stbbdev \param apply_result ApplyResult object to add in our collection 58251c0b2f7Stbbdev """ 58351c0b2f7Stbbdev self._results.append(apply_result) 58451c0b2f7Stbbdev self._remaining += 1 58551c0b2f7Stbbdev 58651c0b2f7Stbbdev def _get_result(self, idx, timeout=None): 58751c0b2f7Stbbdev """Called by the CollectorIterator object to retrieve the 58851c0b2f7Stbbdev result's values one after another (order defined by the 58951c0b2f7Stbbdev implementation) 59051c0b2f7Stbbdev \param idx The index of the result we want, wrt collector's order 59151c0b2f7Stbbdev \param timeout integer telling how long to wait (in seconds) 59251c0b2f7Stbbdev for the result at index idx to be available, or None (wait 59351c0b2f7Stbbdev forever) 59451c0b2f7Stbbdev """ 59551c0b2f7Stbbdev res = self._results[idx] 59651c0b2f7Stbbdev res.wait(timeout) 59751c0b2f7Stbbdev return res 59851c0b2f7Stbbdev 59951c0b2f7Stbbdev def notify_ready(self, apply_result): 60051c0b2f7Stbbdev """Called by the ApplyResult object (already registered via 60151c0b2f7Stbbdev register_result()) that it is now ready (ie. the Job's result 60251c0b2f7Stbbdev is available or an exception has been raised). 60351c0b2f7Stbbdev \param apply_result ApplyResult object telling us that the job 60451c0b2f7Stbbdev has been processed 60551c0b2f7Stbbdev """ 60651c0b2f7Stbbdev got_first = False 60751c0b2f7Stbbdev got_last = False 60851c0b2f7Stbbdev self._lock.acquire() 60951c0b2f7Stbbdev try: 61051c0b2f7Stbbdev assert self._remaining > 0 61151c0b2f7Stbbdev got_first = (len(self._results) == self._remaining) 61251c0b2f7Stbbdev self._remaining -= 1 61351c0b2f7Stbbdev got_last = (self._remaining == 0) 61451c0b2f7Stbbdev finally: 61551c0b2f7Stbbdev self._lock.release() 61651c0b2f7Stbbdev 61751c0b2f7Stbbdev if self._to_notify is not None: 61851c0b2f7Stbbdev if self._as_iterator and got_first: 61951c0b2f7Stbbdev self._to_notify._set_value(iter(self)) 62051c0b2f7Stbbdev elif not self._as_iterator and got_last: 62151c0b2f7Stbbdev try: 62251c0b2f7Stbbdev lst = [r.get(0) for r in self._results] 62351c0b2f7Stbbdev except: 62451c0b2f7Stbbdev self._to_notify._set_exception() 62551c0b2f7Stbbdev else: 62651c0b2f7Stbbdev self._to_notify._set_value(lst) 627