Refactor socket activation and fd inheritance (#1310)

Track the use of systemd socket activation and gunicorn socket inheritance
in the arbiter. Unify the logic of creating gunicorn sockets from each of
these sources to always use the socket name to determine the type rather
than checking the configured addresses. The configured addresses are only
used when there is no inheritance from systemd or a parent arbiter.

Fix #1298
This commit is contained in:
Randall Leeds 2016-12-27 14:01:20 -08:00 committed by GitHub
parent 8dbb2963b2
commit 0be7996885
6 changed files with 266 additions and 76 deletions

View File

@ -15,8 +15,7 @@ import traceback
from gunicorn.errors import HaltServer, AppImportError
from gunicorn.pidfile import Pidfile
from gunicorn.sock import create_sockets
from gunicorn import util
from gunicorn import sock, systemd, util
from gunicorn import __version__, SERVER_SOFTWARE
@ -61,6 +60,7 @@ class Arbiter(object):
self.setup(app)
self.pidfile = None
self.systemd = False
self.worker_age = 0
self.reexec_pid = 0
self.master_pid = 0
@ -140,8 +140,21 @@ class Arbiter(object):
self.cfg.on_starting(self)
self.init_signals()
if not self.LISTENERS:
self.LISTENERS = create_sockets(self.cfg, self.log)
fds = []
listen_fds = systemd.listen_fds()
if listen_fds:
self.systemd = True
fds = range(systemd.SD_LISTEN_FDS_START,
systemd.SD_LISTEN_FDS_START + listen_fds)
elif self.master_pid:
for fd in os.environ.pop('GUNICORN_FD').split(','):
fds.append(int(fd))
self.LISTENERS = sock.create_sockets(self.cfg, self.log, fds)
listeners_str = ",".join([str(l) for l in self.LISTENERS])
self.log.debug("Arbiter booted")
@ -365,9 +378,8 @@ class Arbiter(object):
killed gracefully (ie. trying to wait for the current connection)
"""
if self.reexec_pid == 0 and self.master_pid == 0:
for l in self.LISTENERS:
l.close()
unlink = self.reexec_pid == self.master_pid == 0 and not self.systemd
sock.close_sockets(self.LISTENERS, unlink)
self.LISTENERS = []
sig = signal.SIGTERM
@ -402,10 +414,15 @@ class Arbiter(object):
self.cfg.pre_exec(self)
environ = self.cfg.env_orig.copy()
fds = [l.fileno() for l in self.LISTENERS]
environ['GUNICORN_FD'] = ",".join([str(fd) for fd in fds])
environ['GUNICORN_PID'] = str(master_pid)
if self.systemd:
environ['LISTEN_PID'] = str(os.getpid())
environ['LISTEN_FDS'] = str(len(self.LISTENERS))
else:
environ['GUNICORN_FD'] = ','.join(
str(l.fileno()) for l in self.LISTENERS)
os.chdir(self.START_CTX['cwd'])
# exec the process using the original environment
@ -439,7 +456,7 @@ class Arbiter(object):
# close all listeners
[l.close() for l in self.LISTENERS]
# init new listeners
self.LISTENERS = create_sockets(self.cfg, self.log)
self.LISTENERS = sock.create_sockets(self.cfg, self.log)
listeners_str = ",".join([str(l) for l in self.LISTENERS])
self.log.info("Listening at: %s", listeners_str)

View File

@ -13,8 +13,6 @@ import time
from gunicorn import util
from gunicorn.six import string_types
SD_LISTEN_FDS_START = 3
class BaseSocket(object):
@ -123,10 +121,6 @@ class UnixSocket(BaseSocket):
util.chown(self.cfg_addr, self.conf.uid, self.conf.gid)
os.umask(old_umask)
def close(self):
os.unlink(self.cfg_addr)
super(UnixSocket, self).close()
def _sock_type(addr):
if isinstance(addr, tuple):
@ -141,41 +135,15 @@ def _sock_type(addr):
return sock_type
def create_sockets(conf, log):
"""
Create a new socket for the given address. If the
address is a tuple, a TCP socket is created. If it
is a string, a Unix socket is created. Otherwise
a TypeError is raised.
def create_sockets(conf, log, fds=None):
"""
Create a new socket for the configured addresses or file descriptors.
# Systemd support, use the sockets managed by systemd and passed to
# gunicorn.
# http://www.freedesktop.org/software/systemd/man/systemd.socket.html
If a configured address is a tuple then a TCP socket is created.
If it is a string, a Unix socket is created. Otherwise, a TypeError is
raised.
"""
listeners = []
if ('LISTEN_PID' in os.environ
and int(os.environ.get('LISTEN_PID')) == os.getpid()):
for i in range(int(os.environ.get('LISTEN_FDS', 0))):
fd = i + SD_LISTEN_FDS_START
try:
sock = socket.fromfd(fd, socket.AF_UNIX, socket.SOCK_STREAM)
sockname = sock.getsockname()
if isinstance(sockname, str) and sockname.startswith('/'):
listeners.append(UnixSocket(sockname, conf, log, fd=fd))
elif len(sockname) == 2 and '.' in sockname[0]:
listeners.append(TCPSocket("%s:%s" % sockname, conf, log,
fd=fd))
elif len(sockname) == 4 and ':' in sockname[0]:
listeners.append(TCP6Socket("[%s]:%s" % sockname[:2], conf,
log, fd=fd))
except socket.error:
pass
del os.environ['LISTEN_PID'], os.environ['LISTEN_FDS']
if listeners:
log.debug('Socket activation sockets: %s',
",".join([str(l) for l in listeners]))
return listeners
# get it only once
laddr = conf.address
@ -189,28 +157,19 @@ def create_sockets(conf, log):
raise ValueError('keyfile "%s" does not exist' % conf.keyfile)
# sockets are already bound
if 'GUNICORN_FD' in os.environ:
fds = os.environ.pop('GUNICORN_FD').split(',')
for i, fd in enumerate(fds):
fd = int(fd)
addr = laddr[i]
sock_type = _sock_type(addr)
if fds is not None:
for fd in fds:
sock = socket.fromfd(fd, socket.AF_UNIX, socket.SOCK_STREAM)
sock_name = sock.getsockname()
sock_type = _sock_type(sock_name)
listener = sock_type(sock_name, conf, log, fd=fd)
listeners.append(listener)
try:
listeners.append(sock_type(addr, conf, log, fd=fd))
except socket.error as e:
if e.args[0] == errno.ENOTCONN:
log.error("GUNICORN_FD should refer to an open socket.")
else:
raise
return listeners
# no sockets is bound, first initialization of gunicorn in this env.
for addr in laddr:
sock_type = _sock_type(addr)
# If we fail to create a socket from GUNICORN_FD
# we fall through and try and open the socket
# normally.
sock = None
for i in range(5):
try:
@ -235,3 +194,11 @@ def create_sockets(conf, log):
listeners.append(sock)
return listeners
def close_sockets(listeners, unlink=True):
for sock in listeners:
sock_name = sock.getsockname()
sock.close()
if unlink and _sock_type(sock_name) is UnixSocket:
os.unlink(sock_name)

45
gunicorn/systemd.py Normal file
View File

@ -0,0 +1,45 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
import os
SD_LISTEN_FDS_START = 3
def listen_fds(unset_environment=True):
"""
Get the number of sockets inherited from systemd socket activation.
:param unset_environment: clear systemd environment variables unless False
:type unset_environment: bool
:return: the number of sockets to inherit from systemd socket activation
:rtype: int
Returns zero immediately if $LISTEN_PID is not set to the current pid.
Otherwise, returns the number of systemd activation sockets specified by
$LISTEN_FDS.
When $LISTEN_PID matches the current pid, unsets the environment variables
unless the ``unset_environment`` flag is ``False``.
.. note::
Unlike the sd_listen_fds C function, this implementation does not set
the FD_CLOEXEC flag because the gunicorn arbiter never needs to do this.
.. seealso::
`<https://www.freedesktop.org/software/systemd/man/sd_listen_fds.html>`_
"""
fds = int(os.environ.get('LISTEN_FDS', 0))
listen_pid = int(os.environ.get('LISTEN_PID', 0))
if listen_pid != os.getpid():
return 0
if unset_environment:
os.environ.pop('LISTEN_PID', None)
os.environ.pop('LISTEN_FDS', None)
return fds

View File

@ -29,14 +29,89 @@ class DummyApplication(gunicorn.app.base.BaseApplication):
"""No-op"""
def test_arbiter_shutdown_closes_listeners():
@mock.patch('gunicorn.sock.close_sockets')
def test_arbiter_stop_closes_listeners(close_sockets):
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
listener1 = mock.Mock()
listener2 = mock.Mock()
arbiter.LISTENERS = [listener1, listener2]
listeners = [listener1, listener2]
arbiter.LISTENERS = listeners
arbiter.stop()
listener1.close.assert_called_with()
listener2.close.assert_called_with()
close_sockets.assert_called_with(listeners, True)
@mock.patch('gunicorn.sock.close_sockets')
def test_arbiter_stop_child_does_not_unlink_listeners(close_sockets):
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
arbiter.reexec_pid = os.getpid()
arbiter.stop()
close_sockets.assert_called_with([], False)
@mock.patch('gunicorn.sock.close_sockets')
def test_arbiter_stop_parent_does_not_unlink_listeners(close_sockets):
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
arbiter.master_pid = os.getppid()
arbiter.stop()
close_sockets.assert_called_with([], False)
@mock.patch('gunicorn.sock.close_sockets')
def test_arbiter_stop_does_not_unlink_systemd_listeners(close_sockets):
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
arbiter.systemd = True
arbiter.stop()
close_sockets.assert_called_with([], False)
@mock.patch('os.getpid')
@mock.patch('os.fork')
@mock.patch('os.execvpe')
def test_arbiter_reexec_passing_systemd_sockets(execvpe, fork, getpid):
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
arbiter.LISTENERS = [mock.Mock(), mock.Mock()]
arbiter.systemd = True
fork.return_value = 0
getpid.side_effect = [2, 3]
arbiter.reexec()
environ = execvpe.call_args[0][2]
assert environ['GUNICORN_PID'] == '2'
assert environ['LISTEN_FDS'] == '2'
assert environ['LISTEN_PID'] == '3'
@mock.patch('os.getpid')
@mock.patch('os.fork')
@mock.patch('os.execvpe')
def test_arbiter_reexec_passing_gunicorn_sockets(execvpe, fork, getpid):
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
listener1 = mock.Mock()
listener2 = mock.Mock()
listener1.fileno.return_value = 4
listener2.fileno.return_value = 5
arbiter.LISTENERS = [listener1, listener2]
fork.return_value = 0
getpid.side_effect = [2, 3]
arbiter.reexec()
environ = execvpe.call_args[0][2]
assert environ['GUNICORN_FD'] == '4,5'
assert environ['GUNICORN_PID'] == '2'
@mock.patch('os.fork')
def test_arbiter_reexec_limit_parent(fork):
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
arbiter.reexec_pid = ~os.getpid()
arbiter.reexec()
assert fork.called is False, "should not fork when there is already a child"
@mock.patch('os.fork')
def test_arbiter_reexec_limit_child(fork):
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
arbiter.master_pid = ~os.getpid()
arbiter.reexec()
assert fork.called is False, "should not fork when arbiter is a child"
class PreloadedAppWithEnvSettings(DummyApplication):

View File

@ -1,3 +1,8 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
try:
import unittest.mock as mock
except ImportError:
@ -6,13 +11,29 @@ except ImportError:
from gunicorn import sock
@mock.patch('os.close')
@mock.patch('os.getpid')
def test_socket_close():
listener1 = mock.Mock()
listener1.getsockname.return_value = ('127.0.0.1', '80')
listener2 = mock.Mock()
listener2.getsockname.return_value = ('192.168.2.5', '80')
sock.close_sockets([listener1, listener2])
listener1.close.assert_called_with()
listener2.close.assert_called_with()
@mock.patch('os.unlink')
@mock.patch('socket.fromfd')
def test_unix_socket_close_unlink(fromfd, unlink, getpid, close):
fd = 42
gsock = sock.UnixSocket('test.sock', mock.Mock(), mock.Mock(), fd=fd)
gsock.close()
unlink.assert_called_with("test.sock")
close.assert_called_with(fd)
def test_unix_socket_close_unlink(unlink):
listener = mock.Mock()
listener.getsockname.return_value = '/var/run/test.sock'
sock.close_sockets([listener])
listener.close.assert_called_with()
unlink.assert_called_once_with('/var/run/test.sock')
@mock.patch('os.unlink')
def test_unix_socket_close_without_unlink(unlink):
listener = mock.Mock()
listener.getsockname.return_value = '/var/run/test.sock'
sock.close_sockets([listener], False)
listener.close.assert_called_with()
assert not unlink.called, 'unlink should not have been called'

65
tests/test_systemd.py Normal file
View File

@ -0,0 +1,65 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
from contextlib import contextmanager
import os
try:
import unittest.mock as mock
except ImportError:
import mock
import pytest
from gunicorn import systemd
@contextmanager
def check_environ(unset=True):
"""
A context manager that asserts post-conditions of ``listen_fds`` at exit.
This helper is used to ease checking of the test post-conditions for the
systemd socket activation tests that parametrize the call argument.
"""
with mock.patch.dict(os.environ):
old_fds = os.environ.get('LISTEN_FDS', None)
old_pid = os.environ.get('LISTEN_PID', None)
yield
if unset:
assert 'LISTEN_FDS' not in os.environ, \
"LISTEN_FDS should have been unset"
assert 'LISTEN_PID' not in os.environ, \
"LISTEN_PID should have been unset"
else:
new_fds = os.environ.get('LISTEN_FDS', None)
new_pid = os.environ.get('LISTEN_PID', None)
assert new_fds == old_fds, \
"LISTEN_FDS should not have been changed"
assert new_pid == old_pid, \
"LISTEN_PID should not have been changed"
@pytest.mark.parametrize("unset", [True, False])
def test_listen_fds_ignores_wrong_pid(unset):
with mock.patch.dict(os.environ):
os.environ['LISTEN_FDS'] = str(5)
os.environ['LISTEN_PID'] = str(1)
with check_environ(False): # early exit — never changes the environment
assert systemd.listen_fds(unset) == 0, \
"should ignore listen fds not intended for this pid"
@pytest.mark.parametrize("unset", [True, False])
def test_listen_fds_returns_count(unset):
with mock.patch.dict(os.environ):
os.environ['LISTEN_FDS'] = str(5)
os.environ['LISTEN_PID'] = str(os.getpid())
with check_environ(unset):
assert systemd.listen_fds(unset) == 5, \
"should return the correct count of fds"