allows gunicorn to bind to multiple address

Allows gunicorn to listen on different interface. It can be either ipv6,
unix or ipv4 sockets.

Ex:

    gunicorn -w3 -b 127.0.0.1:8001 -b 127.0.0.1:8000 -b [::1]:8000 test:app

fix #444
This commit is contained in:
benoitc 2012-12-11 08:42:12 +01:00
parent 41a6999fa3
commit b7b51adf13
12 changed files with 191 additions and 140 deletions

View File

@ -12,8 +12,7 @@ error_email_from = paste@localhost
[server:main]
use = egg:gunicorn#main
host = 127.0.0.1
port = 5000
host = 127.0.0.1:8000, 127.0.0.1:8001
workers = 2
[app:main]

View File

@ -29,7 +29,7 @@ class PasterBaseApplication(Application):
if host and port:
cfg['bind'] = '%s:%s' % (host, port)
elif host:
cfg['bind'] = host
cfg['bind'] = host.split(',')
cfg['workers'] = int(lc.get('workers', 1))
cfg['umask'] = int(lc.get('umask', 0))
@ -55,19 +55,11 @@ class PasterBaseApplication(Application):
parser = ConfigParser.ConfigParser()
parser.read([self.cfgfname])
if parser.has_section('loggers'):
if sys.version_info >= (2, 6):
from logging.config import fileConfig
else:
# Use our custom fileConfig -- 2.5.1's with a custom Formatter class
# and less strict whitespace (which were incorporated into 2.6's)
from gunicorn.logging_config import fileConfig
from logging.config import fileConfig
config_file = os.path.abspath(self.cfgfname)
fileConfig(config_file, dict(__file__=config_file,
here=os.path.dirname(config_file)))
class PasterApplication(PasterBaseApplication):
def init(self, parser, opts, args):
@ -111,7 +103,7 @@ class PasterServerApplication(PasterBaseApplication):
bind = "%s:%s" % (host, port)
else:
bind = host
cfg["bind"] = bind
cfg["bind"] = bind.split(',')
if gcfg:
for k, v in gcfg.items():

View File

@ -16,7 +16,7 @@ import traceback
from gunicorn.errors import HaltServer
from gunicorn.pidfile import Pidfile
from gunicorn.sock import create_socket
from gunicorn.sock import create_sockets
from gunicorn import util
from gunicorn import __version__, SERVER_SOFTWARE
@ -35,7 +35,7 @@ class Arbiter(object):
START_CTX = {}
LISTENER = None
LISTENERS = []
WORKERS = {}
PIPE = []
@ -120,12 +120,12 @@ class Arbiter(object):
self.pidfile.create(self.pid)
self.cfg.on_starting(self)
self.init_signals()
if not self.LISTENER:
self.LISTENER = create_socket(self.cfg, self.log)
if not self.LISTENERS:
self.LISTENERS = create_sockets(self.cfg, self.log)
listeners_str = ",".join([str(l) for l in self.LISTENERS])
self.log.debug("Arbiter booted")
self.log.info("Listening at: %s (%s)", self.LISTENER,
self.pid)
self.log.info("Listening at: %s (%s)", listeners_str, self.pid)
self.log.info("Using worker: %s",
self.cfg.settings['worker_class'].get())
@ -321,11 +321,13 @@ class Arbiter(object):
:attr graceful: boolean, If True (the default) workers will be
killed gracefully (ie. trying to wait for the current connection)
"""
try:
self.LISTENER.close()
except Exception:
pass
self.LISTENER = None
for l in self.LISTENERS:
try:
l.close()
except Exception:
pass
self.LISTENERS = []
sig = signal.SIGQUIT
if not graceful:
sig = signal.SIGTERM
@ -348,11 +350,16 @@ class Arbiter(object):
self.master_name = "Old Master"
return
os.environ['GUNICORN_FD'] = str(self.LISTENER.fileno())
fds = [l.fileno() for l in self.LISTENERS]
os.environ['GUNICORN_FD'] = ",".join([str(fd) for fd in fds])
os.chdir(self.START_CTX['cwd'])
self.cfg.pre_exec(self)
util.closerange(3, self.LISTENER.fileno())
util.closerange(self.LISTENER.fileno()+1, util.get_maxfd())
# close all file descriptors except bound sockets
util.closerange(3, fds[0])
util.closerange(fds[-1]+1, util.get_maxfd())
os.execvpe(self.START_CTX[0], self.START_CTX['args'], os.environ)
def reload(self):
@ -367,9 +374,11 @@ class Arbiter(object):
# do we need to change listener ?
if old_address != self.cfg.address:
self.LISTENER.close()
self.LISTENER = create_socket(self.cfg, self.log)
self.log.info("Listening at: %s", self.LISTENER)
# close all listeners
[l.close for l in self.LISTENERS]
# init new listeners
self.LISTENERS = create_sockeSt(self.cfg, self.log)
self.log.info("Listening at: %s", ",".join(str(self.LISTENERS)))
# do some actions on reload
self.cfg.on_reload(self)
@ -451,7 +460,7 @@ class Arbiter(object):
def spawn_worker(self):
self.worker_age += 1
worker = self.worker_class(self.worker_age, self.pid, self.LISTENER,
worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
self.app, self.timeout/2.0,
self.cfg, self.log)
self.cfg.pre_fork(self, worker)

View File

@ -86,8 +86,8 @@ class Config(object):
@property
def address(self):
bind = self.settings['bind'].get()
return util.parse_address(bytes_to_str(bind))
s = self.settings['bind'].get()
return [util.parse_address(bytes_to_str(bind)) for bind in s]
@property
def uid(self):
@ -220,6 +220,16 @@ def validate_string(val):
raise TypeError("Not a string: %s" % val)
return val.strip()
def validate_list_string(val):
if not val:
return []
# legacy syntax
if isinstance(val, string_types):
val = [val]
return [validate_string(v) for v in val]
def validate_string_to_list(val):
val = validate_string(val)
@ -315,15 +325,16 @@ class ConfigFile(Setting):
class Bind(Setting):
name = "bind"
action = "append"
section = "Server Socket"
cli = ["-b", "--bind"]
meta = "ADDRESS"
validator = validate_string
validator = validate_list_string
if 'PORT' in os.environ:
default = '0.0.0.0:{0}'.format(os.environ.get('PORT'))
default = ['0.0.0.0:{0}'.format(os.environ.get('PORT'))]
else:
default = '127.0.0.1:8000'
default = ['127.0.0.1:8000']
desc = """\
The socket to bind.

