Skip to content

Commit d57ddeb

Browse files
committed
feat(webhooks): harden hooks and idempotency
1 parent 49a4dd9 commit d57ddeb

File tree

17 files changed

+589
-137
lines changed

17 files changed

+589
-137
lines changed

src/app/api/hooks/files/__tests__/route.test.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
/** @vitest-environment node */
22

33
import { beforeEach, describe, expect, it, vi } from "vitest";
4+
import type { ReserveKeyOptions } from "@/lib/idempotency/redis";
45
import type { WebhookPayload } from "@/lib/webhooks/payload";
56
import { createMockNextRequest, getMockCookiesForTest } from "@/test/helpers/route";
67

78
type ParseAndVerify = (req: Request) => Promise<ParseResult>;
89
type BuildEventKey = (payload: WebhookPayload) => string;
9-
type TryReserveKey = (key: string, ttlSeconds?: number) => Promise<boolean>;
10+
type TryReserveKey = (
11+
key: string,
12+
ttlSecondsOrOptions?: number | ReserveKeyOptions
13+
) => Promise<boolean>;
1014

1115
type ParseResult = { ok: boolean; payload?: WebhookPayload };
1216
type FilesRouteModule = typeof import("../route");
@@ -50,7 +54,8 @@ vi.mock("@/lib/idempotency/redis", () => ({
5054
this.name = "IdempotencyServiceUnavailableError";
5155
}
5256
},
53-
tryReserveKey: (key: string, ttl?: number) => tryReserveKeyMock(key, ttl),
57+
tryReserveKey: (key: string, ttlSecondsOrOptions?: number | ReserveKeyOptions) =>
58+
tryReserveKeyMock(key, ttlSecondsOrOptions),
5459
}));
5560

