Async vs Sync Route Handlers
AdvancedFastAPI runs on a single-threaded async event loop (via Uvicorn). When you declare a route with async def, it runs directly on that loop — any blocking call inside it (a slow database query, time.sleep(), file I/O via the normal open()) will freeze all other requests until it finishes. Regular def routes are automatically offloaded to a thread pool so the event loop stays free, but they cannot use await. The rule: use async def when all I/O is async-native; use plain def when working with synchronous libraries (pandas, psycopg2, etc.).
import asyncio, time
from fastapi import FastAPI
from concurrent.futures import ThreadPoolExecutor
app = FastAPI()
# ── ASYNC route ─────────────────────────────────────────────────────────
# Runs on the event loop. Safe ONLY with async-native I/O (httpx, asyncpg,
# aiofiles). Never call time.sleep() or requests.get() here.
@app.get("/async-ok")
async def async_ok():
await asyncio.sleep(1) # yields control → other requests can run
return {"source": "async"}
# ── SYNC route ───────────────────────────────────────────────────────────
# FastAPI automatically runs this in a ThreadPoolExecutor thread, so the
# event loop is NOT blocked. Use this with sync libraries.
@app.get("/sync-ok")
def sync_ok():
time.sleep(1) # blocking — but we're in a thread, so fine
return {"source": "sync"}
# ── WRONG: blocking call inside async ────────────────────────────────────
# This looks async but blocks the ENTIRE server for 1 second.
@app.get("/async-bad")
async def async_bad():
time.sleep(1) # ← BLOCKS the event loop!
return {"broken": True}
# ── CORRECT FIX: run_in_executor ─────────────────────────────────────────
# Offload a sync-only call to a thread while still using async def.
@app.get("/async-fixed")
async def async_fixed():
loop = asyncio.get_event_loop()
# run_in_executor returns a coroutine that resolves when the thread finishes
result = await loop.run_in_executor(None, time.sleep, 1)
return {"fixed": True}
# ── Custom thread pool for CPU-heavy work ────────────────────────────────
_cpu_pool = ThreadPoolExecutor(max_workers=4)
@app.get("/cpu-task")
async def cpu_task():
loop = asyncio.get_event_loop()
# Pin CPU work to a dedicated pool so it doesn't steal I/O threads
await loop.run_in_executor(_cpu_pool, expensive_computation)
return {"done": True}
def expensive_computation():
# Simulate heavy CPU work (e.g., image resize, ML inference)
total = sum(i * i for i in range(10_000_000))
return totalRuns on the event loop. All I/O must be awaited — blocking calls freeze every other request.
FastAPI wraps it in anyio.to_thread.run_sync() automatically. Blocking is safe; await is not available.
Returns the running loop. Prefer asyncio.get_running_loop() in Python 3.10+ (raises if no loop, safer).
None uses the default thread pool. Pass a custom Executor to isolate CPU-bound work.
Cap workers to available CPU cores for CPU tasks. For I/O tasks, higher counts are acceptable.
💬 Interview Tip
“When would you use def vs async def?” — Answer: async def only when every I/O call inside it is awaitable. Otherwise use def and let FastAPI handle threading. The common mistake is using async def with a synchronous ORM like SQLAlchemy’s sync engine.
⚠ Gotcha
FastAPI’s thread pool defaults to 40 workers. If you have 100 concurrent def routes and each blocks for 10s, the 41st request queues. Tune with anyio.from_thread.start_blocking_portal() or switch to async I/O.
Lifespan Events
AdvancedApplications need to do work on startup (open a DB connection pool, warm a cache, load an ML model) and on shutdown (flush queues, close connections). FastAPI’s modern approach uses a single @asynccontextmanager lifespan function — everything before the yield runs on startup, everything after runs on shutdown. This replaces the older @app.on_event("startup") decorator, which still works but is deprecated. The lifespan object is passed directly to FastAPI(lifespan=…).
from contextlib import asynccontextmanager
from fastapi import FastAPI
import httpx, asyncpg
# A dict attached to app.state — accessible from any request handler
# Think of it as the app's shared "context bag"
db_pool: asyncpg.Pool | None = None
http_client: httpx.AsyncClient | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
# ── STARTUP ──────────────────────────────────────────────────────────
# Code here runs ONCE before the server begins accepting requests.
global db_pool, http_client
db_pool = await asyncpg.create_pool(
dsn="postgresql://user:pass@localhost/mydb",
min_size=5,
max_size=20,
)
http_client = httpx.AsyncClient(timeout=10.0)
# Store on app.state so route handlers can access via request.app.state
app.state.db_pool = db_pool
app.state.http_client = http_client
print("✅ Startup complete")
yield # ← server is running and accepting requests
# ── SHUTDOWN ─────────────────────────────────────────────────────────
# Code here runs ONCE after the last request is served.
await db_pool.close()
await http_client.aclose()
print("🛑 Shutdown complete")
# Pass the lifespan manager to FastAPI
app = FastAPI(lifespan=lifespan)
@app.get("/users/{user_id}")
async def get_user(user_id: int, request: Request):
# Access the shared pool created during startup
pool = request.app.state.db_pool
row = await pool.fetchrow("SELECT * FROM users WHERE id=$1", user_id)
return dict(row) if row else {"error": "not found"}
# ── OLD STYLE (deprecated, still works) ──────────────────────────────────
# @app.on_event("startup")
# async def startup():
# app.state.db = await connect_db()
#
# @app.on_event("shutdown")
# async def shutdown():
# await app.state.db.close()Converts a generator function into a context manager. The yield splits it into enter (startup) and exit (shutdown) phases.
The dividing line — everything before is startup, everything after is shutdown. The server is live between enter and exit.
A starlette.datastructures.State object — you can attach anything to it. Access it via request.app.state in handlers.
Registers the lifespan context manager. Replaces both on_event("startup") and on_event("shutdown").
Creates a connection pool once at startup. Re-using pool connections per request is far faster than opening a new connection each time.
💬 Interview Tip
Interviewers often ask where to initialize expensive shared resources. Answer: in the lifespan function, stored on app.state. This ensures the resource is created once, reused across requests, and properly closed on shutdown — no global variable initialization order issues.
★ Important
If an exception occurs in the startup phase (before yield), the server will fail to start and the shutdown code will NOT run. Guard startup with try/except and clean up partial initialization manually.
Advanced Dependency Patterns
AdvancedBeyond simple Depends(get_db), FastAPI’s DI system supports three powerful patterns. Caching: by default, a dependency called multiple times within the same request is only executed once (same instance returned). Factory pattern: a dependency that itself accepts parameters, enabling runtime-configurable behavior. Overrides: app.dependency_overrides lets you swap any dependency for tests or feature flags without touching route code.
from fastapi import FastAPI, Depends, Query
from functools import lru_cache
from typing import Annotated
app = FastAPI()
# ── 1. DEPENDENCY CACHING (use_cache=True by default) ────────────────────
# This function is only called ONCE per request, even if two routes depend
# on it in the same call chain.
class DBConnection:
def __init__(self):
print("Opening DB connection") # prints once per request, not twice
def query(self, sql: str):
return f"result of: {sql}"
def get_db() -> DBConnection:
return DBConnection()
@app.get("/a")
async def route_a(
db: Annotated[DBConnection, Depends(get_db)],
# sub_dep also calls Depends(get_db) — same instance returned
):
return {"data": db.query("SELECT 1")}
# To opt OUT of caching (get a fresh instance each time):
# Depends(get_db, use_cache=False)
# ── 2. FACTORY PATTERN (parameterized dependencies) ───────────────────────
# A function that returns a dependency function — lets you configure
# behavior at route definition time, not just at request time.
def require_role(role: str):
"""Factory: creates a dependency that checks for a specific role."""
def check_role(token: str = Query(...)):
# In real code: decode JWT and verify role claim
if token != f"valid-{role}-token":
from fastapi import HTTPException, status
raise HTTPException(status.HTTP_403_FORBIDDEN, f"Requires {role} role")
return token
return check_role # ← returns the actual dependency callable
@app.get("/admin")
async def admin_route(
_: Annotated[str, Depends(require_role("admin"))]
):
return {"access": "admin panel"}
@app.get("/editor")
async def editor_route(
_: Annotated[str, Depends(require_role("editor"))]
):
return {"access": "editor panel"}
# ── 3. DEPENDENCY OVERRIDES (testing / feature flags) ────────────────────
# Replace any dependency at runtime — most commonly used in tests.
def get_real_settings():
return {"db_url": "postgresql://prod/db"}
@app.get("/settings")
async def show_settings(
cfg: Annotated[dict, Depends(get_real_settings)]
):
return cfg
# In tests:
# def get_fake_settings():
# return {"db_url": "sqlite:///:memory:"}
#
# app.dependency_overrides[get_real_settings] = get_fake_settings
# # ... run tests ...
# app.dependency_overrides.clear() # always clean up after tests
# ── 4. CLASS-BASED DEPENDENCIES ──────────────────────────────────────────
# Use a class with __call__ when the dependency needs to hold state
# (e.g., a query parser that normalizes common parameters).
class PaginationParams:
def __init__(
self,
skip: int = Query(0, ge=0, description="Records to skip"),
limit: int = Query(20, ge=1, le=100, description="Max records"),
):
self.skip = skip
self.limit = limit
@app.get("/items")
async def list_items(
page: Annotated[PaginationParams, Depends(PaginationParams)]
):
return {"skip": page.skip, "limit": page.limit}Default behavior: same dependency instance is reused within a single request. Set False to force a fresh call each time (e.g., for randomness).
A function that returns a dependency function. Used when you need to pass arguments at route-definition time (like role names, permission strings).
A dict on the app object: {original_dep: replacement_dep}. FastAPI swaps them at call time. Key pattern for test isolation.
FastAPI calls __init__ to inject Query/Path params, then passes the fully constructed object to the route. Clean way to bundle related params.
The modern way to declare dependencies — keeps the function signature clean and the type hint accurate for IDEs and type checkers.
💬 Interview Tip
The factory pattern (a function returning a dependency) is a common interview follow-up. Example: “How would you implement role-based access without duplicating code?” — Answer: a require_role("admin") factory that returns a reusable Depends-able callable.
⚠ Gotcha
After setting dependency_overrides in tests, always call app.dependency_overrides.clear() in teardown. If you forget, overrides from one test bleed into the next.
Custom Middleware & Request Context
AdvancedMiddleware sits between the ASGI server and your route handlers, intercepting every request and response. FastAPI offers two approaches: the simpler BaseHTTPMiddleware (subclass and override dispatch), or pure ASGI middleware for zero overhead. A related pattern is request-scoped context via contextvars.ContextVar — a Python standard library feature that stores a value per-coroutine (like thread-locals but for async code), enabling you to set a request ID in middleware and read it anywhere downstream without threading it through function arguments.
import uuid, time
from contextvars import ContextVar
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response
from starlette.types import ASGIApp, Receive, Scope, Send
app = FastAPI()
# ── ContextVar: per-request storage ──────────────────────────────────────
# ContextVar is isolated per async task — safe to set in middleware and
# read anywhere in the same request, even in nested async calls.
request_id_var: ContextVar[str] = ContextVar("request_id", default="unknown")
# ── Approach 1: BaseHTTPMiddleware (simple, slight overhead) ──────────────
class RequestIDMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
# Generate a unique ID for this request
req_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
# Store in ContextVar — visible to all code in this request's coroutine
token = request_id_var.set(req_id)
# Attach to request.state too, for route handlers that prefer it
request.state.request_id = req_id
start = time.perf_counter()
try:
response: Response = await call_next(request)
finally:
# Always reset ContextVar to avoid leaking into other requests
request_id_var.reset(token)
elapsed = (time.perf_counter() - start) * 1000
response.headers["X-Request-ID"] = req_id
response.headers["X-Response-Time"] = f"{elapsed:.1f}ms"
return response
app.add_middleware(RequestIDMiddleware)
# ── Approach 2: Pure ASGI middleware (no overhead, more complex) ──────────
class PureASGIMiddleware:
"""Wraps the ASGI app directly — no Starlette abstractions."""
def __init__(self, app: ASGIApp):
self.app = app
async def __call__(self, scope: Scope, receive: Receive, send: Send):
if scope["type"] == "http":
# scope is a plain dict — extract headers manually
headers = dict(scope.get("headers", []))
req_id = headers.get(b"x-request-id", b"").decode() or str(uuid.uuid4())
scope["state"] = scope.get("state", {})
scope["state"]["request_id"] = req_id
await self.app(scope, receive, send)
# app.add_middleware(PureASGIMiddleware) # uncomment to use
# ── Route that reads from ContextVar ─────────────────────────────────────
@app.get("/trace")
async def trace_example():
# No need to pass request_id through parameters — ContextVar handles it
current_id = request_id_var.get()
return {"request_id": current_id, "message": "traceable request"}
# ── Middleware execution order note ──────────────────────────────────────
# Middleware is applied LIFO (last-added, first-executed).
# app.add_middleware(A)
# app.add_middleware(B) ← B executes first on the way in, last on the way outStores a value scoped to the current async task (coroutine chain). Like thread-locals but for async code — each request gets its own isolated value.
Returns a Token object. Pass this to .reset(token) to restore the previous value — important for cleanup in async contexts.
Starlette’s convenience class. Override dispatch(request, call_next). Slightly slower than pure ASGI due to extra wrapping overhead.
Passes control to the next middleware or the route handler. The response it returns can be modified before returning to the client.
In pure ASGI, check this before processing — values are "http", "websocket", or "lifespan". Handle each appropriately.
Last middleware added is first to run on request and last on response. Think of it as wrapping layers — innermost layer is added first.
💬 Interview Tip
If asked “how do you propagate a request ID through async code without passing it everywhere?” — the answer is ContextVar. Set it in middleware, read it in logging, DB calls, or anywhere downstream. No parameter threading needed.
⚠ Gotcha
BaseHTTPMiddleware has a known issue with streaming responses in some Starlette versions — the response body may be buffered entirely. For streaming endpoints (video, large files), use pure ASGI middleware instead.
Streaming Responses
AdvancedNormal FastAPI responses serialize the entire body to a string/bytes, store it in memory, then send it. For large files, database exports, or AI-generated text, this wastes memory and makes the client wait for everything before it sees anything. StreamingResponse takes an async (or sync) generator — FastAPI sends each yielded chunk immediately as it’s produced. The client starts receiving data right away, and your server never holds the full payload in memory at once.
import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import aiofiles
app = FastAPI()
# ── 1. Stream a large file from disk ─────────────────────────────────────
@app.get("/download/{filename}")
async def download_file(filename: str):
async def file_generator():
# aiofiles reads asynchronously — doesn't block the event loop
async with aiofiles.open(f"./files/{filename}", "rb") as f:
while chunk := await f.read(64 * 1024): # 64KB chunks
yield chunk # send each chunk immediately to the client
return StreamingResponse(
file_generator(),
media_type="application/octet-stream",
headers={"Content-Disposition": f"attachment; filename={filename}"},
)
# ── 2. Stream a database query result row by row ─────────────────────────
@app.get("/export/users")
async def export_users():
async def csv_generator():
yield "id,name,email\n" # header row
# In real code: use an async cursor that fetches rows lazily
fake_rows = [
(1, "Alice", "alice@example.com"),
(2, "Bob", "bob@example.com"),
]
for row in fake_rows:
yield f"{row[0]},{row[1]},{row[2]}\n"
await asyncio.sleep(0) # yield control to event loop between rows
return StreamingResponse(csv_generator(), media_type="text/csv")
# ── 3. Stream AI / LLM token output ──────────────────────────────────────
@app.get("/ai/stream")
async def stream_ai():
async def token_generator():
words = ["The", " quick", " brown", " fox", " jumps"]
for word in words:
yield word # send each token as it arrives
await asyncio.sleep(0.1) # simulate LLM latency per token
# text/plain streaming; use text/event-stream for SSE format (see topic 30)
return StreamingResponse(token_generator(), media_type="text/plain")
# ── 4. Sync generator (FastAPI handles it in a thread) ───────────────────
@app.get("/sync-stream")
def sync_stream():
def generate():
for i in range(5):
yield f"line {i}\n"
time.sleep(0.5) # blocking is OK here — FastAPI threads it
return StreamingResponse(generate(), media_type="text/plain")Accepts a generator (sync or async). Each yield sends a chunk. No full-body buffering in memory — constant memory regardless of file size.
A function with async def + yield. Can await between yields, keeping the event loop free while waiting for the next chunk.
Async file I/O library. Wraps Python’s open() to read in a thread pool, making it non-blocking from the event loop’s perspective.
while chunk := await f.read(n) — assign and test in one step. Loop ends when read() returns empty bytes (end of file).
Tells the browser how to handle the stream. application/octet-stream triggers download; text/event-stream enables SSE; text/plain for raw text.
★ Important
If an exception occurs mid-stream, the HTTP response header has already been sent (with status 200). The client won’t see an error status code — it’ll just see the stream end unexpectedly. Add error handling inside the generator and signal errors via your data format.
◆ Rarely Known
You can set Content-Length in the response headers if you know the file size upfront. This lets browsers show accurate download progress bars. With a generator you normally can’t know size in advance, so browsers show indeterminate progress.
Server-Sent Events (SSE)
AdvancedSSE is a simple protocol on top of HTTP where the server pushes data to the browser over a long-lived HTTP connection. The browser’s EventSource API handles reconnection automatically. Unlike WebSockets, SSE is one-directional (server → client only) and works over plain HTTP/1.1 with no protocol upgrade. It’s ideal for live dashboards, notification feeds, and AI token streaming. The format is just newline-delimited text: data: {json}\n\n.
import asyncio, json
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
app = FastAPI()
# ── SSE event format ──────────────────────────────────────────────────────
# Each SSE message follows this text format:
# event: <optional event name>
# id: <optional event id>
# data: <your data>
# (blank line ends the message)
#
# The browser's EventSource reads these and fires addEventListener handlers.
def make_sse(data: dict, event: str | None = None, id: str | None = None) -> str:
"""Format a Python dict as an SSE-compliant text chunk."""
lines = []
if event:
lines.append(f"event: {event}")
if id:
lines.append(f"id: {id}")
lines.append(f"data: {json.dumps(data)}")
lines.append("") # blank line = end of message
return "\n".join(lines) + "\n"
# ── SSE endpoint ──────────────────────────────────────────────────────────
@app.get("/events/live")
async def live_events(request: Request):
async def event_stream():
counter = 0
while True:
# Check if client disconnected — prevents zombie generators
if await request.is_disconnected():
break
counter += 1
# Send a named event every second
yield make_sse(
data={"counter": counter, "status": "ok"},
event="update",
id=str(counter),
)
await asyncio.sleep(1)
# Inform clients the stream is intentionally ending
yield make_sse({"done": True}, event="close")
return StreamingResponse(
event_stream(),
media_type="text/event-stream", # ← triggers EventSource in browser
headers={
"Cache-Control": "no-cache", # prevent proxies from caching
"X-Accel-Buffering": "no", # disable nginx buffering
"Connection": "keep-alive",
},
)
# ── SSE for AI token streaming ─────────────────────────────────────────
@app.get("/ai/tokens")
async def stream_tokens():
async def token_stream():
tokens = ["Hello", ", ", "world", "! ", "How", " are", " you?"]
for token in tokens:
yield make_sse({"token": token, "done": False})
await asyncio.sleep(0.05)
yield make_sse({"token": "", "done": True})
return StreamingResponse(token_stream(), media_type="text/event-stream",
headers={"Cache-Control": "no-cache"})
# ── Client-side JavaScript (for reference) ────────────────────────────────
# const source = new EventSource("/events/live");
# source.addEventListener("update", (e) => {
# console.log(JSON.parse(e.data));
# });
# source.addEventListener("close", () => source.close());The MIME type that activates the browser’s EventSource protocol. Without it, the browser treats the response as a regular stream download.
The minimal SSE format is data: ...\n\n (double newline ends a message). Multi-line data is allowed: multiple data: lines are concatenated by the browser.
Optional named event. Client uses addEventListener("name", handler). Default unnamed events fire the onmessage handler.
Last event ID. Browser sends Last-Event-ID header on reconnect — lets server resume from where it left off after network interruption.
Async check that returns True when the client closes the connection. Essential to break infinite generators and avoid resource leaks.
Nginx header that disables its response buffering. Without it, nginx queues chunks and the client sees delayed bursts instead of real-time events.
💬 Interview Tip
“SSE vs WebSocket?” — SSE is HTTP-based (works through proxies, firewalls, CDNs), one-directional, auto-reconnecting, and simpler to implement. WebSocket is bidirectional, requires protocol upgrade, and is better for interactive real-time apps (chat, games). SSE is the right choice for dashboards, notifications, and AI streaming.
⚠ Gotcha
HTTP/1.1 allows only 6 connections per origin in browsers. If you open many SSE connections to the same server, browsers queue them. Use HTTP/2 to eliminate this limit — it multiplexes streams over one connection.
OpenAPI Customization
AdvancedFastAPI auto-generates an OpenAPI 3.1 schema from your routes and Pydantic models. You can customize nearly every aspect: the schema document itself (title, version, contact info), per-route documentation (summaries, tags, examples), the Swagger UI and ReDoc appearance, and even generate a completely custom schema. This is important for external APIs where documentation quality matters, and for enterprise environments requiring specific API contract formats.
from fastapi import FastAPI
from fastapi.openapi.utils import get_openapi
from fastapi.openapi.docs import get_swagger_ui_html, get_redoc_html
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field
# ── 1. Rich metadata on app creation ─────────────────────────────────────
app = FastAPI(
title="My API",
version="2.1.0",
description="""
## My API
A well-documented API example.
### Features
- Full CRUD for users
- JWT authentication
""",
contact={"name": "Support Team", "email": "support@example.com"},
license_info={"name": "MIT", "url": "https://opensource.org/licenses/MIT"},
# Disable auto-generated docs so we can customize them
docs_url=None,
redoc_url=None,
)
# ── 2. Rich model documentation ───────────────────────────────────────────
class UserCreate(BaseModel):
name: str = Field(..., description="Full name", example="Alice Smith")
email: str = Field(..., description="Email address", example="alice@example.com")
age: int = Field(..., ge=0, le=150, description="Age in years", example=30)
model_config = {
"json_schema_extra": {
"examples": [
{"name": "Alice Smith", "email": "alice@example.com", "age": 30}
]
}
}
# ── 3. Per-route metadata ─────────────────────────────────────────────────
@app.post(
"/users",
summary="Create a new user", # short title in Swagger
description="Creates a user and sends a welcome email.", # longer text
response_description="The created user with ID",
tags=["Users"], # groups routes in Swagger UI
operation_id="create_user", # custom operationId for SDK generation
responses={
409: {"description": "User with this email already exists"},
422: {"description": "Validation error"},
},
)
async def create_user(user: UserCreate):
return {"id": 1, **user.model_dump()}
# ── 4. Custom Swagger UI with self-hosted assets ──────────────────────────
# Serve swagger-ui assets locally (needed in air-gapped environments)
# app.mount("/static", StaticFiles(directory="static"), name="static")
@app.get("/docs", include_in_schema=False) # exclude from the schema itself
async def custom_swagger_ui():
return get_swagger_ui_html(
openapi_url="/openapi.json",
title="My API — Docs",
swagger_js_url="https://cdn.jsdelivr.net/npm/swagger-ui-dist@5/swagger-ui-bundle.js",
swagger_css_url="https://cdn.jsdelivr.net/npm/swagger-ui-dist@5/swagger-ui.css",
swagger_favicon_url="/static/favicon.ico",
oauth2_redirect_url="/docs/oauth2-redirect",
swagger_ui_parameters={"persistAuthorization": True}, # remember auth token
)
# ── 5. Fully custom OpenAPI schema ────────────────────────────────────────
def custom_openapi():
if app.openapi_schema:
return app.openapi_schema # cached after first call
schema = get_openapi(
title=app.title,
version=app.version,
description=app.description,
routes=app.routes,
)
# Inject a global security scheme
schema["components"]["securitySchemes"] = {
"BearerAuth": {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT",
}
}
# Apply it to every operation by default
for path in schema["paths"].values():
for operation in path.values():
operation.setdefault("security", [{"BearerAuth": []}])
app.openapi_schema = schema
return schema
app.openapi = custom_openapi # replace the default schema generatorDisables the auto-generated /docs endpoint so you can mount a custom one. Same for redoc_url=None.
Hides a route from the OpenAPI schema entirely (useful for internal endpoints, health checks, or custom docs routes).
Custom ID for the route. SDK generators (openapi-generator, orval) use this as the method name. Stable IDs prevent breaking SDK clients on route rename.
Replaces the schema generation function entirely. Called lazily on first /openapi.json request. Cache the result on app.openapi_schema for performance.
Pydantic v2 way to add raw JSON Schema fields (like examples) that Pydantic doesn’t generate automatically.
◆ Rarely Known
You can tag individual routes with tags=["Users"] and also define tag metadata (description, external docs link) at the app level: FastAPI(openapi_tags=[{"name": "Users", "description": "User management endpoints"}]). This enriches Swagger UI with grouped descriptions.
Security Scopes & RBAC
AdvancedOAuth2 scopes let you express what a token is allowed to do, not just who it belongs to. FastAPI’s Security() function (a superset of Depends()) and SecurityScopes let you declare required scopes per route. The dependency receives the requested scopes at runtime and can verify them against what the token actually grants. This enables fine-grained Role-Based Access Control (RBAC) where different endpoints require different permission levels.
from fastapi import FastAPI, Depends, Security, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, SecurityScopes
from pydantic import BaseModel
from typing import Annotated
app = FastAPI()
# OAuth2 with explicit scopes listed — these appear in Swagger UI
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="token",
scopes={
"users:read": "Read user information",
"users:write": "Create and update users",
"admin": "Full administrative access",
}
)
class TokenData(BaseModel):
username: str
scopes: list[str] = []
# ── Core scope-checking dependency ────────────────────────────────────────
# SecurityScopes is injected by FastAPI — contains the scopes declared
# by the route that invoked this dependency (via Security()).
async def get_current_user(
security_scopes: SecurityScopes, # FastAPI injects this automatically
token: Annotated[str, Depends(oauth2_scheme)],
) -> TokenData:
# Build the WWW-Authenticate header describing what scopes are needed
auth_value = f'Bearer scope="{security_scopes.scope_str}"'
# In production: decode JWT here
# For demo, simulate a token with limited scopes
fake_token_data = TokenData(
username="alice",
scopes=["users:read"], # this user only has read access
)
# Check each required scope against what the token actually grants
for required_scope in security_scopes.scopes:
if required_scope not in fake_token_data.scopes:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Insufficient permissions. Required: {required_scope}",
headers={"WWW-Authenticate": auth_value},
)
return fake_token_data
# ── Type alias for convenience ────────────────────────────────────────────
CurrentUser = Annotated[TokenData, Depends(get_current_user)]
# ── Routes with different scope requirements ──────────────────────────────
@app.get("/users/me")
async def read_my_profile(
# Security() is like Depends() but also passes scopes to the dependency
current_user: Annotated[TokenData, Security(get_current_user, scopes=["users:read"])]
):
return {"user": current_user.username, "scopes": current_user.scopes}
@app.post("/users")
async def create_user(
current_user: Annotated[TokenData, Security(get_current_user, scopes=["users:write"])]
):
return {"created_by": current_user.username}
@app.delete("/users/{uid}")
async def delete_user(
uid: int,
# Requires BOTH write access AND admin — user must have all listed scopes
current_user: Annotated[TokenData, Security(get_current_user, scopes=["users:write", "admin"])]
):
return {"deleted": uid, "by": current_user.username}Like Depends() but additionally injects SecurityScopes into the dependency with the listed scopes. Used for per-route permission declarations.
Injected by FastAPI into a dependency. Contains .scopes (list) and .scope_str (space-separated string) of what the calling route required.
Used in the WWW-Authenticate header to tell the client exactly what permissions it needs to retry with. Important for OAuth2 compliance.
Declares available scopes in the schema — Swagger UI shows them in the “Authorize” dialog so developers can request specific permissions.
Return 401 Unauthorized when no credentials are provided. Return 403 Forbidden when credentials are valid but lack the required permission.
💬 Interview Tip
“How do you implement RBAC in FastAPI?” — Answer: OAuth2 scopes via Security() + SecurityScopes for fine-grained control, or simpler role-checking in a dependency by reading the role claim from the JWT. Scopes are better for resource-level permissions; roles are better for user-level categories.
⚠ Gotcha
Scopes in the JWT must match exactly — “admin” ≠ “Admin”. Normalize scope strings to lowercase everywhere, and document the canonical scope names. Case mismatches are a common production bug.
Rate Limiting
AdvancedRate limiting prevents abuse by capping how many requests a client can make in a given time window. The most common FastAPI approach uses slowapi, a port of Flask-Limiter that integrates with Starlette. For production, rate limit state should live in Redis so it’s shared across multiple server instances — an in-memory counter resets on each pod restart and can’t coordinate across horizontal replicas.
from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware
# ── Setup ─────────────────────────────────────────────────────────────────
# get_remote_address extracts the client IP from the request.
# For Redis-backed limiting: Limiter(key_func=…, storage_uri="redis://localhost")
limiter = Limiter(
key_func=get_remote_address, # key = client IP
default_limits=["200/day", "50/hour"], # global defaults
)
app = FastAPI()
app.state.limiter = limiter # slowapi reads limiter from app.state
# Register the 429 handler — SlowAPI raises RateLimitExceeded on violation
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
app.add_middleware(SlowAPIMiddleware)
# ── Per-route limits ──────────────────────────────────────────────────────
@app.get("/public")
@limiter.limit("10/minute") # 10 req/min per IP for this route
async def public_endpoint(request: Request): # request MUST be in the signature
return {"data": "public info"}
@app.get("/expensive")
@limiter.limit("2/minute") # very strict — expensive computation
async def expensive_endpoint(request: Request):
return {"result": "heavy computation done"}
# ── Per-user limiting (authenticated routes) ──────────────────────────────
def get_user_id(request: Request) -> str:
"""Extract user ID from JWT for per-user rate limiting."""
auth = request.headers.get("Authorization", "")
if auth.startswith("Bearer "):
# In production: decode JWT and return user ID
return auth.split(" ")[1][:16] # first 16 chars of token as demo key
return get_remote_address(request) # fallback to IP
user_limiter = Limiter(key_func=get_user_id)
@app.get("/user-scoped")
@user_limiter.limit("100/hour")
async def user_scoped(request: Request):
return {"limited": "per user id"}
# ── Manual rate limit check (without decorator) ───────────────────────────
from slowapi.wrappers import Limit
@app.get("/dynamic")
async def dynamic_limit(request: Request, is_premium: bool = False):
limit_str = "1000/hour" if is_premium else "10/hour"
# Programmatic limit check
limiter._check_request_limit(request, endpoint_func=dynamic_limit,
in_middleware=False)
return {"premium": is_premium}Determines what to count requests against. Common: client IP, user ID from JWT, API key. Each unique key gets its own independent counter.
Limit string format: count/period. Periods: second, minute, hour, day. Multiple limits: ["10/min", "100/hour"].
Exception raised by slowapi when a limit is hit. The registered handler returns HTTP 429 with a Retry-After header.
Required — slowapi needs access to the Request object to extract the key and check limits. Forgetting this causes a runtime error.
Stores counters in Redis. Required for multi-instance deployments — otherwise each server pod tracks limits independently.
💬 Interview Tip
Common question: “How do you rate limit across multiple servers?” — Redis-backed distributed counters. Each server atomically increments a Redis key with a TTL. Redis INCR + EXPIRE is atomic and fast. Mention that in-memory limiters won’t work in Kubernetes with multiple pods.
★ Important
Always put @limiter.limit() after @app.get() (closer to the function). Python decorators apply bottom-up — the limiter decorator must wrap the actual function, not FastAPI’s route wrapper.
Caching Strategies
AdvancedCaching stores the result of an expensive operation so subsequent calls return it instantly. In FastAPI apps there are three levels: in-process cache (Python dicts, cachetools.TTLCache — fast, but lost on restart and not shared across instances), Redis cache (shared across all instances, survives restarts, supports TTL), and HTTP cache headers (Cache-Control, ETag — lets CDNs and browsers cache responses, reducing server load entirely). Choose based on whether data changes, how stale is acceptable, and whether you run multiple instances.
import asyncio, hashlib, json
from functools import lru_cache
from fastapi import FastAPI, Request, Response
from cachetools import TTLCache
from typing import Any
app = FastAPI()
# ── 1. In-process TTL cache ───────────────────────────────────────────────
# TTLCache: LRU with automatic expiry. maxsize=1000 items, ttl=300 seconds.
_cache: TTLCache = TTLCache(maxsize=1000, ttl=300)
_cache_lock = asyncio.Lock() # protect concurrent writes to the cache
async def get_expensive_data(key: str) -> dict:
"""Fetch data with in-process cache. Lost on restart."""
async with _cache_lock:
if key in _cache:
return _cache[key]
# Cache miss — fetch from DB / external service
result = {"key": key, "data": "expensive result"}
await asyncio.sleep(0.5) # simulate slow DB
async with _cache_lock:
_cache[key] = result # store for next 300 seconds
return result
@app.get("/data/{key}")
async def get_data(key: str):
return await get_expensive_data(key)
# ── 2. Redis cache ─────────────────────────────────────────────────────────
# Requires: pip install redis[asyncio]
# import redis.asyncio as aioredis
#
# redis_client = aioredis.from_url("redis://localhost")
#
# async def get_or_cache(key: str, ttl: int = 300) -> dict:
# cached = await redis_client.get(key)
# if cached:
# return json.loads(cached) # cache hit
# result = await fetch_from_db(key) # cache miss
# await redis_client.set(key, json.dumps(result), ex=ttl)
# return result
# ── 3. HTTP Cache-Control headers ─────────────────────────────────────────
# These instruct browsers and CDNs to cache the response.
@app.get("/static-config")
async def static_config(response: Response):
# Cache for 1 hour publicly (CDN + browser)
response.headers["Cache-Control"] = "public, max-age=3600"
return {"config": "rarely changes"}
# ── 4. ETag-based conditional caching ────────────────────────────────────
# ETag = fingerprint of the response. Browser sends it back; server
# returns 304 Not Modified if data hasn't changed — no body transferred.
@app.get("/users/{uid}")
async def get_user_etag(uid: int, request: Request, response: Response):
user_data = {"id": uid, "name": "Alice", "version": 5}
# Generate ETag from the content
content_bytes = json.dumps(user_data, sort_keys=True).encode()
etag = f'"{hashlib.md5(content_bytes).hexdigest()}"'
# Check if client already has this version
if request.headers.get("If-None-Match") == etag:
return Response(status_code=304) # Not Modified — no body needed
response.headers["ETag"] = etag
response.headers["Cache-Control"] = "private, max-age=60"
return user_data
# ── 5. Cache invalidation ─────────────────────────────────────────────────
async def invalidate_user_cache(uid: int):
"""Remove a user from cache when their data changes."""
key = f"user:{uid}"
async with _cache_lock:
_cache.pop(key, None) # remove if present, no error if missingCachetools LRU cache with time-based expiry. Items auto-expire after ttl seconds. When maxsize is reached, the least-recently-used item is evicted.
Prevents cache stampede — without a lock, 100 concurrent requests for the same missing key would all simultaneously call the slow DB.
CDNs (Cloudflare, CloudFront) can cache this response. Use private for user-specific data — only the browser should cache it.
A content fingerprint. Browser caches the response + ETag. On next request, sends If-None-Match: "etag". Server returns 304 if unchanged — saves bandwidth.
The hard part of caching. When data changes, must proactively remove or update cached entries. Event-driven or write-through strategies help keep cache consistent.
⚠ Gotcha
Cache stampede (thundering herd): when a popular cache key expires, all concurrent requests miss and hit the DB simultaneously. Solutions: background cache refresh before expiry, probabilistic early expiration, or a “single-flight” lock pattern (only one request fetches; others wait for it).
◆ Rarely Known
stale-while-revalidate in Cache-Control: Cache-Control: max-age=60, stale-while-revalidate=30 — CDN serves stale content for 30s while it fetches a fresh copy in the background. Eliminates cache expiry latency spikes.
Pagination Patterns
AdvancedReturning millions of database rows in one response isn’t feasible. Pagination splits results into pages. Offset/limit pagination is simple (OFFSET 100 LIMIT 20) but becomes slow on large datasets (the database must scan and discard the first 100 rows). Cursor-based pagination uses a pointer to the last seen item (WHERE id > cursor) — it’s O(1) regardless of page depth and is safe for real-time feeds where rows can be inserted between pages. Most APIs offer offset for convenience, cursor for performance.
from fastapi import FastAPI, Query
from pydantic import BaseModel
from typing import Generic, TypeVar, Annotated
import base64, json
app = FastAPI()
T = TypeVar("T")
# ── Generic Page response model ───────────────────────────────────────────
# Generic[T] lets this work for any data type: Page[User], Page[Product], etc.
class Page(BaseModel, Generic[T]):
items: list[T]
total: int
skip: int
limit: int
has_next: bool
has_prev: bool
class User(BaseModel):
id: int
name: str
# Simulate a database
FAKE_USERS = [User(id=i, name=f"User {i}") for i in range(1, 101)]
# ── 1. Offset / Limit pagination ──────────────────────────────────────────
@app.get("/users", response_model=Page[User])
async def list_users(
skip: Annotated[int, Query(ge=0, description="Records to skip")] = 0,
limit: Annotated[int, Query(ge=1, le=100, description="Max items")] = 20,
) -> Page[User]:
total = len(FAKE_USERS)
items = FAKE_USERS[skip : skip + limit]
return Page(
items=items,
total=total,
skip=skip,
limit=limit,
has_next=(skip + limit) < total,
has_prev=skip > 0,
)
# ── 2. Cursor-based pagination ─────────────────────────────────────────────
class CursorPage(BaseModel, Generic[T]):
items: list[T]
next_cursor: str | None # None means no more pages
prev_cursor: str | None
def encode_cursor(data: dict) -> str:
"""Encode cursor as URL-safe base64 JSON."""
return base64.urlsafe_b64encode(json.dumps(data).encode()).decode()
def decode_cursor(cursor: str) -> dict:
"""Decode cursor back to dict."""
return json.loads(base64.urlsafe_b64decode(cursor).encode()
if False else base64.urlsafe_b64decode(cursor))
@app.get("/users/cursor", response_model=CursorPage[User])
async def list_users_cursor(
cursor: Annotated[str | None, Query(description="Pagination cursor")] = None,
limit: Annotated[int, Query(ge=1, le=100)] = 20,
) -> CursorPage[User]:
# Decode cursor to get the last seen ID
after_id = 0
if cursor:
cursor_data = decode_cursor(cursor)
after_id = cursor_data.get("id", 0)
# Filter users after the cursor ID — O(1) with a DB index on ID
remaining = [u for u in FAKE_USERS if u.id > after_id]
items = remaining[:limit]
# Build next cursor from the last item's ID
next_cursor = None
if len(remaining) > limit:
next_cursor = encode_cursor({"id": items[-1].id})
prev_cursor = None
if after_id > 0:
prev_cursor = encode_cursor({"id": max(after_id - limit, 0)})
return CursorPage(items=items, next_cursor=next_cursor, prev_cursor=prev_cursor)
# ── 3. Page number style (convenience wrapper) ────────────────────────────
@app.get("/products")
async def list_products(
page: Annotated[int, Query(ge=1, description="Page number (1-based)")] = 1,
per_page: Annotated[int, Query(ge=1, le=100)] = 20,
):
skip = (page - 1) * per_page # convert 1-based page to 0-based offset
items = FAKE_USERS[skip : skip + per_page]
return {
"page": page,
"per_page": per_page,
"total_pages": -(-len(FAKE_USERS) // per_page), # ceiling division
"items": items,
}class Page(BaseModel, Generic[T]) — lets one model work for any item type. FastAPI generates correct OpenAPI schemas for Page[User], Page[Product], etc.
Simple: OFFSET skip LIMIT limit. Problem: slow on large tables (database scans rows 1–skip just to discard them). Fine for small-to-medium datasets.
WHERE id > cursor_id LIMIT n. Uses an index seek — same speed whether page 1 or page 1000. Correct behavior when new rows are inserted between requests.
Base64-encode the cursor so clients treat it as a black box, not a manipulable integer. This lets you change the internal cursor format without breaking clients.
A Python trick: -(-100 // 3) = 34, the correct ceiling. Equivalent to math.ceil(n/d) but avoids a float conversion.
💬 Interview Tip
“What’s the problem with offset pagination at page 5000?” — The DB performs OFFSET 100000, which forces a full scan of 100,000 rows just to discard them, even with indexes. Cursor-based pagination (keyset pagination) avoids this with an indexed WHERE clause. Always mention this trade-off when designing paginated APIs.
⚠ Gotcha
With offset pagination and a live feed, if a new item is inserted while a user pages through results, they may see a duplicate on the next page (the item shifts others forward). Cursor pagination is immune to this because it’s anchored to a stable ID.