View File

@ -14,10 +14,11 @@ from gunicorn.six import string_types
class BaseSocket(object):
def __init__(self, conf, log, fd=None):
def __init__(self, address, conf, log, fd=None):
self.log = log
self.conf = conf
self.address = conf.address
self.cfg_addr = address
if fd is None:
sock = socket.socket(self.FAMILY, socket.SOCK_STREAM)
else:
@ -39,7 +40,7 @@ class BaseSocket(object):
return sock
def bind(self, sock):
sock.bind(self.address)
sock.bind(self.cfg_addr)
def close(self):
try:
@ -72,37 +73,28 @@ class UnixSocket(BaseSocket):
FAMILY = socket.AF_UNIX
def __init__(self, conf, log, fd=None):
def __init__(self, address, log, fd=None):
if fd is None:
try:
os.remove(conf.address)
os.remove(address)
except OSError:
pass
super(UnixSocket, self).__init__(conf, log, fd=fd)
super(UnixSocket, self).__init__(addr, log, fd=fd)
def __str__(self):
return "unix:%s" % self.address
return "unix:%s" % self.cfg_addr
def bind(self, sock):
old_umask = os.umask(self.conf.umask)
sock.bind(self.address)
util.chown(self.address, self.conf.uid, self.conf.gid)
sock.bind(self.cfg_addr)
util.chown(self.cfg_addr, self.conf.uid, self.conf.gid)
os.umask(old_umask)
def close(self):
super(UnixSocket, self).close()
os.unlink(self.address)
def create_socket(conf, log):
"""
Create a new socket for the given address. If the
address is a tuple, a TCP socket is created. If it
is a string, a Unix socket is created. Otherwise
a TypeError is raised.
"""
# get it only once
addr = conf.address
os.unlink(self.cfg_addr)
def _sock_type(addr):
if isinstance(addr, tuple):
if util.is_ipv6(addr[0]):
sock_type = TCP6Socket
@ -112,33 +104,63 @@ def create_socket(conf, log):
sock_type = UnixSocket
else:
raise TypeError("Unable to create socket from: %r" % addr)
return sock_type
def create_sockets(conf, log):
"""
Create a new socket for the given address. If the
address is a tuple, a TCP socket is created. If it
is a string, a Unix socket is created. Otherwise
a TypeError is raised.
"""
# get it only once
laddr = conf.address
listeners = []
# sockets are already bound
if 'GUNICORN_FD' in os.environ:
fd = int(os.environ.pop('GUNICORN_FD'))
try:
return sock_type(conf, log, fd=fd)
except socket.error as e:
if e.args[0] == errno.ENOTCONN:
log.error("GUNICORN_FD should refer to an open socket.")
fds = os.environ.pop('GUNICORN_FD').split(',')
for i, fd in enumerate(fds):
fd = int(fd)
addr = laddr[i]
sock_type = _sock_type(addr)
try:
listeners.append(sock_type(addr, conf, log, fd=fd))
except socket.error as e:
if e.args[0] == errno.ENOTCONN:
log.error("GUNICORN_FD should refer to an open socket.")
else:
raise
return listeners
# no sockets is bound, first initialization of gunicorn in this env.
for addr in laddr:
sock_type = _sock_type(addr)
# If we fail to create a socket from GUNICORN_FD
# we fall through and try and open the socket
# normally.
sock = None
for i in range(5):
try:
sock = sock_type(addr, conf, log)
except socket.error as e:
if e.args[0] == errno.EADDRINUSE:
log.error("Connection in use: %s", str(addr))
if e.args[0] == errno.EADDRNOTAVAIL:
log.error("Invalid address: %s", str(addr))
sys.exit(1)
if i < 5:
log.error("Retrying in 1 second.")
time.sleep(1)
else:
raise
break
# If we fail to create a socket from GUNICORN_FD
# we fall through and try and open the socket
# normally.
if sock is None:
log.error("Can't connect to %s", str(addr))
sys.exit(1)
for i in range(5):
try:
return sock_type(conf, log)
except socket.error as e:
if e.args[0] == errno.EADDRINUSE:
log.error("Connection in use: %s", str(addr))
if e.args[0] == errno.EADDRNOTAVAIL:
log.error("Invalid address: %s", str(addr))
sys.exit(1)
if i < 5:
log.error("Retrying in 1 second.")
time.sleep(1)
listeners.append(sock)
log.error("Can't connect to %s", str(addr))
sys.exit(1)
return listeners

