From 53c89fbec578aa4265c68ca29a7f56a3037e936c Mon Sep 17 00:00:00 2001 From: Julien Palard Date: Sat, 10 Feb 2018 23:38:01 +0100 Subject: [PATCH 1/3] bpo-19675: Terminate processes if construction of a pool is failing. --- Lib/multiprocessing/pool.py | 12 ++++++++++-- .../Library/2018-02-10-23-41-05.bpo-19675.-dj35-.rst | 2 ++ 2 files changed, 12 insertions(+), 2 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2018-02-10-23-41-05.bpo-19675.-dj35-.rst diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 3e9a0d6b48679f..2b3cc59a9ff8b4 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -173,7 +173,15 @@ def __init__(self, processes=None, initializer=None, initargs=(), self._processes = processes self._pool = [] - self._repopulate_pool() + try: + self._repopulate_pool() + except Exception: + for p in self._pool: + if p.exitcode is None: + p.terminate() + for p in self._pool: + p.join() + raise self._worker_handler = threading.Thread( target=Pool._handle_workers, @@ -235,10 +243,10 @@ def _repopulate_pool(self): self._initargs, self._maxtasksperchild, self._wrap_exception) ) - self._pool.append(w) w.name = w.name.replace('Process', 'PoolWorker') w.daemon = True w.start() + self._pool.append(w) util.debug('added worker') def _maintain_pool(self): diff --git a/Misc/NEWS.d/next/Library/2018-02-10-23-41-05.bpo-19675.-dj35-.rst b/Misc/NEWS.d/next/Library/2018-02-10-23-41-05.bpo-19675.-dj35-.rst new file mode 100644 index 00000000000000..31430475e243e6 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2018-02-10-23-41-05.bpo-19675.-dj35-.rst @@ -0,0 +1,2 @@ +multiprocessing.Pool does no longer leak processes if the construction of +the pool fails. From a8b39244e1de7aec55d969a0035aeb46078b2b77 Mon Sep 17 00:00:00 2001 From: Julien Palard Date: Sat, 7 Apr 2018 12:15:30 +0200 Subject: [PATCH 2/3] bpo-19675: Add a test. --- Lib/test/_test_multiprocessing.py | 45 +++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 1e497a572a760f..5003b170ab1620 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -3,6 +3,7 @@ # import unittest +import unittest.mock import queue as pyqueue import contextlib import time @@ -4493,6 +4494,50 @@ def test_empty(self): proc.join() + +class TestPoolNotLeakOnFailure(unittest.TestCase): + + def test_release_unused_processes(self): + # Issue #19675: During pool creation, if we can't create a process, + # don't leak already created ones. + will_fail_in = 3 + forked_processes = [] + + class FailingForkProcess(): + def __init__(self, **kwargs): + self.name = 'Fake Process' + self.exitcode = None + self.state = None + forked_processes.append(self) + + def start(self): + nonlocal will_fail_in + if will_fail_in <= 0: + raise OSError("Manually inducted OSError") + will_fail_in -= 1 + self.state = 'started' + + def terminate(self): + self.state = 'stopped' + + def join(self): + pass + + def is_alive(self): + return self.state == 'started' + + try: + p = multiprocessing.pool.Pool(5, context=unittest.mock.MagicMock( + Process=FailingForkProcess)) + p.close() + p.join() + except OSError as err: + if str(err) != 'Manually inducted OSError': + raise + self.assertFalse( + any(process.is_alive() for process in forked_processes)) + + # # Mixins # From f569cae4de8bba4cbf945d969150e061d7e6aeb2 Mon Sep 17 00:00:00 2001 From: Tal Einat Date: Mon, 10 Sep 2018 22:03:02 +0300 Subject: [PATCH 3/3] bpo-19675: clean up and fix typos in test and NEWS --- Lib/test/_test_multiprocessing.py | 16 +++++++--------- .../2018-02-10-23-41-05.bpo-19675.-dj35-.rst | 3 +-- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 14baa64ae66007..239e7d4bb84248 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4608,7 +4608,7 @@ def test_release_unused_processes(self): will_fail_in = 3 forked_processes = [] - class FailingForkProcess(): + class FailingForkProcess: def __init__(self, **kwargs): self.name = 'Fake Process' self.exitcode = None @@ -4618,27 +4618,25 @@ def __init__(self, **kwargs): def start(self): nonlocal will_fail_in if will_fail_in <= 0: - raise OSError("Manually inducted OSError") + raise OSError("Manually induced OSError") will_fail_in -= 1 self.state = 'started' def terminate(self): - self.state = 'stopped' + self.state = 'stopping' def join(self): - pass + if self.state == 'stopping': + self.state = 'stopped' def is_alive(self): - return self.state == 'started' + return self.state == 'started' or self.state == 'stopping' - try: + with self.assertRaisesRegex(OSError, 'Manually induced OSError'): p = multiprocessing.pool.Pool(5, context=unittest.mock.MagicMock( Process=FailingForkProcess)) p.close() p.join() - except OSError as err: - if str(err) != 'Manually inducted OSError': - raise self.assertFalse( any(process.is_alive() for process in forked_processes)) diff --git a/Misc/NEWS.d/next/Library/2018-02-10-23-41-05.bpo-19675.-dj35-.rst b/Misc/NEWS.d/next/Library/2018-02-10-23-41-05.bpo-19675.-dj35-.rst index 31430475e243e6..958550d3e8d74a 100644 --- a/Misc/NEWS.d/next/Library/2018-02-10-23-41-05.bpo-19675.-dj35-.rst +++ b/Misc/NEWS.d/next/Library/2018-02-10-23-41-05.bpo-19675.-dj35-.rst @@ -1,2 +1 @@ -multiprocessing.Pool does no longer leak processes if the construction of -the pool fails. +``multiprocessing.Pool`` no longer leaks processes if its initialization fails.