Skip to content

Commit d0ac199

Browse files
committed
feat(qstash): add centralized client with DLQ and retry configuration
Add QStash infrastructure with standardized retry policy enforcement and dead letter queue for failed job recovery. - Create centralized QStash client with ADR-0048 retry configuration - Add DLQ with atomic LREM, payload sanitization, and alerting - Include error stack traces in DLQ entries for debugging - Add signing key fallback warning for operational clarity - Integrate with notify-collaborators job handler
1 parent 57a5fe0 commit d0ac199

File tree

5 files changed

+1000
-6
lines changed

5 files changed

+1000
-6
lines changed

src/app/api/jobs/notify-collaborators/route.ts

Lines changed: 61 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,50 @@ import {
1515
import { getServerEnvVar, getServerEnvVarWithFallback } from "@/lib/env/server";
1616
import { tryReserveKey } from "@/lib/idempotency/redis";
1717
import { sendCollaboratorNotifications } from "@/lib/notifications/collaborators";
18+
import { pushToDLQ } from "@/lib/qstash/dlq";
1819
import { withTelemetrySpan } from "@/lib/telemetry/span";
1920

2021
/**
2122
* Creates a QStash receiver for webhook signature verification.
2223
*
23-
* @return QStash receiver instance or null if not configured.
24+
* Logs a warning if QSTASH_NEXT_SIGNING_KEY is not set, as this
25+
* could cause issues during key rotation.
26+
*
27+
* @return QStash receiver instance.
2428
*/
2529
function getQstashReceiver(): Receiver {
2630
const current = getServerEnvVar("QSTASH_CURRENT_SIGNING_KEY") as string;
27-
const next = getServerEnvVarWithFallback(
28-
"QSTASH_NEXT_SIGNING_KEY",
29-
current
30-
) as string;
31-
return new Receiver({ currentSigningKey: current, nextSigningKey: next });
31+
const next = getServerEnvVarWithFallback("QSTASH_NEXT_SIGNING_KEY", "");
32+
33+
if (!next) {
34+
// Log warning about fallback to help operators during key rotation
35+
console.warn(
36+
"[QStash Worker] QSTASH_NEXT_SIGNING_KEY not configured. " +
37+
"Using current key for both. This is normal for regular operation " +
38+
"but may cause request failures during key rotation if not addressed. " +
39+
"See: https://upstash.com/docs/qstash/howto/signature-validation"
40+
);
41+
}
42+
43+
return new Receiver({
44+
currentSigningKey: current,
45+
nextSigningKey: next || current,
46+
});
47+
}
48+
49+
/** Max retries configured for QStash (per ADR-0048) */
50+
const MAX_RETRIES = 5;
51+
52+
/**
53+
* Extract retry attempt information from QStash headers.
54+
*
55+
* @param req - Incoming request
56+
* @return Object with current attempt and max retries
57+
*/
58+
function getRetryInfo(req: Request): { attempt: number; maxRetries: number } {
59+
const retried = Number(req.headers.get("Upstash-Retried")) || 0;
60+
const maxRetries = Number(req.headers.get("Upstash-Max-Retries")) || MAX_RETRIES;
61+
return { attempt: retried + 1, maxRetries };
3262
}
3363

3464
/**
@@ -42,6 +72,13 @@ export async function POST(req: Request) {
4272
"jobs.notify-collaborators",
4373
{ attributes: { route: "/api/jobs/notify-collaborators" } },
4474
async (span) => {
75+
const { attempt, maxRetries } = getRetryInfo(req);
76+
span.setAttribute("qstash.attempt", attempt);
77+
span.setAttribute("qstash.max_retries", maxRetries);
78+
79+
// Store parsed job data for DLQ on failure
80+
let jobPayload: unknown = null;
81+
4582
try {
4683
let receiver: Receiver;
4784
try {
@@ -82,6 +119,7 @@ export async function POST(req: Request) {
82119
}
83120

84121
const json = (await req.json()) as unknown;
122+
jobPayload = json; // Store for DLQ
85123
const validation = validateSchema(notifyJobSchema, json);
86124
if ("error" in validation) {
87125
return validation.error;
@@ -102,6 +140,23 @@ export async function POST(req: Request) {
102140
return NextResponse.json({ ok: true, ...result });
103141
} catch (error) {
104142
span.recordException(error as Error);
143+
144+
// Check if this is the final retry attempt
145+
const isFinalAttempt = attempt >= maxRetries;
146+
span.setAttribute("qstash.final_attempt", isFinalAttempt);
147+
148+
if (isFinalAttempt) {
149+
// Push to DLQ on final failure per ADR-0048
150+
const dlqEntryId = await pushToDLQ(
151+
"notify-collaborators",
152+
jobPayload,
153+
error,
154+
attempt
155+
);
156+
span.setAttribute("qstash.dlq", true);
157+
span.setAttribute("qstash.dlq_entry_id", dlqEntryId ?? "unavailable");
158+
}
159+
105160
return errorResponse({
106161
err: error,
107162
error: "internal",

0 commit comments

Comments
 (0)