View File

@ -26,14 +26,14 @@ class AsyncWorker(base.Worker):
def timeout_ctx(self):
raise NotImplementedError()
def handle(self, client, addr):
def handle(self, listener, client, addr):
req = None
try:
parser = http.RequestParser(self.cfg, client)
try:
if not self.cfg.keepalive:
req = six.next(parser)
self.handle_request(req, client, addr)
self.handle_request(listener, req, client, addr)
else:
# keepalive loop
while True:
@ -42,7 +42,7 @@ class AsyncWorker(base.Worker):
req = six.next(parser)
if not req:
break
self.handle_request(req, client, addr)
self.handle_request(listener, req, client, addr)
except http.errors.NoMoreData as e:
self.log.debug("Ignored premature client disconnection. %s", e)
except StopIteration as e:
@ -64,11 +64,12 @@ class AsyncWorker(base.Worker):
finally:
util.close(client)
def handle_request(self, req, sock, addr):
def handle_request(self, listener, req, sock, addr):
request_start = datetime.now()
try:
self.cfg.pre_request(self, req)
resp, environ = wsgi.create(req, sock, addr, self.address, self.cfg)
resp, environ = wsgi.create(req, sock, addr,
listener.getsockname(), self.cfg)
self.nr += 1
if self.alive and self.nr >= self.max_requests:
self.log.info("Autorestarting worker after current request.")

View File

@ -26,7 +26,7 @@ class Worker(object):
PIPE = []
def __init__(self, age, ppid, socket, app, timeout, cfg, log):
def __init__(self, age, ppid, sockets, app, timeout, cfg, log):
"""\
This is called pre-fork so it shouldn't do anything to the
current process. If there's a need to make process wide
@ -34,7 +34,7 @@ class Worker(object):
"""
self.age = age
self.ppid = ppid
self.socket = socket
self.sockets = sockets
self.app = app
self.timeout = timeout
self.cfg = cfg
@ -45,7 +45,6 @@ class Worker(object):
self.alive = True
self.log = log
self.debug = cfg.debug
self.address = self.socket.getsockname()
self.tmp = WorkerTmp(cfg)
def __str__(self):
@ -90,7 +89,7 @@ class Worker(object):
util.close_on_exec(p)
# Prevent fd inherientence
util.close_on_exec(self.socket)
[util.close_on_exec(s) for s in self.sockets]
util.close_on_exec(self.tmp.fileno())
self.log.close_on_exec()

