fix unix socket locking

This change add proper file locking to gunicorn. By default "gunicorn.lock" is created in the temporary directory when a unix socket is bound.  In case someone want to fix the lock file path or use multiple gunicorn instance the "--lock-file" setting can be used to set the path of this file.

fix #1259
This commit is contained in:
benoitc 2016-05-14 10:49:36 +02:00
parent fbe865f37a
commit 8a6748ee65
8 changed files with 170 additions and 52 deletions

View File

@ -14,6 +14,7 @@ import time
import traceback import traceback
from gunicorn.errors import HaltServer, AppImportError from gunicorn.errors import HaltServer, AppImportError
from gunicorn.lockfile import LockFile
from gunicorn.pidfile import Pidfile from gunicorn.pidfile import Pidfile
from gunicorn.sock import create_sockets from gunicorn.sock import create_sockets
from gunicorn import util from gunicorn import util
@ -39,6 +40,7 @@ class Arbiter(object):
START_CTX = {} START_CTX = {}
LISTENERS = [] LISTENERS = []
LOCK_FILE = None
WORKERS = {} WORKERS = {}
PIPE = [] PIPE = []
@ -131,8 +133,21 @@ class Arbiter(object):
self.cfg.on_starting(self) self.cfg.on_starting(self)
self.init_signals() self.init_signals()
need_lock = False
if not self.LISTENERS: if not self.LISTENERS:
self.LISTENERS = create_sockets(self.cfg, self.log) self.LISTENERS, need_lock = create_sockets(self.cfg, self.log)
if need_lock:
if not self.LOCK_FILE:
# reuse the lockfile if already set
if 'GUNICORN_LOCK' in os.environ:
lock_path = os.environ.get('GUNICORN_LOCK')
else:
lock_path = self.cfg.lockfile
self.LOCK_FILE = LockFile(lock_path)
# add us to the shared lock
self.LOCK_FILE.lock()
listeners_str = ",".join([str(l) for l in self.LISTENERS]) listeners_str = ",".join([str(l) for l in self.LISTENERS])
self.log.debug("Arbiter booted") self.log.debug("Arbiter booted")
@ -335,8 +350,17 @@ class Arbiter(object):
:attr graceful: boolean, If True (the default) workers will be :attr graceful: boolean, If True (the default) workers will be
killed gracefully (ie. trying to wait for the current connection) killed gracefully (ie. trying to wait for the current connection)
""" """
locked = False
if self.LOCK_FILE:
self.LOCK_FILE.unlock()
locked = self.LOCK_FILE.locked()
# delete the lock file if needed
if not locked and 'GUNICORN_LOCK' in os.environ:
del os.environ['GUNICORN_LOCK']
for l in self.LISTENERS: for l in self.LISTENERS:
l.close() l.close(locked)
self.LISTENERS = [] self.LISTENERS = []
sig = signal.SIGTERM sig = signal.SIGTERM
if not graceful: if not graceful:
@ -366,6 +390,9 @@ class Arbiter(object):
fds = [l.fileno() for l in self.LISTENERS] fds = [l.fileno() for l in self.LISTENERS]
environ['GUNICORN_FD'] = ",".join([str(fd) for fd in fds]) environ['GUNICORN_FD'] = ",".join([str(fd) for fd in fds])
if self.LOCK_FILE:
environ['GUNICORN_LOCK'] = self.LOCK_FILE.name()
os.chdir(self.START_CTX['cwd']) os.chdir(self.START_CTX['cwd'])
self.cfg.pre_exec(self) self.cfg.pre_exec(self)

View File

@ -904,6 +904,20 @@ class Pidfile(Setting):
If not set, no PID file will be written. If not set, no PID file will be written.
""" """
class LockFile(Setting):
name = "lockfile"
section = "Server Mechanics"
cli = ["--lock-file"]
meta = "FILE"
validator = validate_string
default = util.tmpfile(suffix=".lock", prefix="gunicorn-")
desc = """\
A filename to use for the lock file. A lock file is created when using unix sockets.
If not set, the default file 'gunicorn-<RANDOMSTRING>.lock' will be created in the
temporary directory.
"""
class WorkerTmpDir(Setting): class WorkerTmpDir(Setting):
name = "worker_tmp_dir" name = "worker_tmp_dir"
section = "Server Mechanics" section = "Server Mechanics"

