1# Copyright (c) 2016-2023 Intel Corporation 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# Based on the software developed by: 16# Copyright (c) 2008,2016 david decotigny (Pool of threads) 17# Copyright (c) 2006-2008, R Oudkerk (multiprocessing.Pool) 18# All rights reserved. 19# 20# Redistribution and use in source and binary forms, with or without 21# modification, are permitted provided that the following conditions 22# are met: 23# 24# 1. Redistributions of source code must retain the above copyright 25# notice, this list of conditions and the following disclaimer. 26# 2. Redistributions in binary form must reproduce the above copyright 27# notice, this list of conditions and the following disclaimer in the 28# documentation and/or other materials provided with the distribution. 29# 3. Neither the name of author nor the names of any contributors may be 30# used to endorse or promote products derived from this software 31# without specific prior written permission. 32# 33# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 34# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 35# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 36# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 37# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 38# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 39# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 40# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 41# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 42# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 43# SUCH DAMAGE. 44# 45 46import time 47import threading 48 49from .api import * 50from .pool import * 51 52 53def test(arg=None): 54 if arg == "-v": 55 def say(*x): 56 print(*x) 57 else: 58 def say(*x): 59 pass 60 say("Start Pool testing") 61 print("oneTBB version is %s" % runtime_version()) 62 print("oneTBB interface version is %s" % runtime_interface_version()) 63 64 get_tid = lambda: threading.current_thread().ident 65 66 assert default_num_threads() == this_task_arena_max_concurrency() 67 68 def return42(): 69 return 42 70 71 def f(x): 72 return x * x 73 74 def work(mseconds): 75 res = str(mseconds) 76 if mseconds < 0: 77 mseconds = -mseconds 78 say("[%d] Start to work for %fms..." % (get_tid(), mseconds*10)) 79 time.sleep(mseconds/100.) 80 say("[%d] Work done (%fms)." % (get_tid(), mseconds*10)) 81 return res 82 83 # special flag to to be set by thread calling async work 84 spin_flag = None 85 def timeout_work(param): 86 say("[%d] Spin wait work start..." % get_tid()) 87 while spin_flag: 88 time.sleep(0.0001) # yield equivalent 89 say("[%d] Work done." % get_tid()) 90 return str(param) if param != None else None 91 92 def prepare_timeout_exception(): 93 nonlocal spin_flag 94 spin_flag = True # lock threads in timeout_work 95 96 def check_timeout_exception(pool_object, func): 97 nonlocal spin_flag 98 try: 99 func(pool_object) 100 except TimeoutError: 101 say("Good. Got expected timeout exception.") 102 else: 103 assert False, "Expected exception !" 104 spin_flag = False # unlock threads in timeout_work 105 106 ### Test copy/pasted from multiprocessing 107 pool = Pool(4) # start worker threads 108 109 # edge cases 110 assert pool.map(return42, []) == [] 111 assert pool.apply_async(return42, []).get() == 42 112 assert pool.apply(return42, []) == 42 113 assert list(pool.imap(return42, iter([]))) == [] 114 assert list(pool.imap_unordered(return42, iter([]))) == [] 115 assert pool.map_async(return42, []).get() == [] 116 assert list(pool.imap_async(return42, iter([])).get()) == [] 117 assert list(pool.imap_unordered_async(return42, iter([])).get()) == [] 118 119 # basic tests 120 result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously 121 assert result.get(timeout=1) == 100 # ... unless slow computer 122 assert list(pool.map(f, range(10))) == list(map(f, range(10))) 123 it = pool.imap(f, range(10)) 124 assert next(it) == 0 125 assert next(it) == 1 126 assert next(it) == 4 127 128 # Test apply_sync exceptions 129 prepare_timeout_exception() 130 result = pool.apply_async(timeout_work, (None,)) 131 check_timeout_exception(result, lambda result : say(result.get(timeout=1))) 132 assert result.get() is None # sleep() returns None 133 134 def cb(s): 135 say("Result ready: %s" % s) 136 137 # Test imap() 138 assert list(pool.imap(work, range(10, 3, -1), chunksize=4)) == list(map( 139 str, range(10, 3, -1))) 140 141 # Test imap_unordered() 142 assert sorted(pool.imap_unordered(work, range(10, 3, -1))) == sorted(map( 143 str, range(10, 3, -1))) 144 145 # Test map_async() 146 prepare_timeout_exception() 147 result = pool.map_async(timeout_work, range(10), callback=cb) 148 check_timeout_exception(result, lambda result : result.get(timeout=0.01)) 149 say(result.get()) 150 151 # Test imap_async() 152 prepare_timeout_exception() 153 result = pool.imap_async(timeout_work, range(3, 10), callback=cb) 154 check_timeout_exception(result, lambda result : result.get(timeout=0.01)) 155 for i in result.get(): 156 say("Item:", i) 157 say("### Loop again:") 158 for i in result.get(): 159 say("Item2:", i) 160 161 # Test imap_unordered_async() 162 prepare_timeout_exception() 163 result = pool.imap_unordered_async(timeout_work, range(10, 3, -1), callback=cb) 164 check_timeout_exception(result, lambda result : result.get(timeout=0.01)) 165 for i in result.get(): 166 say("Item1:", i) 167 for i in result.get(): 168 say("Item2:", i) 169 r = result.get() 170 for i in r: 171 say("Item3:", i) 172 for i in r: 173 say("Item4:", i) 174 for i in r: 175 say("Item5:", i) 176 177 # 178 # The case for the exceptions 179 # 180 181 # Exceptions in imap_unordered_async() 182 result = pool.imap_unordered_async(work, range(2, -10, -1), callback=cb) 183 time.sleep(3) 184 try: 185 for i in result.get(): 186 say("Got item:", i) 187 except (IOError, ValueError): 188 say("Good. Got expected exception") 189 190 # Exceptions in imap_async() 191 result = pool.imap_async(work, range(2, -10, -1), callback=cb) 192 time.sleep(3) 193 try: 194 for i in result.get(): 195 say("Got item:", i) 196 except (IOError, ValueError): 197 say("Good. Got expected exception") 198 199 # Stop the test: need to stop the pool !!! 200 pool.terminate() 201 pool.join() 202 203if __name__ == "__main__": 204 test() 205