mirror of
https://github.com/frappe/gunicorn.git
synced 2026-01-14 11:09:11 +08:00
Merge pull request #854 from benoitc/fix/830-2
fix asyncio worker handling
This commit is contained in:
commit
f25ea06c74
128
gunicorn/workers/_gaiohttp.py
Normal file
128
gunicorn/workers/_gaiohttp.py
Normal file
@ -0,0 +1,128 @@
|
||||
# -*- coding: utf-8 -
|
||||
#
|
||||
# This file is part of gunicorn released under the MIT license.
|
||||
# See the NOTICE for more information.
|
||||
|
||||
import asyncio
|
||||
import functools
|
||||
import os
|
||||
import gunicorn.workers.base as base
|
||||
|
||||
from aiohttp.wsgi import WSGIServerHttpProtocol
|
||||
|
||||
|
||||
class AiohttpWorker(base.Worker):
|
||||
|
||||
def __init__(self, *args, **kw): # pragma: no cover
|
||||
super().__init__(*args, **kw)
|
||||
|
||||
self.servers = []
|
||||
self.connections = {}
|
||||
|
||||
def init_process(self):
|
||||
# create new event_loop after fork
|
||||
asyncio.get_event_loop().close()
|
||||
|
||||
self.loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(self.loop)
|
||||
|
||||
super().init_process()
|
||||
|
||||
def run(self):
|
||||
self._runner = asyncio.async(self._run(), loop=self.loop)
|
||||
|
||||
try:
|
||||
self.loop.run_until_complete(self._runner)
|
||||
finally:
|
||||
self.loop.close()
|
||||
|
||||
def wrap_protocol(self, proto):
|
||||
proto.connection_made = _wrp(
|
||||
proto, proto.connection_made, self.connections)
|
||||
proto.connection_lost = _wrp(
|
||||
proto, proto.connection_lost, self.connections, False)
|
||||
return proto
|
||||
|
||||
def factory(self, wsgi, addr):
|
||||
proto = WSGIServerHttpProtocol(
|
||||
wsgi, readpayload=True,
|
||||
loop=self.loop,
|
||||
log=self.log,
|
||||
debug=self.cfg.debug,
|
||||
keep_alive=self.cfg.keepalive,
|
||||
access_log=self.log.access_log,
|
||||
access_log_format=self.cfg.access_log_format)
|
||||
return self.wrap_protocol(proto)
|
||||
|
||||
def get_factory(self, sock, addr):
|
||||
return functools.partial(self.factory, self.wsgi, addr)
|
||||
|
||||
@asyncio.coroutine
|
||||
def close(self):
|
||||
try:
|
||||
if hasattr(self.wsgi, 'close'):
|
||||
yield from self.wsgi.close()
|
||||
except:
|
||||
self.log.exception('Process shutdown exception')
|
||||
|
||||
@asyncio.coroutine
|
||||
def _run(self):
|
||||
for sock in self.sockets:
|
||||
factory = self.get_factory(sock.sock, sock.cfg_addr)
|
||||
self.servers.append(
|
||||
(yield from self.loop.create_server(factory, sock=sock.sock)))
|
||||
|
||||
# If our parent changed then we shut down.
|
||||
pid = os.getpid()
|
||||
try:
|
||||
while self.alive or self.connections:
|
||||
self.notify()
|
||||
|
||||
if (self.alive and
|
||||
pid == os.getpid() and self.ppid != os.getppid()):
|
||||
self.log.info("Parent changed, shutting down: %s", self)
|
||||
self.alive = False
|
||||
|
||||
# stop accepting requests
|
||||
if not self.alive:
|
||||
if self.servers:
|
||||
self.log.info(
|
||||
"Stopping server: %s, connections: %s",
|
||||
pid, len(self.connections))
|
||||
for server in self.servers:
|
||||
server.close()
|
||||
self.servers.clear()
|
||||
|
||||
# prepare connections for closing
|
||||
for conn in self.connections.values():
|
||||
if hasattr(conn, 'closing'):
|
||||
conn.closing()
|
||||
|
||||
yield from asyncio.sleep(1.0, loop=self.loop)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
if self.servers:
|
||||
for server in self.servers:
|
||||
server.close()
|
||||
|
||||
yield from self.close()
|
||||
|
||||
|
||||
class _wrp:
|
||||
|
||||
def __init__(self, proto, meth, tracking, add=True):
|
||||
self._proto = proto
|
||||
self._id = id(proto)
|
||||
self._meth = meth
|
||||
self._tracking = tracking
|
||||
self._add = add
|
||||
|
||||
def __call__(self, *args):
|
||||
if self._add:
|
||||
self._tracking[self._id] = self._proto
|
||||
elif self._id in self._tracking:
|
||||
del self._tracking[self._id]
|
||||
|
||||
conn = self._meth(*args)
|
||||
return conn
|
||||
@ -2,131 +2,16 @@
|
||||
#
|
||||
# This file is part of gunicorn released under the MIT license.
|
||||
# See the NOTICE for more information.
|
||||
__all__ = ['AiohttpWorker']
|
||||
|
||||
import asyncio
|
||||
import functools
|
||||
import os
|
||||
import gunicorn.workers.base as base
|
||||
import sys
|
||||
|
||||
try:
|
||||
from aiohttp.wsgi import WSGIServerHttpProtocol
|
||||
except ImportError:
|
||||
raise RuntimeError("You need aiohttp installed to use this worker.")
|
||||
|
||||
|
||||
class AiohttpWorker(base.Worker):
|
||||
|
||||
def __init__(self, *args, **kw): # pragma: no cover
|
||||
super().__init__(*args, **kw)
|
||||
|
||||
self.servers = []
|
||||
self.connections = {}
|
||||
|
||||
def init_process(self):
|
||||
# create new event_loop after fork
|
||||
asyncio.get_event_loop().close()
|
||||
|
||||
self.loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(self.loop)
|
||||
|
||||
super().init_process()
|
||||
|
||||
def run(self):
|
||||
self._runner = asyncio.async(self._run(), loop=self.loop)
|
||||
|
||||
try:
|
||||
self.loop.run_until_complete(self._runner)
|
||||
finally:
|
||||
self.loop.close()
|
||||
|
||||
def wrap_protocol(self, proto):
|
||||
proto.connection_made = _wrp(
|
||||
proto, proto.connection_made, self.connections)
|
||||
proto.connection_lost = _wrp(
|
||||
proto, proto.connection_lost, self.connections, False)
|
||||
return proto
|
||||
|
||||
def factory(self, wsgi, addr):
|
||||
proto = WSGIServerHttpProtocol(
|
||||
wsgi, readpayload=True,
|
||||
loop=self.loop,
|
||||
log=self.log,
|
||||
debug=self.cfg.debug,
|
||||
keep_alive=self.cfg.keepalive,
|
||||
access_log=self.log.access_log,
|
||||
access_log_format=self.cfg.access_log_format)
|
||||
return self.wrap_protocol(proto)
|
||||
|
||||
def get_factory(self, sock, addr):
|
||||
return functools.partial(self.factory, self.wsgi, addr)
|
||||
|
||||
@asyncio.coroutine
|
||||
def close(self):
|
||||
try:
|
||||
if hasattr(self.wsgi, 'close'):
|
||||
yield from self.wsgi.close()
|
||||
except:
|
||||
self.log.exception('Process shutdown exception')
|
||||
|
||||
@asyncio.coroutine
|
||||
def _run(self):
|
||||
for sock in self.sockets:
|
||||
factory = self.get_factory(sock.sock, sock.cfg_addr)
|
||||
self.servers.append(
|
||||
(yield from self.loop.create_server(factory, sock=sock.sock)))
|
||||
|
||||
# If our parent changed then we shut down.
|
||||
pid = os.getpid()
|
||||
try:
|
||||
while self.alive or self.connections:
|
||||
self.notify()
|
||||
|
||||
if (self.alive and
|
||||
pid == os.getpid() and self.ppid != os.getppid()):
|
||||
self.log.info("Parent changed, shutting down: %s", self)
|
||||
self.alive = False
|
||||
|
||||
# stop accepting requests
|
||||
if not self.alive:
|
||||
if self.servers:
|
||||
self.log.info(
|
||||
"Stopping server: %s, connections: %s",
|
||||
pid, len(self.connections))
|
||||
for server in self.servers:
|
||||
server.close()
|
||||
self.servers.clear()
|
||||
|
||||
# prepare connections for closing
|
||||
for conn in self.connections.values():
|
||||
if hasattr(conn, 'closing'):
|
||||
conn.closing()
|
||||
|
||||
yield from asyncio.sleep(1.0, loop=self.loop)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
if self.servers:
|
||||
for server in self.servers:
|
||||
server.close()
|
||||
|
||||
yield from self.close()
|
||||
|
||||
|
||||
class _wrp:
|
||||
|
||||
def __init__(self, proto, meth, tracking, add=True):
|
||||
self._proto = proto
|
||||
self._id = id(proto)
|
||||
self._meth = meth
|
||||
self._tracking = tracking
|
||||
self._add = add
|
||||
|
||||
def __call__(self, *args):
|
||||
if self._add:
|
||||
self._tracking[self._id] = self._proto
|
||||
elif self._id in self._tracking:
|
||||
del self._tracking[self._id]
|
||||
|
||||
conn = self._meth(*args)
|
||||
return conn
|
||||
if sys.version_info >= (3, 3):
|
||||
try:
|
||||
import aiohttp
|
||||
except ImportError:
|
||||
raise RuntimeError("You need aiohttp installed to use this worker.")
|
||||
else:
|
||||
from gunicorn.workers._gaiohttp import AiohttpWorker
|
||||
__all__ = ['AiohttpWorker']
|
||||
else:
|
||||
raise RuntimeError("You need Python >= 3.3 to use the asyncio worker")
|
||||
|
||||
22
setup.py
22
setup.py
@ -5,17 +5,14 @@
|
||||
|
||||
|
||||
import os
|
||||
from setuptools import setup
|
||||
from setuptools.command.test import test as TestCommand
|
||||
|
||||
import sys
|
||||
|
||||
from setuptools import setup, find_packages
|
||||
from setuptools.command.test import test as TestCommand
|
||||
|
||||
from gunicorn import __version__
|
||||
|
||||
|
||||
ASYNCIO_COMPAT = sys.version_info >= (3, 3)
|
||||
|
||||
|
||||
CLASSIFIERS = [
|
||||
'Development Status :: 4 - Beta',
|
||||
'Environment :: Other Environment',
|
||||
@ -71,17 +68,6 @@ class PyTest(TestCommand):
|
||||
|
||||
REQUIREMENTS = []
|
||||
|
||||
py_modules = []
|
||||
|
||||
for root, folders, files in os.walk('gunicorn'):
|
||||
for f in files:
|
||||
if f.endswith('.py') and (ASYNCIO_COMPAT or f != 'gaiohttp.py'):
|
||||
full = os.path.join(root, f[:-3])
|
||||
parts = full.split(os.path.sep)
|
||||
modname = '.'.join(parts)
|
||||
py_modules.append(modname)
|
||||
|
||||
|
||||
setup(
|
||||
name = 'gunicorn',
|
||||
version = __version__,
|
||||
@ -95,7 +81,7 @@ setup(
|
||||
|
||||
classifiers = CLASSIFIERS,
|
||||
zip_safe = False,
|
||||
py_modules = py_modules,
|
||||
packages = find_packages(exclude=['examples', 'tests']),
|
||||
include_package_data = True,
|
||||
|
||||
tests_require = tests_require,
|
||||
|
||||
@ -12,6 +12,7 @@ from aiohttp.wsgi import WSGIServerHttpProtocol
|
||||
|
||||
import asyncio
|
||||
from gunicorn.workers import gaiohttp
|
||||
from gunicorn.workers._gaiohttp import _wrp
|
||||
from gunicorn.config import Config
|
||||
from unittest import mock
|
||||
|
||||
@ -32,7 +33,7 @@ class WorkerTests(unittest.TestCase):
|
||||
def tearDown(self):
|
||||
self.loop.close()
|
||||
|
||||
@mock.patch('gunicorn.workers.gaiohttp.asyncio')
|
||||
@mock.patch('gunicorn.workers._gaiohttp.asyncio')
|
||||
def test_init_process(self, m_asyncio):
|
||||
try:
|
||||
self.worker.init_process()
|
||||
@ -45,7 +46,7 @@ class WorkerTests(unittest.TestCase):
|
||||
self.assertTrue(m_asyncio.new_event_loop.called)
|
||||
self.assertTrue(m_asyncio.set_event_loop.called)
|
||||
|
||||
@mock.patch('gunicorn.workers.gaiohttp.asyncio')
|
||||
@mock.patch('gunicorn.workers._gaiohttp.asyncio')
|
||||
def test_run(self, m_asyncio):
|
||||
self.worker.loop = mock.Mock()
|
||||
self.worker.run()
|
||||
@ -64,7 +65,7 @@ class WorkerTests(unittest.TestCase):
|
||||
self.worker.wsgi, ('localhost', 8080))
|
||||
self.assertIsInstance(f, WSGIServerHttpProtocol)
|
||||
|
||||
@mock.patch('gunicorn.workers.gaiohttp.asyncio')
|
||||
@mock.patch('gunicorn.workers._gaiohttp.asyncio')
|
||||
def test__run(self, m_asyncio):
|
||||
self.worker.ppid = 1
|
||||
self.worker.alive = True
|
||||
@ -84,7 +85,7 @@ class WorkerTests(unittest.TestCase):
|
||||
self.assertTrue(self.worker.log.info.called)
|
||||
self.assertTrue(self.worker.notify.called)
|
||||
|
||||
@mock.patch('gunicorn.workers.gaiohttp.asyncio')
|
||||
@mock.patch('gunicorn.workers._gaiohttp.asyncio')
|
||||
def test__run_unix_socket(self, m_asyncio):
|
||||
self.worker.ppid = 1
|
||||
self.worker.alive = True
|
||||
@ -128,8 +129,8 @@ class WorkerTests(unittest.TestCase):
|
||||
self.assertFalse(self.worker.servers)
|
||||
self.assertTrue(conn.closing.called)
|
||||
|
||||
@mock.patch('gunicorn.workers.gaiohttp.os')
|
||||
@mock.patch('gunicorn.workers.gaiohttp.asyncio.sleep')
|
||||
@mock.patch('gunicorn.workers._gaiohttp.os')
|
||||
@mock.patch('gunicorn.workers._gaiohttp.asyncio.sleep')
|
||||
def test__run_exc(self, m_sleep, m_os):
|
||||
m_os.getpid.return_value = 1
|
||||
m_os.getppid.return_value = 1
|
||||
@ -179,14 +180,14 @@ class WorkerTests(unittest.TestCase):
|
||||
conn = object()
|
||||
tracking = {}
|
||||
meth = mock.Mock()
|
||||
wrp = gaiohttp._wrp(conn, meth, tracking)
|
||||
wrp = _wrp(conn, meth, tracking)
|
||||
wrp()
|
||||
|
||||
self.assertIn(id(conn), tracking)
|
||||
self.assertTrue(meth.called)
|
||||
|
||||
meth = mock.Mock()
|
||||
wrp = gaiohttp._wrp(conn, meth, tracking, False)
|
||||
wrp = _wrp(conn, meth, tracking, False)
|
||||
wrp()
|
||||
|
||||
self.assertNotIn(1, tracking)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user