xref: /oneTBB/python/tbb/pool.py (revision 9acde482)
1*9acde482SJonathan Wakely# Copyright (c) 2016-2023 Intel Corporation
251c0b2f7Stbbdev#
351c0b2f7Stbbdev# Licensed under the Apache License, Version 2.0 (the "License");
451c0b2f7Stbbdev# you may not use this file except in compliance with the License.
551c0b2f7Stbbdev# You may obtain a copy of the License at
651c0b2f7Stbbdev#
751c0b2f7Stbbdev#     http://www.apache.org/licenses/LICENSE-2.0
851c0b2f7Stbbdev#
951c0b2f7Stbbdev# Unless required by applicable law or agreed to in writing, software
1051c0b2f7Stbbdev# distributed under the License is distributed on an "AS IS" BASIS,
1151c0b2f7Stbbdev# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1251c0b2f7Stbbdev# See the License for the specific language governing permissions and
1351c0b2f7Stbbdev# limitations under the License.
1451c0b2f7Stbbdev
1551c0b2f7Stbbdev# Based on the software developed by:
1651c0b2f7Stbbdev# Copyright (c) 2008,2016 david decotigny (Pool of threads)
1751c0b2f7Stbbdev# Copyright (c) 2006-2008, R Oudkerk (multiprocessing.Pool)
1851c0b2f7Stbbdev# All rights reserved.
1951c0b2f7Stbbdev#
2051c0b2f7Stbbdev# Redistribution and use in source and binary forms, with or without
2151c0b2f7Stbbdev# modification, are permitted provided that the following conditions
2251c0b2f7Stbbdev# are met:
2351c0b2f7Stbbdev#
2451c0b2f7Stbbdev# 1. Redistributions of source code must retain the above copyright
2551c0b2f7Stbbdev#    notice, this list of conditions and the following disclaimer.
2651c0b2f7Stbbdev# 2. Redistributions in binary form must reproduce the above copyright
2751c0b2f7Stbbdev#    notice, this list of conditions and the following disclaimer in the
2851c0b2f7Stbbdev#    documentation and/or other materials provided with the distribution.
2951c0b2f7Stbbdev# 3. Neither the name of author nor the names of any contributors may be
3051c0b2f7Stbbdev#    used to endorse or promote products derived from this software
3151c0b2f7Stbbdev#    without specific prior written permission.
3251c0b2f7Stbbdev#
3351c0b2f7Stbbdev# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
3451c0b2f7Stbbdev# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
3551c0b2f7Stbbdev# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
3651c0b2f7Stbbdev# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
3751c0b2f7Stbbdev# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
3851c0b2f7Stbbdev# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
3951c0b2f7Stbbdev# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
4051c0b2f7Stbbdev# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
4151c0b2f7Stbbdev# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
4251c0b2f7Stbbdev# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
4351c0b2f7Stbbdev# SUCH DAMAGE.
4451c0b2f7Stbbdev#
4551c0b2f7Stbbdev
4651c0b2f7Stbbdev# @brief Python Pool implementation based on TBB with monkey-patching
4751c0b2f7Stbbdev#
4851c0b2f7Stbbdev# See http://docs.python.org/dev/library/multiprocessing.html
4951c0b2f7Stbbdev# Differences: added imap_async and imap_unordered_async, and terminate()
5051c0b2f7Stbbdev# has to be called explicitly (it's not registered by atexit).
5151c0b2f7Stbbdev#
5251c0b2f7Stbbdev# The general idea is that we submit works to a workqueue, either as
5351c0b2f7Stbbdev# single Jobs (one function to call), or JobSequences (batch of
5451c0b2f7Stbbdev# Jobs). Each Job is associated with an ApplyResult object which has 2
5551c0b2f7Stbbdev# states: waiting for the Job to complete, or Ready. Instead of
5651c0b2f7Stbbdev# waiting for the jobs to finish, we wait for their ApplyResult object
5751c0b2f7Stbbdev# to become ready: an event mechanism is used for that.
5851c0b2f7Stbbdev# When we apply a function to several arguments in "parallel", we need
5951c0b2f7Stbbdev# a way to wait for all/part of the Jobs to be processed: that's what
6051c0b2f7Stbbdev# "collectors" are for; they group and wait for a set of ApplyResult
6151c0b2f7Stbbdev# objects. Once a collector is ready to be used, we can use a
6251c0b2f7Stbbdev# CollectorIterator to iterate over the result values it's collecting.
6351c0b2f7Stbbdev#
6451c0b2f7Stbbdev# The methods of a Pool object use all these concepts and expose
6551c0b2f7Stbbdev# them to their caller in a very simple way.
6651c0b2f7Stbbdev
6751c0b2f7Stbbdevimport sys
6851c0b2f7Stbbdevimport threading
6951c0b2f7Stbbdevimport traceback
7051c0b2f7Stbbdevfrom .api import *
7151c0b2f7Stbbdev
7251c0b2f7Stbbdev__all__ = ["Pool", "TimeoutError"]
7351c0b2f7Stbbdev__doc__ = """
7451c0b2f7StbbdevStandard Python Pool implementation based on Python API
7551c0b2f7Stbbdevfor Intel(R) oneAPI Threading Building Blocks (oneTBB)
7651c0b2f7Stbbdev"""
7751c0b2f7Stbbdev
7851c0b2f7Stbbdev
7951c0b2f7Stbbdevclass TimeoutError(Exception):
8051c0b2f7Stbbdev    """Raised when a result is not available within the given timeout"""
8151c0b2f7Stbbdev    pass
8251c0b2f7Stbbdev
8351c0b2f7Stbbdev
8451c0b2f7Stbbdevclass Pool(object):
8551c0b2f7Stbbdev    """
8651c0b2f7Stbbdev    The Pool class provides standard multiprocessing.Pool interface
8751c0b2f7Stbbdev    which is mapped onto oneTBB tasks executing in its thread pool
8851c0b2f7Stbbdev    """
8951c0b2f7Stbbdev
9051c0b2f7Stbbdev    def __init__(self, nworkers=0, name="Pool"):
9151c0b2f7Stbbdev        """
9251c0b2f7Stbbdev        \param nworkers (integer) number of worker threads to start
9351c0b2f7Stbbdev        \param name (string) prefix for the worker threads' name
9451c0b2f7Stbbdev        """
9551c0b2f7Stbbdev        self._closed = False
9651c0b2f7Stbbdev        self._tasks = task_group()
9751c0b2f7Stbbdev        self._pool = [None,]*default_num_threads()  # Dask asks for len(_pool)
9851c0b2f7Stbbdev
9951c0b2f7Stbbdev    def apply(self, func, args=(), kwds=dict()):
10051c0b2f7Stbbdev        """Equivalent of the apply() builtin function. It blocks till
10151c0b2f7Stbbdev        the result is ready."""
10251c0b2f7Stbbdev        return self.apply_async(func, args, kwds).get()
10351c0b2f7Stbbdev
10451c0b2f7Stbbdev    def map(self, func, iterable, chunksize=None):
10551c0b2f7Stbbdev        """A parallel equivalent of the map() builtin function. It
10651c0b2f7Stbbdev        blocks till the result is ready.
10751c0b2f7Stbbdev
10851c0b2f7Stbbdev        This method chops the iterable into a number of chunks which
10951c0b2f7Stbbdev        it submits to the process pool as separate tasks. The
11051c0b2f7Stbbdev        (approximate) size of these chunks can be specified by setting
11151c0b2f7Stbbdev        chunksize to a positive integer."""
11251c0b2f7Stbbdev        return self.map_async(func, iterable, chunksize).get()
11351c0b2f7Stbbdev
11451c0b2f7Stbbdev    def imap(self, func, iterable, chunksize=1):
11551c0b2f7Stbbdev        """
11651c0b2f7Stbbdev        An equivalent of itertools.imap().
11751c0b2f7Stbbdev
11851c0b2f7Stbbdev        The chunksize argument is the same as the one used by the
11951c0b2f7Stbbdev        map() method. For very long iterables using a large value for
12051c0b2f7Stbbdev        chunksize can make the job complete much faster than
12151c0b2f7Stbbdev        using the default value of 1.
12251c0b2f7Stbbdev
12351c0b2f7Stbbdev        Also if chunksize is 1 then the next() method of the iterator
12451c0b2f7Stbbdev        returned by the imap() method has an optional timeout
12551c0b2f7Stbbdev        parameter: next(timeout) will raise processing.TimeoutError if
12651c0b2f7Stbbdev        the result cannot be returned within timeout seconds.
12751c0b2f7Stbbdev        """
12851c0b2f7Stbbdev        collector = OrderedResultCollector(as_iterator=True)
12951c0b2f7Stbbdev        self._create_sequences(func, iterable, chunksize, collector)
13051c0b2f7Stbbdev        return iter(collector)
13151c0b2f7Stbbdev
13251c0b2f7Stbbdev    def imap_unordered(self, func, iterable, chunksize=1):
13351c0b2f7Stbbdev        """The same as imap() except that the ordering of the results
13451c0b2f7Stbbdev        from the returned iterator should be considered
13551c0b2f7Stbbdev        arbitrary. (Only when there is only one worker process is the
13651c0b2f7Stbbdev        order guaranteed to be "correct".)"""
13751c0b2f7Stbbdev        collector = UnorderedResultCollector()
13851c0b2f7Stbbdev        self._create_sequences(func, iterable, chunksize, collector)
13951c0b2f7Stbbdev        return iter(collector)
14051c0b2f7Stbbdev
14151c0b2f7Stbbdev    def apply_async(self, func, args=(), kwds=dict(), callback=None):
14251c0b2f7Stbbdev        """A variant of the apply() method which returns an
14351c0b2f7Stbbdev        ApplyResult object.
14451c0b2f7Stbbdev
14551c0b2f7Stbbdev        If callback is specified then it should be a callable which
14651c0b2f7Stbbdev        accepts a single argument. When the result becomes ready,
14751c0b2f7Stbbdev        callback is applied to it (unless the call failed). callback
14851c0b2f7Stbbdev        should complete immediately since otherwise the thread which
14951c0b2f7Stbbdev        handles the results will get blocked."""
15051c0b2f7Stbbdev        assert not self._closed  # No lock here. We assume it's atomic...
15151c0b2f7Stbbdev        apply_result = ApplyResult(callback=callback)
15251c0b2f7Stbbdev        job = Job(func, args, kwds, apply_result)
15351c0b2f7Stbbdev        self._tasks.run(job)
15451c0b2f7Stbbdev        return apply_result
15551c0b2f7Stbbdev
15651c0b2f7Stbbdev    def map_async(self, func, iterable, chunksize=None, callback=None):
15751c0b2f7Stbbdev        """A variant of the map() method which returns a ApplyResult
15851c0b2f7Stbbdev        object.
15951c0b2f7Stbbdev
16051c0b2f7Stbbdev        If callback is specified then it should be a callable which
16151c0b2f7Stbbdev        accepts a single argument. When the result becomes ready
16251c0b2f7Stbbdev        callback is applied to it (unless the call failed). callback
16351c0b2f7Stbbdev        should complete immediately since otherwise the thread which
16451c0b2f7Stbbdev        handles the results will get blocked."""
16551c0b2f7Stbbdev        apply_result = ApplyResult(callback=callback)
16651c0b2f7Stbbdev        collector    = OrderedResultCollector(apply_result, as_iterator=False)
16751c0b2f7Stbbdev        if not self._create_sequences(func, iterable, chunksize, collector):
16851c0b2f7Stbbdev          apply_result._set_value([])
16951c0b2f7Stbbdev        return apply_result
17051c0b2f7Stbbdev
17151c0b2f7Stbbdev    def imap_async(self, func, iterable, chunksize=None, callback=None):
17251c0b2f7Stbbdev        """A variant of the imap() method which returns an ApplyResult
17351c0b2f7Stbbdev        object that provides an iterator (next method(timeout)
17451c0b2f7Stbbdev        available).
17551c0b2f7Stbbdev
17651c0b2f7Stbbdev        If callback is specified then it should be a callable which
17751c0b2f7Stbbdev        accepts a single argument. When the resulting iterator becomes
17851c0b2f7Stbbdev        ready, callback is applied to it (unless the call
17951c0b2f7Stbbdev        failed). callback should complete immediately since otherwise
18051c0b2f7Stbbdev        the thread which handles the results will get blocked."""
18151c0b2f7Stbbdev        apply_result = ApplyResult(callback=callback)
18251c0b2f7Stbbdev        collector    = OrderedResultCollector(apply_result, as_iterator=True)
18351c0b2f7Stbbdev        if not self._create_sequences(func, iterable, chunksize, collector):
18451c0b2f7Stbbdev          apply_result._set_value(iter([]))
18551c0b2f7Stbbdev        return apply_result
18651c0b2f7Stbbdev
18751c0b2f7Stbbdev    def imap_unordered_async(self, func, iterable, chunksize=None,
18851c0b2f7Stbbdev                             callback=None):
18951c0b2f7Stbbdev        """A variant of the imap_unordered() method which returns an
19051c0b2f7Stbbdev        ApplyResult object that provides an iterator (next
19151c0b2f7Stbbdev        method(timeout) available).
19251c0b2f7Stbbdev
19351c0b2f7Stbbdev        If callback is specified then it should be a callable which
19451c0b2f7Stbbdev        accepts a single argument. When the resulting iterator becomes
19551c0b2f7Stbbdev        ready, callback is applied to it (unless the call
19651c0b2f7Stbbdev        failed). callback should complete immediately since otherwise
19751c0b2f7Stbbdev        the thread which handles the results will get blocked."""
19851c0b2f7Stbbdev        apply_result = ApplyResult(callback=callback)
19951c0b2f7Stbbdev        collector    = UnorderedResultCollector(apply_result)
20051c0b2f7Stbbdev        if not self._create_sequences(func, iterable, chunksize, collector):
20151c0b2f7Stbbdev          apply_result._set_value(iter([]))
20251c0b2f7Stbbdev        return apply_result
20351c0b2f7Stbbdev
20451c0b2f7Stbbdev    def close(self):
20551c0b2f7Stbbdev        """Prevents any more tasks from being submitted to the
20651c0b2f7Stbbdev        pool. Once all the tasks have been completed the worker
20751c0b2f7Stbbdev        processes will exit."""
20851c0b2f7Stbbdev        # No lock here. We assume it's sufficiently atomic...
20951c0b2f7Stbbdev        self._closed = True
21051c0b2f7Stbbdev
21151c0b2f7Stbbdev    def terminate(self):
21251c0b2f7Stbbdev        """Stops the worker processes immediately without completing
21351c0b2f7Stbbdev        outstanding work. When the pool object is garbage collected
21451c0b2f7Stbbdev        terminate() will be called immediately."""
21551c0b2f7Stbbdev        self.close()
21651c0b2f7Stbbdev        self._tasks.cancel()
21751c0b2f7Stbbdev
21851c0b2f7Stbbdev    def join(self):
21951c0b2f7Stbbdev        """Wait for the worker processes to exit. One must call
22051c0b2f7Stbbdev        close() or terminate() before using join()."""
22151c0b2f7Stbbdev        self._tasks.wait()
22251c0b2f7Stbbdev
22351c0b2f7Stbbdev    def __enter__(self):
22451c0b2f7Stbbdev        return self
22551c0b2f7Stbbdev
22651c0b2f7Stbbdev    def __exit__(self, exc_type, exc_value, traceback):
22751c0b2f7Stbbdev        self.join()
22851c0b2f7Stbbdev
22951c0b2f7Stbbdev    def __del__(self):
23051c0b2f7Stbbdev        self.terminate()
23151c0b2f7Stbbdev        self.join()
23251c0b2f7Stbbdev
23351c0b2f7Stbbdev    def _create_sequences(self, func, iterable, chunksize, collector):
23451c0b2f7Stbbdev        """
23551c0b2f7Stbbdev        Create callable objects to process and pushes them on the
23651c0b2f7Stbbdev        work queue. Each work unit is meant to process a slice of
23751c0b2f7Stbbdev        iterable of size chunksize. If collector is specified, then
23851c0b2f7Stbbdev        the ApplyResult objects associated with the jobs will notify
23951c0b2f7Stbbdev        collector when their result becomes ready.
24051c0b2f7Stbbdev
24151c0b2f7Stbbdev        \return the list callable objects (basically: JobSequences)
24251c0b2f7Stbbdev        pushed onto the work queue
24351c0b2f7Stbbdev        """
24451c0b2f7Stbbdev        assert not self._closed  # No lock here. We assume it's atomic...
24551c0b2f7Stbbdev        it_ = iter(iterable)
24651c0b2f7Stbbdev        exit_loop = False
24751c0b2f7Stbbdev        sequences = []
24851c0b2f7Stbbdev        while not exit_loop:
24951c0b2f7Stbbdev            seq = []
25051c0b2f7Stbbdev            for _ in range(chunksize or 1):
25151c0b2f7Stbbdev                try:
25251c0b2f7Stbbdev                    arg = next(it_)
25351c0b2f7Stbbdev                except StopIteration:
25451c0b2f7Stbbdev                    exit_loop = True
25551c0b2f7Stbbdev                    break
25651c0b2f7Stbbdev                apply_result = ApplyResult(collector)
25751c0b2f7Stbbdev                job = Job(func, (arg,), {}, apply_result)
25851c0b2f7Stbbdev                seq.append(job)
25951c0b2f7Stbbdev            if seq:
26051c0b2f7Stbbdev                sequences.append(JobSequence(seq))
26151c0b2f7Stbbdev        for t in sequences:
26251c0b2f7Stbbdev            self._tasks.run(t)
26351c0b2f7Stbbdev        return sequences
26451c0b2f7Stbbdev
26551c0b2f7Stbbdev
26651c0b2f7Stbbdevclass Job:
26751c0b2f7Stbbdev    """A work unit that corresponds to the execution of a single function"""
26851c0b2f7Stbbdev
26951c0b2f7Stbbdev    def __init__(self, func, args, kwds, apply_result):
27051c0b2f7Stbbdev        """
27151c0b2f7Stbbdev        \param func/args/kwds used to call the function
27251c0b2f7Stbbdev        \param apply_result ApplyResult object that holds the result
27351c0b2f7Stbbdev        of the function call
27451c0b2f7Stbbdev        """
27551c0b2f7Stbbdev        self._func = func
27651c0b2f7Stbbdev        self._args = args
27751c0b2f7Stbbdev        self._kwds = kwds
27851c0b2f7Stbbdev        self._result = apply_result
27951c0b2f7Stbbdev
28051c0b2f7Stbbdev    def __call__(self):
28151c0b2f7Stbbdev        """
28251c0b2f7Stbbdev        Call the function with the args/kwds and tell the ApplyResult
28351c0b2f7Stbbdev        that its result is ready. Correctly handles the exceptions
28451c0b2f7Stbbdev        happening during the execution of the function
28551c0b2f7Stbbdev        """
28651c0b2f7Stbbdev        try:
28751c0b2f7Stbbdev            result = self._func(*self._args, **self._kwds)
28851c0b2f7Stbbdev        except:
28951c0b2f7Stbbdev            self._result._set_exception()
29051c0b2f7Stbbdev        else:
29151c0b2f7Stbbdev            self._result._set_value(result)
29251c0b2f7Stbbdev
29351c0b2f7Stbbdev
29451c0b2f7Stbbdevclass JobSequence:
29551c0b2f7Stbbdev    """A work unit that corresponds to the processing of a continuous
29651c0b2f7Stbbdev    sequence of Job objects"""
29751c0b2f7Stbbdev
29851c0b2f7Stbbdev    def __init__(self, jobs):
29951c0b2f7Stbbdev        self._jobs = jobs
30051c0b2f7Stbbdev
30151c0b2f7Stbbdev    def __call__(self):
30251c0b2f7Stbbdev        """
30351c0b2f7Stbbdev        Call all the Job objects that have been specified
30451c0b2f7Stbbdev        """
30551c0b2f7Stbbdev        for job in self._jobs:
30651c0b2f7Stbbdev            job()
30751c0b2f7Stbbdev
30851c0b2f7Stbbdev
30951c0b2f7Stbbdevclass ApplyResult(object):
31051c0b2f7Stbbdev    """An object associated with a Job object that holds its result:
31151c0b2f7Stbbdev    it's available during the whole life the Job and after, even when
31251c0b2f7Stbbdev    the Job didn't process yet. It's possible to use this object to
31351c0b2f7Stbbdev    wait for the result/exception of the job to be available.
31451c0b2f7Stbbdev
31551c0b2f7Stbbdev    The result objects returns by the Pool::*_async() methods are of
31651c0b2f7Stbbdev    this type"""
31751c0b2f7Stbbdev
31851c0b2f7Stbbdev    def __init__(self, collector=None, callback=None):
31951c0b2f7Stbbdev        """
32051c0b2f7Stbbdev        \param collector when not None, the notify_ready() method of
32151c0b2f7Stbbdev        the collector will be called when the result from the Job is
32251c0b2f7Stbbdev        ready
32351c0b2f7Stbbdev        \param callback when not None, function to call when the
32451c0b2f7Stbbdev        result becomes available (this is the parameter passed to the
32551c0b2f7Stbbdev        Pool::*_async() methods.
32651c0b2f7Stbbdev        """
32751c0b2f7Stbbdev        self._success = False
32851c0b2f7Stbbdev        self._event = threading.Event()
32951c0b2f7Stbbdev        self._data = None
33051c0b2f7Stbbdev        self._collector = None
33151c0b2f7Stbbdev        self._callback = callback
33251c0b2f7Stbbdev
33351c0b2f7Stbbdev        if collector is not None:
33451c0b2f7Stbbdev            collector.register_result(self)
33551c0b2f7Stbbdev            self._collector = collector
33651c0b2f7Stbbdev
33751c0b2f7Stbbdev    def get(self, timeout=None):
33851c0b2f7Stbbdev        """
33951c0b2f7Stbbdev        Returns the result when it arrives. If timeout is not None and
34051c0b2f7Stbbdev        the result does not arrive within timeout seconds then
34151c0b2f7Stbbdev        TimeoutError is raised. If the remote call raised an exception
34251c0b2f7Stbbdev        then that exception will be reraised by get().
34351c0b2f7Stbbdev        """
34451c0b2f7Stbbdev        if not self.wait(timeout):
34551c0b2f7Stbbdev            raise TimeoutError("Result not available within %fs" % timeout)
34651c0b2f7Stbbdev        if self._success:
34751c0b2f7Stbbdev            return self._data
34851c0b2f7Stbbdev        raise self._data[0](self._data[1]).with_traceback(self._data[2])
34951c0b2f7Stbbdev
35051c0b2f7Stbbdev    def wait(self, timeout=None):
35151c0b2f7Stbbdev        """Waits until the result is available or until timeout
35251c0b2f7Stbbdev        seconds pass."""
35351c0b2f7Stbbdev        self._event.wait(timeout)
35451c0b2f7Stbbdev        return self._event.isSet()
35551c0b2f7Stbbdev
35651c0b2f7Stbbdev    def ready(self):
35751c0b2f7Stbbdev        """Returns whether the call has completed."""
35851c0b2f7Stbbdev        return self._event.isSet()
35951c0b2f7Stbbdev
36051c0b2f7Stbbdev    def successful(self):
36151c0b2f7Stbbdev        """Returns whether the call completed without raising an
36251c0b2f7Stbbdev        exception. Will raise AssertionError if the result is not
36351c0b2f7Stbbdev        ready."""
36451c0b2f7Stbbdev        assert self.ready()
36551c0b2f7Stbbdev        return self._success
36651c0b2f7Stbbdev
36751c0b2f7Stbbdev    def _set_value(self, value):
36851c0b2f7Stbbdev        """Called by a Job object to tell the result is ready, and
36951c0b2f7Stbbdev        provides the value of this result. The object will become
37051c0b2f7Stbbdev        ready and successful. The collector's notify_ready() method
37151c0b2f7Stbbdev        will be called, and the callback method too"""
37251c0b2f7Stbbdev        assert not self.ready()
37351c0b2f7Stbbdev        self._data = value
37451c0b2f7Stbbdev        self._success = True
37551c0b2f7Stbbdev        self._event.set()
37651c0b2f7Stbbdev        if self._collector is not None:
37751c0b2f7Stbbdev            self._collector.notify_ready(self)
37851c0b2f7Stbbdev        if self._callback is not None:
37951c0b2f7Stbbdev            try:
38051c0b2f7Stbbdev                self._callback(value)
38151c0b2f7Stbbdev            except:
38251c0b2f7Stbbdev                traceback.print_exc()
38351c0b2f7Stbbdev
38451c0b2f7Stbbdev    def _set_exception(self):
38551c0b2f7Stbbdev        """Called by a Job object to tell that an exception occurred
38651c0b2f7Stbbdev        during the processing of the function. The object will become
38751c0b2f7Stbbdev        ready but not successful. The collector's notify_ready()
38851c0b2f7Stbbdev        method will be called, but NOT the callback method"""
38951c0b2f7Stbbdev        # traceback.print_exc()
39051c0b2f7Stbbdev        assert not self.ready()
39151c0b2f7Stbbdev        self._data = sys.exc_info()
39251c0b2f7Stbbdev        self._success = False
39351c0b2f7Stbbdev        self._event.set()
39451c0b2f7Stbbdev        if self._collector is not None:
39551c0b2f7Stbbdev            self._collector.notify_ready(self)
39651c0b2f7Stbbdev
39751c0b2f7Stbbdev
39851c0b2f7Stbbdevclass AbstractResultCollector(object):
39951c0b2f7Stbbdev    """ABC to define the interface of a ResultCollector object. It is
40051c0b2f7Stbbdev    basically an object which knows whuich results it's waiting for,
40151c0b2f7Stbbdev    and which is able to get notify when they get available. It is
40251c0b2f7Stbbdev    also able to provide an iterator over the results when they are
40351c0b2f7Stbbdev    available"""
40451c0b2f7Stbbdev
40551c0b2f7Stbbdev    def __init__(self, to_notify):
40651c0b2f7Stbbdev        """
40751c0b2f7Stbbdev        \param to_notify ApplyResult object to notify when all the
40851c0b2f7Stbbdev        results we're waiting for become available. Can be None.
40951c0b2f7Stbbdev        """
41051c0b2f7Stbbdev        self._to_notify = to_notify
41151c0b2f7Stbbdev
41251c0b2f7Stbbdev    def register_result(self, apply_result):
41351c0b2f7Stbbdev        """Used to identify which results we're waiting for. Will
41451c0b2f7Stbbdev        always be called BEFORE the Jobs get submitted to the work
41551c0b2f7Stbbdev        queue, and BEFORE the __iter__ and _get_result() methods can
41651c0b2f7Stbbdev        be called
41751c0b2f7Stbbdev        \param apply_result ApplyResult object to add in our collection
41851c0b2f7Stbbdev        """
41951c0b2f7Stbbdev        raise NotImplementedError("Children classes must implement it")
42051c0b2f7Stbbdev
42151c0b2f7Stbbdev    def notify_ready(self, apply_result):
42251c0b2f7Stbbdev        """Called by the ApplyResult object (already registered via
42351c0b2f7Stbbdev        register_result()) that it is now ready (ie. the Job's result
42451c0b2f7Stbbdev        is available or an exception has been raised).
42551c0b2f7Stbbdev        \param apply_result ApplyResult object telling us that the job
42651c0b2f7Stbbdev        has been processed
42751c0b2f7Stbbdev        """
42851c0b2f7Stbbdev        raise NotImplementedError("Children classes must implement it")
42951c0b2f7Stbbdev
43051c0b2f7Stbbdev    def _get_result(self, idx, timeout=None):
43151c0b2f7Stbbdev        """Called by the CollectorIterator object to retrieve the
43251c0b2f7Stbbdev        result's values one after another (order defined by the
43351c0b2f7Stbbdev        implementation)
43451c0b2f7Stbbdev        \param idx The index of the result we want, wrt collector's order
43551c0b2f7Stbbdev        \param timeout integer telling how long to wait (in seconds)
43651c0b2f7Stbbdev        for the result at index idx to be available, or None (wait
43751c0b2f7Stbbdev        forever)
43851c0b2f7Stbbdev        """
43951c0b2f7Stbbdev        raise NotImplementedError("Children classes must implement it")
44051c0b2f7Stbbdev
44151c0b2f7Stbbdev    def __iter__(self):
44251c0b2f7Stbbdev        """Return a new CollectorIterator object for this collector"""
44351c0b2f7Stbbdev        return CollectorIterator(self)
44451c0b2f7Stbbdev
44551c0b2f7Stbbdev
44651c0b2f7Stbbdevclass CollectorIterator(object):
44751c0b2f7Stbbdev    """An iterator that allows to iterate over the result values
44851c0b2f7Stbbdev    available in the given collector object. Equipped with an extended
44951c0b2f7Stbbdev    next() method accepting a timeout argument. Created by the
45051c0b2f7Stbbdev    AbstractResultCollector::__iter__() method"""
45151c0b2f7Stbbdev
45251c0b2f7Stbbdev    def __init__(self, collector):
45351c0b2f7Stbbdev        """\param AbstractResultCollector instance"""
45451c0b2f7Stbbdev        self._collector = collector
45551c0b2f7Stbbdev        self._idx = 0
45651c0b2f7Stbbdev
45751c0b2f7Stbbdev    def __iter__(self):
45851c0b2f7Stbbdev        return self
45951c0b2f7Stbbdev
46051c0b2f7Stbbdev    def next(self, timeout=None):
46151c0b2f7Stbbdev        """Return the next result value in the sequence. Raise
46251c0b2f7Stbbdev        StopIteration at the end. Can raise the exception raised by
46351c0b2f7Stbbdev        the Job"""
46451c0b2f7Stbbdev        try:
46551c0b2f7Stbbdev            apply_result = self._collector._get_result(self._idx, timeout)
46651c0b2f7Stbbdev        except IndexError:
46751c0b2f7Stbbdev            # Reset for next time
46851c0b2f7Stbbdev            self._idx = 0
46951c0b2f7Stbbdev            raise StopIteration
47051c0b2f7Stbbdev        except:
47151c0b2f7Stbbdev            self._idx = 0
47251c0b2f7Stbbdev            raise
47351c0b2f7Stbbdev        self._idx += 1
47451c0b2f7Stbbdev        assert apply_result.ready()
47551c0b2f7Stbbdev        return apply_result.get(0)
47651c0b2f7Stbbdev
47751c0b2f7Stbbdev    def __next__(self):
47851c0b2f7Stbbdev        return self.next()
47951c0b2f7Stbbdev
48051c0b2f7Stbbdev
48151c0b2f7Stbbdevclass UnorderedResultCollector(AbstractResultCollector):
48251c0b2f7Stbbdev    """An AbstractResultCollector implementation that collects the
48351c0b2f7Stbbdev    values of the ApplyResult objects in the order they become ready. The
48451c0b2f7Stbbdev    CollectorIterator object returned by __iter__() will iterate over
48551c0b2f7Stbbdev    them in the order they become ready"""
48651c0b2f7Stbbdev
48751c0b2f7Stbbdev    def __init__(self, to_notify=None):
48851c0b2f7Stbbdev        """
48951c0b2f7Stbbdev        \param to_notify ApplyResult object to notify when all the
49051c0b2f7Stbbdev        results we're waiting for become available. Can be None.
49151c0b2f7Stbbdev        """
49251c0b2f7Stbbdev        AbstractResultCollector.__init__(self, to_notify)
49351c0b2f7Stbbdev        self._cond = threading.Condition()
49451c0b2f7Stbbdev        self._collection = []
49551c0b2f7Stbbdev        self._expected = 0
49651c0b2f7Stbbdev
49751c0b2f7Stbbdev    def register_result(self, apply_result):
49851c0b2f7Stbbdev        """Used to identify which results we're waiting for. Will
49951c0b2f7Stbbdev        always be called BEFORE the Jobs get submitted to the work
50051c0b2f7Stbbdev        queue, and BEFORE the __iter__ and _get_result() methods can
50151c0b2f7Stbbdev        be called
50251c0b2f7Stbbdev        \param apply_result ApplyResult object to add in our collection
50351c0b2f7Stbbdev        """
50451c0b2f7Stbbdev        self._expected += 1
50551c0b2f7Stbbdev
50651c0b2f7Stbbdev    def _get_result(self, idx, timeout=None):
50751c0b2f7Stbbdev        """Called by the CollectorIterator object to retrieve the
50851c0b2f7Stbbdev        result's values one after another, in the order the results have
50951c0b2f7Stbbdev        become available.
51051c0b2f7Stbbdev        \param idx The index of the result we want, wrt collector's order
51151c0b2f7Stbbdev        \param timeout integer telling how long to wait (in seconds)
51251c0b2f7Stbbdev        for the result at index idx to be available, or None (wait
51351c0b2f7Stbbdev        forever)
51451c0b2f7Stbbdev        """
51551c0b2f7Stbbdev        self._cond.acquire()
51651c0b2f7Stbbdev        try:
51751c0b2f7Stbbdev            if idx >= self._expected:
51851c0b2f7Stbbdev                raise IndexError
51951c0b2f7Stbbdev            elif idx < len(self._collection):
52051c0b2f7Stbbdev                return self._collection[idx]
52151c0b2f7Stbbdev            elif idx != len(self._collection):
52251c0b2f7Stbbdev                # Violation of the sequence protocol
52351c0b2f7Stbbdev                raise IndexError()
52451c0b2f7Stbbdev            else:
52551c0b2f7Stbbdev                self._cond.wait(timeout=timeout)
52651c0b2f7Stbbdev                try:
52751c0b2f7Stbbdev                    return self._collection[idx]
52851c0b2f7Stbbdev                except IndexError:
52951c0b2f7Stbbdev                    # Still not added !
53051c0b2f7Stbbdev                    raise TimeoutError("Timeout while waiting for results")
53151c0b2f7Stbbdev        finally:
53251c0b2f7Stbbdev            self._cond.release()
53351c0b2f7Stbbdev
53451c0b2f7Stbbdev    def notify_ready(self, apply_result=None):
53551c0b2f7Stbbdev        """Called by the ApplyResult object (already registered via
53651c0b2f7Stbbdev        register_result()) that it is now ready (ie. the Job's result
53751c0b2f7Stbbdev        is available or an exception has been raised).
53851c0b2f7Stbbdev        \param apply_result ApplyResult object telling us that the job
53951c0b2f7Stbbdev        has been processed
54051c0b2f7Stbbdev        """
54151c0b2f7Stbbdev        first_item = False
54251c0b2f7Stbbdev        self._cond.acquire()
54351c0b2f7Stbbdev        try:
54451c0b2f7Stbbdev            self._collection.append(apply_result)
54551c0b2f7Stbbdev            first_item = (len(self._collection) == 1)
54651c0b2f7Stbbdev
54751c0b2f7Stbbdev            self._cond.notifyAll()
54851c0b2f7Stbbdev        finally:
54951c0b2f7Stbbdev            self._cond.release()
55051c0b2f7Stbbdev
55151c0b2f7Stbbdev        if first_item and self._to_notify is not None:
55251c0b2f7Stbbdev            self._to_notify._set_value(iter(self))
55351c0b2f7Stbbdev
55451c0b2f7Stbbdev
55551c0b2f7Stbbdevclass OrderedResultCollector(AbstractResultCollector):
55651c0b2f7Stbbdev    """An AbstractResultCollector implementation that collects the
55751c0b2f7Stbbdev    values of the ApplyResult objects in the order they have been
55851c0b2f7Stbbdev    submitted. The CollectorIterator object returned by __iter__()
55951c0b2f7Stbbdev    will iterate over them in the order they have been submitted"""
56051c0b2f7Stbbdev
56151c0b2f7Stbbdev    def __init__(self, to_notify=None, as_iterator=True):
56251c0b2f7Stbbdev        """
56351c0b2f7Stbbdev        \param to_notify ApplyResult object to notify when all the
56451c0b2f7Stbbdev        results we're waiting for become available. Can be None.
56551c0b2f7Stbbdev        \param as_iterator boolean telling whether the result value
56651c0b2f7Stbbdev        set on to_notify should be an iterator (available as soon as 1
56751c0b2f7Stbbdev        result arrived) or a list (available only after the last
56851c0b2f7Stbbdev        result arrived)
56951c0b2f7Stbbdev        """
57051c0b2f7Stbbdev        AbstractResultCollector.__init__(self, to_notify)
57151c0b2f7Stbbdev        self._results = []
57251c0b2f7Stbbdev        self._lock = threading.Lock()
57351c0b2f7Stbbdev        self._remaining = 0
57451c0b2f7Stbbdev        self._as_iterator = as_iterator
57551c0b2f7Stbbdev
57651c0b2f7Stbbdev    def register_result(self, apply_result):
57751c0b2f7Stbbdev        """Used to identify which results we're waiting for. Will
57851c0b2f7Stbbdev        always be called BEFORE the Jobs get submitted to the work
57951c0b2f7Stbbdev        queue, and BEFORE the __iter__ and _get_result() methods can
58051c0b2f7Stbbdev        be called
58151c0b2f7Stbbdev        \param apply_result ApplyResult object to add in our collection
58251c0b2f7Stbbdev        """
58351c0b2f7Stbbdev        self._results.append(apply_result)
58451c0b2f7Stbbdev        self._remaining += 1
58551c0b2f7Stbbdev
58651c0b2f7Stbbdev    def _get_result(self, idx, timeout=None):
58751c0b2f7Stbbdev        """Called by the CollectorIterator object to retrieve the
58851c0b2f7Stbbdev        result's values one after another (order defined by the
58951c0b2f7Stbbdev        implementation)
59051c0b2f7Stbbdev        \param idx The index of the result we want, wrt collector's order
59151c0b2f7Stbbdev        \param timeout integer telling how long to wait (in seconds)
59251c0b2f7Stbbdev        for the result at index idx to be available, or None (wait
59351c0b2f7Stbbdev        forever)
59451c0b2f7Stbbdev        """
59551c0b2f7Stbbdev        res = self._results[idx]
59651c0b2f7Stbbdev        res.wait(timeout)
59751c0b2f7Stbbdev        return res
59851c0b2f7Stbbdev
59951c0b2f7Stbbdev    def notify_ready(self, apply_result):
60051c0b2f7Stbbdev        """Called by the ApplyResult object (already registered via
60151c0b2f7Stbbdev        register_result()) that it is now ready (ie. the Job's result
60251c0b2f7Stbbdev        is available or an exception has been raised).
60351c0b2f7Stbbdev        \param apply_result ApplyResult object telling us that the job
60451c0b2f7Stbbdev        has been processed
60551c0b2f7Stbbdev        """
60651c0b2f7Stbbdev        got_first = False
60751c0b2f7Stbbdev        got_last = False
60851c0b2f7Stbbdev        self._lock.acquire()
60951c0b2f7Stbbdev        try:
61051c0b2f7Stbbdev            assert self._remaining > 0
61151c0b2f7Stbbdev            got_first = (len(self._results) == self._remaining)
61251c0b2f7Stbbdev            self._remaining -= 1
61351c0b2f7Stbbdev            got_last = (self._remaining == 0)
61451c0b2f7Stbbdev        finally:
61551c0b2f7Stbbdev            self._lock.release()
61651c0b2f7Stbbdev
61751c0b2f7Stbbdev        if self._to_notify is not None:
61851c0b2f7Stbbdev            if self._as_iterator and got_first:
61951c0b2f7Stbbdev                self._to_notify._set_value(iter(self))
62051c0b2f7Stbbdev            elif not self._as_iterator and got_last:
62151c0b2f7Stbbdev                try:
62251c0b2f7Stbbdev                    lst = [r.get(0) for r in self._results]
62351c0b2f7Stbbdev                except:
62451c0b2f7Stbbdev                    self._to_notify._set_exception()
62551c0b2f7Stbbdev                else:
62651c0b2f7Stbbdev                    self._to_notify._set_value(lst)
627