Got the Sync and Eventlet workers running.

This commit is contained in:
Paul J. Davis 2010-04-15 21:20:17 -04:00
parent 4fe8608196
commit 0c935d06c7
11 changed files with 92 additions and 89 deletions

View File

@ -51,31 +51,25 @@ class Arbiter(object):
self.cfg = cfg self.cfg = cfg
self.app = app self.app = app
self.log = logging.getLogger(__name__)
self.address = cfg.address self.address = cfg.address
self.num_workers = cfg.num_workers self.num_workers = cfg.workers
self.debug = cfg.debug self.debug = cfg.debug
self.timeout = cfg.timeout self.timeout = cfg.timeout
self.proc_name = cfg.proc_name self.proc_name = cfg.proc_name
self.worker_class = cfg.worker_class
self.address = address try:
self.num_workers = num_workers self.worker_class = cfg.worker_class
self.app = app except ImportError, e:
self.log.error("%s" % e)
sys.exit(1)
self._pidfile = None self._pidfile = None
self.worker_age = 0 self.worker_age = 0
self.reexec_pid = 0 self.reexec_pid = 0
self.master_name = "Master" self.master_name = "Master"
self.opts = kwargs
self.debug = kwargs.get("debug", False)
self.conf = kwargs.get("config", {})
self.timeout = self.conf['timeout']
self.proc_name = self.conf['proc_name']
self.worker_class = kwargs.get("workerclass", SyncWorker)
self.log = logging.getLogger(__name__)
# get current path, try to use PWD env first # get current path, try to use PWD env first
try: try:
a = os.stat(os.environ('PWD')) a = os.stat(os.environ('PWD'))
@ -100,8 +94,8 @@ class Arbiter(object):
""" """
self.pid = os.getpid() self.pid = os.getpid()
self.init_signals() self.init_signals()
self.LISTENER = create_socket(self.conf) self.LISTENER = create_socket(self.cfg)
self.pidfile = self.opts.get("pidfile") self.pidfile = self.cfg.pidfile
self.log.info("Arbiter booted") self.log.info("Arbiter booted")
self.log.info("Listening at: %s" % self.LISTENER) self.log.info("Listening at: %s" % self.LISTENER)
@ -305,7 +299,7 @@ class Arbiter(object):
os.environ['GUNICORN_FD'] = str(self.LISTENER.fileno()) os.environ['GUNICORN_FD'] = str(self.LISTENER.fileno())
os.chdir(self.START_CTX['cwd']) os.chdir(self.START_CTX['cwd'])
self.conf.before_exec(self) self.cfg.before_exec(self)
os.execlp(self.START_CTX[0], *self.START_CTX['argv']) os.execlp(self.START_CTX[0], *self.START_CTX['argv'])
def murder_workers(self): def murder_workers(self):
@ -369,8 +363,8 @@ class Arbiter(object):
for i in range(self.num_workers - len(self.WORKERS.keys())): for i in range(self.num_workers - len(self.WORKERS.keys())):
self.worker_age += 1 self.worker_age += 1
worker = self.worker_class(self.worker_age, self.pid, self.LISTENER, worker = self.worker_class(self.worker_age, self.pid, self.LISTENER,
self.app, self.timeout/2.0, self.conf) self.app, self.timeout/2.0, self.cfg)
self.conf.before_fork(self, worker) self.cfg.before_fork(self, worker)
pid = os.fork() pid = os.fork()
if pid != 0: if pid != 0:
self.WORKERS[pid] = worker self.WORKERS[pid] = worker
@ -382,8 +376,8 @@ class Arbiter(object):
util._setproctitle("worker [%s]" % self.proc_name) util._setproctitle("worker [%s]" % self.proc_name)
self.log.debug("Booting worker: %s (age: %s)" % ( self.log.debug("Booting worker: %s (age: %s)" % (
worker_pid, self.worker_age)) worker_pid, self.worker_age))
self.conf.after_fork(self, worker) self.cfg.after_fork(self, worker)
worker.run() worker.init_process()
sys.exit(0) sys.exit(0)
except SystemExit: except SystemExit:
raise raise

View File

@ -4,7 +4,15 @@
# See the NOTICE for more information. # See the NOTICE for more information.
from gunicorn.http.parser import Parser from gunicorn.http.parser import Parser
from gunicorn.http.request import Request, RequestError from gunicorn.http.request import Request, KeepAliveRequest, RequestError
from gunicorn.http.response import Response from gunicorn.http.response import Response, KeepAliveResponse
__all__ = [
Parser,
Request,
KeepAliveRequest,
RequestError,
Response,
KeepAliveResponse
]
__all__ = [Parser, Request, RequestError, Response]

View File

