mirror of
https://github.com/frappe/gunicorn.git
synced 2026-07-03 11:11:30 +08:00
fix(http2): ALPN negotiation for gevent/eventlet workers
- Add explicit do_handshake() in base_async.py before ALPN check when do_handshake_on_connect is False - Mark eventlet worker as deprecated (removal in 26.0) - Add HTTP/2 gevent example with Docker and tests - Update documentation to reflect eventlet deprecation - Remove eventlet websocket example (gevent version exists) The ALPN fix ensures HTTP/2 works correctly with gevent and eventlet workers when do_handshake_on_connect config is False (the default). Without explicit handshake, selected_alpn_protocol() returns None.
This commit is contained in:
parent
200977bc0b
commit
315e7bde80
@ -95,7 +95,12 @@ Choose a worker type based on your application's needs.
|
||||
gunicorn myapp:app -k gevent --worker-connections 1000
|
||||
```
|
||||
|
||||
=== "Eventlet"
|
||||
=== "Eventlet (Deprecated)"
|
||||
|
||||
!!! warning "Deprecated"
|
||||
The eventlet worker is **deprecated** and will be removed in Gunicorn 26.0.
|
||||
Eventlet itself is [no longer actively maintained](https://eventlet.readthedocs.io/en/latest/asyncio/migration.html).
|
||||
Please migrate to `gevent`, `gthread`, or another supported worker type.
|
||||
|
||||
**Greenlet-based** async worker using [Eventlet](http://eventlet.net/).
|
||||
|
||||
@ -127,14 +132,14 @@ Choose a worker type based on your application's needs.
|
||||
| `gthread` | Thread pool | ✅ | Mixed workloads, moderate concurrency |
|
||||
| ASGI workers | AsyncIO | ✅ | Modern async frameworks (FastAPI, etc.) |
|
||||
| `gevent` | Greenlets | ✅ | I/O-bound, WebSockets, streaming |
|
||||
| `eventlet` | Greenlets | ✅ | I/O-bound, long-polling |
|
||||
| `eventlet` | Greenlets | ✅ | **Deprecated** - use `gevent` instead |
|
||||
| `tornado` | Tornado IOLoop | ✅ | Native Tornado applications |
|
||||
|
||||
!!! tip "Quick Decision Guide"
|
||||
|
||||
- **Simple app behind nginx?** → `sync` (default)
|
||||
- **Need keep-alive or moderate concurrency?** → `gthread`
|
||||
- **WebSockets, streaming, long-polling?** → `gevent` or `eventlet`
|
||||
- **WebSockets, streaming, long-polling?** → `gevent` or ASGI worker
|
||||
- **FastAPI, Starlette, or async framework?** → ASGI worker
|
||||
|
||||
## When to Use Async Workers
|
||||
@ -200,9 +205,6 @@ gunicorn myapp:app -k gthread --workers 4 --threads 4
|
||||
# Gevent - high concurrency for I/O-bound apps
|
||||
gunicorn myapp:app -k gevent --workers 4 --worker-connections 1000
|
||||
|
||||
# Eventlet - alternative async worker
|
||||
gunicorn myapp:app -k eventlet --workers 4 --worker-connections 1000
|
||||
|
||||
# ASGI - FastAPI/Starlette with Uvicorn worker
|
||||
gunicorn myapp:app -k uvicorn.workers.UvicornWorker --workers 4
|
||||
```
|
||||
|
||||
@ -31,7 +31,7 @@ HTTP/2 support requires:
|
||||
- **SSL/TLS**: HTTP/2 uses ALPN (Application-Layer Protocol Negotiation) which
|
||||
requires an encrypted connection
|
||||
- **h2 library**: Install with `pip install gunicorn[http2]` or `pip install h2`
|
||||
- **Compatible worker**: gthread, gevent, eventlet, or ASGI workers
|
||||
- **Compatible worker**: gthread, gevent, or ASGI workers
|
||||
|
||||
## Configuration
|
||||
|
||||
@ -109,7 +109,7 @@ Not all workers support HTTP/2:
|
||||
| `sync` | No | Single-threaded, cannot multiplex streams |
|
||||
| `gthread` | Yes | Recommended for HTTP/2 |
|
||||
| `gevent` | Yes | Requires gevent |
|
||||
| `eventlet` | Yes | Requires eventlet |
|
||||
| `eventlet` | Yes | **Deprecated** - will be removed in 26.0 |
|
||||
| `asgi` | Yes | For async frameworks |
|
||||
| `tornado` | No | Tornado handles its own protocol |
|
||||
|
||||
|
||||
@ -91,10 +91,10 @@ pip install gunicorn[gevent,setproctitle]
|
||||
|
||||
| Extra | Description |
|
||||
|-------|-------------|
|
||||
| `gunicorn[eventlet]` | Eventlet-based greenlet workers |
|
||||
| `gunicorn[gevent]` | Gevent-based greenlet workers |
|
||||
| `gunicorn[gthread]` | Threaded workers |
|
||||
| `gunicorn[tornado]` | Tornado-based workers (not recommended) |
|
||||
| `gunicorn[eventlet]` | **Deprecated** - will be removed in 26.0 |
|
||||
|
||||
See the [design docs](design.md) for guidance on choosing worker types.
|
||||
|
||||
@ -124,17 +124,6 @@ library:
|
||||
gunicorn app:app --worker-class gevent
|
||||
```
|
||||
|
||||
=== "Eventlet"
|
||||
|
||||
```bash
|
||||
pip install gunicorn[eventlet]
|
||||
```
|
||||
|
||||
Run with:
|
||||
```bash
|
||||
gunicorn app:app --worker-class eventlet
|
||||
```
|
||||
|
||||
=== "ASGI (asyncio)"
|
||||
|
||||
No extra installation required:
|
||||
|
||||
@ -1747,8 +1747,7 @@ libraries may be installed using setuptools' ``extras_require`` feature.
|
||||
A string referring to one of the following bundled classes:
|
||||
|
||||
* ``sync``
|
||||
* ``eventlet`` - Requires eventlet >= 0.40.3 (or install it via
|
||||
``pip install gunicorn[eventlet]``)
|
||||
* ``eventlet`` - **DEPRECATED: will be removed in 26.0**. Requires eventlet >= 0.40.3
|
||||
* ``gevent`` - Requires gevent >= 24.10.1 (or install it via
|
||||
``pip install gunicorn[gevent]``)
|
||||
* ``tornado`` - Requires tornado >= 6.5.0 (or install it via
|
||||
|
||||
@ -72,7 +72,7 @@ configuration files or environment variables for anything beyond quick tests.
|
||||
- `-w WORKERS`, `--workers WORKERS` — number of worker processes, typically
|
||||
two to four per CPU core. See the [FAQ](faq.md) for tuning tips.
|
||||
- `-k WORKERCLASS`, `--worker-class WORKERCLASS` — worker type (`sync`,
|
||||
`eventlet`, `gevent`, `tornado`, `gthread`). Read the
|
||||
`gevent`, `tornado`, `gthread`). Read the
|
||||
[settings entry](reference/settings.md#worker_class) before switching classes.
|
||||
- `-n APP_NAME`, `--name APP_NAME` — set the process name (requires
|
||||
[`setproctitle`](https://pypi.python.org/pypi/setproctitle)).
|
||||
|
||||
@ -43,7 +43,7 @@ backlog = 2048
|
||||
# can be seen at
|
||||
# https://gunicorn.org/reference/settings/#worker_class
|
||||
#
|
||||
# worker_connections - For the eventlet and gevent worker classes
|
||||
# worker_connections - For the gevent and gthread worker classes
|
||||
# this limits the maximum number of simultaneous clients that
|
||||
# a single process can handle.
|
||||
#
|
||||
|
||||
2
examples/http2_gevent/.gitignore
vendored
Normal file
2
examples/http2_gevent/.gitignore
vendored
Normal file
@ -0,0 +1,2 @@
|
||||
# Generated certificates - run ./generate_certs.sh to create
|
||||
certs/
|
||||
38
examples/http2_gevent/Dockerfile
Normal file
38
examples/http2_gevent/Dockerfile
Normal file
@ -0,0 +1,38 @@
|
||||
# HTTP/2 with Gevent Example
|
||||
#
|
||||
# Build: docker build -t gunicorn-http2-gevent .
|
||||
# Run: docker run -p 8443:8443 -v $(pwd)/certs:/certs:ro gunicorn-http2-gevent
|
||||
|
||||
FROM python:3.12-slim
|
||||
|
||||
# Install build dependencies for gevent and h2
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
gcc \
|
||||
libc-dev \
|
||||
libffi-dev \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Copy gunicorn source and install with gevent and http2 support
|
||||
# For production, use: pip install gunicorn[gevent,http2]
|
||||
COPY --chown=root:root . /gunicorn-src/
|
||||
RUN pip install --no-cache-dir /gunicorn-src/[gevent,http2]
|
||||
|
||||
# Copy application files
|
||||
COPY examples/http2_gevent/app.py /app/
|
||||
COPY examples/http2_gevent/gunicorn_conf.py /app/
|
||||
|
||||
# Create non-root user for security
|
||||
RUN useradd -m -u 1000 gunicorn && \
|
||||
chown -R gunicorn:gunicorn /app
|
||||
USER gunicorn
|
||||
|
||||
EXPOSE 8443
|
||||
|
||||
# Health check
|
||||
HEALTHCHECK --interval=10s --timeout=5s --start-period=5s --retries=3 \
|
||||
CMD python -c "import ssl,socket; s=socket.socket(); s.settimeout(2); ctx=ssl.create_default_context(); ctx.check_hostname=False; ctx.verify_mode=ssl.CERT_NONE; ss=ctx.wrap_socket(s,server_hostname='localhost'); ss.connect(('localhost',8443)); ss.close()" || exit 1
|
||||
|
||||
# Run gunicorn with the config file
|
||||
CMD ["gunicorn", "--config", "gunicorn_conf.py", "app:app"]
|
||||
163
examples/http2_gevent/README.md
Normal file
163
examples/http2_gevent/README.md
Normal file
@ -0,0 +1,163 @@
|
||||
# HTTP/2 with Gevent Worker Example
|
||||
|
||||
This example demonstrates how to run Gunicorn with HTTP/2 support using the gevent async worker.
|
||||
|
||||
## Features
|
||||
|
||||
- HTTP/2 protocol with ALPN negotiation
|
||||
- Gevent-based async worker for high concurrency
|
||||
- Connection multiplexing (multiple streams per connection)
|
||||
- Flow control for large transfers
|
||||
- SSL/TLS encryption (required for HTTP/2)
|
||||
|
||||
## Quick Start
|
||||
|
||||
### 1. Generate SSL Certificates
|
||||
|
||||
HTTP/2 requires TLS. Generate self-signed certificates for testing:
|
||||
|
||||
```bash
|
||||
chmod +x generate_certs.sh
|
||||
./generate_certs.sh
|
||||
```
|
||||
|
||||
### 2. Start with Docker Compose
|
||||
|
||||
```bash
|
||||
docker compose up -d
|
||||
```
|
||||
|
||||
### 3. Test the Server
|
||||
|
||||
Using curl with HTTP/2:
|
||||
|
||||
```bash
|
||||
# Basic request
|
||||
curl -k --http2 https://localhost:8443/
|
||||
|
||||
# Check HTTP version
|
||||
curl -k --http2 -w "HTTP Version: %{http_version}\n" https://localhost:8443/
|
||||
|
||||
# Test echo endpoint
|
||||
curl -k --http2 -X POST -d "Hello HTTP/2" https://localhost:8443/echo
|
||||
|
||||
# Get server info
|
||||
curl -k --http2 https://localhost:8443/info | jq
|
||||
```
|
||||
|
||||
### 4. Run Tests
|
||||
|
||||
```bash
|
||||
# Install test dependencies
|
||||
pip install httpx[http2] pytest pytest-asyncio
|
||||
|
||||
# Run tests
|
||||
python test_http2_gevent.py
|
||||
|
||||
# Or with pytest for more detail
|
||||
pytest test_http2_gevent.py -v
|
||||
```
|
||||
|
||||
## Running Locally (Without Docker)
|
||||
|
||||
### Prerequisites
|
||||
|
||||
```bash
|
||||
pip install gunicorn[gevent,http2]
|
||||
```
|
||||
|
||||
### Generate Certificates
|
||||
|
||||
```bash
|
||||
./generate_certs.sh
|
||||
```
|
||||
|
||||
### Start Server
|
||||
|
||||
```bash
|
||||
gunicorn --config gunicorn_conf.py app:app
|
||||
```
|
||||
|
||||
Or with command-line options:
|
||||
|
||||
```bash
|
||||
gunicorn app:app \
|
||||
--bind 0.0.0.0:8443 \
|
||||
--worker-class gevent \
|
||||
--workers 4 \
|
||||
--worker-connections 1000 \
|
||||
--http-protocols h2,h1 \
|
||||
--certfile certs/server.crt \
|
||||
--keyfile certs/server.key
|
||||
```
|
||||
|
||||
## Configuration Options
|
||||
|
||||
### HTTP/2 Settings
|
||||
|
||||
| Setting | Default | Description |
|
||||
|---------|---------|-------------|
|
||||
| `http_protocols` | `['h1']` | Enable protocols: `['h2', 'h1']` for HTTP/2 |
|
||||
| `http2_max_concurrent_streams` | 100 | Max streams per connection |
|
||||
| `http2_initial_window_size` | 65535 | Flow control window size (bytes) |
|
||||
| `http2_max_frame_size` | 16384 | Max frame size (bytes) |
|
||||
| `http2_max_header_list_size` | 65536 | Max header list size (bytes) |
|
||||
|
||||
### Gevent Worker Settings
|
||||
|
||||
| Setting | Default | Description |
|
||||
|---------|---------|-------------|
|
||||
| `worker_class` | `sync` | Set to `gevent` for async |
|
||||
| `workers` | 1 | Number of worker processes |
|
||||
| `worker_connections` | 1000 | Max clients per worker |
|
||||
|
||||
## Endpoints
|
||||
|
||||
| Path | Method | Description |
|
||||
|------|--------|-------------|
|
||||
| `/` | GET | Hello message |
|
||||
| `/health` | GET | Health check |
|
||||
| `/echo` | POST | Echo request body |
|
||||
| `/info` | GET | Server/request info as JSON |
|
||||
| `/large` | GET | 1MB response (test streaming) |
|
||||
| `/stream` | GET | Server-sent events stream |
|
||||
| `/delay?seconds=N` | GET | Delayed response |
|
||||
| `/priority` | GET | HTTP/2 priority info |
|
||||
|
||||
## Performance Tips
|
||||
|
||||
1. **Worker Count**: Use `2 * CPU cores + 1` workers for I/O-bound apps
|
||||
2. **Connections**: Increase `worker_connections` for high concurrency
|
||||
3. **Window Size**: Larger `http2_initial_window_size` improves throughput for large transfers
|
||||
4. **Streams**: Increase `http2_max_concurrent_streams` for many parallel requests
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### Certificate Issues
|
||||
|
||||
```bash
|
||||
# Regenerate certificates
|
||||
rm -rf certs/
|
||||
./generate_certs.sh
|
||||
```
|
||||
|
||||
### Connection Refused
|
||||
|
||||
```bash
|
||||
# Check if server is running
|
||||
docker compose ps
|
||||
|
||||
# View logs
|
||||
docker compose logs -f
|
||||
```
|
||||
|
||||
### HTTP/2 Not Negotiated
|
||||
|
||||
Ensure:
|
||||
- SSL/TLS is configured (certfile and keyfile)
|
||||
- `http_protocols` includes `'h2'`
|
||||
- Client supports HTTP/2 over TLS (curl with `--http2`, not `--http2-prior-knowledge`)
|
||||
|
||||
## License
|
||||
|
||||
MIT License - See the main Gunicorn repository for details.
|
||||
130
examples/http2_gevent/app.py
Normal file
130
examples/http2_gevent/app.py
Normal file
@ -0,0 +1,130 @@
|
||||
"""
|
||||
Example WSGI application demonstrating HTTP/2 with gevent worker.
|
||||
|
||||
This application showcases various HTTP/2 features including:
|
||||
- Basic request/response handling
|
||||
- Large file transfers (streaming)
|
||||
- Concurrent requests (multiplexing)
|
||||
- Server push simulation
|
||||
"""
|
||||
|
||||
import json
|
||||
import time
|
||||
|
||||
|
||||
def app(environ, start_response):
|
||||
"""WSGI application for HTTP/2 demonstration."""
|
||||
path = environ.get('PATH_INFO', '/')
|
||||
method = environ.get('REQUEST_METHOD', 'GET')
|
||||
|
||||
# Root endpoint
|
||||
if path == '/':
|
||||
body = b'Hello from HTTP/2 with Gevent!'
|
||||
status = '200 OK'
|
||||
content_type = 'text/plain; charset=utf-8'
|
||||
|
||||
# Health check
|
||||
elif path == '/health':
|
||||
body = b'OK'
|
||||
status = '200 OK'
|
||||
content_type = 'text/plain'
|
||||
|
||||
# Echo endpoint - returns the request body
|
||||
elif path == '/echo':
|
||||
content_length = int(environ.get('CONTENT_LENGTH', 0) or 0)
|
||||
body = environ['wsgi.input'].read(content_length)
|
||||
status = '200 OK'
|
||||
content_type = 'application/octet-stream'
|
||||
|
||||
# JSON endpoint - returns request info as JSON
|
||||
elif path == '/info':
|
||||
info = {
|
||||
'method': method,
|
||||
'path': path,
|
||||
'protocol': environ.get('SERVER_PROTOCOL', 'unknown'),
|
||||
'http_version': environ.get('HTTP_VERSION', '1.1'),
|
||||
'server': 'gunicorn with gevent + HTTP/2',
|
||||
'headers': {
|
||||
k: v for k, v in environ.items()
|
||||
if k.startswith('HTTP_')
|
||||
}
|
||||
}
|
||||
body = json.dumps(info, indent=2).encode('utf-8')
|
||||
status = '200 OK'
|
||||
content_type = 'application/json'
|
||||
|
||||
# Large response for testing streaming/flow control
|
||||
elif path == '/large':
|
||||
# Return 1MB of data
|
||||
size = 1024 * 1024
|
||||
body = b'X' * size
|
||||
status = '200 OK'
|
||||
content_type = 'application/octet-stream'
|
||||
|
||||
# Streaming response using generator
|
||||
elif path == '/stream':
|
||||
def generate():
|
||||
for i in range(10):
|
||||
yield f'data: chunk {i}\n\n'.encode('utf-8')
|
||||
# Small delay to simulate streaming
|
||||
time.sleep(0.1)
|
||||
|
||||
start_response('200 OK', [
|
||||
('Content-Type', 'text/event-stream'),
|
||||
('Cache-Control', 'no-cache'),
|
||||
])
|
||||
return generate()
|
||||
|
||||
# Concurrent test endpoint with configurable delay
|
||||
elif path.startswith('/delay'):
|
||||
query = environ.get('QUERY_STRING', '')
|
||||
try:
|
||||
delay = float(query.split('=')[1]) if '=' in query else 0.5
|
||||
delay = min(delay, 5.0) # Cap at 5 seconds
|
||||
except (ValueError, IndexError):
|
||||
delay = 0.5
|
||||
|
||||
# Use gevent sleep for cooperative yielding
|
||||
try:
|
||||
import gevent
|
||||
gevent.sleep(delay)
|
||||
except ImportError:
|
||||
time.sleep(delay)
|
||||
|
||||
body = f'Delayed response after {delay}s'.encode('utf-8')
|
||||
status = '200 OK'
|
||||
content_type = 'text/plain'
|
||||
|
||||
# HTTP/2 priority information (if available)
|
||||
elif path == '/priority':
|
||||
priority_info = {
|
||||
'weight': environ.get('HTTP2_PRIORITY_WEIGHT', 'N/A'),
|
||||
'depends_on': environ.get('HTTP2_PRIORITY_DEPENDS_ON', 'N/A'),
|
||||
'exclusive': environ.get('HTTP2_PRIORITY_EXCLUSIVE', 'N/A'),
|
||||
}
|
||||
body = json.dumps(priority_info, indent=2).encode('utf-8')
|
||||
status = '200 OK'
|
||||
content_type = 'application/json'
|
||||
|
||||
# 404 for unknown paths
|
||||
else:
|
||||
body = b'Not Found'
|
||||
status = '404 Not Found'
|
||||
content_type = 'text/plain'
|
||||
|
||||
response_headers = [
|
||||
('Content-Type', content_type),
|
||||
('Content-Length', str(len(body))),
|
||||
('X-Worker-Type', 'gevent'),
|
||||
]
|
||||
|
||||
start_response(status, response_headers)
|
||||
return [body]
|
||||
|
||||
|
||||
# Allow running directly for testing
|
||||
if __name__ == '__main__':
|
||||
from wsgiref.simple_server import make_server
|
||||
server = make_server('localhost', 8000, app)
|
||||
print('Test server running on http://localhost:8000')
|
||||
server.serve_forever()
|
||||
46
examples/http2_gevent/docker-compose.yml
Normal file
46
examples/http2_gevent/docker-compose.yml
Normal file
@ -0,0 +1,46 @@
|
||||
# HTTP/2 with Gevent Docker Compose
|
||||
#
|
||||
# Usage:
|
||||
# # Generate certificates first (or use your own)
|
||||
# ./generate_certs.sh
|
||||
#
|
||||
# # Start services
|
||||
# docker compose up -d
|
||||
#
|
||||
# # Test with curl (requires curl with HTTP/2 support)
|
||||
# curl -k --http2 https://localhost:8443/
|
||||
#
|
||||
# # View logs
|
||||
# docker compose logs -f
|
||||
#
|
||||
# # Stop services
|
||||
# docker compose down
|
||||
|
||||
services:
|
||||
gunicorn:
|
||||
build:
|
||||
context: ../..
|
||||
dockerfile: examples/http2_gevent/Dockerfile
|
||||
ports:
|
||||
- "8443:8443"
|
||||
volumes:
|
||||
- ./certs:/certs:ro
|
||||
environment:
|
||||
- GUNICORN_WORKERS=4
|
||||
- GUNICORN_LOG_LEVEL=info
|
||||
healthcheck:
|
||||
test: ["CMD", "python", "-c", "import ssl,socket; s=socket.socket(); s.settimeout(2); ctx=ssl.create_default_context(); ctx.check_hostname=False; ctx.verify_mode=ssl.CERT_NONE; ss=ctx.wrap_socket(s,server_hostname='localhost'); ss.connect(('localhost',8443)); ss.close()"]
|
||||
interval: 5s
|
||||
timeout: 5s
|
||||
retries: 10
|
||||
start_period: 10s
|
||||
restart: unless-stopped
|
||||
deploy:
|
||||
resources:
|
||||
limits:
|
||||
cpus: '2'
|
||||
memory: 512M
|
||||
|
||||
networks:
|
||||
default:
|
||||
driver: bridge
|
||||
46
examples/http2_gevent/generate_certs.sh
Executable file
46
examples/http2_gevent/generate_certs.sh
Executable file
@ -0,0 +1,46 @@
|
||||
#!/bin/bash
|
||||
#
|
||||
# Generate self-signed certificates for HTTP/2 testing.
|
||||
#
|
||||
# Usage: ./generate_certs.sh
|
||||
#
|
||||
|
||||
set -e
|
||||
|
||||
CERTS_DIR="./certs"
|
||||
CERT_FILE="$CERTS_DIR/server.crt"
|
||||
KEY_FILE="$CERTS_DIR/server.key"
|
||||
|
||||
# Create certs directory if it doesn't exist
|
||||
mkdir -p "$CERTS_DIR"
|
||||
|
||||
# Check if certificates already exist
|
||||
if [ -f "$CERT_FILE" ] && [ -f "$KEY_FILE" ]; then
|
||||
echo "Certificates already exist in $CERTS_DIR"
|
||||
echo "Delete them first if you want to regenerate."
|
||||
exit 0
|
||||
fi
|
||||
|
||||
echo "Generating self-signed certificate..."
|
||||
|
||||
openssl req -x509 -newkey rsa:2048 \
|
||||
-keyout "$KEY_FILE" \
|
||||
-out "$CERT_FILE" \
|
||||
-days 365 \
|
||||
-nodes \
|
||||
-subj "/CN=localhost/O=Gunicorn HTTP2 Example/C=US" \
|
||||
-addext "subjectAltName=DNS:localhost,DNS:gunicorn,IP:127.0.0.1"
|
||||
|
||||
# Set appropriate permissions
|
||||
chmod 644 "$CERT_FILE"
|
||||
chmod 600 "$KEY_FILE"
|
||||
|
||||
echo "Certificates generated successfully:"
|
||||
echo " Certificate: $CERT_FILE"
|
||||
echo " Private Key: $KEY_FILE"
|
||||
echo ""
|
||||
echo "You can now start the server with:"
|
||||
echo " docker compose up -d"
|
||||
echo ""
|
||||
echo "Or run locally with:"
|
||||
echo " gunicorn --config gunicorn_conf.py app:app"
|
||||
80
examples/http2_gevent/gunicorn_conf.py
Normal file
80
examples/http2_gevent/gunicorn_conf.py
Normal file
@ -0,0 +1,80 @@
|
||||
"""
|
||||
Gunicorn configuration for HTTP/2 with gevent worker.
|
||||
|
||||
This configuration demonstrates:
|
||||
- HTTP/2 protocol support with ALPN
|
||||
- Gevent async worker for high concurrency
|
||||
- SSL/TLS configuration
|
||||
- HTTP/2 specific tuning options
|
||||
"""
|
||||
|
||||
import os
|
||||
import multiprocessing
|
||||
|
||||
# Server socket
|
||||
bind = os.environ.get('GUNICORN_BIND', '0.0.0.0:8443')
|
||||
|
||||
# Worker configuration
|
||||
worker_class = 'gevent'
|
||||
workers = int(os.environ.get('GUNICORN_WORKERS', multiprocessing.cpu_count() * 2 + 1))
|
||||
worker_connections = 1000 # Max simultaneous clients per worker
|
||||
|
||||
# HTTP protocols - enable HTTP/2 with HTTP/1.1 fallback
|
||||
http_protocols = "h2,h1"
|
||||
|
||||
# SSL/TLS configuration (required for HTTP/2)
|
||||
# Default paths work in Docker; override with env vars for local testing
|
||||
_default_cert = '/certs/server.crt' if os.path.exists('/certs/server.crt') else 'certs/server.crt'
|
||||
_default_key = '/certs/server.key' if os.path.exists('/certs/server.key') else 'certs/server.key'
|
||||
certfile = os.environ.get('GUNICORN_CERTFILE', _default_cert)
|
||||
keyfile = os.environ.get('GUNICORN_KEYFILE', _default_key)
|
||||
|
||||
# HTTP/2 specific settings
|
||||
http2_max_concurrent_streams = 128 # Max streams per connection
|
||||
http2_initial_window_size = 262144 # 256KB initial flow control window
|
||||
http2_max_frame_size = 16384 # Default frame size (16KB)
|
||||
http2_max_header_list_size = 65536 # Max header size
|
||||
|
||||
# Timeouts
|
||||
timeout = 30 # Worker timeout
|
||||
graceful_timeout = 30 # Graceful shutdown timeout
|
||||
keepalive = 5 # Keep-alive connections
|
||||
|
||||
# Logging
|
||||
loglevel = os.environ.get('GUNICORN_LOG_LEVEL', 'info')
|
||||
accesslog = '-' # Log to stdout
|
||||
errorlog = '-' # Log to stderr
|
||||
access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %(L)s'
|
||||
|
||||
# Process naming
|
||||
proc_name = 'gunicorn-http2-gevent'
|
||||
|
||||
# Server mechanics
|
||||
daemon = False
|
||||
pidfile = None
|
||||
umask = 0
|
||||
user = None
|
||||
group = None
|
||||
tmp_upload_dir = None
|
||||
|
||||
|
||||
def on_starting(server):
|
||||
"""Called just before the master process is initialized."""
|
||||
server.log.info("Starting HTTP/2 server with gevent worker...")
|
||||
server.log.info(f"Workers: {workers}, Connections per worker: {worker_connections}")
|
||||
server.log.info(f"HTTP/2 max streams: {http2_max_concurrent_streams}")
|
||||
|
||||
|
||||
def when_ready(server):
|
||||
"""Called just after the server is started."""
|
||||
server.log.info("HTTP/2 server is ready to accept connections")
|
||||
|
||||
|
||||
def worker_int(worker):
|
||||
"""Called when a worker receives SIGINT or SIGQUIT."""
|
||||
worker.log.info("Worker received interrupt signal")
|
||||
|
||||
|
||||
def worker_abort(worker):
|
||||
"""Called when a worker receives SIGABRT."""
|
||||
worker.log.warning("Worker aborted")
|
||||
301
examples/http2_gevent/test_http2_gevent.py
Normal file
301
examples/http2_gevent/test_http2_gevent.py
Normal file
@ -0,0 +1,301 @@
|
||||
#!/usr/bin/env python
|
||||
"""
|
||||
Tests for HTTP/2 with gevent example.
|
||||
|
||||
Run with:
|
||||
# Start the server first
|
||||
docker compose up -d
|
||||
|
||||
# Run tests
|
||||
python test_http2_gevent.py
|
||||
|
||||
# Or with pytest
|
||||
pytest test_http2_gevent.py -v
|
||||
|
||||
Requirements:
|
||||
pip install httpx[http2] pytest pytest-asyncio
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
import ssl
|
||||
import socket
|
||||
import time
|
||||
|
||||
|
||||
def check_server_available(host='localhost', port=8443, timeout=30):
|
||||
"""Wait for server to become available."""
|
||||
start = time.time()
|
||||
while time.time() - start < timeout:
|
||||
try:
|
||||
ctx = ssl.create_default_context()
|
||||
ctx.check_hostname = False
|
||||
ctx.verify_mode = ssl.CERT_NONE
|
||||
with socket.create_connection((host, port), timeout=2) as sock:
|
||||
with ctx.wrap_socket(sock, server_hostname=host):
|
||||
return True
|
||||
except (socket.error, ssl.SSLError, OSError):
|
||||
time.sleep(1)
|
||||
return False
|
||||
|
||||
|
||||
class TestHTTP2Gevent:
|
||||
"""Test HTTP/2 functionality with gevent worker."""
|
||||
|
||||
BASE_URL = "https://localhost:8443"
|
||||
|
||||
@classmethod
|
||||
def setup_class(cls):
|
||||
"""Check server is available before running tests."""
|
||||
if not check_server_available():
|
||||
raise RuntimeError(
|
||||
"Server not available. Start it with: docker compose up -d"
|
||||
)
|
||||
|
||||
def get_client(self):
|
||||
"""Create HTTP/2 client."""
|
||||
import httpx
|
||||
return httpx.Client(http2=True, verify=False, timeout=30.0)
|
||||
|
||||
def test_root_endpoint(self):
|
||||
"""Test basic GET request returns HTTP/2."""
|
||||
with self.get_client() as client:
|
||||
response = client.get(f"{self.BASE_URL}/")
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.http_version == "HTTP/2"
|
||||
assert b"HTTP/2" in response.content or b"Gevent" in response.content
|
||||
|
||||
def test_health_endpoint(self):
|
||||
"""Test health check endpoint."""
|
||||
with self.get_client() as client:
|
||||
response = client.get(f"{self.BASE_URL}/health")
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.text == "OK"
|
||||
|
||||
def test_echo_post(self):
|
||||
"""Test POST echo endpoint."""
|
||||
with self.get_client() as client:
|
||||
data = b"Hello HTTP/2 with Gevent!"
|
||||
response = client.post(f"{self.BASE_URL}/echo", content=data)
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.content == data
|
||||
|
||||
def test_echo_large_body(self):
|
||||
"""Test POST with large body (tests flow control)."""
|
||||
with self.get_client() as client:
|
||||
# 100KB of data
|
||||
data = b"X" * (100 * 1024)
|
||||
response = client.post(f"{self.BASE_URL}/echo", content=data)
|
||||
|
||||
assert response.status_code == 200
|
||||
assert len(response.content) == len(data)
|
||||
assert response.content == data
|
||||
|
||||
def test_info_endpoint(self):
|
||||
"""Test JSON info endpoint."""
|
||||
with self.get_client() as client:
|
||||
response = client.get(f"{self.BASE_URL}/info")
|
||||
|
||||
assert response.status_code == 200
|
||||
info = response.json()
|
||||
assert info['method'] == 'GET'
|
||||
assert info['path'] == '/info'
|
||||
assert 'gevent' in info['server'].lower()
|
||||
|
||||
def test_large_response(self):
|
||||
"""Test large response (1MB) - tests streaming and flow control."""
|
||||
with self.get_client() as client:
|
||||
response = client.get(f"{self.BASE_URL}/large")
|
||||
|
||||
assert response.status_code == 200
|
||||
assert len(response.content) == 1024 * 1024
|
||||
assert response.content == b"X" * (1024 * 1024)
|
||||
|
||||
def test_streaming_response(self):
|
||||
"""Test server-sent events style streaming."""
|
||||
with self.get_client() as client:
|
||||
response = client.get(f"{self.BASE_URL}/stream")
|
||||
|
||||
assert response.status_code == 200
|
||||
assert b"chunk 0" in response.content
|
||||
assert b"chunk 9" in response.content
|
||||
|
||||
def test_delay_endpoint(self):
|
||||
"""Test delayed response."""
|
||||
with self.get_client() as client:
|
||||
start = time.time()
|
||||
response = client.get(f"{self.BASE_URL}/delay?seconds=0.5")
|
||||
elapsed = time.time() - start
|
||||
|
||||
assert response.status_code == 200
|
||||
assert elapsed >= 0.4 # Allow some tolerance
|
||||
assert b"Delayed" in response.content
|
||||
|
||||
def test_not_found(self):
|
||||
"""Test 404 response."""
|
||||
with self.get_client() as client:
|
||||
response = client.get(f"{self.BASE_URL}/nonexistent")
|
||||
|
||||
assert response.status_code == 404
|
||||
|
||||
def test_gevent_worker_header(self):
|
||||
"""Test that gevent worker header is present."""
|
||||
with self.get_client() as client:
|
||||
response = client.get(f"{self.BASE_URL}/")
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.headers.get('x-worker-type') == 'gevent'
|
||||
|
||||
|
||||
class TestHTTP2Concurrency:
|
||||
"""Test HTTP/2 multiplexing with concurrent requests."""
|
||||
|
||||
BASE_URL = "https://localhost:8443"
|
||||
|
||||
@classmethod
|
||||
def setup_class(cls):
|
||||
"""Check server is available."""
|
||||
if not check_server_available():
|
||||
raise RuntimeError("Server not available")
|
||||
|
||||
def test_concurrent_requests_sync(self):
|
||||
"""Test multiple concurrent requests using threads."""
|
||||
import httpx
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
|
||||
def make_request(i):
|
||||
with httpx.Client(http2=True, verify=False, timeout=30.0) as client:
|
||||
response = client.get(f"{self.BASE_URL}/delay?seconds=0.2")
|
||||
return i, response.status_code
|
||||
|
||||
num_requests = 10
|
||||
with ThreadPoolExecutor(max_workers=10) as executor:
|
||||
futures = [executor.submit(make_request, i) for i in range(num_requests)]
|
||||
results = [f.result() for f in as_completed(futures)]
|
||||
|
||||
assert len(results) == num_requests
|
||||
assert all(status == 200 for _, status in results)
|
||||
|
||||
|
||||
class TestHTTP2ConcurrencyAsync:
|
||||
"""Async tests for HTTP/2 multiplexing."""
|
||||
|
||||
BASE_URL = "https://localhost:8443"
|
||||
|
||||
@classmethod
|
||||
def setup_class(cls):
|
||||
"""Check server is available."""
|
||||
if not check_server_available():
|
||||
raise RuntimeError("Server not available")
|
||||
|
||||
def test_async_concurrent_requests(self):
|
||||
"""Test concurrent requests with asyncio."""
|
||||
import httpx
|
||||
|
||||
async def run_concurrent():
|
||||
async with httpx.AsyncClient(http2=True, verify=False, timeout=30.0) as client:
|
||||
# Make 10 concurrent requests
|
||||
tasks = [
|
||||
client.get(f"{self.BASE_URL}/delay?seconds=0.2")
|
||||
for _ in range(10)
|
||||
]
|
||||
responses = await asyncio.gather(*tasks)
|
||||
return responses
|
||||
|
||||
responses = asyncio.run(run_concurrent())
|
||||
|
||||
assert len(responses) == 10
|
||||
assert all(r.status_code == 200 for r in responses)
|
||||
assert all(r.http_version == "HTTP/2" for r in responses)
|
||||
|
||||
def test_async_multiple_streams(self):
|
||||
"""Test that multiple concurrent streams work over single HTTP/2 connection.
|
||||
|
||||
This test verifies that HTTP/2 can handle multiple concurrent requests,
|
||||
which is the foundation of multiplexing. Performance benefits depend on
|
||||
client library implementation and network conditions.
|
||||
"""
|
||||
import httpx
|
||||
|
||||
async def run_test():
|
||||
async with httpx.AsyncClient(http2=True, verify=False, timeout=30.0) as client:
|
||||
# Send multiple concurrent requests
|
||||
tasks = [
|
||||
client.get(f"{self.BASE_URL}/info")
|
||||
for _ in range(10)
|
||||
]
|
||||
responses = await asyncio.gather(*tasks)
|
||||
return responses
|
||||
|
||||
responses = asyncio.run(run_test())
|
||||
|
||||
# Verify all requests succeeded with HTTP/2
|
||||
assert len(responses) == 10
|
||||
assert all(r.status_code == 200 for r in responses)
|
||||
assert all(r.http_version == "HTTP/2" for r in responses)
|
||||
|
||||
|
||||
def run_basic_test():
|
||||
"""Run a basic test without pytest."""
|
||||
print("Running basic HTTP/2 gevent test...")
|
||||
|
||||
if not check_server_available():
|
||||
print("ERROR: Server not available at https://localhost:8443")
|
||||
print("Start it with: docker compose up -d")
|
||||
return False
|
||||
|
||||
try:
|
||||
import httpx
|
||||
except ImportError:
|
||||
print("ERROR: httpx not installed. Run: pip install httpx[http2]")
|
||||
return False
|
||||
|
||||
try:
|
||||
with httpx.Client(http2=True, verify=False, timeout=30.0) as client:
|
||||
# Test basic request
|
||||
print(" Testing root endpoint...", end=" ")
|
||||
response = client.get("https://localhost:8443/")
|
||||
assert response.status_code == 200
|
||||
assert response.http_version == "HTTP/2"
|
||||
print("OK")
|
||||
|
||||
# Test echo
|
||||
print(" Testing echo endpoint...", end=" ")
|
||||
data = b"test data"
|
||||
response = client.post("https://localhost:8443/echo", content=data)
|
||||
assert response.content == data
|
||||
print("OK")
|
||||
|
||||
# Test large response
|
||||
print(" Testing large response...", end=" ")
|
||||
response = client.get("https://localhost:8443/large")
|
||||
assert len(response.content) == 1024 * 1024
|
||||
print("OK")
|
||||
|
||||
# Test worker header
|
||||
print(" Testing gevent worker...", end=" ")
|
||||
response = client.get("https://localhost:8443/")
|
||||
assert response.headers.get('x-worker-type') == 'gevent'
|
||||
print("OK")
|
||||
|
||||
print("\nAll basic tests passed!")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"\nERROR: {e}")
|
||||
return False
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Check if pytest is available
|
||||
try:
|
||||
import pytest
|
||||
# Run with pytest if available
|
||||
sys.exit(pytest.main([__file__, '-v']))
|
||||
except ImportError:
|
||||
# Run basic tests without pytest
|
||||
success = run_basic_test()
|
||||
sys.exit(0 if success else 1)
|
||||
@ -138,8 +138,8 @@ class WebSocket:
|
||||
"""
|
||||
def __init__(self, sock, environ, version=76):
|
||||
"""
|
||||
:param socket: The eventlet socket
|
||||
:type socket: :class:`eventlet.greenio.GreenSocket`
|
||||
:param socket: The gevent socket
|
||||
:type socket: :class:`gevent.socket.socket`
|
||||
:param environ: The wsgi environment
|
||||
:param version: The WebSocket spec version to follow (default is 76)
|
||||
"""
|
||||
|
||||
@ -1,449 +0,0 @@
|
||||
import collections
|
||||
import errno
|
||||
import re
|
||||
from hashlib import md5, sha1
|
||||
import base64
|
||||
from base64 import b64encode, b64decode
|
||||
import socket
|
||||
import struct
|
||||
import logging
|
||||
from socket import error as SocketError
|
||||
|
||||
import eventlet
|
||||
from gunicorn.workers.base_async import ALREADY_HANDLED
|
||||
from eventlet import pools
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
WS_KEY = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
|
||||
|
||||
class WebSocketWSGI:
|
||||
def __init__(self, handler):
|
||||
self.handler = handler
|
||||
|
||||
def verify_client(self, ws):
|
||||
pass
|
||||
|
||||
def _get_key_value(self, key_value):
|
||||
if not key_value:
|
||||
return
|
||||
key_number = int(re.sub("\\D", "", key_value))
|
||||
spaces = re.subn(" ", "", key_value)[1]
|
||||
if key_number % spaces != 0:
|
||||
return
|
||||
part = key_number / spaces
|
||||
return part
|
||||
|
||||
def __call__(self, environ, start_response):
|
||||
if not (environ.get('HTTP_CONNECTION').find('Upgrade') != -1 and
|
||||
environ['HTTP_UPGRADE'].lower() == 'websocket'):
|
||||
# need to check a few more things here for true compliance
|
||||
start_response('400 Bad Request', [('Connection','close')])
|
||||
return []
|
||||
|
||||
sock = environ['gunicorn.socket']
|
||||
|
||||
version = environ.get('HTTP_SEC_WEBSOCKET_VERSION')
|
||||
|
||||
ws = WebSocket(sock, environ, version)
|
||||
|
||||
handshake_reply = ("HTTP/1.1 101 Switching Protocols\r\n"
|
||||
"Upgrade: websocket\r\n"
|
||||
"Connection: Upgrade\r\n")
|
||||
|
||||
key = environ.get('HTTP_SEC_WEBSOCKET_KEY')
|
||||
if key:
|
||||
ws_key = base64.b64decode(key)
|
||||
if len(ws_key) != 16:
|
||||
start_response('400 Bad Request', [('Connection','close')])
|
||||
return []
|
||||
|
||||
protocols = []
|
||||
subprotocols = environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL')
|
||||
ws_protocols = []
|
||||
if subprotocols:
|
||||
for s in subprotocols.split(','):
|
||||
s = s.strip()
|
||||
if s in protocols:
|
||||
ws_protocols.append(s)
|
||||
if ws_protocols:
|
||||
handshake_reply += 'Sec-WebSocket-Protocol: %s\r\n' % ', '.join(ws_protocols)
|
||||
|
||||
exts = []
|
||||
extensions = environ.get('HTTP_SEC_WEBSOCKET_EXTENSIONS')
|
||||
ws_extensions = []
|
||||
if extensions:
|
||||
for ext in extensions.split(','):
|
||||
ext = ext.strip()
|
||||
if ext in exts:
|
||||
ws_extensions.append(ext)
|
||||
if ws_extensions:
|
||||
handshake_reply += 'Sec-WebSocket-Extensions: %s\r\n' % ', '.join(ws_extensions)
|
||||
|
||||
key_hash = sha1()
|
||||
key_hash.update(key.encode())
|
||||
key_hash.update(WS_KEY)
|
||||
|
||||
handshake_reply += (
|
||||
"Sec-WebSocket-Origin: %s\r\n"
|
||||
"Sec-WebSocket-Location: ws://%s%s\r\n"
|
||||
"Sec-WebSocket-Version: %s\r\n"
|
||||
"Sec-WebSocket-Accept: %s\r\n\r\n"
|
||||
% (
|
||||
environ.get('HTTP_ORIGIN'),
|
||||
environ.get('HTTP_HOST'),
|
||||
ws.path,
|
||||
version,
|
||||
base64.b64encode(key_hash.digest()).decode()
|
||||
))
|
||||
|
||||
else:
|
||||
|
||||
handshake_reply += (
|
||||
"WebSocket-Origin: %s\r\n"
|
||||
"WebSocket-Location: ws://%s%s\r\n\r\n" % (
|
||||
environ.get('HTTP_ORIGIN'),
|
||||
environ.get('HTTP_HOST'),
|
||||
ws.path))
|
||||
|
||||
sock.sendall(handshake_reply.encode())
|
||||
|
||||
try:
|
||||
self.handler(ws)
|
||||
except BrokenPipeError:
|
||||
pass
|
||||
else:
|
||||
raise
|
||||
# use this undocumented feature of grainbows to ensure that it
|
||||
# doesn't barf on the fact that we didn't call start_response
|
||||
return ALREADY_HANDLED
|
||||
|
||||
class WebSocket:
|
||||
"""A websocket object that handles the details of
|
||||
serialization/deserialization to the socket.
|
||||
|
||||
The primary way to interact with a :class:`WebSocket` object is to
|
||||
call :meth:`send` and :meth:`wait` in order to pass messages back
|
||||
and forth with the browser. Also available are the following
|
||||
properties:
|
||||
|
||||
path
|
||||
The path value of the request. This is the same as the WSGI PATH_INFO variable, but more convenient.
|
||||
protocol
|
||||
The value of the Websocket-Protocol header.
|
||||
origin
|
||||
The value of the 'Origin' header.
|
||||
environ
|
||||
The full WSGI environment for this request.
|
||||
|
||||
"""
|
||||
def __init__(self, sock, environ, version=76):
|
||||
"""
|
||||
:param socket: The eventlet socket
|
||||
:type socket: :class:`eventlet.greenio.GreenSocket`
|
||||
:param environ: The wsgi environment
|
||||
:param version: The WebSocket spec version to follow (default is 76)
|
||||
"""
|
||||
self.socket = sock
|
||||
self.origin = environ.get('HTTP_ORIGIN')
|
||||
self.protocol = environ.get('HTTP_WEBSOCKET_PROTOCOL')
|
||||
self.path = environ.get('PATH_INFO')
|
||||
self.environ = environ
|
||||
self.version = version
|
||||
self.websocket_closed = False
|
||||
self._buf = ""
|
||||
self._msgs = collections.deque()
|
||||
self._sendlock = pools.TokenPool(1)
|
||||
|
||||
@staticmethod
|
||||
def encode_hybi(buf, opcode, base64=False):
|
||||
""" Encode a HyBi style WebSocket frame.
|
||||
Optional opcode:
|
||||
0x0 - continuation
|
||||
0x1 - text frame (base64 encode buf)
|
||||
0x2 - binary frame (use raw buf)
|
||||
0x8 - connection close
|
||||
0x9 - ping
|
||||
0xA - pong
|
||||
"""
|
||||
if base64:
|
||||
buf = b64encode(buf)
|
||||
else:
|
||||
buf = buf.encode()
|
||||
|
||||
b1 = 0x80 | (opcode & 0x0f) # FIN + opcode
|
||||
payload_len = len(buf)
|
||||
if payload_len <= 125:
|
||||
header = struct.pack('>BB', b1, payload_len)
|
||||
elif payload_len > 125 and payload_len < 65536:
|
||||
header = struct.pack('>BBH', b1, 126, payload_len)
|
||||
elif payload_len >= 65536:
|
||||
header = struct.pack('>BBQ', b1, 127, payload_len)
|
||||
|
||||
#print("Encoded: %s" % repr(header + buf))
|
||||
|
||||
return header + buf, len(header), 0
|
||||
|
||||
@staticmethod
|
||||
def decode_hybi(buf, base64=False):
|
||||
""" Decode HyBi style WebSocket packets.
|
||||
Returns:
|
||||
{'fin' : 0_or_1,
|
||||
'opcode' : number,
|
||||
'mask' : 32_bit_number,
|
||||
'hlen' : header_bytes_number,
|
||||
'length' : payload_bytes_number,
|
||||
'payload' : decoded_buffer,
|
||||
'left' : bytes_left_number,
|
||||
'close_code' : number,
|
||||
'close_reason' : string}
|
||||
"""
|
||||
|
||||
f = {'fin' : 0,
|
||||
'opcode' : 0,
|
||||
'mask' : 0,
|
||||
'hlen' : 2,
|
||||
'length' : 0,
|
||||
'payload' : None,
|
||||
'left' : 0,
|
||||
'close_code' : None,
|
||||
'close_reason' : None}
|
||||
|
||||
blen = len(buf)
|
||||
f['left'] = blen
|
||||
|
||||
if blen < f['hlen']:
|
||||
return f # Incomplete frame header
|
||||
|
||||
b1, b2 = struct.unpack_from(">BB", buf)
|
||||
f['opcode'] = b1 & 0x0f
|
||||
f['fin'] = (b1 & 0x80) >> 7
|
||||
has_mask = (b2 & 0x80) >> 7
|
||||
|
||||
f['length'] = b2 & 0x7f
|
||||
|
||||
if f['length'] == 126:
|
||||
f['hlen'] = 4
|
||||
if blen < f['hlen']:
|
||||
return f # Incomplete frame header
|
||||
(f['length'],) = struct.unpack_from('>xxH', buf)
|
||||
elif f['length'] == 127:
|
||||
f['hlen'] = 10
|
||||
if blen < f['hlen']:
|
||||
return f # Incomplete frame header
|
||||
(f['length'],) = struct.unpack_from('>xxQ', buf)
|
||||
|
||||
full_len = f['hlen'] + has_mask * 4 + f['length']
|
||||
|
||||
if blen < full_len: # Incomplete frame
|
||||
return f # Incomplete frame header
|
||||
|
||||
# Number of bytes that are part of the next frame(s)
|
||||
f['left'] = blen - full_len
|
||||
|
||||
# Process 1 frame
|
||||
if has_mask:
|
||||
# unmask payload
|
||||
f['mask'] = buf[f['hlen']:f['hlen']+4]
|
||||
b = c = ''
|
||||
if f['length'] >= 4:
|
||||
data = struct.unpack('<I', buf[f['hlen']:f['hlen']+4])[0]
|
||||
of1 = f['hlen']+4
|
||||
b = ''
|
||||
for i in range(0, int(f['length']/4)):
|
||||
mask = struct.unpack('<I', buf[of1+4*i:of1+4*(i+1)])[0]
|
||||
b += struct.pack('I', data ^ mask)
|
||||
|
||||
if f['length'] % 4:
|
||||
l = f['length'] % 4
|
||||
of1 = f['hlen']
|
||||
of2 = full_len - l
|
||||
c = ''
|
||||
for i in range(0, l):
|
||||
mask = struct.unpack('B', buf[of1 + i])[0]
|
||||
data = struct.unpack('B', buf[of2 + i])[0]
|
||||
c += chr(data ^ mask)
|
||||
|
||||
f['payload'] = b + c
|
||||
else:
|
||||
print("Unmasked frame: %s" % repr(buf))
|
||||
f['payload'] = buf[(f['hlen'] + has_mask * 4):full_len]
|
||||
|
||||
if base64 and f['opcode'] in [1, 2]:
|
||||
try:
|
||||
f['payload'] = b64decode(f['payload'])
|
||||
except:
|
||||
print("Exception while b64decoding buffer: %s" %
|
||||
repr(buf))
|
||||
raise
|
||||
|
||||
if f['opcode'] == 0x08:
|
||||
if f['length'] >= 2:
|
||||
f['close_code'] = struct.unpack_from(">H", f['payload'])
|
||||
if f['length'] > 3:
|
||||
f['close_reason'] = f['payload'][2:]
|
||||
|
||||
return f
|
||||
|
||||
|
||||
@staticmethod
|
||||
def _pack_message(message):
|
||||
"""Pack the message inside ``00`` and ``FF``
|
||||
|
||||
As per the dataframing section (5.3) for the websocket spec
|
||||
"""
|
||||
if isinstance(message, str):
|
||||
message = message.encode('utf-8')
|
||||
packed = "\x00%s\xFF" % message
|
||||
return packed
|
||||
|
||||
def _parse_messages(self):
|
||||
""" Parses for messages in the buffer *buf*. It is assumed that
|
||||
the buffer contains the start character for a message, but that it
|
||||
may contain only part of the rest of the message.
|
||||
|
||||
Returns an array of messages, and the buffer remainder that
|
||||
didn't contain any full messages."""
|
||||
msgs = []
|
||||
end_idx = 0
|
||||
buf = self._buf
|
||||
while buf:
|
||||
if self.version in ['7', '8', '13']:
|
||||
frame = self.decode_hybi(buf, base64=False)
|
||||
#print("Received buf: %s, frame: %s" % (repr(buf), frame))
|
||||
|
||||
if frame['payload'] == None:
|
||||
break
|
||||
else:
|
||||
if frame['opcode'] == 0x8: # connection close
|
||||
self.websocket_closed = True
|
||||
break
|
||||
#elif frame['opcode'] == 0x1:
|
||||
else:
|
||||
msgs.append(frame['payload']);
|
||||
#msgs.append(frame['payload'].decode('utf-8', 'replace'));
|
||||
#buf = buf[-frame['left']:]
|
||||
if frame['left']:
|
||||
buf = buf[-frame['left']:]
|
||||
else:
|
||||
buf = ''
|
||||
|
||||
|
||||
else:
|
||||
frame_type = ord(buf[0])
|
||||
if frame_type == 0:
|
||||
# Normal message.
|
||||
end_idx = buf.find("\xFF")
|
||||
if end_idx == -1: #pragma NO COVER
|
||||
break
|
||||
msgs.append(buf[1:end_idx].decode('utf-8', 'replace'))
|
||||
buf = buf[end_idx+1:]
|
||||
elif frame_type == 255:
|
||||
# Closing handshake.
|
||||
assert ord(buf[1]) == 0, "Unexpected closing handshake: %r" % buf
|
||||
self.websocket_closed = True
|
||||
break
|
||||
else:
|
||||
raise ValueError("Don't understand how to parse this type of message: %r" % buf)
|
||||
self._buf = buf
|
||||
return msgs
|
||||
|
||||
def send(self, message):
|
||||
"""Send a message to the browser.
|
||||
|
||||
*message* should be convertible to a string; unicode objects should be
|
||||
encodable as utf-8. Raises socket.error with errno of 32
|
||||
(broken pipe) if the socket has already been closed by the client."""
|
||||
if self.version in ['7', '8', '13']:
|
||||
packed, lenhead, lentail = self.encode_hybi(message, opcode=0x01, base64=False)
|
||||
else:
|
||||
packed = self._pack_message(message)
|
||||
# if two greenthreads are trying to send at the same time
|
||||
# on the same socket, sendlock prevents interleaving and corruption
|
||||
#self._sendlock.acquire()
|
||||
t = self._sendlock.get()
|
||||
try:
|
||||
self.socket.sendall(packed)
|
||||
finally:
|
||||
self._sendlock.put(t)
|
||||
|
||||
def wait(self):
|
||||
"""Waits for and deserializes messages.
|
||||
|
||||
Returns a single message; the oldest not yet processed. If the client
|
||||
has already closed the connection, returns None. This is different
|
||||
from normal socket behavior because the empty string is a valid
|
||||
websocket message."""
|
||||
while not self._msgs:
|
||||
# Websocket might be closed already.
|
||||
if self.websocket_closed:
|
||||
return None
|
||||
# no parsed messages, must mean buf needs more data
|
||||
delta = self.socket.recv(8096)
|
||||
if delta == b'':
|
||||
return None
|
||||
self._buf += delta
|
||||
msgs = self._parse_messages()
|
||||
self._msgs.extend(msgs)
|
||||
return self._msgs.popleft()
|
||||
|
||||
def _send_closing_frame(self, ignore_send_errors=False):
|
||||
"""Sends the closing frame to the client, if required."""
|
||||
if self.version in ['7', '8', '13'] and not self.websocket_closed:
|
||||
msg = ''
|
||||
#if code != None:
|
||||
# msg = struct.pack(">H%ds" % (len(reason)), code)
|
||||
|
||||
buf, h, t = self.encode_hybi(msg, opcode=0x08, base64=False)
|
||||
self.socket.sendall(buf)
|
||||
self.websocket_closed = True
|
||||
|
||||
elif self.version == 76 and not self.websocket_closed:
|
||||
try:
|
||||
self.socket.sendall(b"\xff\x00")
|
||||
except SocketError:
|
||||
# Sometimes, like when the remote side cuts off the connection,
|
||||
# we don't care about this.
|
||||
if not ignore_send_errors: #pragma NO COVER
|
||||
raise
|
||||
self.websocket_closed = True
|
||||
|
||||
def close(self):
|
||||
"""Forcibly close the websocket; generally it is preferable to
|
||||
return from the handler method."""
|
||||
self._send_closing_frame()
|
||||
self.socket.shutdown(True)
|
||||
self.socket.close()
|
||||
|
||||
# demo app
|
||||
import os
|
||||
import random
|
||||
def handle(ws):
|
||||
""" This is the websocket handler function. Note that we
|
||||
can dispatch based on path in here, too."""
|
||||
if ws.path == '/echo':
|
||||
while True:
|
||||
m = ws.wait()
|
||||
if m is None:
|
||||
break
|
||||
ws.send(m)
|
||||
|
||||
elif ws.path == '/data':
|
||||
for i in range(10000):
|
||||
ws.send("0 %s %s\n" % (i, random.random()))
|
||||
eventlet.sleep(0.1)
|
||||
|
||||
wsapp = WebSocketWSGI(handle)
|
||||
def app(environ, start_response):
|
||||
""" This resolves to the web page or the websocket depending on
|
||||
the path."""
|
||||
if environ['PATH_INFO'] == '/' or environ['PATH_INFO'] == "":
|
||||
data = open(os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
'websocket.html')).read()
|
||||
data = data % environ
|
||||
start_response('200 OK', [('Content-Type', 'text/html'),
|
||||
('Content-Length', str(len(data)))])
|
||||
return [data.encode()]
|
||||
else:
|
||||
return wsapp(environ, start_response)
|
||||
@ -5,7 +5,7 @@
|
||||
# supported gunicorn workers.
|
||||
SUPPORTED_WORKERS = {
|
||||
"sync": "gunicorn.workers.sync.SyncWorker",
|
||||
"eventlet": "gunicorn.workers.geventlet.EventletWorker",
|
||||
"eventlet": "gunicorn.workers.geventlet.EventletWorker", # DEPRECATED: will be removed in 26.0
|
||||
"gevent": "gunicorn.workers.ggevent.GeventWorker",
|
||||
"gevent_wsgi": "gunicorn.workers.ggevent.GeventPyWSGIWorker",
|
||||
"gevent_pywsgi": "gunicorn.workers.ggevent.GeventPyWSGIWorker",
|
||||
|
||||
@ -33,6 +33,11 @@ class AsyncWorker(base.Worker):
|
||||
def handle(self, listener, client, addr):
|
||||
req = None
|
||||
try:
|
||||
# Complete the handshake to ensure ALPN negotiation is done
|
||||
# (needed if do_handshake_on_connect is False)
|
||||
if isinstance(client, ssl.SSLSocket) and not self.cfg.do_handshake_on_connect:
|
||||
client.do_handshake()
|
||||
|
||||
# Check if HTTP/2 was negotiated (for SSL connections)
|
||||
is_http2 = gunicorn_sock.is_http2_negotiated(client)
|
||||
|
||||
|
||||
@ -2,6 +2,21 @@
|
||||
# This file is part of gunicorn released under the MIT license.
|
||||
# See the NOTICE for more information.
|
||||
|
||||
# DEPRECATION NOTICE: The eventlet worker is deprecated and will be removed
|
||||
# in Gunicorn 26.0. Eventlet itself is deprecated and no longer maintained.
|
||||
# Please migrate to gevent, gthread, or another supported worker type.
|
||||
# See: https://eventlet.readthedocs.io/en/latest/asyncio/migration.html
|
||||
|
||||
import warnings
|
||||
|
||||
warnings.warn(
|
||||
"The eventlet worker is deprecated and will be removed in Gunicorn 26.0. "
|
||||
"Please migrate to gevent, gthread, or another supported worker type. "
|
||||
"See: https://docs.gunicorn.org/en/stable/design.html#choosing-a-worker-type",
|
||||
DeprecationWarning,
|
||||
stacklevel=2
|
||||
)
|
||||
|
||||
# NOTE: eventlet import and monkey_patch() must happen before any other imports
|
||||
# to ensure all standard library modules are properly patched.
|
||||
try:
|
||||
@ -150,6 +165,10 @@ class EventletWorker(AsyncWorker):
|
||||
return super().is_already_handled(respiter)
|
||||
|
||||
def init_process(self):
|
||||
self.log.warning(
|
||||
"The eventlet worker is DEPRECATED and will be removed in Gunicorn 26.0. "
|
||||
"Please migrate to gevent, gthread, or another supported worker type."
|
||||
)
|
||||
self.patch()
|
||||
super().init_process()
|
||||
|
||||
|
||||
@ -206,3 +206,223 @@ class TestAlpnProtocolMap:
|
||||
def test_h2_maps_to_h2(self):
|
||||
from gunicorn.config import ALPN_PROTOCOL_MAP
|
||||
assert ALPN_PROTOCOL_MAP.get("h2") == "h2"
|
||||
|
||||
|
||||
class TestAsyncWorkerAlpnHandshake:
|
||||
"""Test that AsyncWorker performs handshake before ALPN check.
|
||||
|
||||
This is critical for gevent and eventlet workers where do_handshake_on_connect
|
||||
may be False, causing ALPN negotiation to not complete until first I/O.
|
||||
"""
|
||||
|
||||
@pytest.fixture
|
||||
def async_worker(self):
|
||||
"""Create an AsyncWorker instance for testing."""
|
||||
from gunicorn.workers.base_async import AsyncWorker
|
||||
|
||||
worker = AsyncWorker.__new__(AsyncWorker)
|
||||
worker.cfg = mock.MagicMock()
|
||||
worker.cfg.keepalive = 2
|
||||
worker.cfg.do_handshake_on_connect = False
|
||||
worker.cfg.http_protocols = ["h2", "h1"]
|
||||
worker.alive = True
|
||||
worker.log = mock.MagicMock()
|
||||
worker.wsgi = mock.MagicMock()
|
||||
worker.nr = 0
|
||||
worker.max_requests = 1000
|
||||
|
||||
return worker
|
||||
|
||||
def test_handshake_called_when_do_handshake_on_connect_false(self, async_worker):
|
||||
"""Test that do_handshake() is called when do_handshake_on_connect is False."""
|
||||
mock_ssl_socket = mock.Mock(spec=ssl.SSLSocket)
|
||||
mock_ssl_socket.selected_alpn_protocol.return_value = None
|
||||
mock_listener = mock.MagicMock()
|
||||
|
||||
# Mock the rest of handle() to prevent full execution
|
||||
with mock.patch('gunicorn.sock.is_http2_negotiated', return_value=False):
|
||||
with mock.patch('gunicorn.http.get_parser') as mock_parser:
|
||||
mock_parser.return_value = iter([])
|
||||
try:
|
||||
async_worker.handle(mock_listener, mock_ssl_socket, ('127.0.0.1', 8000))
|
||||
except StopIteration:
|
||||
pass
|
||||
|
||||
# Verify handshake was called
|
||||
mock_ssl_socket.do_handshake.assert_called_once()
|
||||
|
||||
def test_no_handshake_when_do_handshake_on_connect_true(self, async_worker):
|
||||
"""Test that do_handshake() is NOT called when do_handshake_on_connect is True."""
|
||||
async_worker.cfg.do_handshake_on_connect = True
|
||||
|
||||
mock_ssl_socket = mock.Mock(spec=ssl.SSLSocket)
|
||||
mock_ssl_socket.selected_alpn_protocol.return_value = None
|
||||
mock_listener = mock.MagicMock()
|
||||
|
||||
with mock.patch('gunicorn.sock.is_http2_negotiated', return_value=False):
|
||||
with mock.patch('gunicorn.http.get_parser') as mock_parser:
|
||||
mock_parser.return_value = iter([])
|
||||
try:
|
||||
async_worker.handle(mock_listener, mock_ssl_socket, ('127.0.0.1', 8000))
|
||||
except StopIteration:
|
||||
pass
|
||||
|
||||
# Verify handshake was NOT called (already done on connect)
|
||||
mock_ssl_socket.do_handshake.assert_not_called()
|
||||
|
||||
def test_no_handshake_for_non_ssl_socket(self, async_worker):
|
||||
"""Test that no handshake is attempted for non-SSL sockets."""
|
||||
mock_socket = mock.MagicMock() # Regular socket, not ssl.SSLSocket
|
||||
mock_listener = mock.MagicMock()
|
||||
|
||||
with mock.patch('gunicorn.sock.is_http2_negotiated', return_value=False):
|
||||
with mock.patch('gunicorn.http.get_parser') as mock_parser:
|
||||
mock_parser.return_value = iter([])
|
||||
try:
|
||||
async_worker.handle(mock_listener, mock_socket, ('127.0.0.1', 8000))
|
||||
except StopIteration:
|
||||
pass
|
||||
|
||||
# Non-SSL sockets don't have do_handshake, so it shouldn't be called
|
||||
assert not hasattr(mock_socket, 'do_handshake') or \
|
||||
not mock_socket.do_handshake.called
|
||||
|
||||
def test_http2_detected_after_handshake(self, async_worker):
|
||||
"""Test that HTTP/2 is properly detected after explicit handshake."""
|
||||
mock_ssl_socket = mock.Mock(spec=ssl.SSLSocket)
|
||||
mock_ssl_socket.selected_alpn_protocol.return_value = "h2"
|
||||
mock_listener = mock.MagicMock()
|
||||
|
||||
with mock.patch.object(async_worker, 'handle_http2') as mock_h2:
|
||||
async_worker.handle(mock_listener, mock_ssl_socket, ('127.0.0.1', 8000))
|
||||
|
||||
# Verify handshake was called first
|
||||
mock_ssl_socket.do_handshake.assert_called_once()
|
||||
# Verify HTTP/2 handler was invoked
|
||||
mock_h2.assert_called_once()
|
||||
|
||||
|
||||
class TestGeventWorkerAlpn:
|
||||
"""Test ALPN handling in GeventWorker."""
|
||||
|
||||
@pytest.fixture
|
||||
def gevent_worker(self):
|
||||
"""Create a GeventWorker instance for testing."""
|
||||
try:
|
||||
import gevent
|
||||
except ImportError:
|
||||
pytest.skip("gevent not available")
|
||||
|
||||
from gunicorn.workers.ggevent import GeventWorker
|
||||
|
||||
worker = GeventWorker.__new__(GeventWorker)
|
||||
worker.cfg = mock.MagicMock()
|
||||
worker.cfg.keepalive = 2
|
||||
worker.cfg.do_handshake_on_connect = False
|
||||
worker.cfg.http_protocols = ["h2", "h1"]
|
||||
worker.cfg.is_ssl = True
|
||||
worker.alive = True
|
||||
worker.log = mock.MagicMock()
|
||||
worker.wsgi = mock.MagicMock()
|
||||
worker.nr = 0
|
||||
worker.max_requests = 1000
|
||||
worker.worker_connections = 1000
|
||||
|
||||
return worker
|
||||
|
||||
def test_gevent_inherits_async_worker(self):
|
||||
"""Test that GeventWorker inherits from AsyncWorker."""
|
||||
try:
|
||||
import gevent
|
||||
except ImportError:
|
||||
pytest.skip("gevent not available")
|
||||
|
||||
from gunicorn.workers.ggevent import GeventWorker
|
||||
from gunicorn.workers.base_async import AsyncWorker
|
||||
|
||||
assert issubclass(GeventWorker, AsyncWorker)
|
||||
|
||||
def test_gevent_handle_calls_super(self, gevent_worker):
|
||||
"""Test that GeventWorker.handle() calls super().handle()."""
|
||||
mock_client = mock.MagicMock()
|
||||
mock_listener = mock.MagicMock()
|
||||
|
||||
with mock.patch('gunicorn.workers.base_async.AsyncWorker.handle') as mock_super:
|
||||
gevent_worker.handle(mock_listener, mock_client, ('127.0.0.1', 8000))
|
||||
|
||||
mock_super.assert_called_once()
|
||||
|
||||
|
||||
class TestEventletWorkerAlpn:
|
||||
"""Test ALPN handling in EventletWorker."""
|
||||
|
||||
@pytest.fixture
|
||||
def eventlet_worker(self):
|
||||
"""Create an EventletWorker instance for testing."""
|
||||
try:
|
||||
import eventlet
|
||||
except (ImportError, AttributeError):
|
||||
pytest.skip("eventlet not available")
|
||||
|
||||
from gunicorn.workers.geventlet import EventletWorker
|
||||
|
||||
worker = EventletWorker.__new__(EventletWorker)
|
||||
worker.cfg = mock.MagicMock()
|
||||
worker.cfg.keepalive = 2
|
||||
worker.cfg.do_handshake_on_connect = False
|
||||
worker.cfg.http_protocols = ["h2", "h1"]
|
||||
worker.cfg.is_ssl = True
|
||||
worker.alive = True
|
||||
worker.log = mock.MagicMock()
|
||||
worker.wsgi = mock.MagicMock()
|
||||
worker.nr = 0
|
||||
worker.max_requests = 1000
|
||||
worker.worker_connections = 1000
|
||||
|
||||
return worker
|
||||
|
||||
def test_eventlet_inherits_async_worker(self):
|
||||
"""Test that EventletWorker inherits from AsyncWorker."""
|
||||
try:
|
||||
import eventlet
|
||||
except (ImportError, AttributeError):
|
||||
pytest.skip("eventlet not available")
|
||||
|
||||
from gunicorn.workers.geventlet import EventletWorker
|
||||
from gunicorn.workers.base_async import AsyncWorker
|
||||
|
||||
assert issubclass(EventletWorker, AsyncWorker)
|
||||
|
||||
def test_eventlet_handle_wraps_ssl_then_calls_super(self, eventlet_worker):
|
||||
"""Test that EventletWorker.handle() wraps SSL then calls super()."""
|
||||
from gunicorn.workers import geventlet
|
||||
|
||||
mock_client = mock.MagicMock()
|
||||
mock_wrapped = mock.MagicMock()
|
||||
mock_listener = mock.MagicMock()
|
||||
|
||||
with mock.patch.object(geventlet, 'ssl_wrap_socket', return_value=mock_wrapped):
|
||||
with mock.patch('gunicorn.workers.base_async.AsyncWorker.handle') as mock_super:
|
||||
eventlet_worker.handle(mock_listener, mock_client, ('127.0.0.1', 8000))
|
||||
|
||||
# Verify super().handle() was called with the wrapped socket
|
||||
mock_super.assert_called_once()
|
||||
call_args = mock_super.call_args[0]
|
||||
assert call_args[1] == mock_wrapped # Second arg is the client socket
|
||||
|
||||
def test_eventlet_alpn_works_with_handshake_fix(self, eventlet_worker):
|
||||
"""Test that ALPN detection works after handshake fix for eventlet."""
|
||||
from gunicorn.workers import geventlet
|
||||
|
||||
mock_ssl_socket = mock.Mock(spec=ssl.SSLSocket)
|
||||
mock_ssl_socket.selected_alpn_protocol.return_value = "h2"
|
||||
mock_listener = mock.MagicMock()
|
||||
|
||||
with mock.patch.object(geventlet, 'ssl_wrap_socket', return_value=mock_ssl_socket):
|
||||
with mock.patch.object(eventlet_worker, 'handle_http2') as mock_h2:
|
||||
eventlet_worker.handle(mock_listener, mock.MagicMock(), ('127.0.0.1', 8000))
|
||||
|
||||
# Verify handshake was called (by base_async.handle)
|
||||
mock_ssl_socket.do_handshake.assert_called_once()
|
||||
# Verify HTTP/2 handler was invoked
|
||||
mock_h2.assert_called_once()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user