5661
vi.mock("@/lib/supabase/admin", () => ({
@@ -162,7 +167,10 @@ describe("POST /api/hooks/files", () => {
162167
expect(res.status).toBe(200);
163168
expect(json.duplicate).toBe(true);
164169
expect(json.ok).toBe(true);
165-
expect(tryReserveKeyMock).toHaveBeenCalledWith("file-event-key-1", 300);
170+
expect(tryReserveKeyMock).toHaveBeenCalledWith("file-event-key-1", {
171+
degradedMode: "fail_closed",
172+
ttlSeconds: 300,
173+
});
166174
});
167175

168176
it("processes INSERT with uploading status successfully", async () => {

src/app/api/hooks/trips/__tests__/route.test.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@ const { afterAllHook: upstashAfterAllHook, beforeEachHook: upstashBeforeEachHook
1010

1111
type ParseAndVerify = (req: Request) => Promise<ParseResult>;
1212
type BuildEventKey = (payload: WebhookPayload) => string;
13-
type TryReserveKey = (key: string, ttlSeconds?: number) => Promise<boolean>;
13+
type TryReserveKeyOptions = { degradedMode?: string; ttlSeconds?: number };
14+
type TryReserveKey = (
15+
key: string,
16+
ttlSecondsOrOptions?: number | TryReserveKeyOptions
17+
) => Promise<boolean>;
1418
type SendNotifications = (
1519
payload: WebhookPayload,
1620
eventKey: string
@@ -63,7 +67,8 @@ vi.mock("@/lib/idempotency/redis", () => ({
6367
this.name = "IdempotencyServiceUnavailableError";
6468
}
6569
},
66-
tryReserveKey: (key: string, ttl?: number) => tryReserveKeyMock(key, ttl),
70+
tryReserveKey: (key: string, ttlSecondsOrOptions?: number | TryReserveKeyOptions) =>
71+
tryReserveKeyMock(key, ttlSecondsOrOptions),
6772
}));
6873

6974
vi.mock("@/lib/notifications/collaborators", () => ({
@@ -238,7 +243,10 @@ describe("POST /api/hooks/trips", () => {
238243
const json = await res.json();
239244
expect(res.status).toBe(200);
240245
expect(json.duplicate).toBe(true);
241-
expect(tryReserveKeyMock).toHaveBeenCalledWith("event-key-1", 300);
246+
expect(tryReserveKeyMock).toHaveBeenCalledWith("event-key-1", {
247+
degradedMode: "fail_closed",
248+
ttlSeconds: 300,
249+
});
242250
});
243251

244252
it("enqueues to QStash when configured", async () => {

src/app/api/jobs/notify-collaborators/__tests__/route.test.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ const {
1010
beforeEachHook: upstashBeforeEachHook,
1111
mocks: upstashMocks,
1212
} = setupUpstashTestEnvironment();
13-
type TryReserveKey = (key: string, ttlSeconds?: number) => Promise<boolean>;
13+
type TryReserveKey = (key: string, ttlSecondsOrOptions?: unknown) => Promise<boolean>;
1414
type SendNotifications = (
1515
payload: NotifyJob["payload"],
1616
eventKey: string
@@ -50,7 +50,8 @@ vi.mock("@/lib/env/server", () => ({
5050
}));
5151

5252
vi.mock("@/lib/idempotency/redis", () => ({
53-
tryReserveKey: (key: string, ttl?: number) => tryReserveKeyMock(key, ttl),
53+
tryReserveKey: (key: string, ttlSecondsOrOptions?: unknown) =>
54+
tryReserveKeyMock(key, ttlSecondsOrOptions),
5455
}));
5556

5657
vi.mock("@/lib/notifications/collaborators", () => ({

src/app/api/jobs/notify-collaborators/route.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@ import "server-only";
77
import { notifyJobSchema } from "@schemas/webhooks";
88
import { NextResponse } from "next/server";
99
import { errorResponse, validateSchema } from "@/lib/api/route-helpers";
10-
import { getClientIpFromHeaders } from "@/lib/http/ip";
1110
import { tryReserveKey } from "@/lib/idempotency/redis";
1211
import { sendCollaboratorNotifications } from "@/lib/notifications/collaborators";
1312
import { pushToDLQ } from "@/lib/qstash/dlq";
1413
import { getQstashReceiver, verifyQstashRequest } from "@/lib/qstash/receiver";
14+
import { getTrustedRateLimitIdentifierFromHeaders } from "@/lib/ratelimit/identifier";
1515
import { withTelemetrySpan } from "@/lib/telemetry/span";
1616

1717
/** Max retries configured for QStash (per ADR-0048) */
@@ -70,12 +70,13 @@ export async function POST(req: Request) {
7070
const verified = await verifyQstashRequest(req, receiver);
7171
if (!verified.ok) {
7272
try {
73-
const ip = getClientIpFromHeaders(req.headers);
73+
const ipHash = getTrustedRateLimitIdentifierFromHeaders(req.headers);
74+
const pathname = new URL(req.url).pathname;
7475
span.addEvent("unauthorized_attempt", {
7576
hasSignature: verified.reason !== "missing_signature",
76-
ip: ip === "unknown" ? undefined : ip,
77+
ipHash: ipHash === "unknown" ? undefined : ipHash,
78+
path: pathname,
7779
reason: verified.reason,
78-
url: req.url,
7980
});
8081
} catch (spanError) {
8182
span.recordException(spanError as Error);
@@ -97,7 +98,10 @@ export async function POST(req: Request) {
9798
span.setAttribute("op", payload.type);
9899

99100
// De-duplicate at worker level as well to avoid double-send on retries
100-
const unique = await tryReserveKey(`notify:${eventKey}`, 300);
101+
const unique = await tryReserveKey(`notify:${eventKey}`, {
102+
degradedMode: "fail_closed",
103+
ttlSeconds: 300,
104+
});
101105
if (!unique) {
102106
span.setAttribute("event.duplicate", true);
103107
return NextResponse.json({ duplicate: true, ok: true });

src/lib/idempotency/__tests__/redis.test.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import { beforeEach, describe, expect, it, vi } from "vitest";
44

55
const warnRedisUnavailableMock = vi.fn();
6+
const emitOperationalAlertOncePerWindowMock = vi.fn();
67

78
// Mock redis client and factories
89
const existsMock = vi.fn(async (_key: string) => 0);
@@ -28,6 +29,11 @@ vi.mock("@/lib/telemetry/redis", () => ({
2829
warnRedisUnavailable: (...args: unknown[]) => warnRedisUnavailableMock(...args),
2930
}));
3031

32+
vi.mock("@/lib/telemetry/degraded-mode", () => ({
33+
emitOperationalAlertOncePerWindow: (...args: unknown[]) =>
34+
emitOperationalAlertOncePerWindowMock(...args),
35+
}));
36+
3137
describe("idempotency redis helpers", () => {
3238
beforeEach(() => {
3339
existsMock.mockReset();
@@ -36,6 +42,7 @@ describe("idempotency redis helpers", () => {
3642
redisClient = { del: delMock, exists: existsMock, set: setMock };
3743
getRedisMock.mockReset();
3844
warnRedisUnavailableMock.mockReset();
45+
emitOperationalAlertOncePerWindowMock.mockReset();
3946
});
4047

4148
describe("hasKey", () => {

src/lib/idempotency/redis.ts

Lines changed: 105 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import "server-only";
1212
import { z } from "zod";
1313

1414
import { getRedis } from "@/lib/redis";
15+
import { emitOperationalAlertOncePerWindow } from "@/lib/telemetry/degraded-mode";
1516
import { warnRedisUnavailable } from "@/lib/telemetry/redis";
1617

1718
const REDIS_FEATURE = "idempotency.keys";
@@ -61,6 +62,28 @@ export interface ReserveKeyOptions {
6162
* Can also be set globally via IDEMPOTENCY_FAIL_OPEN env var (read at module load).
6263
*/
6364
failOpen?: boolean;
65+
66+
/**
67+
* Alias for fail-open behavior with explicit naming.
68+
* When set, overrides failOpen.
69+
*/
70+
degradedMode?: "fail_closed" | "fail_open";
71+
}
72+
73+
function resolveDegradedMode(options: {
74+
degradedMode?: "fail_closed" | "fail_open";
75+
failOpen?: boolean;
76+
}): "fail_closed" | "fail_open" {
77+
if (options.degradedMode) return options.degradedMode;
78+
if (options.failOpen === true) return "fail_open";
79+
if (options.failOpen === false) return "fail_closed";
80+
return DEFAULT_FAIL_OPEN ? "fail_open" : "fail_closed";
81+
}
82+
83+
function getIdempotencyNamespace(key: string): string {
84+
const idx = key.indexOf(":");
85+
const namespace = idx === -1 ? key : key.slice(0, idx);
86+
return namespace.trim().slice(0, 64) || "unknown";
6487
}
6588

6689
/**
@@ -91,7 +114,8 @@ export async function tryReserveKey(
91114
: ttlSecondsOrOptions;
92115

93116
const ttlSeconds = options.ttlSeconds ?? 300;
94-
const failOpen = options.failOpen ?? DEFAULT_FAIL_OPEN;
117+
const degradedMode = resolveDegradedMode(options);
118+
const failOpen = degradedMode === "fail_open";
95119

96120
const redis = getRedis();
97121
if (!redis) {
@@ -101,13 +125,40 @@ export async function tryReserveKey(
101125
throw new IdempotencyServiceUnavailableError();
102126
}
103127

128+
emitOperationalAlertOncePerWindow({
129+
attributes: {
130+
degradedMode: "fail_open",
131+
namespace: getIdempotencyNamespace(key),
132+
reason: "redis_unavailable",
133+
},
134+
event: "idempotency.degraded",
135+
windowMs: 60_000,
136+
});
137+
104138
// Fail open: allow processing (may cause duplicates during Redis outage)
105139
return true;
106140
}
107141

108142
const namespaced = `idemp:${key}`;
109-
const result = await redis.set(namespaced, "1", { ex: ttlSeconds, nx: true });
110-
return result === "OK";
143+
try {
144+
const result = await redis.set(namespaced, "1", { ex: ttlSeconds, nx: true });
145+
return result === "OK";
146+
} catch (_error) {
147+
warnRedisUnavailable(REDIS_FEATURE);
148+
if (!failOpen) {
149+
throw new IdempotencyServiceUnavailableError();
150+
}
151+
emitOperationalAlertOncePerWindow({
152+
attributes: {
153+
degradedMode: "fail_open",
154+
namespace: getIdempotencyNamespace(key),
155+
reason: "redis_error",
156+
},
157+
event: "idempotency.degraded",
158+
windowMs: 60_000,
159+
});
160+
return true;
161+
}
111162
}
112163

113164
/**
@@ -121,7 +172,8 @@ export async function hasKey(
121172
key: string,
122173
options?: { failOpen?: boolean }
123174
): Promise<boolean> {
124-
const failOpen = options?.failOpen ?? DEFAULT_FAIL_OPEN;
175+
const degradedMode = resolveDegradedMode({ failOpen: options?.failOpen });
176+
const failOpen = degradedMode === "fail_open";
125177

126178
const redis = getRedis();
127179
if (!redis) {
@@ -132,8 +184,34 @@ export async function hasKey(
132184
}
133185

134186
const namespaced = `idemp:${key}`;
135-
const result = await redis.exists(namespaced);
136-
return result > 0;
187+
try {
188+
const result = await redis.exists(namespaced);
189+
return result > 0;
190+
} catch (_error) {
191+
warnRedisUnavailable(REDIS_FEATURE);
192+
if (!failOpen) {
193+
emitOperationalAlertOncePerWindow({
194+
attributes: {
195+
degradedMode: "fail_closed",
196+
namespace: getIdempotencyNamespace(key),
197+
reason: "redis_error",
198+
},
199+
event: "idempotency.degraded",
200+
windowMs: 60_000,
201+
});
202+
return true;
203+
}
204+
emitOperationalAlertOncePerWindow({
205+
attributes: {
206+
degradedMode: "fail_open",
207+
namespace: getIdempotencyNamespace(key),
208+
reason: "redis_error",
209+
},
210+
event: "idempotency.degraded",
211+
windowMs: 60_000,
212+
});
213+
return false;
214+
}
137215
}
138216

139217
/**
@@ -146,7 +224,8 @@ export async function releaseKey(
146224
key: string,
147225
options?: { failOpen?: boolean }
148226
): Promise<boolean> {
149-
const failOpen = options?.failOpen ?? DEFAULT_FAIL_OPEN;
227+
const degradedMode = resolveDegradedMode({ failOpen: options?.failOpen });
228+
const failOpen = degradedMode === "fail_open";
150229

151230
const redis = getRedis();
152231
if (!redis) {
@@ -158,6 +237,23 @@ export async function releaseKey(
158237
}
159238

160239
const namespaced = `idemp:${key}`;
161-
const result = await redis.del(namespaced);
162-
return result > 0;
240+
try {
241+
const result = await redis.del(namespaced);
242+
return result > 0;
243+
} catch (_error) {
244+
warnRedisUnavailable(REDIS_FEATURE);
245+
if (!failOpen) {
246+
throw new IdempotencyServiceUnavailableError();
247+
}
248+
emitOperationalAlertOncePerWindow({
249+
attributes: {
250+
degradedMode: "fail_open",
251+
namespace: getIdempotencyNamespace(key),
252+
reason: "redis_error",
253+
},
254+
event: "idempotency.degraded",
255+
windowMs: 60_000,
256+
});
257+
return false;
258+
}
163259
}

0 commit comments

Comments
 (0)