@ -7,6 +7,7 @@ import errno
import logging import logging
import os import os
import re import re
import socket
try: try:
from cStringIO import StringIO from cStringIO import StringIO
@ -171,13 +172,13 @@ class Request(object):
self.response = self.RESPONSE_CLASS(self, status, headers) self.response = self.RESPONSE_CLASS(self, status, headers)
return self.response.write return self.response.write
class KeepaliveRequest(http.Request): class KeepAliveRequest(Request):
RESPONSE_CLASS = KeepAliveResponse RESPONSE_CLASS = KeepAliveResponse
def read(self): def read(self):
try: try:
return super(KeepaliveRequest, self).read() return super(KeepAliveRequest, self).read()
except socket.error, e: except socket.error, e:
if e[0] == errno.ECONNRESET: if e[0] == errno.ECONNRESET:
return return

View File

@ -58,7 +58,7 @@ class Response(object):
if self.chunked: if self.chunked:
write_chunk(self.socket, "") write_chunk(self.socket, "")
class KeepaliveResponse(Response): class KeepAliveResponse(Response):
def default_headers(self): def default_headers(self):
connection = "keep-alive" connection = "keep-alive"

View File

@ -33,7 +33,7 @@ def options():
help='Adress to listen on. Ex. 127.0.0.1:8000 or unix:/tmp/gunicorn.sock'), help='Adress to listen on. Ex. 127.0.0.1:8000 or unix:/tmp/gunicorn.sock'),
op.make_option('-w', '--workers', dest='workers', op.make_option('-w', '--workers', dest='workers',
help='Number of workers to spawn. [1]'), help='Number of workers to spawn. [1]'),
op.make_option('-k', '--worker-class', dest='klass', op.make_option('-k', '--worker-class', dest='workerclass',
help="The type of request processing to use "+ help="The type of request processing to use "+
"[egg:gunicorn#main]"), "[egg:gunicorn#main]"),
op.make_option('-p','--pid', dest='pidfile', op.make_option('-p','--pid', dest='pidfile',
@ -79,44 +79,6 @@ def main(usage, get_app):
Arbiter(cfg, app).run() Arbiter(cfg, app).run()
def paste_server(app, gcfg=None, host="127.0.0.1", port=None, *args, **kwargs):
"""\
A paster server.
Then entry point in your paster ini file should looks like this:
[server:main]
use = egg:gunicorn#main
host = 127.0.0.1
port = 5000
"""
opts = kwargs.copy()
if port and not host.startswith("unix:"):
bind = "%s:%s" % (host, port)
else:
bind = host
opts['bind'] = bind
if gcfg:
for key, value in list(gcfg.items()):
if value and value is not None:
if key == "debug":
value = (value == "true")
opts[key] = value
opts['default_proc_name'] = opts['__file__']
cfg = Config(opts)
if cfg.spew:
spew()
if cfg.daemon:
daemonize()
else:
os.setpgrp()
configure_logging(cfg)
Arbiter(cfg, app).run()
def run(): def run():
"""\ """\
The ``gunicorn`` command line runner for launcing Gunicorn with The ``gunicorn`` command line runner for launcing Gunicorn with
@ -239,6 +201,44 @@ def run_paster():
main("%prog [OPTIONS] pasteconfig.ini", get_app) main("%prog [OPTIONS] pasteconfig.ini", get_app)
def paste_server(app, gcfg=None, host="127.0.0.1", port=None, *args, **kwargs):
"""\
A paster server.
Then entry point in your paster ini file should looks like this:
[server:main]
use = egg:gunicorn#main
host = 127.0.0.1
port = 5000
"""
opts = kwargs.copy()
if port and not host.startswith("unix:"):
bind = "%s:%s" % (host, port)
else:
bind = host
opts['bind'] = bind
if gcfg:
for key, value in list(gcfg.items()):
if value and value is not None:
if key == "debug":
value = (value == "true")
opts[key] = value
opts['default_proc_name'] = opts['__file__']
cfg = Config(opts)
if cfg.spew:
spew()
if cfg.daemon:
daemonize()
else:
os.setpgrp()
configure_logging(cfg)
Arbiter(cfg, app).run()
def daemonize(): def daemonize():
"""\ """\
Standard daemonization of a process. Code is basd on the Standard daemonization of a process. Code is basd on the

View File

@ -51,9 +51,9 @@ def load_worker_class(uri):
dist, name = entry_str.rsplit("#",1) dist, name = entry_str.rsplit("#",1)
except ValueError: except ValueError:
dist = entry_str dist = entry_str
name = "main" name = "sync"
return pkg_resources.load_entry_point(dist, "gunicorn.arbiter", name) return pkg_resources.load_entry_point(dist, "gunicorn.workers", name)
else: else:
components = uri.split('.') components = uri.split('.')
if len(components) == 1: if len(components) == 1:

View File

@ -20,7 +20,7 @@ class AsyncWorker(Worker):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
Worker.__init__(self, *args, **kwargs) Worker.__init__(self, *args, **kwargs)
self.worker_connections = self.conf.worker_connections self.worker_connections = self.cfg.worker_connections
def handle(self, client, addr): def handle(self, client, addr):
try: try:
@ -46,7 +46,7 @@ class AsyncWorker(Worker):
util.close(client) util.close(client)
def handle_request(self, client, addr): def handle_request(self, client, addr):
req = KeepaliveRequest(client, addr, self.address, self.conf) req = http.KeepAliveRequest(client, addr, self.address, self.cfg)
try: try:
environ = req.read() environ = req.read()
if not environ or not req.parser.headers: if not environ or not req.parser.headers:

View File

@ -4,15 +4,11 @@
# See the NOTICE for more information. # See the NOTICE for more information.
import errno
import logging import logging
import os import os
import select
import signal import signal
import socket
import sys import sys
import tempfile import tempfile
import traceback
from gunicorn import util from gunicorn import util
@ -25,7 +21,7 @@ class Worker(object):
PIPE = [] PIPE = []
def __init__(self, age, ppid, socket, app, timeout, conf): def __init__(self, age, ppid, socket, app, timeout, cfg):
"""\ """\
This is called pre-fork so it shouldn't do anything to the This is called pre-fork so it shouldn't do anything to the
current process. If there's a need to make process wide current process. If there's a need to make process wide
@ -36,17 +32,17 @@ class Worker(object):
self.socket = socket self.socket = socket
self.app = app self.app = app
self.timeout = timeout self.timeout = timeout
self.conf = conf self.cfg = cfg
self.nr = 0 self.nr = 0
self.alive = True self.alive = True
self.spinner = 0 self.spinner = 0
self.log = logging.getLogger(__name__) self.log = logging.getLogger(__name__)
self.debug = conf.get('debug', False) self.debug = cfg.debug
self.address = self.socket.getsockname() self.address = self.socket.getsockname()
self.fd, self.tmpname = tempfile.mkstemp(prefix="wgunicorn-") self.fd, self.tmpname = tempfile.mkstemp(prefix="wgunicorn-")
util.chown(self.tmpname, conf.uid, conf.gid) util.chown(self.tmpname, cfg.uid, cfg.gid)
self.tmp = os.fdopen(self.fd, "r+b") self.tmp = os.fdopen(self.fd, "r+b")
def __str__(self): def __str__(self):
@ -83,7 +79,7 @@ class Worker(object):
super(MyWorkerClass, self).init_process() so that the ``run()`` super(MyWorkerClass, self).init_process() so that the ``run()``
loop is initiated. loop is initiated.
""" """
util.set_owner_process(self.conf.uid, self.conf.gid) util.set_owner_process(self.cfg.uid, self.cfg.gid)
# For waking ourselves up # For waking ourselves up
self.PIPE = os.pipe() self.PIPE = os.pipe()

View File

@ -39,7 +39,6 @@ class EventletWorker(AsyncWorker):
super(EventletWorker, self).init_process() super(EventletWorker, self).init_process()
def run(self): def run(self):
self.init_process()
self.socket.setblocking(1) self.socket.setblocking(1)
pool = greenpool.GreenPool(self.worker_connections) pool = greenpool.GreenPool(self.worker_connections)

View File

@ -1,12 +1,17 @@
from gunicorn import http import errno
import os
import select
import socket
import traceback
from gunicorn import http, util
from gunicorn.http.tee import UnexpectedEOF from gunicorn.http.tee import UnexpectedEOF
from gunicorn.workers.base import Worker from gunicorn.workers.base import Worker
class SyncWorker(Worker): class SyncWorker(Worker):
def run(self): def run(self):
self.init_process()
self.nr = 0 self.nr = 0
# self.socket appears to lose its blocking status after # self.socket appears to lose its blocking status after
@ -24,7 +29,7 @@ class SyncWorker(Worker):
try: try:
client, addr = self.socket.accept() client, addr = self.socket.accept()
client.setblocking(1) client.setblocking(1)
util.close_on_exec(sock) util.close_on_exec(client)
self.handle(client, addr) self.handle(client, addr)
self.nr += 1 self.nr += 1
except socket.error, e: except socket.error, e:
@ -79,7 +84,7 @@ class SyncWorker(Worker):
util.close(client) util.close(client)
def handle_request(self, client, addr): def handle_request(self, client, addr):
req = http.Request(client, addr, self.address, self.conf) req = http.Request(client, addr, self.address, self.cfg)
try: try:
environ = req.read() environ = req.read()
if not environ or not req.parser.status_line: if not environ or not req.parser.status_line:

View File

@ -48,7 +48,7 @@ setup(
gunicorn_django=gunicorn.main:run_django gunicorn_django=gunicorn.main:run_django
gunicorn_paster=gunicorn.main:run_paster gunicorn_paster=gunicorn.main:run_paster
[gunicorn.worker] [gunicorn.workers]
sync=gunicorn.workers.sync:SyncWorker sync=gunicorn.workers.sync:SyncWorker
eventlet=gunicorn.workers.geventlet:EventletWorker eventlet=gunicorn.workers.geventlet:EventletWorker
gevent=gunicorn.workers.ggevent:GEventWorker gevent=gunicorn.workers.ggevent:GEventWorker