Fastapihooks is a high-performance, and pluggable webhook management system for FastAPI. It allows you to "webhook-enable" any router with a single decorator, offloading the heavy lifting to a scalable sidecar worker while keeping your API response times near zero.
- Full multi-page docs: docs/index.md
-
⚡ Zero-Impact Emission: Uses FastAPI BackgroundTasks to ensure webhook processing never slows down your API flow.
-
🔌 Backend by Design, Extensions by You: Fastapihooks ships with
BackgroundTaskBackendonly. Build Redis/Kafka/SQS backends by implementingBaseBackend. -
🛠️ Sidecar Worker: A dedicated async engine designed for high-concurrency delivery with built-in retries and HMAC signing.
-
🛡️ Secure by Default: Optional HMAC-SHA256 signing — when a
signing_secretis provided, every delivery includes a verifiableX-Fastapihooks-Signatureheader. -
📊 Observable: Telemetry support (Logfire, OpenTelemetry, Prometheus) planned for a future release.
-
📝 Type-Safe: Powered by Pydantic for flexible payload transformations.
- Install
pip install fastapihooks
- Configure
from contextlib import asynccontextmanager from fastapi import FastAPI, BackgroundTasks, Request from fastapihooks import Fastapihooks from fastapihooks.backends import BackgroundTaskBackend hooks = Fastapihooks(backend=BackgroundTaskBackend(signing_secret="your-secret")) @asynccontextmanager async def lifespan(app: FastAPI): async with hooks: # closes HTTP connections on shutdown yield app = FastAPI(lifespan=lifespan) @app.post("/orders") @hooks.hook("order.created") async def create_order(request: Request, background_tasks: BackgroundTasks): # Your business logic here return {"id": "ord_123", "status": "confirmed"}
- Run the Worker (The Sidecar)
# Not required for BackgroundTaskBackend; dispatch runs in FastAPI BackgroundTasks. # Use the sidecar only when you implement a queue backend with consume()/ack(). fastapihooks start --backend-module myapp.backends:custom_backend --store-module myapp.stores:store --signing-secret "your-secret"
Don't just dump your API response. Use the FastapihooksContext to shape exactly what your subscribers see.
def my_transformer(ctx: FastapihooksContext):
return {
"event": ctx.event_name,
"order_id": ctx.response_payload["id"],
"user_agent": ctx.headers.get("user-agent")
}
@app.post("/orders")
@hooks.hook("order.created", transform=my_transformer, include_headers=True)
async def create_order(...):
...
include_request=Truecaveat — When FastAPI parses a Pydantic model parameter (e.g.body: OrderIn), it consumes the request body before your handler runs. After that,await request.body()returns empty bytes andctx.request_payloadwill beb"". Useinclude_request=Trueonly when you inject the rawRequestand read the body yourself. If you only need request headers, preferinclude_headers=True— that is always safe.
| Component | Responsibility | Available Drivers |
|---|---|---|
| Backend | Transport Layer | BackgroundTaskBackend (Built-in), custom via BaseBackend |
| Store | Subscription Data | SQLStore (PostgreSQL, MySQL, SQLite via SQLAlchemy), MemoryStore (dev/testing), custom via BaseStore |
| Telemetry | Observability | Planned — Logfire, OpenTelemetry, Prometheus |
Use BaseStore to bring your own subscription storage — MongoDB, DynamoDB, Redis, or anything else.
from collections.abc import Iterable
from typing import Any, Literal
from fastapihooks.stores import BaseStore, StoredWebhookSubscription
class MyMongoStore(BaseStore):
async def add_subscription(self, event_name, target_url, auth_type="none", auth_value=None, metadata=None) -> str:
... # insert and return generated ID
async def remove_subscription(self, subscription_id: str) -> bool:
... # delete by ID, return True if found
async def get_subscriptions(self, event_name: str) -> Iterable[StoredWebhookSubscription]:
... # query by event_name
async def update_subscription(self, subscription_id, target_url=None, auth_type=None, auth_value=None, metadata=None) -> bool:
... # partial update, return True if foundUse BaseBackend to add your own transport backend while keeping the main library lightweight.
from typing import Any
from fastapihooks.backends import BaseBackend
from fastapihooks.stores.base_store import WebhookSubscription
class MyQueueBackend(BaseBackend):
async def publish(
self,
event_name: str,
payload: Any,
owner_id: str | None,
subscribers: list[WebhookSubscription] | None = None,
):
# enqueue event to your transport
...
async def consume(self):
# yield queued events for worker mode
...
async def ack(self, event_id: str):
# ack successful processing
...Fastapihooks is designed for horizontal scale. By using an asynchronous backend (eg: Redis Stream Backend), you can run multiple sidecar workers in a Consumer Group. This allows you to process millions of webhooks across a cluster of workers without duplicate deliveries.
Fastapihooks signs every payload. Your users can verify the authenticity of a webhook using the X-Fastapihooks-Signature header.
The header value is formatted as sha256=<hex-digest>, matching the convention used by GitHub, Stripe, and most webhook providers.
import hashlib
import hmac
def verify_signature(payload: bytes, secret: str, header: str) -> bool:
algorithm, _, received = header.partition("=")
expected = hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest()
return hmac.compare_digest(expected, received)