72
gunicorn/lockfile.py Normal file
View File

@ -0,0 +1,72 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
import os
from gunicorn import util
if os.name == 'nt':
import msvcrt
def _lock(fd):
msvcrt.locking(fd, msvcrt.LK_NBLCK, 1)
def _unlock(fd):
try:
msvcrt.locking(fd, msvcrt.LK_UNLCK, 1)
except OSError:
return False
return True
else:
import fcntl
def _lock(fd):
fcntl.lockf(fd, fcntl.LOCK_SH | fcntl.LOCK_NB)
def _unlock(fd):
try:
fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except:
print("no unlock")
return False
return True
class LockFile(object):
"""Manage a LOCK file"""
def __init__(self, fname):
self.fname = fname
fdir = os.path.dirname(self.fname)
if fdir and not os.path.isdir(fdir):
raise RuntimeError("%s doesn't exist. Can't create lock file." % fdir)
self._lockfile = open(self.fname, 'w+b')
# set permissions to -rw-r--r--
os.chmod(self.fname, 420)
self._locked = False
def lock(self):
_lock(self._lockfile.fileno())
self._locked = True
def unlock(self):
if not self.locked():
return
if _unlock(self._lockfile.fileno()):
self._lockfile.close()
util.unlink(self.fname)
self._lockfile = None
self._locked = False
def locked(self):
return self._lockfile is not None and self._locked
def name(self):
return self.fname

View File

