it's better to test when you use the right code to do it. We had a

blocking operation django example (we read a file already on the fs and
recreate another which blocked async schedulers).

While I'm here ease the code of eventlet worker. Just use the convenient
eventlet.serve function which already manage what we do and revert sopme
useless changes in body and header parsing.
This commit is contained in:
benoitc 2010-09-02 14:55:56 +02:00
parent 7e4ca4b809
commit 7715199b48
7 changed files with 39 additions and 65 deletions

View File

@ -66,6 +66,11 @@ MIDDLEWARE_CLASSES = (
'django.contrib.auth.middleware.AuthenticationMiddleware', 'django.contrib.auth.middleware.AuthenticationMiddleware',
) )
FILE_UPLOAD_HANDLERS = (
"django.core.files.uploadhandler.TemporaryFileUploadHandler",
)
ROOT_URLCONF = 'djangotest.urls' ROOT_URLCONF = 'djangotest.urls'
TEMPLATE_DIRS = ( TEMPLATE_DIRS = (

View File

@ -18,6 +18,7 @@ def home(request):
subject = None subject = None
message = None message = None
size = 0 size = 0
print request.META
if request.POST: if request.POST:
form = MsgForm(request.POST, request.FILES) form = MsgForm(request.POST, request.FILES)
print request.FILES print request.FILES
@ -25,11 +26,7 @@ def home(request):
subject = form.cleaned_data['subject'] subject = form.cleaned_data['subject']
message = form.cleaned_data['message'] message = form.cleaned_data['message']
f = request.FILES['f'] f = request.FILES['f']
tmp = tempfile.TemporaryFile() size = int(os.fstat(f.fileno())[6])
for chunk in f.chunks():
tmp.write(chunk)
tmp.flush()
size = int(os.fstat(tmp.fileno())[6])
else: else:
form = MsgForm() form = MsgForm()

View File

@ -223,12 +223,11 @@ class Body(object):
idx = self.buf.getvalue().find("\n") idx = self.buf.getvalue().find("\n")
while idx < 0: while idx < 0:
pos = self.buf.tell() - 1
data = self.reader.read(1024) data = self.reader.read(1024)
if not len(data): if not len(data):
break break
self.buf.write(data) self.buf.write(data)
idx = self.buf.getvalue()[pos:].find("\n") idx = self.buf.getvalue().find("\n")
if size < self.buf.tell(): if size < self.buf.tell():
break break

View File

@ -137,25 +137,21 @@ class Request(Message):
# Headers # Headers
pos = 0
idx = buf.getvalue().find("\r\n\r\n") idx = buf.getvalue().find("\r\n\r\n")
done = buf.getvalue()[:2] == "\r\n" done = buf.getvalue()[:2] == "\r\n"
while idx < 0 and not done: while idx < 0 and not done:
pos = buf.tell() - 4
self.get_data(unreader, buf) self.get_data(unreader, buf)
idx = buf.getvalue()[pos:].find("\r\n\r\n") idx = buf.getvalue().find("\r\n\r\n")
done = buf.getvalue()[:2] == "\r\n" done = buf.getvalue()[:2] == "\r\n"
if done: if done:
self.unreader.unread(buf.getvalue()[2:]) self.unreader.unread(buf.getvalue()[2:])
return "" return ""
end = pos + idx self.headers = self.parse_headers(buf.getvalue()[:idx])
self.headers = self.parse_headers(buf.getvalue()[:end]) ret = buf.getvalue()[idx+4:]
ret = buf.getvalue()[end+4:]
buf.truncate(0) buf.truncate(0)
return ret return ret

View File

@ -5,15 +5,13 @@
from __future__ import with_statement from __future__ import with_statement
import errno
import socket import os
import eventlet import eventlet
from eventlet.green import os
from eventlet import hubs from eventlet import hubs
from eventlet.greenio import GreenSocket from eventlet.greenio import GreenSocket
from gunicorn.workers.async import AsyncWorker from gunicorn.workers.async import AsyncWorker
class EventletWorker(AsyncWorker): class EventletWorker(AsyncWorker):
@ -32,47 +30,21 @@ class EventletWorker(AsyncWorker):
def timeout_ctx(self): def timeout_ctx(self):
return eventlet.Timeout(self.cfg.keepalive, False) return eventlet.Timeout(self.cfg.keepalive, False)
def acceptor(self, pool):
try:
while self.alive:
try:
client, addr = self.socket.accept()
pool.spawn_n(self.handle, client, addr)
except socket.error, e:
if e[0] not in (errno.EAGAIN, errno.ECONNABORTED):
raise
if pool.running() > self.worker_connections:
continue
try:
hubs.trampoline(self.socket.fileno(), read=True,
timeout=self.timeout)
except eventlet.Timeout:
pass
except eventlet.StopServe:
pool.waitall()
def run(self): def run(self):
self.socket = GreenSocket(family_or_realsock=self.socket.sock) self.socket = GreenSocket(family_or_realsock=self.socket.sock)
self.socket.setblocking(1) self.socket.setblocking(1)
self.acceptor = eventlet.spawn(eventlet.serve, self.socket,
self.handle, self.worker_connections)
pool = eventlet.GreenPool(self.worker_connections)
acceptor = eventlet.spawn(self.acceptor, pool)
try:
while self.alive: while self.alive:
self.notify() self.notify()
if self.ppid != os.getppid(): if self.ppid != os.getppid():
self.log.info("Parent changed, shutting down: %s" % self) self.log.info("Parent changed, shutting down: %s" % self)
server.stop()
break break
eventlet.sleep(0.1) eventlet.sleep(0.1)
except KeyboardInterrupt:
pass
self.notify()
with eventlet.Timeout(self.timeout, False): with eventlet.Timeout(self.timeout, False):
eventlet.kill(acceptor, eventlet.StopServe) self.log.info("are we blocking?")
eventlet.kill(self.acceptor, eventlet.StopServe)
self.log.info("no sir!")

View File

@ -78,14 +78,16 @@ class GeventWorker(AsyncWorker):
self.log.info("Parent changed, shutting down: %s" % self) self.log.info("Parent changed, shutting down: %s" % self)
break break
gevent.sleep(0.1) gevent.sleep(0.1)
except KeyboardInterrupt:
pass
try:
# Try to stop connections until timeout # Try to stop connections until timeout
self.notify() self.notify()
server.stop(timeout=self.timeout) server.stop(timeout=self.timeout)
except: except:
pass pass
def handle_request(self, *args): def handle_request(self, *args):
try: try:
super(GeventWorker, self).handle_request(*args) super(GeventWorker, self).handle_request(*args)
@ -137,11 +139,14 @@ class GeventBaseWorker(Worker):
break break
gevent.sleep(0.1) gevent.sleep(0.1)
except KeyboardInterrupt:
pass
# try to stop the connections
try:
self.notify() self.notify()
server.stop(timeout=self.timeout) server.stop(timeout=self.timeout)
except gevent.GreenletExit: except:
pass
except KeyboardInterrupt:
pass pass