Skip to content

Commit eb7a173

Browse files
committed
feat(genie): use client-provided requestId query param as SSE streamId
Replace conversation-derived stream IDs with a client-provided ?requestId=<uuid> query param, enabling reliable SSE reconnection. Falls back to crypto.randomUUID() when not provided. Signed-off-by: Jorge Calvar <jorge.calvar@databricks.com>
1 parent f4850e3 commit eb7a173

2 files changed

Lines changed: 13 additions & 11 deletions

File tree

packages/appkit/src/plugins/genie/genie.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { randomUUID } from "node:crypto";
12
import { Time, TimeUnits } from "@databricks/sdk-experimental";
23
import type {
34
GenieMessage,
@@ -134,6 +135,7 @@ export class GeniePlugin extends Plugin {
134135
);
135136

136137
const timeout = this.config.timeout ?? 120_000;
138+
const requestId = (req.query.requestId as string) || randomUUID();
137139

138140
const streamSettings: StreamExecutionSettings = {
139141
...genieStreamDefaults,
@@ -144,9 +146,7 @@ export class GeniePlugin extends Plugin {
144146
},
145147
stream: {
146148
...genieStreamDefaults.stream,
147-
streamId: conversationId
148-
? `genie:send:${spaceId}:${conversationId}`
149-
: `genie:send:${spaceId}:new-${Date.now()}`,
149+
streamId: requestId,
150150
},
151151
};
152152

@@ -294,6 +294,7 @@ export class GeniePlugin extends Plugin {
294294
}
295295

296296
const includeQueryResults = req.query.includeQueryResults !== "false";
297+
const requestId = (req.query.requestId as string) || randomUUID();
297298

298299
logger.debug(
299300
"Fetching conversation %s from space %s (alias=%s, includeQueryResults=%s)",
@@ -309,7 +310,7 @@ export class GeniePlugin extends Plugin {
309310
...genieStreamDefaults,
310311
stream: {
311312
...genieStreamDefaults.stream,
312-
streamId: `genie:conversation:${spaceId}:${conversationId}`,
313+
streamId: requestId,
313314
},
314315
};
315316

packages/appkit/src/plugins/genie/tests/genie.test.ts

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -994,7 +994,7 @@ describe("Genie Plugin", () => {
994994
executeStreamSpy.mockRestore();
995995
});
996996

997-
test("sendMessage with conversationId should use deterministic streamId", async () => {
997+
test("sendMessage should use requestId query param as streamId", async () => {
998998
const plugin = new GeniePlugin(config);
999999
const { router, getHandler } = createMockRouter();
10001000

@@ -1003,6 +1003,7 @@ describe("Genie Plugin", () => {
10031003
const handler = getHandler("POST", "/:alias/messages");
10041004
const mockReq = createMockRequest({
10051005
params: { alias: "myspace" },
1006+
query: { requestId: "req-uuid-123" },
10061007
body: {
10071008
content: "follow-up question",
10081009
conversationId: "conv-42",
@@ -1021,14 +1022,14 @@ describe("Genie Plugin", () => {
10211022
expect.any(Function),
10221023
expect.objectContaining({
10231024
stream: expect.objectContaining({
1024-
streamId: "genie:send:test-space-id:conv-42",
1025+
streamId: "req-uuid-123",
10251026
bufferSize: 100,
10261027
}),
10271028
}),
10281029
);
10291030
});
10301031

1031-
test("sendMessage without conversationId should use timestamped streamId", async () => {
1032+
test("sendMessage without requestId should generate a random streamId", async () => {
10321033
const plugin = new GeniePlugin(config);
10331034
const { router, getHandler } = createMockRouter();
10341035

@@ -1053,15 +1054,15 @@ describe("Genie Plugin", () => {
10531054
expect.objectContaining({
10541055
stream: expect.objectContaining({
10551056
streamId: expect.stringMatching(
1056-
/^genie:send:test-space-id:new-\d+$/,
1057+
/^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/,
10571058
),
10581059
bufferSize: 100,
10591060
}),
10601061
}),
10611062
);
10621063
});
10631064

1064-
test("getConversation should use deterministic streamId", async () => {
1065+
test("getConversation should use requestId query param as streamId", async () => {
10651066
const plugin = new GeniePlugin(config);
10661067
const { router, getHandler } = createMockRouter();
10671068

@@ -1073,7 +1074,7 @@ describe("Genie Plugin", () => {
10731074
);
10741075
const mockReq = createMockRequest({
10751076
params: { alias: "myspace", conversationId: "conv-99" },
1076-
query: {},
1077+
query: { requestId: "req-uuid-456" },
10771078
headers: {
10781079
"x-forwarded-access-token": "user-token",
10791080
"x-forwarded-user": "user-1",
@@ -1088,7 +1089,7 @@ describe("Genie Plugin", () => {
10881089
expect.any(Function),
10891090
expect.objectContaining({
10901091
stream: expect.objectContaining({
1091-
streamId: "genie:conversation:test-space-id:conv-99",
1092+
streamId: "req-uuid-456",
10921093
bufferSize: 100,
10931094
}),
10941095
}),

0 commit comments

Comments
 (0)