View File

@ -3,10 +3,9 @@
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
from __future__ import with_statement
from functools import partial
import os
try:
import eventlet
except ImportError:
@ -33,10 +32,13 @@ class EventletWorker(AsyncWorker):
return eventlet.Timeout(self.cfg.keepalive or None, False)
def run(self):
self.socket = GreenSocket(family_or_realsock=self.socket.sock)
self.socket.setblocking(1)
self.acceptor = eventlet.spawn(eventlet.serve, self.socket,
self.handle, self.worker_connections)
acceptors = []
for sock in self.sockets:
s = GreenSocket(family_or_realsock=sock)
s.setblocking(1)
hfun = partial(self.handle, s)
acceptor = eventlet.spawn(eventlet.serve, s, hfun,
self.worker_connections)
while self.alive:
self.notify()
@ -48,4 +50,4 @@ class EventletWorker(AsyncWorker):
self.notify()
with eventlet.Timeout(self.cfg.graceful_timeout, False):
eventlet.kill(self.acceptor, eventlet.StopServe)
[eventlet.kill(a, eventlet.StopServe) for a in acceptors]

View File

@ -8,6 +8,7 @@ from __future__ import with_statement
import os
import sys
from datetime import datetime
from functools import partial
import time
# workaround on osx, disable kqueue
@ -51,18 +52,23 @@ class GeventWorker(AsyncWorker):
def timeout_ctx(self):
return gevent.Timeout(self.cfg.keepalive, False)
def run(self):
self.socket.setblocking(1)
servers = []
for s in self.sockets:
s.setblocking(1)
pool = Pool(self.worker_connections)
if self.server_class is not None:
server = self.server_class(
s, application=self.wsgi, spawn=pool, log=self.log,
handler_class=self.wsgi_handler)
else:
hfun = partial(self.handle, s)
server = StreamServer(s, handle=hfun, spawn=pool)
pool = Pool(self.worker_connections)
if self.server_class is not None:
server = self.server_class(
self.socket, application=self.wsgi, spawn=pool, log=self.log,
handler_class=self.wsgi_handler)
else:
server = StreamServer(self.socket, handle=self.handle, spawn=pool)
server.start()
servers.append(server)
server.start()
pid = os.getpid()
try:
while self.alive:
@ -79,13 +85,19 @@ class GeventWorker(AsyncWorker):
try:
# Stop accepting requests
server.kill()
[server.stop_accepting() for server in servers]
# Handle current requests until graceful_timeout
ts = time.time()
while time.time() - ts <= self.cfg.graceful_timeout:
if server.pool.free_count() == server.pool.size:
return # all requests was handled
accepting = 0
for server in servers:
if server.pool.free_count() != server.pool.size:
accepting += 1
# if no server is accepting a connection, we can exit
if not accepting:
return
self.notify()
gevent.sleep(1.0)

View File

@ -50,7 +50,6 @@ class TornadoWorker(Worker):
self.ioloop.stop()
def run(self):
self.socket.setblocking(0)
self.ioloop = IOLoop.instance()
self.alive = True
PeriodicCallback(self.watchdog, 1000, io_loop=self.ioloop).start()
@ -77,16 +76,15 @@ class TornadoWorker(Worker):
sys.modules["tornado.httpserver"] = httpserver
server = tornado.httpserver.HTTPServer(app, io_loop=self.ioloop)
if hasattr(server, "add_socket"): # tornado > 2.0
server.add_socket(self.socket)
elif hasattr(server, "_sockets"): # tornado 2.0
server._sockets[self.socket.fileno()] = self.socket
else: # tornado 1.2 or earlier
server._socket = self.socket
for s in self.sockets:
s.setblocking(0)
if hasattr(server, "add_socket"): # tornado > 2.0
server.add_socket(s)
elif hasattr(server, "_sockets"): # tornado 2.0
server._sockets[s.fileno()] = s
server.no_keep_alive = self.cfg.keepalive <= 0
server.xheaders = bool(self.cfg.x_forwarded_for_header)
server.start(num_processes=1)
self.ioloop.start()

