Skip to content

Commit f7e495d

Browse files
committed
onSuccess with different schema
onFailure and onCancel (new)
1 parent e1df692 commit f7e495d

11 files changed

Lines changed: 558 additions & 60 deletions

File tree

src/client/index.ts

Lines changed: 351 additions & 30 deletions
Large diffs are not rendered by default.

src/component/_generated/component.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,10 @@ export type ComponentApi<Name extends string | undefined = string | undefined> =
6969
fnHandle: string;
7070
fnName: string;
7171
fnType: "action" | "mutation" | "query";
72+
onCancel?: { context?: any; fnHandle: string };
7273
onComplete?: { context?: any; fnHandle: string };
74+
onFailure?: { context?: any; fnHandle: string };
75+
onSuccess?: { context?: any; fnHandle: string };
7376
retryBehavior?: {
7477
base: number;
7578
initialBackoffMs: number;
@@ -93,7 +96,10 @@ export type ComponentApi<Name extends string | undefined = string | undefined> =
9396
fnHandle: string;
9497
fnName: string;
9598
fnType: "action" | "mutation" | "query";
99+
onCancel?: { context?: any; fnHandle: string };
96100
onComplete?: { context?: any; fnHandle: string };
101+
onFailure?: { context?: any; fnHandle: string };
102+
onSuccess?: { context?: any; fnHandle: string };
97103
retryBehavior?: {
98104
base: number;
99105
initialBackoffMs: number;

src/component/complete.test.ts

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,82 @@ describe("complete", () => {
288288
});
289289
});
290290

