From 61b31994d1b13c58fd70919e9e699a6035675858 Mon Sep 17 00:00:00 2001 From: "Paul J. Davis" Date: Thu, 11 Feb 2010 19:22:55 -0500 Subject: [PATCH] Refactored the socket creation code. --- gunicorn/arbiter.py | 80 ++------------------------------ gunicorn/sock.py | 108 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 113 insertions(+), 75 deletions(-) create mode 100644 gunicorn/sock.py diff --git a/gunicorn/arbiter.py b/gunicorn/arbiter.py index c4a8d759..9e6c2871 100644 --- a/gunicorn/arbiter.py +++ b/gunicorn/arbiter.py @@ -18,6 +18,7 @@ import tempfile import time import traceback +from gunicorn.sock import create_socket from gunicorn.worker import Worker from gunicorn import util @@ -40,8 +41,7 @@ class Arbiter(object): if name[:3] == "SIG" and name[3] != "_" ) - def __init__(self, address, num_workers, modname, - **kwargs): + def __init__(self, address, num_workers, modname, **kwargs): self.address = address self.num_workers = num_workers self.modname = modname @@ -70,14 +70,14 @@ class Arbiter(object): "cwd": cwd, 0: copy.copy(sys.argv[0]) } - - + def start(self): self.pid = os.getpid() self.init_signals() - self.listen(self.address) + self.LISTENER = create_socket(self.address) self.pidfile = self.opts.get("pidfile") self.log.info("Booted Arbiter: %s" % os.getpid()) + self.log.info("Listening on socket: %s" % self.LISTENER) def _del_pidfile(self): self._pidfile = None @@ -106,7 +106,6 @@ class Arbiter(object): self._pidfile = path pidfile = property(_get_pidfile, _set_pidfile, _del_pidfile) - def unlink_pidfile(self, path): try: with open(path, "r") as f: @@ -150,75 +149,6 @@ class Arbiter(object): else: self.log.warn("Ignoring rapid signaling: %s" % sig) - def listen(self, addr): - if 'GUNICORN_FD' in os.environ: - fd = int(os.environ['GUNICORN_FD']) - del os.environ['GUNICORN_FD'] - try: - sock = self.init_socket_fromfd(fd, addr) - self.LISTENER = sock - return - except socket.error, e: - if e[0] == errno.ENOTCONN: - self.log.error("should be a non GUNICORN environment") - else: - raise - else: - for i in range(5): - try: - sock = self.init_socket(addr) - self.LISTENER = sock - break - except socket.error, e: - if e[0] == errno.EADDRINUSE: - self.log.error("Connection in use: %s" % str(addr)) - if i < 5: - self.log.error("Retrying in 1 second.") - time.sleep(1) - - if self.LISTENER: - try: - self.log.info("Listen on %s:%s" % self.LISTENER.getsockname()) - except TypeError: - self.log.info("Listen on %s" % self.LISTENER.getsockname()) - else: - self.log.error("Can't connect to %s" % str(addr)) - sys.exit(1) - - def init_socket_fromfd(self, fd, address): - if isinstance(address, basestring): - sock = socket.fromfd(fd, socket.AF_UNIX, socket.SOCK_STREAM) - else: - sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) - self.set_tcp_sockopts(sock) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.setblocking(0) - sock.listen(2048) - return sock - - def init_socket(self, address): - if isinstance(address, basestring): - try: - os.remove(address) - except OSError, e: - pass - sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - else: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.set_tcp_sockopts(sock) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.bind(address) - sock.setblocking(0) - sock.listen(2048) - return sock - - def set_tcp_sockopts(self, sock): - if hasattr(socket, "TCP_CORK"): - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_CORK, 1) - elif hasattr(socket, "TCP_NOPUSH"): - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NOPUSH, 1) - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - def run(self): self.start() self.manage_workers() diff --git a/gunicorn/sock.py b/gunicorn/sock.py new file mode 100644 index 00000000..77054193 --- /dev/null +++ b/gunicorn/sock.py @@ -0,0 +1,108 @@ + +import errno +import logging +import os +import socket +import sys +import time + +log = logging.getLogger(__name__) + +class BaseSocket(object): + + def __init__(self, addr, fd=None): + self.address = addr + if fd is None: + sock = socket.socket(self.FAMILY, socket.SOCK_STREAM) + else: + print "%r" % fd + sock = socket.fromfd(fd, self.FAMILY, socket.SOCK_STREAM) + self.sock = self.set_options(sock, bound=(fd is not None)) + + def __str__(self, name): + return "" % self.sock.fileno() + + def __getattr__(self, name): + return getattr(self.sock, name) + + def set_options(self, sock, bound=False): + if not bound: + sock.bind(self.address) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.setblocking(0) + sock.listen(2048) + return sock + +class TCPSocket(BaseSocket): + + FAMILY = socket.AF_INET + + def __str__(self): + return "http://%s:%d" % self.address + + def set_options(self, sock, bound=False): + if hasattr(socket, "TCP_CORK"): + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_CORK, 1) + elif hasattr(socket, "TCP_NOPUSH"): + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NOPUSH, 1) + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + return super(TCPSocket, self).set_options(sock, bound=bound) + +class UnixSocket(BaseSocket): + + FAMILY = socket.AF_UNIX + + def __init__(self, addr, fd=None): + if fd is None: + try: + os.remove(addr) + except OSError: + pass + super(UnixSocket, self).__init__(addr, fd=fd) + + def __str__(self): + return "unix://%s" % self.address + +def create_socket(addr): + """\ + Create a new socket for the given address. If the + address is a tuple, a TCP socket is created. If it + is a string, a Unix socket is created. Otherwise + a TypeError is raised. + """ + if isinstance(addr, tuple): + sock_type = TCPSocket + elif isinstance(addr, basestring): + sock_type = UnixSocket + else: + raise TypeError("Unable to create socket from: %r" % addr) + + if 'GUNICORN_FD' in os.environ: + fd = int(os.environ.pop('GUNICORN_FD')) + try: + return sock_type(addr, fd=fd) + except socket.error, e: + if e[0] == errno.ENOTCONN: + log.error("GUNICORN_FD should refer to an open socket.") + else: + raise + + # If we fail to create a socket from GUNICORN_FD + # we fall through and try and open the socket + # normally. + + for i in range(5): + try: + return sock_type(addr) + except socket.error, e: + if e[0] == errno.EADDRINUSE: + log.error("Connection in use: %s" % str(addr)) + if e[0] == errno.EADDRNOTAVAIL: + log.error("Invalid address: %s" % str(addr)) + sys.exit(1) + if i < 5: + log.error("Retrying in 1 second.") + time.sleep(1) + + log.error("Can't connect to %s" % str(addr)) + sys.exit(1)