View File

@ -21,8 +21,9 @@ class SyncWorker(base.Worker):
def run(self):
# self.socket appears to lose its blocking status after
# we fork in the arbiter. Reset it here.
self.socket.setblocking(0)
[s.setblocking(0) for s in self.sockets]
ready = self.sockets
while self.alive:
self.notify()
@ -30,20 +31,23 @@ class SyncWorker(base.Worker):
# that no connection is waiting we fall down to the
# select which is where we'll wait for a bit for new
# workers to come give us some love.
try:
client, addr = self.socket.accept()
client.setblocking(1)
util.close_on_exec(client)
self.handle(client, addr)
# Keep processing clients until no one is waiting. This
# prevents the need to select() for every client that we
# process.
continue
for sock in ready:
try:
client, addr = sock.accept()
client.setblocking(1)
util.close_on_exec(client)
self.handle(sock, client, addr)
except socket.error as e:
if e.args[0] not in (errno.EAGAIN, errno.ECONNABORTED):
raise
# Keep processing clients until no one is waiting. This
# prevents the need to select() for every client that we
# process.
continue
except socket.error as e:
if e.args[0] not in (errno.EAGAIN, errno.ECONNABORTED,
errno.EWOULDBLOCK):
raise
# If our parent changed then we shut down.
if self.ppid != os.getppid():
@ -52,25 +56,28 @@ class SyncWorker(base.Worker):
try:
self.notify()
ret = select.select([self.socket], [], self.PIPE, self.timeout)
ret = select.select(self.sockets, [], self.PIPE, self.timeout)
if ret[0]:
ready = ret[0]
continue
except select.error as e:
if e.args[0] == errno.EINTR:
ready = self.sockets
continue
if e.args[0] == errno.EBADF:
if self.nr < 0:
ready = self.sockets
continue
else:
return
raise
def handle(self, client, addr):
def handle(self, listener, client, addr):
req = None
try:
parser = http.RequestParser(self.cfg, client)
req = six.next(parser)
self.handle_request(req, client, addr)
self.handle_request(listener, req, client, addr)
except http.errors.NoMoreData as e:
self.log.debug("Ignored premature client disconnection. %s", e)
except StopIteration as e:
@ -85,13 +92,13 @@ class SyncWorker(base.Worker):
finally:
util.close(client)
def handle_request(self, req, client, addr):
def handle_request(self, listener, req, client, addr):
environ = {}
try:
self.cfg.pre_request(self, req)
request_start = datetime.now()
resp, environ = wsgi.create(req, client, addr,
self.address, self.cfg)
listener.getsockname(), self.cfg)
# Force the connection closed until someone shows
# a buffering proxy that supports Keep-Alive to
# the backend.
@ -124,4 +131,3 @@ class SyncWorker(base.Worker):
self.cfg.post_request(self, req, environ)
except:
pass

View File

@ -63,7 +63,7 @@ def test_property_access():
t.eq(c.workers, 3)
# Address is parsed
t.eq(c.address, ("127.0.0.1", 8000))
t.eq(c.address, [("127.0.0.1", 8000)])
# User and group defaults
t.eq(os.geteuid(), c.uid)
@ -165,7 +165,7 @@ def test_callable_validation_for_string():
def test_cmd_line():
with AltArgs(["prog_name", "-b", "blargh"]):
app = NoConfigApp()
t.eq(app.cfg.bind, "blargh")
t.eq(app.cfg.bind, ["blargh"])
with AltArgs(["prog_name", "-w", "3"]):
app = NoConfigApp()
t.eq(app.cfg.workers, 3)
@ -183,12 +183,12 @@ def test_app_config():
def test_load_config():
with AltArgs(["prog_name", "-c", cfg_file()]):
app = NoConfigApp()
t.eq(app.cfg.bind, "unix:/tmp/bar/baz")
t.eq(app.cfg.bind, ["unix:/tmp/bar/baz"])
t.eq(app.cfg.workers, 3)
t.eq(app.cfg.proc_name, "fooey")
def test_cli_overrides_config():
with AltArgs(["prog_name", "-c", cfg_file(), "-b", "blarney"]):
app = NoConfigApp()
t.eq(app.cfg.bind, "blarney")
t.eq(app.cfg.bind, ["blarney"])
t.eq(app.cfg.proc_name, "fooey")