291+
const handlerTypes = [
292+
{
293+
handler: "onSuccess" as const,
294+
runResult: { kind: "success", returnValue: "test result" },
295+
} as const,
296+
{
297+
handler: "onFailure" as const,
298+
runResult: {
299+
kind: "failed",
300+
error: "error result",
301+
},
302+
} as const,
303+
{
304+
handler: "onCancel" as const,
305+
runResult: {
306+
kind: "canceled",
307+
},
308+
} as const,
309+
];
310+
311+
it.each(handlerTypes)(
312+
"should call the correct onComplete handler",
313+
async ({ handler, runResult }) => {
314+
// Create a spy on runMutation
315+
const runMutationSpy = vi.fn();
316+
317+
const testHandle = `${runResult.kind}Handle`;
318+
319+
// Enqueue a work item with a onComplete handler
320+
const workId = await t.mutation(api.lib.enqueue, {
321+
fnHandle: "testHandle",
322+
fnName: "testFunction",
323+
fnArgs: { test: "data" },
324+
fnType: "mutation",
325+
runAt: Date.now(),
326+
config: {
327+
maxParallelism: 10,
328+
logLevel: "WARN",
329+
},
330+
[handler]: {
331+
fnHandle: testHandle,
332+
context: { someContext: "value" },
333+
},
334+
});
335+
336+
// Simulate a job completion with a spy on runMutation
337+
await t.run(async (ctx) => {
338+
// Create a modified context with a spy on runMutation
339+
const spyCtx = {
340+
...ctx,
341+
runMutation: runMutationSpy,
342+
};
343+
344+
await completeHandler(spyCtx, {
345+
jobs: [
346+
{
347+
workId,
348+
runResult,
349+
attempt: 0,
350+
},
351+
],
352+
});
353+
354+
// Verify the handler was called with the right arguments
355+
expect(runMutationSpy).toHaveBeenCalledWith(
356+
testHandle,
357+
expect.objectContaining({
358+
workId,
359+
context: { someContext: "value" },
360+
result: runResult,
361+
}),
362+
);
363+
});
364+
},
365+
);
366+
291367
it("should handle multiple jobs in a single call", async () => {
292368
// Enqueue multiple work items
293369
const workId1 = await t.mutation(api.lib.enqueue, {

src/component/complete.ts

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { FunctionHandle } from "convex/server";
22
import { getConvexSize, type Infer, v } from "convex/values";
3-
import type { Id } from "./_generated/dataModel.js";
3+
import type { Doc, Id } from "./_generated/dataModel.js";
44
import { internal } from "./_generated/api.js";
55
import { internalMutation, type MutationCtx } from "./_generated/server.js";
66
import { kickMainLoop } from "./kick.js";
@@ -116,18 +116,19 @@ export async function completeHandler(
116116
!!maxAttempts &&
117117
work.attempts < maxAttempts;
118118
if (!retry) {
119-
if (work.onComplete) {
119+
const onCompleteHandler = getOnCompleteHandler(work, job.runResult);
120+
if (onCompleteHandler) {
120121
try {
121122
// Retrieve large context if stored separately
122-
let context = work.onComplete.context;
123+
let context = onCompleteHandler.context;
123124
if (context === undefined && work.payloadId) {
124125
const payload = await ctx.db.get(work.payloadId);
125126
if (payload) {
126127
context = payload.context;
127128
}
128129
}
129130

130-
const handle = work.onComplete.fnHandle as FunctionHandle<
131+
const handle = onCompleteHandler.fnHandle as FunctionHandle<
131132
"mutation",
132133
OnCompleteArgs,
133134
void
@@ -179,6 +180,21 @@ export async function completeHandler(
179180
}
180181
}
181182

183+
function getOnCompleteHandler(work: Doc<"work">, jobResult: RunResult) {
184+
if (work.onComplete) {
185+
return work.onComplete;
186+
} else {
187+
switch (jobResult.kind) {
188+
case "success":
189+
return work.onSuccess;
190+
case "failed":
191+
return work.onFailure;
192+
case "canceled":
193+
return work.onCancel;
194+
}
195+
}
196+
}
197+
182198
function stripResult(result: RunResult): RunResult {
183199
if (result.kind === "success") {
184200
return { kind: "success", returnValue: null };

src/component/lib.ts

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { type ObjectType, v, getConvexSize } from "convex/values";
1+
import { type ObjectType, v, getConvexSize, type Value } from "convex/values";
22
import type { WithoutSystemFields } from "convex/server";
33
import { api } from "./_generated/api.js";
44
import { type Doc, type Id } from "./_generated/dataModel.js";
@@ -33,6 +33,13 @@ const INLINE_METADATA_THRESHOLD = 8_000; // 8KB threshold
3333
const MAX_DOC_SIZE = 1_000_000; // Some buffer for 1MiB actual limit
3434
const PAYLOAD_DOC_OVERHEAD = 78; // Size of { args: null, context: null }
3535

36+
const onCompleteHandlers = [
37+
"onComplete",
38+
"onSuccess",
39+
"onFailure",
40+
"onCancel",
41+
] as const;
42+
3643
const itemArgs = {
3744
fnHandle: v.string(),
3845
fnName: v.string(),
@@ -41,8 +48,12 @@ const itemArgs = {
4148
runAt: v.number(),
4249
// TODO: annotation?
4350
onComplete: v.optional(vOnCompleteFnContext),
51+
onSuccess: v.optional(vOnCompleteFnContext),
52+
onFailure: v.optional(vOnCompleteFnContext),
53+
onCancel: v.optional(vOnCompleteFnContext),
4454
retryBehavior: v.optional(retryBehavior),
4555
};
56+
type ItemArgs = ObjectType<typeof itemArgs>;
4657
const enqueueArgs = {
4758
...itemArgs,
4859
config: vConfig.partial(),
@@ -61,21 +72,31 @@ async function enqueueHandler(
6172
ctx: MutationCtx,
6273
console: Logger,
6374
kickSegment: bigint,
64-
{ runAt, ...workArgs }: ObjectType<typeof itemArgs>,
75+
{ runAt, ...workArgs }: ItemArgs,
6576
) {
6677
runAt = boundScheduledTime(runAt, console);
6778

79+
if (
80+
workArgs.onComplete &&
81+
(workArgs.onSuccess || workArgs.onFailure || workArgs.onCancel)
82+
) {
83+
throw new Error(
84+
`Function has an onComplete handler and a onSuccess, onFailure, or onCancel handler. onComplete handler cannot be defined when a onSuccess, onFailure, or onCancel handler exists`,
85+
);
86+
}
87+
6888
const fnArgsSize = getConvexSize(workArgs.fnArgs);
6989
if (fnArgsSize > MAX_DOC_SIZE) {
7090
throw new Error(
7191
`Function arguments for function ${workArgs.fnName} too large: ${fnArgsSize} bytes (max: ${MAX_DOC_SIZE} bytes)`,
7292
);
7393
}
7494

95+
// Consider all the context from onSuccess, onFailure, and onCancel all together
7596
let contextSize = 0;
76-
const context = workArgs.onComplete?.context;
77-
if (context !== undefined) {
78-
contextSize = getConvexSize(context);
97+
const context = getContext(workArgs);
98+
if (context.length > 0) {
99+
contextSize = context.map(getConvexSize).reduce((a, b) => a + b, 0);
79100
if (contextSize > MAX_DOC_SIZE) {
80101
throw new Error(
81102
`OnComplete context for function ${workArgs.fnName} too large: ${contextSize} bytes (max: ${MAX_DOC_SIZE} bytes)`,
@@ -88,6 +109,12 @@ async function enqueueHandler(
88109
attempts: 0,
89110
};
90111

112+
const deleteContext = () => {
113+
for (const handler of onCompleteHandlers) {
114+
delete workItem[handler]?.context;
115+
}
116+
};
117+
91118
if (fnArgsSize >= INLINE_METADATA_THRESHOLD) {
92119
// Args are large, store separately
93120
const payloadDoc: { args: Record<string, any>; context?: unknown } = {
@@ -99,14 +126,14 @@ async function enqueueHandler(
99126
// Context is also too big to inline
100127
payloadDoc.context = context;
101128
workItem.payloadSize += contextSize;
102-
delete workItem.onComplete!.context;
129+
deleteContext();
103130
}
104131
workItem.payloadId = await ctx.db.insert("payload", payloadDoc);
105132
} else if (fnArgsSize + contextSize >= INLINE_METADATA_THRESHOLD) {
106133
// Args are small enough, but combined with context it's too big.
107134
// Store just context in this case.
108135
workItem.payloadId = await ctx.db.insert("payload", { context });
109-
delete workItem.onComplete!.context;
136+
deleteContext();
110137
workItem.payloadSize = contextSize + PAYLOAD_DOC_OVERHEAD;
111138
}
112139

@@ -261,5 +288,11 @@ async function shouldCancelWorkItem(
261288
return true;
262289
}
263290

291+
function getContext(workArgs: Partial<ItemArgs>): Value[] {
292+
return onCompleteHandlers
293+
.map((handler) => workArgs[handler]?.context)
294+
.filter((context) => context !== undefined);
295+
}
296+
264297
// eslint-disable-next-line @typescript-eslint/no-unused-vars
265298
const console = "THIS IS A REMINDER TO USE createLogger";

src/component/loop.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ describe("loop", () => {
7171
fnArgs: {},
7272
logLevel: "WARN",
7373
attempt: 0,
74+
hasOnSuccess: false,
7475
});
7576
}
7677

src/component/loop.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,15 @@ async function beginWork(
563563
throw new Error("work not found");
564564
}
565565
const { attempts: attempt, fnHandle, fnArgs, payloadId } = work;
566-
const args = { workId, fnHandle, fnArgs, payloadId, logLevel, attempt };
566+
const args = {
567+
workId,
568+
fnHandle,
569+
fnArgs,
570+
payloadId,
571+
logLevel,
572+
attempt,
573+
hasOnSuccess: Boolean(work.onSuccess),
574+
};
567575
let scheduleId;
568576
if (work.fnType === "action") {
569577
scheduleId = await ctx.scheduler.runAfter(

src/component/recovery.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ describe("recovery", () => {
5757
fnArgs: {},
5858
logLevel: "WARN",
5959
attempt: 0,
60+
hasOnSuccess: false,
6061
});
6162
}
6263

src/component/schema.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ export default defineSchema({
6868
payloadSize: v.optional(v.number()),
6969
attempts: v.number(), // number of completed attempts
7070
onComplete: v.optional(vOnCompleteFnContext),
71+
// onComplete should be undefined when any of onSuccess, onFailure, or onCancel are defined. This is enforced at the client level, but not in the schema for backwards compatibility.
72+
onSuccess: v.optional(vOnCompleteFnContext),
73+
onFailure: v.optional(vOnCompleteFnContext),
74+
onCancel: v.optional(vOnCompleteFnContext),
7175
retryBehavior: v.optional(retryBehavior),
7276
canceled: v.optional(v.boolean()),
7377
}),

src/component/shared.ts

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -70,20 +70,25 @@ export const DEFAULT_RETRY_BEHAVIOR: RetryBehavior = {
7070
// This ensures that the type satisfies the schema.
7171
const _ = {} as RetryBehavior satisfies Infer<typeof retryBehavior>;
7272

73-
export const vResult = v.union(
74-
v.object({
75-
kind: v.literal("success"),
76-
returnValue: v.any(),
77-
}),
78-
v.object({
79-
kind: v.literal("failed"),
80-
error: v.string(),
81-
}),
82-
v.object({
83-
kind: v.literal("canceled"),
84-
}),
85-
);
73+
export const vSuccessResult = v.object({
74+
kind: v.literal("success"),
75+
returnValue: v.any(),
76+
});
77+
78+
export const vFailureResult = v.object({
79+
kind: v.literal("failed"),
80+
error: v.string(),
81+
});
82+
83+
export const vCancelResult = v.object({
84+
kind: v.literal("canceled"),
85+
});
86+
87+
export const vResult = v.union(vSuccessResult, vFailureResult, vCancelResult);
8688
export type RunResult = Infer<typeof vResult>;
89+
export type SuccessResult = Infer<typeof vSuccessResult>;
90+
export type FailureResult = Infer<typeof vFailureResult>;
91+
export type CancelResult = Infer<typeof vCancelResult>;
8792

8893
export const vOnCompleteFnContext = v.object({
8994
fnHandle: v.string(), // mutation

0 commit comments

Comments
 (0)