Skip to content

Commit f24a012

Browse files
graingertblurb-it[bot]gpshead
authored
gh-131788: make resource_tracker re-entrant safe (GH-131787)
* make resource_tracker re-entrant safe * Update Lib/multiprocessing/resource_tracker.py * trim trailing whitespace * use f-string and args = [x, *y, z] * raise self._reentrant_call_error --------- Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com> Co-authored-by: Gregory P. Smith <greg@krypto.org>
1 parent a10152f commit f24a012

File tree

2 files changed

+94
-70
lines changed

2 files changed

+94
-70
lines changed

Lib/multiprocessing/resource_tracker.py

Lines changed: 93 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import sys
2121
import threading
2222
import warnings
23+
from collections import deque
2324

2425
from . import spawn
2526
from . import util
@@ -62,6 +63,7 @@ def __init__(self):
6263
self._fd = None
6364
self._pid = None
6465
self._exitcode = None
66+
self._reentrant_messages = deque()
6567

6668
def _reentrant_call_error(self):
6769
# gh-109629: this happens if an explicit call to the ResourceTracker
@@ -98,7 +100,7 @@ def _stop_locked(
98100
# This shouldn't happen (it might when called by a finalizer)
99101
# so we check for it anyway.
100102
if self._lock._recursion_count() > 1:
101-
return self._reentrant_call_error()
103+
raise self._reentrant_call_error()
102104
if self._fd is None:
103105
# not running
104106
return
@@ -128,69 +130,99 @@ def ensure_running(self):
128130
129131
This can be run from any process. Usually a child process will use
130132
the resource created by its parent.'''
133+
return self._ensure_running_and_write()
134+
135+
def _teardown_dead_process(self):
136+
os.close(self._fd)
137+
138+
# Clean-up to avoid dangling processes.
139+
try:
140+
# _pid can be None if this process is a child from another
141+
# python process, which has started the resource_tracker.
142+
if self._pid is not None:
143+
os.waitpid(self._pid, 0)
144+
except ChildProcessError:
145+
# The resource_tracker has already been terminated.
146+
pass
147+
self._fd = None
148+
self._pid = None
149+
self._exitcode = None
150+
151+
warnings.warn('resource_tracker: process died unexpectedly, '
152+
'relaunching. Some resources might leak.')
153+
154+
def _launch(self):
155+
fds_to_pass = []
156+
try:
157+
fds_to_pass.append(sys.stderr.fileno())
158+
except Exception:
159+
pass
160+
r, w = os.pipe()
161+
try:
162+
fds_to_pass.append(r)
163+
# process will out live us, so no need to wait on pid
164+
exe = spawn.get_executable()
165+
args = [
166+
exe,
167+
*util._args_from_interpreter_flags(),
168+
'-c',
169+
f'from multiprocessing.resource_tracker import main;main({r})',
170+
]
171+
# bpo-33613: Register a signal mask that will block the signals.
172+
# This signal mask will be inherited by the child that is going
173+
# to be spawned and will protect the child from a race condition
174+
# that can make the child die before it registers signal handlers
175+
# for SIGINT and SIGTERM. The mask is unregistered after spawning
176+
# the child.
177+
prev_sigmask = None
178+
try:
179+
if _HAVE_SIGMASK:
180+
prev_sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
181+
pid = util.spawnv_passfds(exe, args, fds_to_pass)
182+
finally:
183+
if prev_sigmask is not None:
184+
signal.pthread_sigmask(signal.SIG_SETMASK, prev_sigmask)
185+
except:
186+
os.close(w)
187+
raise
188+
else:
189+
self._fd = w
190+
self._pid = pid
191+
finally:
192+
os.close(r)
193+
194+
def _ensure_running_and_write(self, msg=None):
131195
with self._lock:
132196
if self._lock._recursion_count() > 1:
133197
# The code below is certainly not reentrant-safe, so bail out
134-
return self._reentrant_call_error()
198+
if msg is None:
199+
raise self._reentrant_call_error()
200+
return self._reentrant_messages.append(msg)
201+
135202
if self._fd is not None:
136203
# resource tracker was launched before, is it still running?
137-
if self._check_alive():
138-
# => still alive
139-
return
140-
# => dead, launch it again
141-
os.close(self._fd)
142-
143-
# Clean-up to avoid dangling processes.
204+
if msg is None:
205+
to_send = b'PROBE:0:noop\n'
206+
else:
207+
to_send = msg
144208
try:
145-
# _pid can be None if this process is a child from another
146-
# python process, which has started the resource_tracker.
147-
if self._pid is not None:
148-
os.waitpid(self._pid, 0)
149-
except ChildProcessError:
150-
# The resource_tracker has already been terminated.
151-
pass
152-
self._fd = None
153-
self._pid = None
154-
self._exitcode = None
209+
self._write(to_send)
210+
except OSError:
211+
self._teardown_dead_process()
212+
self._launch()
155213

156-
warnings.warn('resource_tracker: process died unexpectedly, '
157-
'relaunching. Some resources might leak.')
214+
msg = None # message was sent in probe
215+
else:
216+
self._launch()
158217

159-
fds_to_pass = []
218+
while True:
160219
try:
161-
fds_to_pass.append(sys.stderr.fileno())
162-
except Exception:
163-
pass
164-
cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
165-
r, w = os.pipe()
166-
try:
167-
fds_to_pass.append(r)
168-
# process will out live us, so no need to wait on pid
169-
exe = spawn.get_executable()
170-
args = [exe] + util._args_from_interpreter_flags()
171-
args += ['-c', cmd % r]
172-
# bpo-33613: Register a signal mask that will block the signals.
173-
# This signal mask will be inherited by the child that is going
174-
# to be spawned and will protect the child from a race condition
175-
# that can make the child die before it registers signal handlers
176-
# for SIGINT and SIGTERM. The mask is unregistered after spawning
177-
# the child.
178-
prev_sigmask = None
179-
try:
180-
if _HAVE_SIGMASK:
181-
prev_sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
182-
pid = util.spawnv_passfds(exe, args, fds_to_pass)
183-
finally:
184-
if prev_sigmask is not None:
185-
signal.pthread_sigmask(signal.SIG_SETMASK, prev_sigmask)
186-
except:
187-
os.close(w)
188-
raise
189-
else:
190-
self._fd = w
191-
self._pid = pid
192-
finally:
193-
os.close(r)
220+
reentrant_msg = self._reentrant_messages.popleft()
221+
except IndexError:
222+
break
223+
self._write(reentrant_msg)
224+
if msg is not None:
225+
self._write(msg)
194226

195227
def _check_alive(self):
196228
'''Check that the pipe has not been closed by sending a probe.'''
@@ -211,27 +243,18 @@ def unregister(self, name, rtype):
211243
'''Unregister name of resource with resource tracker.'''
212244
self._send('UNREGISTER', name, rtype)
213245

246+
def _write(self, msg):
247+
nbytes = os.write(self._fd, msg)
248+
assert nbytes == len(msg), f"{nbytes=} != {len(msg)=}"
249+
214250
def _send(self, cmd, name, rtype):
215-
try:
216-
self.ensure_running()
217-
except ReentrantCallError:
218-
# The code below might or might not work, depending on whether
219-
# the resource tracker was already running and still alive.
220-
# Better warn the user.
221-
# (XXX is warnings.warn itself reentrant-safe? :-)
222-
warnings.warn(
223-
f"ResourceTracker called reentrantly for resource cleanup, "
224-
f"which is unsupported. "
225-
f"The {rtype} object {name!r} might leak.")
226-
msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
251+
msg = f"{cmd}:{name}:{rtype}\n".encode("ascii")
227252
if len(msg) > 512:
228253
# posix guarantees that writes to a pipe of less than PIPE_BUF
229254
# bytes are atomic, and that PIPE_BUF >= 512
230255
raise ValueError('msg too long')
231-
nbytes = os.write(self._fd, msg)
232-
assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format(
233-
nbytes, len(msg))
234256

257+
self._ensure_running_and_write(msg)
235258

236259
_resource_tracker = ResourceTracker()
237260
ensure_running = _resource_tracker.ensure_running
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Make ``ResourceTracker.send`` from :mod:`multiprocessing` re-entrant safe

0 commit comments

Comments
 (0)