From 719e61bf18ca3347e58c03dbc04156c859659a1d Mon Sep 17 00:00:00 2001 From: Berker Peksag Date: Mon, 11 Aug 2014 07:17:16 +0300 Subject: [PATCH 1/2] Raise a RuntimeError if asyncio is not available. Fixes #830. --- gunicorn/workers/_gaiohttp.py | 128 +++++++++++++++++++++++++++++++ gunicorn/workers/gaiohttp.py | 137 +++------------------------------- setup.py | 22 +----- 3 files changed, 143 insertions(+), 144 deletions(-) create mode 100644 gunicorn/workers/_gaiohttp.py diff --git a/gunicorn/workers/_gaiohttp.py b/gunicorn/workers/_gaiohttp.py new file mode 100644 index 00000000..4457a3f0 --- /dev/null +++ b/gunicorn/workers/_gaiohttp.py @@ -0,0 +1,128 @@ +# -*- coding: utf-8 - +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +import asyncio +import functools +import os +import gunicorn.workers.base as base + +from aiohttp.wsgi import WSGIServerHttpProtocol + + +class AiohttpWorker(base.Worker): + + def __init__(self, *args, **kw): # pragma: no cover + super().__init__(*args, **kw) + + self.servers = [] + self.connections = {} + + def init_process(self): + # create new event_loop after fork + asyncio.get_event_loop().close() + + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + super().init_process() + + def run(self): + self._runner = asyncio.async(self._run(), loop=self.loop) + + try: + self.loop.run_until_complete(self._runner) + finally: + self.loop.close() + + def wrap_protocol(self, proto): + proto.connection_made = _wrp( + proto, proto.connection_made, self.connections) + proto.connection_lost = _wrp( + proto, proto.connection_lost, self.connections, False) + return proto + + def factory(self, wsgi, addr): + proto = WSGIServerHttpProtocol( + wsgi, readpayload=True, + loop=self.loop, + log=self.log, + debug=self.cfg.debug, + keep_alive=self.cfg.keepalive, + access_log=self.log.access_log, + access_log_format=self.cfg.access_log_format) + return self.wrap_protocol(proto) + + def get_factory(self, sock, addr): + return functools.partial(self.factory, self.wsgi, addr) + + @asyncio.coroutine + def close(self): + try: + if hasattr(self.wsgi, 'close'): + yield from self.wsgi.close() + except: + self.log.exception('Process shutdown exception') + + @asyncio.coroutine + def _run(self): + for sock in self.sockets: + factory = self.get_factory(sock.sock, sock.cfg_addr) + self.servers.append( + (yield from self.loop.create_server(factory, sock=sock.sock))) + + # If our parent changed then we shut down. + pid = os.getpid() + try: + while self.alive or self.connections: + self.notify() + + if (self.alive and + pid == os.getpid() and self.ppid != os.getppid()): + self.log.info("Parent changed, shutting down: %s", self) + self.alive = False + + # stop accepting requests + if not self.alive: + if self.servers: + self.log.info( + "Stopping server: %s, connections: %s", + pid, len(self.connections)) + for server in self.servers: + server.close() + self.servers.clear() + + # prepare connections for closing + for conn in self.connections.values(): + if hasattr(conn, 'closing'): + conn.closing() + + yield from asyncio.sleep(1.0, loop=self.loop) + except KeyboardInterrupt: + pass + + if self.servers: + for server in self.servers: + server.close() + + yield from self.close() + + +class _wrp: + + def __init__(self, proto, meth, tracking, add=True): + self._proto = proto + self._id = id(proto) + self._meth = meth + self._tracking = tracking + self._add = add + + def __call__(self, *args): + if self._add: + self._tracking[self._id] = self._proto + elif self._id in self._tracking: + del self._tracking[self._id] + + conn = self._meth(*args) + return conn diff --git a/gunicorn/workers/gaiohttp.py b/gunicorn/workers/gaiohttp.py index 199a8025..8260bb50 100644 --- a/gunicorn/workers/gaiohttp.py +++ b/gunicorn/workers/gaiohttp.py @@ -2,131 +2,16 @@ # # This file is part of gunicorn released under the MIT license. # See the NOTICE for more information. -__all__ = ['AiohttpWorker'] -import asyncio -import functools -import os -import gunicorn.workers.base as base +import sys -try: - from aiohttp.wsgi import WSGIServerHttpProtocol -except ImportError: - raise RuntimeError("You need aiohttp installed to use this worker.") - - -class AiohttpWorker(base.Worker): - - def __init__(self, *args, **kw): # pragma: no cover - super().__init__(*args, **kw) - - self.servers = [] - self.connections = {} - - def init_process(self): - # create new event_loop after fork - asyncio.get_event_loop().close() - - self.loop = asyncio.new_event_loop() - asyncio.set_event_loop(self.loop) - - super().init_process() - - def run(self): - self._runner = asyncio.async(self._run(), loop=self.loop) - - try: - self.loop.run_until_complete(self._runner) - finally: - self.loop.close() - - def wrap_protocol(self, proto): - proto.connection_made = _wrp( - proto, proto.connection_made, self.connections) - proto.connection_lost = _wrp( - proto, proto.connection_lost, self.connections, False) - return proto - - def factory(self, wsgi, addr): - proto = WSGIServerHttpProtocol( - wsgi, readpayload=True, - loop=self.loop, - log=self.log, - debug=self.cfg.debug, - keep_alive=self.cfg.keepalive, - access_log=self.log.access_log, - access_log_format=self.cfg.access_log_format) - return self.wrap_protocol(proto) - - def get_factory(self, sock, addr): - return functools.partial(self.factory, self.wsgi, addr) - - @asyncio.coroutine - def close(self): - try: - if hasattr(self.wsgi, 'close'): - yield from self.wsgi.close() - except: - self.log.exception('Process shutdown exception') - - @asyncio.coroutine - def _run(self): - for sock in self.sockets: - factory = self.get_factory(sock.sock, sock.cfg_addr) - self.servers.append( - (yield from self.loop.create_server(factory, sock=sock.sock))) - - # If our parent changed then we shut down. - pid = os.getpid() - try: - while self.alive or self.connections: - self.notify() - - if (self.alive and - pid == os.getpid() and self.ppid != os.getppid()): - self.log.info("Parent changed, shutting down: %s", self) - self.alive = False - - # stop accepting requests - if not self.alive: - if self.servers: - self.log.info( - "Stopping server: %s, connections: %s", - pid, len(self.connections)) - for server in self.servers: - server.close() - self.servers.clear() - - # prepare connections for closing - for conn in self.connections.values(): - if hasattr(conn, 'closing'): - conn.closing() - - yield from asyncio.sleep(1.0, loop=self.loop) - except KeyboardInterrupt: - pass - - if self.servers: - for server in self.servers: - server.close() - - yield from self.close() - - -class _wrp: - - def __init__(self, proto, meth, tracking, add=True): - self._proto = proto - self._id = id(proto) - self._meth = meth - self._tracking = tracking - self._add = add - - def __call__(self, *args): - if self._add: - self._tracking[self._id] = self._proto - elif self._id in self._tracking: - del self._tracking[self._id] - - conn = self._meth(*args) - return conn +if sys.version_info >= (3, 3): + try: + import aiohttp + except ImportError: + raise RuntimeError("You need aiohttp installed to use this worker.") + else: + from gunicorn.workers._gaiohttp import AiohttpWorker + __all__ = ['AiohttpWorker'] +else: + raise RuntimeError("You need Python >= 3.3 to use the asyncio worker") diff --git a/setup.py b/setup.py index 212b157d..dfe138b8 100644 --- a/setup.py +++ b/setup.py @@ -5,17 +5,14 @@ import os -from setuptools import setup -from setuptools.command.test import test as TestCommand - import sys +from setuptools import setup, find_packages +from setuptools.command.test import test as TestCommand + from gunicorn import __version__ -ASYNCIO_COMPAT = sys.version_info >= (3, 3) - - CLASSIFIERS = [ 'Development Status :: 4 - Beta', 'Environment :: Other Environment', @@ -71,17 +68,6 @@ class PyTest(TestCommand): REQUIREMENTS = [] -py_modules = [] - -for root, folders, files in os.walk('gunicorn'): - for f in files: - if f.endswith('.py') and (ASYNCIO_COMPAT or f != 'gaiohttp.py'): - full = os.path.join(root, f[:-3]) - parts = full.split(os.path.sep) - modname = '.'.join(parts) - py_modules.append(modname) - - setup( name = 'gunicorn', version = __version__, @@ -95,7 +81,7 @@ setup( classifiers = CLASSIFIERS, zip_safe = False, - py_modules = py_modules, + packages = find_packages(exclude=['examples', 'tests']), include_package_data = True, tests_require = tests_require, From f80ddf368c5e92360b3d6b162869c34ad9d454d4 Mon Sep 17 00:00:00 2001 From: benoitc Date: Sat, 16 Aug 2014 12:26:04 +0200 Subject: [PATCH 2/2] fix tests --- tests/test_009-gaiohttp.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/tests/test_009-gaiohttp.py b/tests/test_009-gaiohttp.py index 6947fd21..caeacf03 100644 --- a/tests/test_009-gaiohttp.py +++ b/tests/test_009-gaiohttp.py @@ -12,6 +12,7 @@ from aiohttp.wsgi import WSGIServerHttpProtocol import asyncio from gunicorn.workers import gaiohttp +from gunicorn.workers._gaiohttp import _wrp from gunicorn.config import Config from unittest import mock @@ -32,7 +33,7 @@ class WorkerTests(unittest.TestCase): def tearDown(self): self.loop.close() - @mock.patch('gunicorn.workers.gaiohttp.asyncio') + @mock.patch('gunicorn.workers._gaiohttp.asyncio') def test_init_process(self, m_asyncio): try: self.worker.init_process() @@ -45,7 +46,7 @@ class WorkerTests(unittest.TestCase): self.assertTrue(m_asyncio.new_event_loop.called) self.assertTrue(m_asyncio.set_event_loop.called) - @mock.patch('gunicorn.workers.gaiohttp.asyncio') + @mock.patch('gunicorn.workers._gaiohttp.asyncio') def test_run(self, m_asyncio): self.worker.loop = mock.Mock() self.worker.run() @@ -64,7 +65,7 @@ class WorkerTests(unittest.TestCase): self.worker.wsgi, ('localhost', 8080)) self.assertIsInstance(f, WSGIServerHttpProtocol) - @mock.patch('gunicorn.workers.gaiohttp.asyncio') + @mock.patch('gunicorn.workers._gaiohttp.asyncio') def test__run(self, m_asyncio): self.worker.ppid = 1 self.worker.alive = True @@ -84,7 +85,7 @@ class WorkerTests(unittest.TestCase): self.assertTrue(self.worker.log.info.called) self.assertTrue(self.worker.notify.called) - @mock.patch('gunicorn.workers.gaiohttp.asyncio') + @mock.patch('gunicorn.workers._gaiohttp.asyncio') def test__run_unix_socket(self, m_asyncio): self.worker.ppid = 1 self.worker.alive = True @@ -128,8 +129,8 @@ class WorkerTests(unittest.TestCase): self.assertFalse(self.worker.servers) self.assertTrue(conn.closing.called) - @mock.patch('gunicorn.workers.gaiohttp.os') - @mock.patch('gunicorn.workers.gaiohttp.asyncio.sleep') + @mock.patch('gunicorn.workers._gaiohttp.os') + @mock.patch('gunicorn.workers._gaiohttp.asyncio.sleep') def test__run_exc(self, m_sleep, m_os): m_os.getpid.return_value = 1 m_os.getppid.return_value = 1 @@ -179,14 +180,14 @@ class WorkerTests(unittest.TestCase): conn = object() tracking = {} meth = mock.Mock() - wrp = gaiohttp._wrp(conn, meth, tracking) + wrp = _wrp(conn, meth, tracking) wrp() self.assertIn(id(conn), tracking) self.assertTrue(meth.called) meth = mock.Mock() - wrp = gaiohttp._wrp(conn, meth, tracking, False) + wrp = _wrp(conn, meth, tracking, False) wrp() self.assertNotIn(1, tracking)