@ -53,7 +53,7 @@ class BaseSocket(object):
def bind(self, sock): def bind(self, sock):
sock.bind(self.cfg_addr) sock.bind(self.cfg_addr)
def close(self): def close(self, locked=False):
if self.sock is None: if self.sock is None:
return return
@ -110,8 +110,6 @@ class UnixSocket(BaseSocket):
raise ValueError("%r is not a socket" % addr) raise ValueError("%r is not a socket" % addr)
self.parent = os.getpid() self.parent = os.getpid()
super(UnixSocket, self).__init__(addr, conf, log, fd=fd) super(UnixSocket, self).__init__(addr, conf, log, fd=fd)
# each arbiter grabs a shared lock on the unix socket.
fcntl.lockf(self.sock, fcntl.LOCK_SH | fcntl.LOCK_NB)
def __str__(self): def __str__(self):
return "unix:%s" % self.cfg_addr return "unix:%s" % self.cfg_addr
@ -122,18 +120,9 @@ class UnixSocket(BaseSocket):
util.chown(self.cfg_addr, self.conf.uid, self.conf.gid) util.chown(self.cfg_addr, self.conf.uid, self.conf.gid)
os.umask(old_umask) os.umask(old_umask)
def close(self, locked=False):
def close(self): if self.parent == os.getpid() and not locked:
if self.parent == os.getpid(): os.unlink(self.cfg_addr)
# attempt to acquire an exclusive lock on the unix socket.
# if we're the only arbiter running, the lock will succeed, and
# we can safely rm the socket.
try:
fcntl.lockf(self.sock, fcntl.LOCK_EX | fcntl.LOCK_NB)
except:
pass
else:
os.unlink(self.cfg_addr)
super(UnixSocket, self).close() super(UnixSocket, self).close()
@ -162,6 +151,7 @@ def create_sockets(conf, log):
# gunicorn. # gunicorn.
# http://www.freedesktop.org/software/systemd/man/systemd.socket.html # http://www.freedesktop.org/software/systemd/man/systemd.socket.html
listeners = [] listeners = []
need_lock = False
if ('LISTEN_PID' in os.environ if ('LISTEN_PID' in os.environ
and int(os.environ.get('LISTEN_PID')) == os.getpid()): and int(os.environ.get('LISTEN_PID')) == os.getpid()):
for i in range(int(os.environ.get('LISTEN_FDS', 0))): for i in range(int(os.environ.get('LISTEN_FDS', 0))):
@ -170,6 +160,7 @@ def create_sockets(conf, log):
sock = socket.fromfd(fd, socket.AF_UNIX, socket.SOCK_STREAM) sock = socket.fromfd(fd, socket.AF_UNIX, socket.SOCK_STREAM)
sockname = sock.getsockname() sockname = sock.getsockname()
if isinstance(sockname, str) and sockname.startswith('/'): if isinstance(sockname, str) and sockname.startswith('/'):
need_lock = True
listeners.append(UnixSocket(sockname, conf, log, fd=fd)) listeners.append(UnixSocket(sockname, conf, log, fd=fd))
elif len(sockname) == 2 and '.' in sockname[0]: elif len(sockname) == 2 and '.' in sockname[0]:
listeners.append(TCPSocket("%s:%s" % sockname, conf, log, listeners.append(TCPSocket("%s:%s" % sockname, conf, log,
@ -184,7 +175,7 @@ def create_sockets(conf, log):
if listeners: if listeners:
log.debug('Socket activation sockets: %s', log.debug('Socket activation sockets: %s',
",".join([str(l) for l in listeners])) ",".join([str(l) for l in listeners]))
return listeners return listeners, need_lock
# get it only once # get it only once
laddr = conf.address laddr = conf.address
@ -205,6 +196,9 @@ def create_sockets(conf, log):
addr = laddr[i] addr = laddr[i]
sock_type = _sock_type(addr) sock_type = _sock_type(addr)
if sock_type == UnixSocket:
need_lock = True
try: try:
listeners.append(sock_type(addr, conf, log, fd=fd)) listeners.append(sock_type(addr, conf, log, fd=fd))
except socket.error as e: except socket.error as e:
@ -212,11 +206,13 @@ def create_sockets(conf, log):
log.error("GUNICORN_FD should refer to an open socket.") log.error("GUNICORN_FD should refer to an open socket.")
else: else:
raise raise
return listeners return listeners, need_lock
# no sockets is bound, first initialization of gunicorn in this env. # no sockets is bound, first initialization of gunicorn in this env.
for addr in laddr: for addr in laddr:
sock_type = _sock_type(addr) sock_type = _sock_type(addr)
if sock_type == UnixSocket:
need_lock = True
# If we fail to create a socket from GUNICORN_FD # If we fail to create a socket from GUNICORN_FD
# we fall through and try and open the socket # we fall through and try and open the socket
@ -244,4 +240,4 @@ def create_sockets(conf, log):
listeners.append(sock) listeners.append(sock)
return listeners return listeners, need_lock

View File

@ -17,6 +17,7 @@ import resource
import socket import socket
import stat import stat
import sys import sys
import tempfile
import textwrap import textwrap
import time import time
import traceback import traceback
@ -38,6 +39,8 @@ CHUNK_SIZE = (16 * 1024)
MAX_BODY = 1024 * 132 MAX_BODY = 1024 * 132
normcase = os.path.normcase
# Server and Date aren't technically hop-by-hop # Server and Date aren't technically hop-by-hop
# headers, but they are in the purview of the # headers, but they are in the purview of the
# origin server which the WSGI spec says we should # origin server which the WSGI spec says we should
@ -546,3 +549,14 @@ def make_fail_app(msg):
return [msg] return [msg]
return app return app
characters = ("abcdefghijklmnopqrstuvwxyz" +
"ABCDEFGHIJKLMNOPQRSTUVWXYZ" +
"0123456789_")
def tmpfile(suffix="", prefix="tmp"):
c = characters
rand_str = normcase("".join([random.choice(c) for _ in "123456"]))
fname = "".join([prefix, rand_str, suffix])
return os.path.join(tempfile.gettempdir(), fname)

View File

@ -35,8 +35,8 @@ def test_arbiter_shutdown_closes_listeners():
listener2 = mock.Mock() listener2 = mock.Mock()
arbiter.LISTENERS = [listener1, listener2] arbiter.LISTENERS = [listener1, listener2]
arbiter.stop() arbiter.stop()
listener1.close.assert_called_with() listener1.close.assert_called_with(False)
listener2.close.assert_called_with() listener2.close.assert_called_with(False)
class PreloadedAppWithEnvSettings(DummyApplication): class PreloadedAppWithEnvSettings(DummyApplication):

15
tests/test_lockfile.py Normal file
View File

@ -0,0 +1,15 @@
import os
from gunicorn.lockfile import LockFile
from gunicorn.util import tmpfile
def test_lockfile():
lockname = tmpfile(prefix="gunicorn-tests", suffix=".lock")
lock_file = LockFile(lockname)
assert lock_file.locked() == False
assert os.path.exists(lockname)
lock_file.lock()
assert lock_file.locked() == True
lock_file.unlock()
assert lock_file.locked() == False
assert os.path.exists(lockname) == False

View File

@ -1,5 +1,3 @@
import fcntl
try: try:
import unittest.mock as mock import unittest.mock as mock
except ImportError: except ImportError:
@ -8,49 +6,31 @@ except ImportError:
from gunicorn import sock from gunicorn import sock
@mock.patch('fcntl.lockf')
@mock.patch('socket.fromfd')
def test_unix_socket_init_lock(fromfd, lockf):
s = fromfd.return_value
sock.UnixSocket('test.sock', mock.Mock(), mock.Mock(), mock.Mock())
lockf.assert_called_with(s, fcntl.LOCK_SH | fcntl.LOCK_NB)
@mock.patch('fcntl.lockf')
@mock.patch('os.getpid') @mock.patch('os.getpid')
@mock.patch('os.unlink') @mock.patch('os.unlink')
@mock.patch('socket.fromfd') @mock.patch('socket.fromfd')
def test_unix_socket_close_delete_if_exlock(fromfd, unlink, getpid, lockf): def test_unix_socket_close_delete_if_exlock(fromfd, unlink, getpid):
s = fromfd.return_value
gsock = sock.UnixSocket('test.sock', mock.Mock(), mock.Mock(), mock.Mock()) gsock = sock.UnixSocket('test.sock', mock.Mock(), mock.Mock(), mock.Mock())
lockf.reset_mock() gsock.close(False)
gsock.close()
lockf.assert_called_with(s, fcntl.LOCK_EX | fcntl.LOCK_NB)
unlink.assert_called_with('test.sock') unlink.assert_called_with('test.sock')
@mock.patch('fcntl.lockf')
@mock.patch('os.getpid') @mock.patch('os.getpid')
@mock.patch('os.unlink') @mock.patch('os.unlink')
@mock.patch('socket.fromfd') @mock.patch('socket.fromfd')
def test_unix_socket_close_keep_if_no_exlock(fromfd, unlink, getpid, lockf): def test_unix_socket_close_keep_if_no_exlock(fromfd, unlink, getpid):
s = fromfd.return_value
gsock = sock.UnixSocket('test.sock', mock.Mock(), mock.Mock(), mock.Mock()) gsock = sock.UnixSocket('test.sock', mock.Mock(), mock.Mock(), mock.Mock())
lockf.reset_mock() gsock.close(True)
lockf.side_effect = IOError('locked')
gsock.close()
lockf.assert_called_with(s, fcntl.LOCK_EX | fcntl.LOCK_NB)
unlink.assert_not_called() unlink.assert_not_called()
@mock.patch('fcntl.lockf')
@mock.patch('os.getpid') @mock.patch('os.getpid')
@mock.patch('os.unlink')
@mock.patch('socket.fromfd') @mock.patch('socket.fromfd')
def test_unix_socket_not_deleted_by_worker(fromfd, getpid, lockf): def test_unix_socket_not_deleted_by_worker(fromfd, unlink, getpid):
fd = mock.Mock() fd = mock.Mock()
gsock = sock.UnixSocket('name', mock.Mock(), mock.Mock(), fd) gsock = sock.UnixSocket('test.sock', mock.Mock(), mock.Mock(), fd)
lockf.reset_mock()
getpid.reset_mock() getpid.reset_mock()
getpid.return_value = mock.Mock() # fake a pid change getpid.return_value = "fake" # fake a pid change
gsock.close() gsock.close(False)
lockf.assert_not_called() unlink.assert_not_called()