mirror of
https://github.com/frappe/gunicorn.git
synced 2026-01-14 11:09:11 +08:00
This change optimize the sync worker when we only have to listen on one interface. While I'm here, I fixed a long and unnoticed outstanding issue when we were accepting on multiple interfaces (wonder if someone really use it), at some point soe interfaces were skipped.
199 lines
6.6 KiB
Python
199 lines
6.6 KiB
Python
# -*- coding: utf-8 -
|
|
#
|
|
# This file is part of gunicorn released under the MIT license.
|
|
# See the NOTICE for more information.
|
|
#
|
|
|
|
from datetime import datetime
|
|
import errno
|
|
import os
|
|
import select
|
|
import socket
|
|
import ssl
|
|
import sys
|
|
|
|
import gunicorn.http as http
|
|
import gunicorn.http.wsgi as wsgi
|
|
import gunicorn.util as util
|
|
import gunicorn.workers.base as base
|
|
from gunicorn import six
|
|
|
|
|
|
class SyncWorker(base.Worker):
|
|
|
|
def accept(self, listener):
|
|
client, addr = listener.accept()
|
|
client.setblocking(1)
|
|
util.close_on_exec(client)
|
|
self.handle(listener, client, addr)
|
|
|
|
def wait(self, timeout):
|
|
try:
|
|
self.notify()
|
|
ret = select.select(self.sockets, [], self.PIPE, timeout)
|
|
if ret[0]:
|
|
return ret[0]
|
|
|
|
except select.error as e:
|
|
if e.args[0] == errno.EINTR:
|
|
return self.sockets
|
|
if e.args[0] == errno.EBADF:
|
|
if self.nr < 0:
|
|
return self.sockets
|
|
else:
|
|
return False
|
|
raise
|
|
|
|
def is_parent_alive(self):
|
|
# If our parent changed then we shut down.
|
|
if self.ppid != os.getppid():
|
|
self.log.info("Parent changed, shutting down: %s", self)
|
|
return False
|
|
return True
|
|
|
|
def run_for_one(self, timeout):
|
|
listener = self.sockets[0]
|
|
while self.alive:
|
|
self.notify()
|
|
|
|
# Accept a connection. If we get an error telling us
|
|
# that no connection is waiting we fall down to the
|
|
# select which is where we'll wait for a bit for new
|
|
# workers to come give us some love.
|
|
try:
|
|
self.accept(listener)
|
|
# Keep processing clients until no one is waiting. This
|
|
# prevents the need to select() for every client that we
|
|
# process.
|
|
continue
|
|
|
|
except socket.error as e:
|
|
if e.args[0] not in (errno.EAGAIN, errno.ECONNABORTED,
|
|
errno.EWOULDBLOCK):
|
|
raise
|
|
|
|
if not self.is_parent_alive():
|
|
return
|
|
|
|
if not self.wait(timeout):
|
|
return
|
|
|
|
def run_for_multiple(self, timeout):
|
|
while self.alive:
|
|
self.notify()
|
|
|
|
ready = self.wait(timeout)
|
|
if not ready:
|
|
return
|
|
|
|
for listener in ready:
|
|
try:
|
|
self.accept(listener)
|
|
except socket.error as e:
|
|
if e.args[0] not in (errno.EAGAIN, errno.ECONNABORTED,
|
|
errno.EWOULDBLOCK):
|
|
raise
|
|
|
|
if not self.is_parent_alive():
|
|
return
|
|
|
|
def run(self):
|
|
# if no timeout is given the worker will never wait and will
|
|
# use the CPU for nothing. This minimal timeout prevent it.
|
|
timeout = self.timeout or 0.5
|
|
|
|
# self.socket appears to lose its blocking status after
|
|
# we fork in the arbiter. Reset it here.
|
|
for s in self.sockets:
|
|
s.setblocking(0)
|
|
|
|
if len(self.sockets) > 1:
|
|
self.run_for_multiple(timeout)
|
|
else:
|
|
self.run_for_one(timeout)
|
|
|
|
def handle(self, listener, client, addr):
|
|
req = None
|
|
try:
|
|
if self.cfg.is_ssl:
|
|
client = ssl.wrap_socket(client, server_side=True,
|
|
**self.cfg.ssl_options)
|
|
|
|
parser = http.RequestParser(self.cfg, client)
|
|
req = six.next(parser)
|
|
self.handle_request(listener, req, client, addr)
|
|
except http.errors.NoMoreData as e:
|
|
self.log.debug("Ignored premature client disconnection. %s", e)
|
|
except StopIteration as e:
|
|
self.log.debug("Closing connection. %s", e)
|
|
except ssl.SSLError as e:
|
|
if e.args[0] == ssl.SSL_ERROR_EOF:
|
|
self.log.debug("ssl connection closed")
|
|
client.close()
|
|
else:
|
|
self.log.debug("Error processing SSL request.")
|
|
self.handle_error(req, client, addr, e)
|
|
except socket.error as e:
|
|
if e.args[0] not in (errno.EPIPE, errno.ECONNRESET):
|
|
self.log.exception("Socket error processing request.")
|
|
else:
|
|
if e.args[0] == errno.ECONNRESET:
|
|
self.log.debug("Ignoring connection reset")
|
|
else:
|
|
self.log.debug("Ignoring EPIPE")
|
|
except Exception as e:
|
|
self.handle_error(req, client, addr, e)
|
|
finally:
|
|
util.close(client)
|
|
|
|
def handle_request(self, listener, req, client, addr):
|
|
environ = {}
|
|
resp = None
|
|
try:
|
|
self.cfg.pre_request(self, req)
|
|
request_start = datetime.now()
|
|
resp, environ = wsgi.create(req, client, addr,
|
|
listener.getsockname(), self.cfg)
|
|
# Force the connection closed until someone shows
|
|
# a buffering proxy that supports Keep-Alive to
|
|
# the backend.
|
|
resp.force_close()
|
|
self.nr += 1
|
|
if self.nr >= self.max_requests:
|
|
self.log.info("Autorestarting worker after current request.")
|
|
self.alive = False
|
|
respiter = self.wsgi(environ, resp.start_response)
|
|
try:
|
|
if isinstance(respiter, environ['wsgi.file_wrapper']):
|
|
resp.write_file(respiter)
|
|
else:
|
|
for item in respiter:
|
|
resp.write(item)
|
|
resp.close()
|
|
request_time = datetime.now() - request_start
|
|
self.log.access(resp, req, environ, request_time)
|
|
finally:
|
|
if hasattr(respiter, "close"):
|
|
respiter.close()
|
|
except socket.error:
|
|
exc_info = sys.exc_info()
|
|
# pass to next try-except level
|
|
six.reraise(exc_info[0], exc_info[1], exc_info[2])
|
|
except Exception:
|
|
if resp and resp.headers_sent:
|
|
# If the requests have already been sent, we should close the
|
|
# connection to indicate the error.
|
|
self.log.exception("Error handling request")
|
|
try:
|
|
client.shutdown(socket.SHUT_RDWR)
|
|
client.close()
|
|
except socket.error:
|
|
pass
|
|
raise StopIteration()
|
|
raise
|
|
finally:
|
|
try:
|
|
self.cfg.post_request(self, req, environ, resp)
|
|
except Exception:
|
|
self.log.exception("Exception in post_request hook")
|