Gunicorn + Uvicorn Workers
ProductionRunning uvicorn main:app directly starts a single process with a single event loop — fine for development, not for production. In production, Gunicorn acts as a process manager that spawns multiple Uvicorn workers, each with its own event loop and Python interpreter. This uses all CPU cores and provides fault isolation (a crashed worker is restarted automatically). The key: use Gunicorn with UvicornWorker class, not the default sync worker.
# ── Direct command ─────────────────────────────────────────────────────────
# -w = number of worker processes
# -k = worker class (UvicornWorker = async ASGI worker)
# --bind = address:port
gunicorn main:app \
-w 4 \
-k uvicorn.workers.UvicornWorker \
--bind 0.0.0.0:8000 \
--timeout 60 \
--graceful-timeout 30 \
--keep-alive 5
# Worker count rule of thumb:
# CPU-bound: workers = CPU_cores + 1
# I/O-bound (typical FastAPI): workers = CPU_cores * 2 + 1
# With 4 cores → 4*2+1 = 9 workers for an I/O-heavy API# gunicorn.conf.py — configuration file (cleaner than command-line flags)
import multiprocessing
# Worker settings
worker_class = "uvicorn.workers.UvicornWorker"
workers = multiprocessing.cpu_count() * 2 + 1
threads = 1 # Uvicorn workers are async — threads add no benefit
# Networking
bind = "0.0.0.0:8000"
backlog = 2048 # max pending connections before refusing
# Timeouts (seconds)
timeout = 60 # kill worker if it doesn't respond in 60s
graceful_timeout = 30 # wait 30s for in-flight requests on shutdown
keepalive = 5 # seconds to keep idle connections open
# Process management
max_requests = 1000 # restart worker after 1000 requests (prevents leaks)
max_requests_jitter = 100 # +/- 100 random jitter (avoids all workers restarting simultaneously)
preload_app = True # load app code once in master, copy-on-write to workers
# Logging
accesslog = "-" # log to stdout (captured by Docker/K8s)
errorlog = "-"
loglevel = "info"
def on_starting(server):
print("Gunicorn starting")
def worker_init(worker):
print(f"Worker {worker.pid} initialized")
def worker_exit(worker, server):
print(f"Worker {worker.pid} exiting")CPU * 2 + 1 for I/O-heavy apps. Each worker has its own event loop so this isn’t the same as threads — each handles many concurrent requests.💬 Interview Tip
“Why use Gunicorn in front of Uvicorn?” — Gunicorn adds process management (auto-restart crashed workers, zero-downtime reload with kill -HUP, worker count control). Uvicorn alone can’t restart itself if it crashes. Together they give production resilience.
⚠ Gotcha
Never put shared mutable state (counters, caches) in Python global variables when using multiple workers — each worker is a separate process with its own memory. They don’t share globals. Use Redis or a database for shared state.
Docker & Docker Compose
ProductionDocker packages your FastAPI app and all its dependencies into an immutable image that runs identically in development, staging, and production. A multi-stage Dockerfile uses one stage to install dependencies and a second stage that copies only what’s needed into the final image — keeping the image small and secure. Docker Compose orchestrates multiple containers (app + database + Redis) locally.
# ── Stage 1: build dependencies ───────────────────────────────────────────
FROM python:3.12-slim AS builder
WORKDIR /build
RUN pip install --upgrade pip
COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt
# ── Stage 2: production image ──────────────────────────────────────────────
FROM python:3.12-slim AS production
# Create non-root user — never run as root in production
RUN addgroup --system app && adduser --system --group app
WORKDIR /app
COPY --from=builder /install /usr/local
COPY --chown=app:app . .
USER app
EXPOSE 8000
# Use exec form — PID 1 receives signals correctly
CMD ["gunicorn", "main:app", \
"-k", "uvicorn.workers.UvicornWorker", \
"-w", "4", \
"--bind", "0.0.0.0:8000", \
"--timeout", "60"]# docker-compose.yml — local development stack
version: "3.9"
services:
api:
build:
context: .
target: production
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql+asyncpg://user:pass@db:5432/mydb
- REDIS_URL=redis://redis:6379
- SECRET_KEY=dev-secret-key
depends_on:
db:
condition: service_healthy
redis:
condition: service_started
volumes:
- .:/app
command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload
db:
image: postgres:16-alpine
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
POSTGRES_DB: mydb
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U user -d mydb"]
interval: 5s
timeout: 5s
retries: 5
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
postgres_data:adduser --system).CMD ["gunicorn", …] (JSON array) vs CMD gunicorn … (string). Exec form: process is PID 1, receives SIGTERM directly. Shell form: shell is PID 1, signals may not propagate.condition: service_healthy waits until the DB passes its health check before starting the API — prevents “connection refused” on startup.requirements.txt before application code — only if requirements change does the pip install layer rebuild. Saves minutes on every build.⚠ Gotcha
Never copy .env files into Docker images — they end up in image history and can be extracted. Use --env-file at runtime (docker run --env-file .env) or mount secrets as files. Add .env to .dockerignore.
Environment Management
ProductionProduction apps need different configuration for different environments (dev has a local DB; prod has a managed cloud DB). The FastAPI-idiomatic approach uses Pydantic Settings (from pydantic-settings) to read configuration from environment variables with type validation and secret handling. A single cached settings instance is injected via Depends(get_settings), making it testable and overridable.
from pydantic_settings import BaseSettings, SettingsConfigDict
from pydantic import PostgresDsn, RedisDsn, SecretStr, AnyHttpUrl
from functools import lru_cache
from typing import Literal
class Settings(BaseSettings):
# ── App ────────────────────────────────────────────────────────────────
app_name: str = "My FastAPI App"
environment: Literal["development", "staging", "production"] = "development"
debug: bool = False
log_level: str = "INFO"
# ── Security ───────────────────────────────────────────────────────────
# SecretStr prevents the value from appearing in repr/logs
secret_key: SecretStr
algorithm: str = "HS256"
access_token_expire_minutes: int = 30
# ── Database ───────────────────────────────────────────────────────────
# PostgresDsn validates the URL format
database_url: PostgresDsn
db_pool_size: int = 10
db_max_overflow: int = 20
# ── Redis (optional) ──────────────────────────────────────────────────
redis_url: RedisDsn | None = None
# ── CORS ───────────────────────────────────────────────────────────────
allowed_origins: list[AnyHttpUrl] = ["http://localhost:3000"]
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=False,
)
@property
def is_production(self) -> bool:
return self.environment == "production"
@property
def db_url_str(self) -> str:
return str(self.database_url)
# lru_cache: settings are read ONCE and cached for the app lifetime.
@lru_cache
def get_settings() -> Settings:
return Settings()
from fastapi import Depends
from typing import Annotated
SettingsDep = Annotated[Settings, Depends(get_settings)]
# Override in tests:
# def get_test_settings():
# return Settings(database_url="postgresql+asyncpg://user:pass@localhost/test", ...)
# app.dependency_overrides[get_settings] = get_test_settingsdatabase_url maps to env var DATABASE_URL. Validates types — wrong format raises on startup, not mid-request.str(settings.secret_key) returns **********. Access real value with settings.secret_key.get_secret_value(). Prevents accidental log exposure.ValidationError on startup if the format is wrong — fail fast, not at first query.Settings() instance and re-reads env vars. lru_cache ensures a singleton.★ Key Info
Put .env in .gitignore. Check in an .env.example with placeholder values so teammates know which variables are needed. Never commit real secrets — they persist in git history even after deletion.
Structured JSON Logging
ProductionPlain text logs are unqueryable in log aggregators (Datadog, CloudWatch, Loki). Structured logging emits JSON per line: {"level":"info","event":"user_created","user_id":5,"request_id":"abc"} — each field is searchable and filterable. structlog is the standard library for this in Python, with context binding that carries request-scoped data (request_id, user_id) through all log calls without manually passing them.
import structlog, logging, sys
from contextvars import ContextVar
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
# ── Configure structlog ───────────────────────────────────────────────────
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars, # inject bound context vars
structlog.stdlib.add_log_level, # add "level" field
structlog.stdlib.add_logger_name, # add "logger" field
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.JSONRenderer(), # final output as JSON
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
logger_factory=structlog.PrintLoggerFactory(file=sys.stdout),
)
log = structlog.get_logger()
# ── Middleware: bind request context to all logs ──────────────────────────
class LoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
import uuid
req_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
structlog.contextvars.bind_contextvars(
request_id=req_id,
method=request.method,
path=str(request.url.path),
)
response = await call_next(request)
log.info("request_complete", status_code=response.status_code)
structlog.contextvars.clear_contextvars()
response.headers["X-Request-ID"] = req_id
return response
app = FastAPI()
app.add_middleware(LoggingMiddleware)
# ── Usage in route handlers ───────────────────────────────────────────────
@app.post("/users")
async def create_user(name: str, email: str):
log.info("creating_user", name=name, email=email)
user_id = 42
structlog.contextvars.bind_contextvars(user_id=user_id)
log.info("user_created", user_id=user_id)
return {"id": user_id}
# Output (one JSON line per log — queryable in Datadog/CloudWatch):
# {"event":"creating_user","name":"Alice","email":"a@b.com",
# "request_id":"abc-123","method":"POST","path":"/users",
# "level":"info","timestamp":"2024-01-15T10:30:00Z"}log.info("event", key=value) call emits a JSON line. Fields are machine-readable and searchable in log aggregators.merge_contextvars processor automatically injects values bound with bind_contextvars() into every log call — no need to pass request_id to every function.ConsoleRenderer (human-readable dev output), KeyValueRenderer.💬 Interview Tip
“How do you trace a request through your logs?” — Assign a UUID to each request (from X-Request-ID header or generated), bind it to the logging context, and return it in the response header. Log aggregators can then filter all logs for a specific request_id value.
Distributed Tracing
ProductionIn a microservices architecture, a single user request may touch 5 different services. Logs tell you what happened inside each service, but not how they connect. Distributed tracing follows a request across service boundaries using a shared trace_id propagated in HTTP headers. OpenTelemetry is the open standard — FastAPIInstrumentor auto-instruments every route, creating spans with timing, HTTP method, status code, and error details, all sent to a tracing backend (Jaeger, Tempo, Datadog APM).
# pip install opentelemetry-sdk opentelemetry-instrumentation-fastapi
# opentelemetry-exporter-otlp opentelemetry-instrumentation-sqlalchemy
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.sdk.resources import Resource
from fastapi import FastAPI
# ── 1. Configure the tracer ───────────────────────────────────────────────
resource = Resource.create({"service.name": "my-fastapi-service", "deployment.environment": "production"})
exporter = OTLPSpanExporter(endpoint="http://otel-collector:4317", insecure=True)
provider = TracerProvider(resource=resource)
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)
app = FastAPI()
# ── 2. Auto-instrument FastAPI ────────────────────────────────────────────
# Automatically creates spans for every route with HTTP attributes.
FastAPIInstrumentor.instrument_app(app)
# ── 3. Manual spans for custom operations ─────────────────────────────────
@app.get("/users/{uid}")
async def get_user(uid: int):
with tracer.start_as_current_span("fetch_user_from_db") as span:
span.set_attribute("user.id", uid)
span.set_attribute("db.operation", "SELECT")
user = {"id": uid, "name": "Alice"}
if not user:
span.set_status(trace.StatusCode.ERROR, "User not found")
span.record_exception(ValueError(f"User {uid} not found"))
return user
# ── 4. Propagate trace context in outgoing HTTP calls ─────────────────────
# HTTPXClientInstrumentor().instrument() # auto-propagates W3C traceparent headertraceparent HTTP header (W3C Trace Context standard)..instrument_app(app) once at startup.SimpleSpanProcessor sends synchronously (only for debugging — blocks the event loop).★ Rarely Known
OpenTelemetry supports baggage — key-value pairs propagated alongside traces across service boundaries. Useful for propagating tenant ID, feature flags, or user context without modifying every function signature.
Health Check & Readiness Endpoints
ProductionKubernetes (and load balancers) use liveness and readiness probes to manage your containers. Liveness: “is the process alive?” — if this fails, K8s restarts the container. Readiness: “is the app ready to serve traffic?” — if this fails, K8s stops routing to the pod (but doesn’t restart it). A readiness probe should check that all dependencies (DB, Redis) are reachable, not just that the process is running. Return actual dependency health, not just HTTP 200.
from fastapi import FastAPI, Response
from pydantic import BaseModel
from typing import Literal
import asyncio
app = FastAPI()
# ── Liveness probe ────────────────────────────────────────────────────────
# Just proves the process is alive. K8s restarts pod if this fails.
# Keep it FAST — no DB calls.
@app.get("/health/live", include_in_schema=False)
async def liveness():
return {"status": "alive"}
# ── Readiness probe ───────────────────────────────────────────────────────
class DependencyStatus(BaseModel):
status: Literal["ok", "error"]
latency_ms: float | None = None
error: str | None = None
class ReadinessResponse(BaseModel):
status: Literal["ready", "degraded", "not_ready"]
dependencies: dict[str, DependencyStatus]
@app.get("/health/ready", response_model=ReadinessResponse, include_in_schema=False)
async def readiness(response: Response):
checks: dict[str, DependencyStatus] = {}
# Check database
try:
import time
start = time.perf_counter()
await asyncio.sleep(0.001) # replace with: pool.fetchval("SELECT 1")
checks["database"] = DependencyStatus(
status="ok",
latency_ms=round((time.perf_counter() - start) * 1000, 2),
)
except Exception as e:
checks["database"] = DependencyStatus(status="error", error=str(e))
# Check Redis
try:
start = time.perf_counter()
# await redis.ping()
checks["redis"] = DependencyStatus(
status="ok",
latency_ms=round((time.perf_counter() - start) * 1000, 2),
)
except Exception as e:
checks["redis"] = DependencyStatus(status="error", error=str(e))
all_ok = all(v.status == "ok" for v in checks.values())
critical_ok = checks.get("database", DependencyStatus(status="error")).status == "ok"
if all_ok:
overall, response.status_code = "ready", 200
elif critical_ok:
overall, response.status_code = "degraded", 200 # still serve traffic
else:
overall, response.status_code = "not_ready", 503 # remove from load balancer
return ReadinessResponse(status=overall, dependencies=checks)
# ── Startup probe (K8s 1.16+) ─────────────────────────────────────────────
@app.get("/health/startup", include_in_schema=False)
async def startup_probe():
return {"initialized": True}⚠ Gotcha
Health endpoints should be excluded from authentication middleware — Kubernetes probes don’t send auth tokens. Check that your JWT middleware has an allowlist for /health/* paths, or they’ll always return 401.
Horizontal Scaling & Stateless Design
ProductionHorizontal scaling means running multiple identical copies of your API (pods/containers) behind a load balancer. This only works if each instance is stateless — it doesn’t store any request-specific data in local memory between requests. In-memory sessions, local file storage, or in-process caches all break horizontal scaling. Everything shared between requests must live in an external service: session state in Redis, files in S3, rate limit counters in Redis, cache in Redis.
# ── WRONG: stateful designs that break horizontal scaling ─────────────────
# ❌ In-memory session store — only visible on THIS pod
in_memory_sessions: dict[str, dict] = {}
@app.post("/login-bad")
async def login_bad(username: str):
import uuid
session_id = str(uuid.uuid4())
in_memory_sessions[session_id] = {"username": username} # lost on restart!
return {"session_id": session_id}
# ❌ Local file storage — only on THIS pod's filesystem
@app.post("/upload-bad")
async def upload_bad(file: bytes):
with open(f"/tmp/{uuid.uuid4()}.bin", "wb") as f:
f.write(file) # other pods can't access /tmp on this pod
return {"saved": True}
# ── CORRECT: stateless designs ────────────────────────────────────────────
# ✅ JWT — self-contained token, no server-side state needed
from jose import jwt as jose_jwt
@app.post("/login")
async def login(username: str, secret: str = "secret-key"):
token = jose_jwt.encode(
{"sub": username, "exp": 9999999999},
secret, algorithm="HS256"
)
return {"access_token": token}
# ✅ Redis-backed sessions (shared across pods)
# async def get_session(session_id: str, redis) -> dict | None:
# data = await redis.get(f"session:{session_id}")
# return json.loads(data) if data else None
# ✅ S3/object storage for files (shared across pods)
# async def upload_file(filename: str, data: bytes, s3_client) -> str:
# await s3_client.put_object(Bucket="my-bucket", Key=filename, Body=data)
# return f"s3://my-bucket/{filename}"
# ── Stateless checklist ───────────────────────────────────────────────────
# ✅ Auth: JWT (stateless) or Redis sessions
# ✅ Rate limits: Redis counters (not in-memory)
# ✅ Caches: Redis (not dict)
# ✅ Files: S3 / GCS / Azure Blob (not local disk)
# ✅ Background jobs: task queue (Celery/ARQ) — not asyncio tasks on one pod
# ✅ WebSocket connections: Redis Pub/Sub for cross-pod messaging (see topic 52)
# ✅ App config: env vars (not files baked into image with local state)💬 Interview Tip
“How do you scale a FastAPI app?” — Run multiple instances behind a load balancer. The key prerequisite is stateless design: JWT for auth, Redis for sessions/caches/rate limits, S3 for files. If someone asks why, the answer is: any instance can handle any request without sharing in-process memory.
Background Job Queues
ProductionSome work shouldn’t block the HTTP response: sending emails, resizing images, generating reports. FastAPI’s built-in BackgroundTasks runs work in the same process after the response is sent — simple but not durable (crashes lose queued work). For production, use a persistent job queue: ARQ (async, Redis-backed), Celery (battle-tested, multi-language), or TaskIQ (modern async). Jobs survive restarts, can be distributed across worker machines, and provide retry logic.
# ── Option 1: FastAPI BackgroundTasks (simple, in-process) ────────────────
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def send_email_sync(email: str, message: str):
"""Runs after response is sent — in the same process."""
print(f"Sending email to {email}: {message}")
@app.post("/register-simple")
async def register_simple(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(send_email_sync, email, "Welcome!")
return {"status": "registered"}
# ── Option 2: ARQ (async, Redis-backed, production-grade) ─────────────────
# pip install arq redis
import arq
async def send_welcome_email(ctx: dict, email: str, name: str):
"""ARQ task — ctx contains the Redis connection and settings."""
print(f"Sending welcome email to {email}")
return {"sent_to": email}
async def generate_report(ctx: dict, report_id: int):
print(f"Generating report {report_id}")
return {"report_id": report_id, "status": "complete"}
class WorkerSettings:
functions = [send_welcome_email, generate_report]
redis_settings = arq.connections.RedisSettings(host="localhost", port=6379)
max_jobs = 10
job_timeout = 300
keep_result = 3600
@app.post("/register")
async def register(email: str, name: str):
redis = await arq.create_pool(arq.connections.RedisSettings())
await redis.enqueue_job("send_welcome_email", email, name)
return {"status": "registered", "email_queued": True}
# Run worker: arq app.jobs.WorkerSettings
# ── Option 3: Celery (mature, multi-language workers) ────────────────────
# from celery import Celery
# celery_app = Celery("tasks", broker="redis://localhost:6379", backend="redis://localhost:6379")
#
# @celery_app.task(bind=True, max_retries=3)
# def send_email_task(self, email: str, message: str):
# try:
# pass # ... send email ...
# except Exception as exc:
# raise self.retry(exc=exc, countdown=60)★ Key Info
Use BackgroundTasks only for truly non-critical, fast work. For anything you must complete (email on signup, payment confirmation, report generation), use a durable queue. Ask: “what happens if the server crashes 1ms after the response is sent?” — if the answer is “data is lost,” use a queue.
WebSocket Scaling with Redis Pub/Sub
ProductionWebSocket connections are stateful — they’re held open to a specific server instance. With multiple instances, a message published to Pod A won’t reach clients connected to Pod B. The solution: use Redis Pub/Sub as a message bus. When any pod receives a message to broadcast, it publishes to a Redis channel. All pods subscribe to that channel and forward the message to their local WebSocket connections.
import asyncio, json
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import redis.asyncio as aioredis
app = FastAPI()
# ── Local connection manager (per-pod) ───────────────────────────────────
class LocalConnectionManager:
def __init__(self):
# room_id -> set of WebSocket connections ON THIS POD
self.rooms: dict[str, set[WebSocket]] = {}
async def connect(self, ws: WebSocket, room: str):
await ws.accept()
self.rooms.setdefault(room, set()).add(ws)
def disconnect(self, ws: WebSocket, room: str):
if room in self.rooms:
self.rooms[room].discard(ws)
async def broadcast_local(self, room: str, message: str):
"""Send to all WebSocket connections on THIS pod."""
for ws in list(self.rooms.get(room, set())):
try:
await ws.send_text(message)
except Exception:
self.disconnect(ws, room)
manager = LocalConnectionManager()
redis_client = aioredis.from_url("redis://localhost:6379")
async def redis_listener():
"""Subscribe to Redis and forward messages to local connections."""
pubsub = redis_client.pubsub()
await pubsub.psubscribe("room:*") # subscribe to all room channels
async for message in pubsub.listen():
if message["type"] == "pmessage":
channel = message["channel"].decode()
room_id = channel.split(":", 1)[1]
data = message["data"].decode()
await manager.broadcast_local(room_id, data)
@app.on_event("startup")
async def start_redis_listener():
asyncio.create_task(redis_listener())
@app.websocket("/ws/{room_id}")
async def websocket_endpoint(ws: WebSocket, room_id: str):
await manager.connect(ws, room_id)
try:
while True:
data = await ws.receive_text()
# Publish to Redis — ALL pods subscribed to this room receive it
payload = json.dumps({"room": room_id, "message": data})
await redis_client.publish(f"room:{room_id}", payload)
except WebSocketDisconnect:
manager.disconnect(ws, room_id)⚠ Gotcha
Redis Pub/Sub is not durable — if a pod is down when a message is published, it misses it. For guaranteed delivery, use Redis Streams (XADD/XREAD) instead, which allow catching up on missed messages after reconnect.
Database Connection Pooling
ProductionEvery database operation needs a connection. Opening a new connection takes 10–100ms and consumes server resources. A connection pool maintains a set of open connections, lending them to requests and returning them when done. With multiple Gunicorn workers, each worker has its own pool — the total connections to the database is pool_size × workers. PostgreSQL typically allows 100 connections by default; exceeding this causes “too many connections” errors.
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from typing import AsyncGenerator
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/mydb"
engine = create_async_engine(
DATABASE_URL,
# ── Pool size math ────────────────────────────────────────────────────
# Total max per worker = pool_size + max_overflow
# With 4 Gunicorn workers: total DB connections = 4 × (5 + 10) = 60
pool_size=5,
max_overflow=10,
pool_timeout=30, # seconds to wait for a free connection before error
pool_recycle=1800, # recycle connections after 30min (prevents stale state)
pool_pre_ping=True, # test connection health before lending from pool
echo=False, # set True to log all SQL (dev only — noisy in prod)
)
AsyncSessionLocal = async_sessionmaker(
bind=engine,
class_=AsyncSession,
expire_on_commit=False,
autoflush=False,
autocommit=False,
)
async def get_session() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
# ── Pool sizing formula ────────────────────────────────────────────────────
# Max DB connections = pool_size × gunicorn_workers
# PostgreSQL default max_connections = 100
# With 4 workers:
# pool_size=20 → 4 × 20 = 80 connections (safe)
# pool_size=30 → 4 × 30 = 120 → ERROR: too many connections
def pool_status() -> dict:
pool = engine.pool
return {
"size": pool.size(),
"checked_in": pool.checkedin(),
"checked_out": pool.checkedout(),
"overflow": pool.overflow(),
}pool_timeout seconds.pool_size. These connections are created on demand and closed when returned — not kept warm.SELECT 1). Detects dead connections (DB restart, firewall timeout) before they cause request errors.⚠ Gotcha
pool_size × gunicorn_workers must be less than PostgreSQL’s max_connections (usually 100). With 10 workers and pool_size=15, you’d need 150 DB connections — causing connection errors under load. Always calculate this and set pool_size accordingly or use PgBouncer.
Async Task Patterns
ProductionPython’s asyncio provides powerful primitives for concurrent async work. gather() runs coroutines concurrently and returns when all finish. wait_for() adds a timeout. Semaphore limits how many coroutines run at once (prevents overwhelming a downstream service). Python 3.11’s TaskGroup is the modern, safer alternative to gather() — it cancels all sibling tasks when one fails.
import asyncio
from fastapi import FastAPI
app = FastAPI()
# ── 1. asyncio.gather — run multiple tasks concurrently ───────────────────
@app.get("/dashboard")
async def dashboard(user_id: int):
# Without gather: sequential, ~300ms total
# With gather: concurrent, ~100ms total (slowest wins)
user, orders, notifications = await asyncio.gather(
fetch_user(user_id),
fetch_orders(user_id),
fetch_notifications(user_id),
)
return {"user": user, "orders": orders, "notifications": notifications}
# ── 2. gather with error handling ─────────────────────────────────────────
@app.get("/dashboard-safe")
async def dashboard_safe(user_id: int):
# return_exceptions=True: failed tasks return the exception, not raise it
results = await asyncio.gather(
fetch_user(user_id),
fetch_orders(user_id),
fetch_notifications(user_id),
return_exceptions=True,
)
return {
"user": results[0] if not isinstance(results[0], Exception) else None,
"orders": results[1] if not isinstance(results[1], Exception) else [],
"notifications": results[2] if not isinstance(results[2], Exception) else [],
}
# ── 3. asyncio.wait_for — timeout on slow operations ─────────────────────
@app.get("/slow-service")
async def call_slow_service():
try:
result = await asyncio.wait_for(slow_external_api(), timeout=2.0)
return result
except asyncio.TimeoutError:
from fastapi import HTTPException
raise HTTPException(504, "Upstream service timed out")
# ── 4. Semaphore — limit concurrency ──────────────────────────────────────
_db_semaphore = asyncio.Semaphore(10)
async def fetch_with_limit(item_id: int):
async with _db_semaphore: # blocks here if 10 are already running
return await fetch_item(item_id)
@app.get("/bulk")
async def bulk_fetch(ids: list[int]):
return await asyncio.gather(*[fetch_with_limit(i) for i in ids])
# ── 5. TaskGroup (Python 3.11+) — structured concurrency ────────────────
@app.get("/taskgroup/{user_id}")
async def with_task_group(user_id: int):
async with asyncio.TaskGroup() as tg:
user_task = tg.create_task(fetch_user(user_id))
orders_task = tg.create_task(fetch_orders(user_id))
# If ANY task raises, all others are CANCELLED immediately
return {"user": user_task.result(), "orders": orders_task.result()}
async def fetch_user(uid): await asyncio.sleep(0.1); return {"id": uid}
async def fetch_orders(uid): await asyncio.sleep(0.05); return []
async def fetch_notifications(uid): await asyncio.sleep(0.08); return []
async def slow_external_api(): await asyncio.sleep(5); return {}
async def fetch_item(item_id): await asyncio.sleep(0.01); return {"id": item_id}return_exceptions), the exception propagates and others are cancelled.asyncio.TimeoutError if not done in time. Critical for calls to external services that may hang.async with sem decrements; waits if count = 0. Limits concurrency to prevent overwhelming downstream resources.gather().💬 Interview Tip
“How would you call 3 APIs in parallel?” — asyncio.gather(). “What if one might fail?” — return_exceptions=True or TaskGroup. “What if they might be slow?” — wrap each in asyncio.wait_for(coro, timeout=X). This triad of answers shows production async awareness.
Kubernetes Deployment Basics
ProductionKubernetes (K8s) orchestrates containers at scale: it schedules pods on nodes, restarts crashed pods, scales based on CPU/memory, and does rolling deployments with zero downtime. A FastAPI app in K8s needs: a Deployment (manages replicas), a Service (stable internal DNS), an HPA (auto-scaling), and properly configured liveness/readiness probes and resource limits.
apiVersion: apps/v1
kind: Deployment
metadata:
name: fastapi-app
spec:
replicas: 3
selector:
matchLabels:
app: fastapi-app
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1 # allow 1 extra pod during rollout
maxUnavailable: 0 # never have fewer than desired count
template:
metadata:
labels:
app: fastapi-app
spec:
containers:
- name: fastapi
image: myrepo/fastapi-app:v1.2.3 # always use specific tags, never "latest"
ports:
- containerPort: 8000
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health/live
port: 8000
initialDelaySeconds: 15
periodSeconds: 10
failureThreshold: 3
readinessProbe:
httpGet:
path: /health/ready
port: 8000
initialDelaySeconds: 10
periodSeconds: 5
failureThreshold: 3
env:
- name: SECRET_KEY
valueFrom:
secretKeyRef:
name: fastapi-secrets
key: secret-key
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: fastapi-secrets
key: database-url
---
apiVersion: v1
kind: Service
metadata:
name: fastapi-service
spec:
selector:
app: fastapi-app
ports:
- port: 80
targetPort: 8000
type: ClusterIP
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: fastapi-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: fastapi-app
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70maxUnavailable: 0 and maxSurge: 1, always have full capacity during deploy.limits.replicas automatically between minReplicas and maxReplicas.⚠ Gotcha
Never use image: myapp:latest in K8s — pods may pull different versions if the tag updates. Always use immutable tags (:v1.2.3 or a git SHA). :latest in production is a deployment reliability anti-pattern.
CI/CD Pipeline
ProductionA CI/CD pipeline automates: run tests → build Docker image → push to registry → deploy. For FastAPI, CI needs a running PostgreSQL service to run integration tests (no mocking DB). GitHub Actions is the most common platform. The pipeline ensures every commit is tested before it can be merged, and every merge to main is deployed automatically.
name: CI/CD
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:16-alpine
env:
POSTGRES_USER: test
POSTGRES_PASSWORD: test
POSTGRES_DB: testdb
ports:
- 5432:5432
options: >-
--health-cmd pg_isready
--health-interval 5s
--health-timeout 5s
--health-retries 5
redis:
image: redis:7-alpine
ports:
- 6379:6379
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.12"
cache: pip
- name: Install dependencies
run: pip install -r requirements.txt -r requirements-dev.txt
- name: Run Alembic migrations
env:
DATABASE_URL: postgresql+asyncpg://test:test@localhost:5432/testdb
run: alembic upgrade head
- name: Run tests with coverage
env:
DATABASE_URL: postgresql+asyncpg://test:test@localhost:5432/testdb
REDIS_URL: redis://localhost:6379
SECRET_KEY: test-secret-key-ci
run: pytest --cov=app --cov-report=xml -v
- name: Upload coverage
uses: codecov/codecov-action@v4
build-and-push:
needs: test
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Login to Container Registry
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push
uses: docker/build-push-action@v5
with:
context: .
push: true
tags: |
ghcr.io/${{ github.repository }}:${{ github.sha }}
ghcr.io/${{ github.repository }}:latest
cache-from: type=gha
cache-to: type=gha,mode=maxlocalhost:5432 — same as running locally. No mocking needed.test succeeds. This gates Docker builds — broken code never makes it into a deployable image.💬 Interview Tip
“How do you test with a real database in CI?” — GitHub Actions services: block. Runs a sidecar container (postgres, redis) accessible on localhost. This catches schema migration issues and query bugs that mocks would miss.
API Versioning Strategies
ProductionWhen you need to change an API in a breaking way (remove a field, change a type), existing clients break. API versioning lets you run old and new versions simultaneously. Three main approaches: URL path versioning (/v1/users, /v2/users) — most common, explicit, easy to route; header versioning (API-Version: 2) — cleaner URLs but harder to test in browsers; mount-based versioning (separate FastAPI apps mounted under a prefix) — most isolation, recommended for major version differences.
from fastapi import FastAPI, APIRouter, Header, HTTPException
from typing import Annotated
# ── Approach 1: URL path versioning ───────────────────────────────────────
app = FastAPI()
v1_router = APIRouter(prefix="/v1")
v2_router = APIRouter(prefix="/v2")
@v1_router.get("/users/{uid}")
async def get_user_v1(uid: int):
return {"id": uid, "name": "Alice"}
@v2_router.get("/users/{uid}")
async def get_user_v2(uid: int):
return {"id": uid, "name": "Alice", "created_at": "2024-01-15", "role": "user"}
app.include_router(v1_router)
app.include_router(v2_router)
# ── Approach 2: Header versioning ─────────────────────────────────────────
@app.get("/users/{uid}")
async def get_user_versioned(
uid: int,
api_version: Annotated[str, Header(alias="API-Version")] = "1",
):
if api_version == "2":
return {"id": uid, "name": "Alice", "created_at": "2024-01-15"}
elif api_version == "1":
return {"id": uid, "name": "Alice"}
raise HTTPException(400, f"Unsupported API version: {api_version}")
# ── Approach 3: Mount separate apps (maximum isolation) ──────────────────
v1_app = FastAPI(title="API v1", docs_url="/docs")
v2_app = FastAPI(title="API v2", docs_url="/docs")
@v1_app.get("/users/{uid}")
async def v1_users(uid: int):
return {"id": uid, "format": "v1"}
@v2_app.get("/users/{uid}")
async def v2_users(uid: int):
return {"id": uid, "format": "v2", "extra": True}
main_app = FastAPI(docs_url=None)
main_app.mount("/v1", v1_app)
main_app.mount("/v2", v2_app)
# ── Version deprecation strategy ─────────────────────────────────────────
from starlette.middleware.base import BaseHTTPMiddleware
class DeprecationWarningMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
response = await call_next(request)
if request.url.path.startswith("/v1/"):
response.headers["Deprecation"] = "true"
response.headers["Sunset"] = "Sat, 31 Dec 2025 23:59:59 GMT"
response.headers["Link"] = '</v2/>; rel="successor-version"'
return response/v1/* and /v2/* independently. The standard choice for public APIs.Sunset date for removal.★ Rarely Known
The Deprecation and Sunset response headers are IETF RFC standards (RFC 8594, RFC 8607). Well-behaved API clients and developer tooling can read these and warn developers that they’re using an API version scheduled for removal.
Performance Profiling
ProductionBefore optimizing, measure. Two major FastAPI performance problems: N+1 queries (fetching 100 users then making a separate DB query per user — 101 total queries instead of 1 join) and synchronous code in async routes (blocking the event loop). Tools: pyinstrument (call-tree profiler, minimal overhead), ORJSONResponse (5–10x faster JSON serialization), and SQLAlchemy query logging.
from fastapi import FastAPI, Request, Response
from fastapi.responses import ORJSONResponse
import pyinstrument
# ── Use ORJSONResponse globally (5-10x faster JSON serialization) ─────────
app = FastAPI(default_response_class=ORJSONResponse)
# ── 1. Profiling middleware (enable in dev/staging only) ───────────────────
class ProfilingMiddleware:
def __init__(self, app, profile_path: str = "/_profile"):
self.app = app
self.profile_path = profile_path
async def __call__(self, scope, receive, send):
if scope.get("path") == self.profile_path or \
scope.get("headers", {}).get(b"x-profile") == b"true":
profiler = pyinstrument.Profiler()
profiler.start()
await self.app(scope, receive, send)
profiler.stop()
html = profiler.output_html()
response = Response(content=html, media_type="text/html")
await response(scope, receive, send)
else:
await self.app(scope, receive, send)
# app.add_middleware(ProfilingMiddleware) # enable for profiling sessions
# ── 2. N+1 query problem and fix ──────────────────────────────────────────
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from sqlalchemy.orm import selectinload
# ❌ N+1: 1 query for users + N queries for each user's items
async def get_users_bad(session: AsyncSession):
users = (await session.execute(select(User))).scalars().all()
for user in users:
items = user.items # triggers lazy load — 100 users = 101 queries!
# ✅ Eager loading: 1 query with JOIN (or 2 queries total)
async def get_users_good(session: AsyncSession):
users = (await session.execute(
select(User).options(
selectinload(User.items) # loads all users' items in ONE extra query
)
)).scalars().all()
return users
# ── 3. Enable SQL query logging (find slow queries) ───────────────────────
import logging
logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)
# ── 4. Route-level profiling ──────────────────────────────────────────────
@app.get("/profile-me")
async def profile_me():
profiler = pyinstrument.Profiler(async_mode="enabled")
with profiler:
result = await some_expensive_operation()
print(profiler.output_text(unicode=True, color=True))
return result
async def some_expensive_operation():
import asyncio; await asyncio.sleep(0.01); return {"done": True}selectinload() or joinedload() to fetch everything in 1–2 queries.SELECT ... WHERE id IN (...) query for all related items at once. Avoids N+1 without a cartesian product JOIN.orjson (Rust-based) instead of Python’s json. 5–10x faster serialization. Handles datetime, UUID, bytes natively without custom encoders.FastAPI(default_response_class=ORJSONResponse) — applies globally. All routes use ORJSON without having to specify it per route.💬 Interview Tip
“How would you investigate a slow FastAPI endpoint?” — (1) Check for N+1 queries by enabling SQLAlchemy logging. (2) Profile with pyinstrument to find CPU hotspots. (3) Check if async routes are calling blocking code. (4) Look at DB query explain plans. (5) Consider caching frequently-read data.
★ Key Info
N+1 is the #1 real-world FastAPI performance issue. It’s invisible until you look at query logs — the app “works” but makes 100 queries per page load instead of 1. Always check SQLAlchemy logs in development and add selectinload or joinedload wherever you access relationships in a loop.