ProcessPoolExecutor, BrokenProcessPool Handling
In this documentation ( https://pymotw.com/3/concurrent.futures/ ) it says: 'The ProcessPoolExecutor works in the same way as ThreadPoolExecutor, but uses processes instead of thre
Solution 1:
I hadn't see it IRL, but from the code it looks like the returned file descriptors not contains the results_queue file descriptor.
from concurrent.futures.process:
reader = result_queue._reader
while True:
_add_call_item_to_queue(pending_work_items,
work_ids_queue,
call_queue)
sentinels = [p.sentinel for p in processes.values()]
assert sentinels
ready = wait([reader] + sentinels)
if reader in ready: # <===================================== THIS
result_item = reader.recv()
else:
# Mark the process pool broken so that submits fail right now.
executor = executor_reference()
if executor is not None:
executor._broken = True
executor._shutdown_thread = True
executor = None
# All futures in flight must be marked failed
for work_id, work_item in pending_work_items.items():
work_item.future.set_exception(
BrokenProcessPool(
"A process in the process pool was "
"terminated abruptly while the future was "
"running or pending."
))
# Delete references to object. See issue16284
del work_item
the wait
function depends on system, but assuming linux OS (at multiprocessing.connection
, removed all timeout related code):
def wait(object_list, timeout=None):
'''
Wait till an object in object_list is ready/readable.
Returns list of those objects in object_list which are ready/readable.
'''
with _WaitSelector() as selector:
for obj in object_list:
selector.register(obj, selectors.EVENT_READ)
while True:
ready = selector.select(timeout)
if ready:
return [key.fileobj for (key, events) in ready]
else:
# some timeout code
Post a Comment for "ProcessPoolExecutor, BrokenProcessPool Handling"