1#!/usr/bin/env python3 2# 3# Copyright (c) 2016-2022 Intel Corporation 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17# Based on the software developed by: 18# Copyright (c) 2008,2016 david decotigny (Pool of threads) 19# Copyright (c) 2006-2008, R Oudkerk (multiprocessing.Pool) 20# All rights reserved. 21# 22# Redistribution and use in source and binary forms, with or without 23# modification, are permitted provided that the following conditions 24# are met: 25# 26# 1. Redistributions of source code must retain the above copyright 27# notice, this list of conditions and the following disclaimer. 28# 2. Redistributions in binary form must reproduce the above copyright 29# notice, this list of conditions and the following disclaimer in the 30# documentation and/or other materials provided with the distribution. 31# 3. Neither the name of author nor the names of any contributors may be 32# used to endorse or promote products derived from this software 33# without specific prior written permission. 34# 35# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 36# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 37# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 38# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 39# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 40# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 41# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 42# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 43# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 44# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 45# SUCH DAMAGE. 46# 47 48import time 49import threading 50 51from .api import * 52from .pool import * 53 54 55def test(arg=None): 56 if arg == "-v": 57 def say(*x): 58 print(*x) 59 else: 60 def say(*x): 61 pass 62 say("Start Pool testing") 63 print("oneTBB version is %s" % runtime_version()) 64 print("oneTBB interface version is %s" % runtime_interface_version()) 65 66 get_tid = lambda: threading.current_thread().ident 67 68 assert default_num_threads() == this_task_arena_max_concurrency() 69 70 def return42(): 71 return 42 72 73 def f(x): 74 return x * x 75 76 def work(mseconds): 77 res = str(mseconds) 78 if mseconds < 0: 79 mseconds = -mseconds 80 say("[%d] Start to work for %fms..." % (get_tid(), mseconds*10)) 81 time.sleep(mseconds/100.) 82 say("[%d] Work done (%fms)." % (get_tid(), mseconds*10)) 83 return res 84 85 # special flag to to be set by thread calling async work 86 spin_flag = None 87 def timeout_work(param): 88 say("[%d] Spin wait work start..." % get_tid()) 89 while spin_flag: 90 time.sleep(0.0001) # yield equivalent 91 say("[%d] Work done." % get_tid()) 92 return str(param) if param != None else None 93 94 def prepare_timeout_exception(): 95 nonlocal spin_flag 96 spin_flag = True # lock threads in timeout_work 97 98 def check_timeout_exception(pool_object, func): 99 nonlocal spin_flag 100 try: 101 func(pool_object) 102 except TimeoutError: 103 say("Good. Got expected timeout exception.") 104 else: 105 assert False, "Expected exception !" 106 spin_flag = False # unlock threads in timeout_work 107 108 ### Test copy/pasted from multiprocessing 109 pool = Pool(4) # start worker threads 110 111 # edge cases 112 assert pool.map(return42, []) == [] 113 assert pool.apply_async(return42, []).get() == 42 114 assert pool.apply(return42, []) == 42 115 assert list(pool.imap(return42, iter([]))) == [] 116 assert list(pool.imap_unordered(return42, iter([]))) == [] 117 assert pool.map_async(return42, []).get() == [] 118 assert list(pool.imap_async(return42, iter([])).get()) == [] 119 assert list(pool.imap_unordered_async(return42, iter([])).get()) == [] 120 121 # basic tests 122 result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously 123 assert result.get(timeout=1) == 100 # ... unless slow computer 124 assert list(pool.map(f, range(10))) == list(map(f, range(10))) 125 it = pool.imap(f, range(10)) 126 assert next(it) == 0 127 assert next(it) == 1 128 assert next(it) == 4 129 130 # Test apply_sync exceptions 131 prepare_timeout_exception() 132 result = pool.apply_async(timeout_work, (None,)) 133 check_timeout_exception(result, lambda result : say(result.get(timeout=1))) 134 assert result.get() is None # sleep() returns None 135 136 def cb(s): 137 say("Result ready: %s" % s) 138 139 # Test imap() 140 assert list(pool.imap(work, range(10, 3, -1), chunksize=4)) == list(map( 141 str, range(10, 3, -1))) 142 143 # Test imap_unordered() 144 assert sorted(pool.imap_unordered(work, range(10, 3, -1))) == sorted(map( 145 str, range(10, 3, -1))) 146 147 # Test map_async() 148 prepare_timeout_exception() 149 result = pool.map_async(timeout_work, range(10), callback=cb) 150 check_timeout_exception(result, lambda result : result.get(timeout=0.01)) 151 say(result.get()) 152 153 # Test imap_async() 154 prepare_timeout_exception() 155 result = pool.imap_async(timeout_work, range(3, 10), callback=cb) 156 check_timeout_exception(result, lambda result : result.get(timeout=0.01)) 157 for i in result.get(): 158 say("Item:", i) 159 say("### Loop again:") 160 for i in result.get(): 161 say("Item2:", i) 162 163 # Test imap_unordered_async() 164 prepare_timeout_exception() 165 result = pool.imap_unordered_async(timeout_work, range(10, 3, -1), callback=cb) 166 check_timeout_exception(result, lambda result : result.get(timeout=0.01)) 167 for i in result.get(): 168 say("Item1:", i) 169 for i in result.get(): 170 say("Item2:", i) 171 r = result.get() 172 for i in r: 173 say("Item3:", i) 174 for i in r: 175 say("Item4:", i) 176 for i in r: 177 say("Item5:", i) 178 179 # 180 # The case for the exceptions 181 # 182 183 # Exceptions in imap_unordered_async() 184 result = pool.imap_unordered_async(work, range(2, -10, -1), callback=cb) 185 time.sleep(3) 186 try: 187 for i in result.get(): 188 say("Got item:", i) 189 except (IOError, ValueError): 190 say("Good. Got expected exception") 191 192 # Exceptions in imap_async() 193 result = pool.imap_async(work, range(2, -10, -1), callback=cb) 194 time.sleep(3) 195 try: 196 for i in result.get(): 197 say("Got item:", i) 198 except (IOError, ValueError): 199 say("Good. Got expected exception") 200 201 # Stop the test: need to stop the pool !!! 202 pool.terminate() 203 pool.join() 204 205if __name__ == "__main__": 206 test() 207