mirror of
https://github.com/frappe/gunicorn.git
synced 2026-01-14 11:09:11 +08:00
Merge pull request #1270 from benoitc/improve-arbiter-promotion
remove file locking
This commit is contained in:
commit
c62cf2f500
@ -14,7 +14,6 @@ import time
|
|||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
from gunicorn.errors import HaltServer, AppImportError
|
from gunicorn.errors import HaltServer, AppImportError
|
||||||
from gunicorn.lockfile import LockFile
|
|
||||||
from gunicorn.pidfile import Pidfile
|
from gunicorn.pidfile import Pidfile
|
||||||
from gunicorn.sock import create_sockets
|
from gunicorn.sock import create_sockets
|
||||||
from gunicorn import util
|
from gunicorn import util
|
||||||
@ -40,7 +39,6 @@ class Arbiter(object):
|
|||||||
START_CTX = {}
|
START_CTX = {}
|
||||||
|
|
||||||
LISTENERS = []
|
LISTENERS = []
|
||||||
LOCK_FILE = None
|
|
||||||
WORKERS = {}
|
WORKERS = {}
|
||||||
PIPE = []
|
PIPE = []
|
||||||
|
|
||||||
@ -65,6 +63,7 @@ class Arbiter(object):
|
|||||||
self.pidfile = None
|
self.pidfile = None
|
||||||
self.worker_age = 0
|
self.worker_age = 0
|
||||||
self.reexec_pid = 0
|
self.reexec_pid = 0
|
||||||
|
self.master_pid = 0
|
||||||
self.master_name = "Master"
|
self.master_name = "Master"
|
||||||
|
|
||||||
cwd = util.getcwd()
|
cwd = util.getcwd()
|
||||||
@ -126,28 +125,23 @@ class Arbiter(object):
|
|||||||
"""
|
"""
|
||||||
self.log.info("Starting gunicorn %s", __version__)
|
self.log.info("Starting gunicorn %s", __version__)
|
||||||
|
|
||||||
|
if 'GUNICORN_PID' in os.environ:
|
||||||
|
self.master_pid = int(os.environ.get('GUNICORN_PID'))
|
||||||
|
self.proc_name = self.proc_name + ".2"
|
||||||
|
self.master_name = "Master.2"
|
||||||
|
|
||||||
self.pid = os.getpid()
|
self.pid = os.getpid()
|
||||||
if self.cfg.pidfile is not None:
|
if self.cfg.pidfile is not None:
|
||||||
self.pidfile = Pidfile(self.cfg.pidfile)
|
pidname = self.cfg.pidfile
|
||||||
|
if self.master_pid != 0:
|
||||||
|
pidname += ".2"
|
||||||
|
self.pidfile = Pidfile(pidname)
|
||||||
self.pidfile.create(self.pid)
|
self.pidfile.create(self.pid)
|
||||||
self.cfg.on_starting(self)
|
self.cfg.on_starting(self)
|
||||||
|
|
||||||
self.init_signals()
|
self.init_signals()
|
||||||
need_lock = False
|
|
||||||
if not self.LISTENERS:
|
if not self.LISTENERS:
|
||||||
self.LISTENERS, need_lock = create_sockets(self.cfg, self.log)
|
self.LISTENERS = create_sockets(self.cfg, self.log)
|
||||||
|
|
||||||
if need_lock:
|
|
||||||
if not self.LOCK_FILE:
|
|
||||||
# reuse the lockfile if already set
|
|
||||||
if 'GUNICORN_LOCK' in os.environ:
|
|
||||||
lock_path = os.environ.get('GUNICORN_LOCK')
|
|
||||||
else:
|
|
||||||
lock_path = self.cfg.lockfile
|
|
||||||
|
|
||||||
self.LOCK_FILE = LockFile(lock_path)
|
|
||||||
# add us to the shared lock
|
|
||||||
self.LOCK_FILE.lock()
|
|
||||||
|
|
||||||
listeners_str = ",".join([str(l) for l in self.LISTENERS])
|
listeners_str = ",".join([str(l) for l in self.LISTENERS])
|
||||||
self.log.debug("Arbiter booted")
|
self.log.debug("Arbiter booted")
|
||||||
@ -193,7 +187,10 @@ class Arbiter(object):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
self.manage_workers()
|
self.manage_workers()
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
|
self.maybe_promote_master()
|
||||||
|
|
||||||
sig = self.SIG_QUEUE.pop(0) if len(self.SIG_QUEUE) else None
|
sig = self.SIG_QUEUE.pop(0) if len(self.SIG_QUEUE) else None
|
||||||
if sig is None:
|
if sig is None:
|
||||||
self.sleep()
|
self.sleep()
|
||||||
@ -302,6 +299,23 @@ class Arbiter(object):
|
|||||||
else:
|
else:
|
||||||
self.log.debug("SIGWINCH ignored. Not daemonized")
|
self.log.debug("SIGWINCH ignored. Not daemonized")
|
||||||
|
|
||||||
|
def maybe_promote_master(self):
|
||||||
|
if self.master_pid == 0:
|
||||||
|
return
|
||||||
|
|
||||||
|
if self.master_pid != os.getppid():
|
||||||
|
self.log.info("Master has been promoted.")
|
||||||
|
# reset master infos
|
||||||
|
self.master_name = "Master"
|
||||||
|
self.master_pid = 0
|
||||||
|
self.proc_name = self.cfg.proc_name
|
||||||
|
del os.environ['GUNICORN_PID']
|
||||||
|
# rename the pidfile
|
||||||
|
if self.pidfile is not None:
|
||||||
|
self.pidfile.rename(self.cfg.pidfile)
|
||||||
|
# reset proctitle
|
||||||
|
util._setproctitle("master [%s]" % self.proc_name)
|
||||||
|
|
||||||
def wakeup(self):
|
def wakeup(self):
|
||||||
"""\
|
"""\
|
||||||
Wake up the arbiter by writing to the PIPE
|
Wake up the arbiter by writing to the PIPE
|
||||||
@ -350,17 +364,11 @@ class Arbiter(object):
|
|||||||
:attr graceful: boolean, If True (the default) workers will be
|
:attr graceful: boolean, If True (the default) workers will be
|
||||||
killed gracefully (ie. trying to wait for the current connection)
|
killed gracefully (ie. trying to wait for the current connection)
|
||||||
"""
|
"""
|
||||||
locked = False
|
|
||||||
if self.LOCK_FILE:
|
|
||||||
self.LOCK_FILE.unlock()
|
|
||||||
locked = self.LOCK_FILE.locked()
|
|
||||||
|
|
||||||
# delete the lock file if needed
|
if self.reexec_pid == 0 and self.master_pid == 0:
|
||||||
if not locked and 'GUNICORN_LOCK' in os.environ:
|
for l in self.LISTENERS:
|
||||||
del os.environ['GUNICORN_LOCK']
|
l.close()
|
||||||
|
|
||||||
for l in self.LISTENERS:
|
|
||||||
l.close(locked)
|
|
||||||
self.LISTENERS = []
|
self.LISTENERS = []
|
||||||
sig = signal.SIGTERM
|
sig = signal.SIGTERM
|
||||||
if not graceful:
|
if not graceful:
|
||||||
@ -378,20 +386,23 @@ class Arbiter(object):
|
|||||||
"""\
|
"""\
|
||||||
Relaunch the master and workers.
|
Relaunch the master and workers.
|
||||||
"""
|
"""
|
||||||
if self.pidfile is not None:
|
if self.reexec_pid != 0:
|
||||||
self.pidfile.rename("%s.oldbin" % self.pidfile.fname)
|
self.log.warning("USR2 signal ignored. Child exists.")
|
||||||
|
return
|
||||||
|
|
||||||
|
if self.master_pid != 0:
|
||||||
|
self.log.warning("USR2 signal ignored. Parent exists")
|
||||||
|
return
|
||||||
|
|
||||||
|
master_pid = os.getpid()
|
||||||
self.reexec_pid = os.fork()
|
self.reexec_pid = os.fork()
|
||||||
if self.reexec_pid != 0:
|
if self.reexec_pid != 0:
|
||||||
self.master_name = "Old Master"
|
|
||||||
return
|
return
|
||||||
|
|
||||||
environ = self.cfg.env_orig.copy()
|
environ = self.cfg.env_orig.copy()
|
||||||
fds = [l.fileno() for l in self.LISTENERS]
|
fds = [l.fileno() for l in self.LISTENERS]
|
||||||
environ['GUNICORN_FD'] = ",".join([str(fd) for fd in fds])
|
environ['GUNICORN_FD'] = ",".join([str(fd) for fd in fds])
|
||||||
|
environ['GUNICORN_PID'] = str(master_pid)
|
||||||
if self.LOCK_FILE:
|
|
||||||
environ['GUNICORN_LOCK'] = self.LOCK_FILE.name()
|
|
||||||
|
|
||||||
os.chdir(self.START_CTX['cwd'])
|
os.chdir(self.START_CTX['cwd'])
|
||||||
self.cfg.pre_exec(self)
|
self.cfg.pre_exec(self)
|
||||||
|
|||||||
@ -904,20 +904,6 @@ class Pidfile(Setting):
|
|||||||
If not set, no PID file will be written.
|
If not set, no PID file will be written.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
class LockFile(Setting):
|
|
||||||
name = "lockfile"
|
|
||||||
section = "Server Mechanics"
|
|
||||||
cli = ["--lock-file"]
|
|
||||||
meta = "FILE"
|
|
||||||
validator = validate_string
|
|
||||||
default = util.tmpfile(suffix=".lock", prefix="gunicorn-")
|
|
||||||
|
|
||||||
desc = """\
|
|
||||||
A filename to use for the lock file. A lock file is created when using unix sockets.
|
|
||||||
If not set, the default file 'gunicorn-<RANDOMSTRING>.lock' will be created in the
|
|
||||||
temporary directory.
|
|
||||||
"""
|
|
||||||
|
|
||||||
class WorkerTmpDir(Setting):
|
class WorkerTmpDir(Setting):
|
||||||
name = "worker_tmp_dir"
|
name = "worker_tmp_dir"
|
||||||
section = "Server Mechanics"
|
section = "Server Mechanics"
|
||||||
|
|||||||
@ -1,72 +0,0 @@
|
|||||||
# -*- coding: utf-8 -
|
|
||||||
#
|
|
||||||
# This file is part of gunicorn released under the MIT license.
|
|
||||||
# See the NOTICE for more information.
|
|
||||||
|
|
||||||
import os
|
|
||||||
|
|
||||||
from gunicorn import util
|
|
||||||
|
|
||||||
if os.name == 'nt':
|
|
||||||
import msvcrt
|
|
||||||
|
|
||||||
def _lock(fd):
|
|
||||||
msvcrt.locking(fd, msvcrt.LK_NBLCK, 1)
|
|
||||||
|
|
||||||
|
|
||||||
def _unlock(fd):
|
|
||||||
try:
|
|
||||||
msvcrt.locking(fd, msvcrt.LK_UNLCK, 1)
|
|
||||||
except OSError:
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
else:
|
|
||||||
import fcntl
|
|
||||||
|
|
||||||
def _lock(fd):
|
|
||||||
fcntl.lockf(fd, fcntl.LOCK_SH | fcntl.LOCK_NB)
|
|
||||||
|
|
||||||
|
|
||||||
def _unlock(fd):
|
|
||||||
try:
|
|
||||||
fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
||||||
except:
|
|
||||||
print("no unlock")
|
|
||||||
return False
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
class LockFile(object):
|
|
||||||
"""Manage a LOCK file"""
|
|
||||||
|
|
||||||
def __init__(self, fname):
|
|
||||||
self.fname = fname
|
|
||||||
fdir = os.path.dirname(self.fname)
|
|
||||||
if fdir and not os.path.isdir(fdir):
|
|
||||||
raise RuntimeError("%s doesn't exist. Can't create lock file." % fdir)
|
|
||||||
self._lockfile = open(self.fname, 'w+b')
|
|
||||||
# set permissions to -rw-r--r--
|
|
||||||
os.chmod(self.fname, 420)
|
|
||||||
self._locked = False
|
|
||||||
|
|
||||||
def lock(self):
|
|
||||||
_lock(self._lockfile.fileno())
|
|
||||||
self._locked = True
|
|
||||||
|
|
||||||
def unlock(self):
|
|
||||||
if not self.locked():
|
|
||||||
return
|
|
||||||
|
|
||||||
if _unlock(self._lockfile.fileno()):
|
|
||||||
self._lockfile.close()
|
|
||||||
util.unlink(self.fname)
|
|
||||||
self._lockfile = None
|
|
||||||
self._locked = False
|
|
||||||
|
|
||||||
def locked(self):
|
|
||||||
return self._lockfile is not None and self._locked
|
|
||||||
|
|
||||||
def name(self):
|
|
||||||
return self.fname
|
|
||||||
@ -53,7 +53,7 @@ class BaseSocket(object):
|
|||||||
def bind(self, sock):
|
def bind(self, sock):
|
||||||
sock.bind(self.cfg_addr)
|
sock.bind(self.cfg_addr)
|
||||||
|
|
||||||
def close(self, locked=False):
|
def close(self):
|
||||||
if self.sock is None:
|
if self.sock is None:
|
||||||
return
|
return
|
||||||
|
|
||||||
@ -108,7 +108,6 @@ class UnixSocket(BaseSocket):
|
|||||||
os.remove(addr)
|
os.remove(addr)
|
||||||
else:
|
else:
|
||||||
raise ValueError("%r is not a socket" % addr)
|
raise ValueError("%r is not a socket" % addr)
|
||||||
self.parent = os.getpid()
|
|
||||||
super(UnixSocket, self).__init__(addr, conf, log, fd=fd)
|
super(UnixSocket, self).__init__(addr, conf, log, fd=fd)
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
@ -120,9 +119,8 @@ class UnixSocket(BaseSocket):
|
|||||||
util.chown(self.cfg_addr, self.conf.uid, self.conf.gid)
|
util.chown(self.cfg_addr, self.conf.uid, self.conf.gid)
|
||||||
os.umask(old_umask)
|
os.umask(old_umask)
|
||||||
|
|
||||||
def close(self, locked=False):
|
def close(self):
|
||||||
if self.parent == os.getpid() and not locked:
|
os.unlink(self.cfg_addr)
|
||||||
os.unlink(self.cfg_addr)
|
|
||||||
super(UnixSocket, self).close()
|
super(UnixSocket, self).close()
|
||||||
|
|
||||||
|
|
||||||
@ -151,7 +149,6 @@ def create_sockets(conf, log):
|
|||||||
# gunicorn.
|
# gunicorn.
|
||||||
# http://www.freedesktop.org/software/systemd/man/systemd.socket.html
|
# http://www.freedesktop.org/software/systemd/man/systemd.socket.html
|
||||||
listeners = []
|
listeners = []
|
||||||
need_lock = False
|
|
||||||
if ('LISTEN_PID' in os.environ
|
if ('LISTEN_PID' in os.environ
|
||||||
and int(os.environ.get('LISTEN_PID')) == os.getpid()):
|
and int(os.environ.get('LISTEN_PID')) == os.getpid()):
|
||||||
for i in range(int(os.environ.get('LISTEN_FDS', 0))):
|
for i in range(int(os.environ.get('LISTEN_FDS', 0))):
|
||||||
@ -160,7 +157,6 @@ def create_sockets(conf, log):
|
|||||||
sock = socket.fromfd(fd, socket.AF_UNIX, socket.SOCK_STREAM)
|
sock = socket.fromfd(fd, socket.AF_UNIX, socket.SOCK_STREAM)
|
||||||
sockname = sock.getsockname()
|
sockname = sock.getsockname()
|
||||||
if isinstance(sockname, str) and sockname.startswith('/'):
|
if isinstance(sockname, str) and sockname.startswith('/'):
|
||||||
need_lock = True
|
|
||||||
listeners.append(UnixSocket(sockname, conf, log, fd=fd))
|
listeners.append(UnixSocket(sockname, conf, log, fd=fd))
|
||||||
elif len(sockname) == 2 and '.' in sockname[0]:
|
elif len(sockname) == 2 and '.' in sockname[0]:
|
||||||
listeners.append(TCPSocket("%s:%s" % sockname, conf, log,
|
listeners.append(TCPSocket("%s:%s" % sockname, conf, log,
|
||||||
@ -175,7 +171,7 @@ def create_sockets(conf, log):
|
|||||||
if listeners:
|
if listeners:
|
||||||
log.debug('Socket activation sockets: %s',
|
log.debug('Socket activation sockets: %s',
|
||||||
",".join([str(l) for l in listeners]))
|
",".join([str(l) for l in listeners]))
|
||||||
return listeners, need_lock
|
return listeners
|
||||||
|
|
||||||
# get it only once
|
# get it only once
|
||||||
laddr = conf.address
|
laddr = conf.address
|
||||||
@ -196,9 +192,6 @@ def create_sockets(conf, log):
|
|||||||
addr = laddr[i]
|
addr = laddr[i]
|
||||||
sock_type = _sock_type(addr)
|
sock_type = _sock_type(addr)
|
||||||
|
|
||||||
if sock_type == UnixSocket:
|
|
||||||
need_lock = True
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
listeners.append(sock_type(addr, conf, log, fd=fd))
|
listeners.append(sock_type(addr, conf, log, fd=fd))
|
||||||
except socket.error as e:
|
except socket.error as e:
|
||||||
@ -206,14 +199,11 @@ def create_sockets(conf, log):
|
|||||||
log.error("GUNICORN_FD should refer to an open socket.")
|
log.error("GUNICORN_FD should refer to an open socket.")
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
return listeners, need_lock
|
return listeners
|
||||||
|
|
||||||
# no sockets is bound, first initialization of gunicorn in this env.
|
# no sockets is bound, first initialization of gunicorn in this env.
|
||||||
for addr in laddr:
|
for addr in laddr:
|
||||||
sock_type = _sock_type(addr)
|
sock_type = _sock_type(addr)
|
||||||
if sock_type == UnixSocket:
|
|
||||||
need_lock = True
|
|
||||||
|
|
||||||
# If we fail to create a socket from GUNICORN_FD
|
# If we fail to create a socket from GUNICORN_FD
|
||||||
# we fall through and try and open the socket
|
# we fall through and try and open the socket
|
||||||
# normally.
|
# normally.
|
||||||
@ -240,4 +230,4 @@ def create_sockets(conf, log):
|
|||||||
|
|
||||||
listeners.append(sock)
|
listeners.append(sock)
|
||||||
|
|
||||||
return listeners, need_lock
|
return listeners
|
||||||
|
|||||||
@ -17,7 +17,6 @@ import resource
|
|||||||
import socket
|
import socket
|
||||||
import stat
|
import stat
|
||||||
import sys
|
import sys
|
||||||
import tempfile
|
|
||||||
import textwrap
|
import textwrap
|
||||||
import time
|
import time
|
||||||
import traceback
|
import traceback
|
||||||
@ -39,8 +38,6 @@ CHUNK_SIZE = (16 * 1024)
|
|||||||
|
|
||||||
MAX_BODY = 1024 * 132
|
MAX_BODY = 1024 * 132
|
||||||
|
|
||||||
normcase = os.path.normcase
|
|
||||||
|
|
||||||
# Server and Date aren't technically hop-by-hop
|
# Server and Date aren't technically hop-by-hop
|
||||||
# headers, but they are in the purview of the
|
# headers, but they are in the purview of the
|
||||||
# origin server which the WSGI spec says we should
|
# origin server which the WSGI spec says we should
|
||||||
@ -549,14 +546,3 @@ def make_fail_app(msg):
|
|||||||
return [msg]
|
return [msg]
|
||||||
|
|
||||||
return app
|
return app
|
||||||
|
|
||||||
|
|
||||||
characters = ("abcdefghijklmnopqrstuvwxyz" +
|
|
||||||
"ABCDEFGHIJKLMNOPQRSTUVWXYZ" +
|
|
||||||
"0123456789_")
|
|
||||||
|
|
||||||
def tmpfile(suffix="", prefix="tmp"):
|
|
||||||
c = characters
|
|
||||||
rand_str = normcase("".join([random.choice(c) for _ in "123456"]))
|
|
||||||
fname = "".join([prefix, rand_str, suffix])
|
|
||||||
return os.path.join(tempfile.gettempdir(), fname)
|
|
||||||
|
|||||||
@ -35,8 +35,8 @@ def test_arbiter_shutdown_closes_listeners():
|
|||||||
listener2 = mock.Mock()
|
listener2 = mock.Mock()
|
||||||
arbiter.LISTENERS = [listener1, listener2]
|
arbiter.LISTENERS = [listener1, listener2]
|
||||||
arbiter.stop()
|
arbiter.stop()
|
||||||
listener1.close.assert_called_with(False)
|
listener1.close.assert_called_with()
|
||||||
listener2.close.assert_called_with(False)
|
listener2.close.assert_called_with()
|
||||||
|
|
||||||
|
|
||||||
class PreloadedAppWithEnvSettings(DummyApplication):
|
class PreloadedAppWithEnvSettings(DummyApplication):
|
||||||
|
|||||||
@ -1,15 +0,0 @@
|
|||||||
import os
|
|
||||||
|
|
||||||
from gunicorn.lockfile import LockFile
|
|
||||||
from gunicorn.util import tmpfile
|
|
||||||
|
|
||||||
def test_lockfile():
|
|
||||||
lockname = tmpfile(prefix="gunicorn-tests", suffix=".lock")
|
|
||||||
lock_file = LockFile(lockname)
|
|
||||||
assert lock_file.locked() == False
|
|
||||||
assert os.path.exists(lockname)
|
|
||||||
lock_file.lock()
|
|
||||||
assert lock_file.locked() == True
|
|
||||||
lock_file.unlock()
|
|
||||||
assert lock_file.locked() == False
|
|
||||||
assert os.path.exists(lockname) == False
|
|
||||||
@ -9,28 +9,7 @@ from gunicorn import sock
|
|||||||
@mock.patch('os.getpid')
|
@mock.patch('os.getpid')
|
||||||
@mock.patch('os.unlink')
|
@mock.patch('os.unlink')
|
||||||
@mock.patch('socket.fromfd')
|
@mock.patch('socket.fromfd')
|
||||||
def test_unix_socket_close_delete_if_exlock(fromfd, unlink, getpid):
|
def test_unix_socket_close_unlink(fromfd, unlink, getpid):
|
||||||
gsock = sock.UnixSocket('test.sock', mock.Mock(), mock.Mock(), mock.Mock())
|
gsock = sock.UnixSocket('test.sock', mock.Mock(), mock.Mock(), mock.Mock())
|
||||||
gsock.close(False)
|
gsock.close()
|
||||||
unlink.assert_called_with('test.sock')
|
unlink.assert_called_with("test.sock")
|
||||||
|
|
||||||
|
|
||||||
@mock.patch('os.getpid')
|
|
||||||
@mock.patch('os.unlink')
|
|
||||||
@mock.patch('socket.fromfd')
|
|
||||||
def test_unix_socket_close_keep_if_no_exlock(fromfd, unlink, getpid):
|
|
||||||
gsock = sock.UnixSocket('test.sock', mock.Mock(), mock.Mock(), mock.Mock())
|
|
||||||
gsock.close(True)
|
|
||||||
unlink.assert_not_called()
|
|
||||||
|
|
||||||
|
|
||||||
@mock.patch('os.getpid')
|
|
||||||
@mock.patch('os.unlink')
|
|
||||||
@mock.patch('socket.fromfd')
|
|
||||||
def test_unix_socket_not_deleted_by_worker(fromfd, unlink, getpid):
|
|
||||||
fd = mock.Mock()
|
|
||||||
gsock = sock.UnixSocket('test.sock', mock.Mock(), mock.Mock(), fd)
|
|
||||||
getpid.reset_mock()
|
|
||||||
getpid.return_value = "fake" # fake a pid change
|
|
||||||
gsock.close(False)
|
|
||||||
unlink.assert_not_called()
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user