From 730ad39cf8914a4497d570c67790cdf856711bd5 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sun, 26 Apr 2026 05:58:34 +0000 Subject: [PATCH 1/7] Harden home connector observability Co-authored-by: Kent C. Dodds --- packages/home-connector/Dockerfile | 2 +- packages/home-connector/index.ts | 1 + packages/home-connector/server/index.ts | 67 +++++ .../home-connector/src/sentry.node.test.ts | 35 ++- packages/home-connector/src/sentry.ts | 8 + .../src/transport/worker-connector.ts | 256 +++++++++++++----- packages/worker/src/home/session.node.test.ts | 131 +++++++++ packages/worker/src/home/session.ts | 60 +++- 8 files changed, 488 insertions(+), 72 deletions(-) create mode 100644 packages/worker/src/home/session.node.test.ts diff --git a/packages/home-connector/Dockerfile b/packages/home-connector/Dockerfile index be3cb9c26..bc57f5083 100644 --- a/packages/home-connector/Dockerfile +++ b/packages/home-connector/Dockerfile @@ -24,4 +24,4 @@ ENV SENTRY_TRACES_SAMPLE_RATE=1.0 EXPOSE 4040 -CMD ["node", "index.ts"] +CMD ["node", "--import", "./src/sentry-init.ts", "index.ts"] diff --git a/packages/home-connector/index.ts b/packages/home-connector/index.ts index 439564bde..4c404d9ed 100644 --- a/packages/home-connector/index.ts +++ b/packages/home-connector/index.ts @@ -1,4 +1,5 @@ import 'dotenv/config' +import './src/sentry-init.ts' if (process.env.MOCKS === 'true') { await import('./mocks/index.ts') diff --git a/packages/home-connector/server/index.ts b/packages/home-connector/server/index.ts index d5ab70162..984e784dc 100644 --- a/packages/home-connector/server/index.ts +++ b/packages/home-connector/server/index.ts @@ -2,11 +2,73 @@ import http from 'node:http' import { createRequestListener } from '@remix-run/node-fetch-server' import { createHomeConnectorRouter } from '../app/router.ts' import { + closeHomeConnectorSentry, captureHomeConnectorException, flushHomeConnectorSentry, } from '../src/sentry.ts' import { startHomeConnectorApp } from '../src/index.ts' +const signalExitCodeByName = { + SIGINT: 130, + SIGTERM: 143, +} as const + +function installGracefulShutdownHandlers(input: { + server: http.Server + connector: Awaited> +}) { + let isShuttingDown = false + + async function shutdown(reason: string, closeSentry: boolean) { + if (isShuttingDown) { + return + } + isShuttingDown = true + console.info(`Shutting down home connector reason=${reason}`) + input.connector.workerConnector.stop() + await new Promise((resolve) => { + input.server.close(() => resolve()) + }) + if (closeSentry) { + await closeHomeConnectorSentry() + return + } + await flushHomeConnectorSentry() + } + + for (const signal of ['SIGINT', 'SIGTERM'] as const) { + process.once(signal, () => { + void shutdown(`signal:${signal}`, true).finally(() => { + process.exit(signalExitCodeByName[signal]) + }) + }) + } + + process.once('uncaughtException', (error) => { + captureHomeConnectorException(error, { + tags: { + area: 'process', + process_event: 'uncaughtException', + }, + }) + void shutdown('uncaughtException', true).finally(() => { + process.exit(1) + }) + }) + + process.once('unhandledRejection', (reason) => { + captureHomeConnectorException(reason, { + tags: { + area: 'process', + process_event: 'unhandledRejection', + }, + }) + void shutdown('unhandledRejection', true).finally(() => { + process.exit(1) + }) + }) +} + async function main() { const connector = await startHomeConnectorApp() const router = createHomeConnectorRouter( @@ -51,6 +113,11 @@ async function main() { `home-connector listening on http://localhost:${connector.config.port}`, ) }) + + installGracefulShutdownHandlers({ + server, + connector, + }) } try { diff --git a/packages/home-connector/src/sentry.node.test.ts b/packages/home-connector/src/sentry.node.test.ts index 7ab2b2238..ee0cc18fb 100644 --- a/packages/home-connector/src/sentry.node.test.ts +++ b/packages/home-connector/src/sentry.node.test.ts @@ -1,8 +1,22 @@ -import { expect, test } from 'vitest' -import { +import { expect, test, vi } from 'vitest' + +const sentryMock = vi.hoisted(() => ({ + close: vi.fn(), + flush: vi.fn(), + init: vi.fn(), + isEnabled: vi.fn(() => false), + setContext: vi.fn(), + setTag: vi.fn(), +})) + +vi.mock('@sentry/node', () => sentryMock) + +const { buildHomeConnectorSentryOptions, + closeHomeConnectorSentry, + flushHomeConnectorSentry, initializeHomeConnectorSentry, -} from './sentry.ts' +} = await import('./sentry.ts') function createTemporaryEnv(values: Record) { const previousValues = Object.fromEntries( @@ -73,6 +87,7 @@ test('buildHomeConnectorSentryOptions falls back to defaults for invalid sample }) test('initializeHomeConnectorSentry skips initialization without a DSN', () => { + sentryMock.isEnabled.mockReturnValue(false) using _env = createTemporaryEnv({ SENTRY_DSN: undefined, SENTRY_ENVIRONMENT: undefined, @@ -82,3 +97,17 @@ test('initializeHomeConnectorSentry skips initialization without a DSN', () => { expect(() => initializeHomeConnectorSentry()).not.toThrow() }) + +test('flushHomeConnectorSentry returns true when Sentry is disabled', async () => { + sentryMock.isEnabled.mockReturnValue(false) + sentryMock.flush.mockReset() + await expect(flushHomeConnectorSentry()).resolves.toBe(true) + expect(sentryMock.flush).not.toHaveBeenCalled() +}) + +test('closeHomeConnectorSentry returns true when Sentry is disabled', async () => { + sentryMock.isEnabled.mockReturnValue(false) + sentryMock.close.mockReset() + await expect(closeHomeConnectorSentry()).resolves.toBe(true) + expect(sentryMock.close).not.toHaveBeenCalled() +}) diff --git a/packages/home-connector/src/sentry.ts b/packages/home-connector/src/sentry.ts index d5ea00929..7f4c0f78a 100644 --- a/packages/home-connector/src/sentry.ts +++ b/packages/home-connector/src/sentry.ts @@ -119,3 +119,11 @@ export async function flushHomeConnectorSentry(timeout = 2_000) { return Sentry.flush(timeout) } + +export async function closeHomeConnectorSentry(timeout = 2_000) { + if (!Sentry.isEnabled()) { + return true + } + + return Sentry.close(timeout) +} diff --git a/packages/home-connector/src/transport/worker-connector.ts b/packages/home-connector/src/transport/worker-connector.ts index e8e7d2a1d..709503923 100644 --- a/packages/home-connector/src/transport/worker-connector.ts +++ b/packages/home-connector/src/transport/worker-connector.ts @@ -72,6 +72,19 @@ function getReconnectDelayMs(consecutiveReconnects: number) { ) } +function createSocketEventContext(input: { + config: HomeConnectorConfig + connectionAttempt: number + consecutiveReconnects: number +}) { + return { + attempt: input.connectionAttempt, + consecutiveReconnects: input.consecutiveReconnects, + connectorId: input.config.homeConnectorId, + url: input.config.workerWebSocketUrl, + } +} + async function handleJsonRpcRequest( message: JSONRPCRequest, toolRegistry: HomeConnectorToolRegistry, @@ -215,6 +228,23 @@ export function createWorkerConnector(input: { console.info( `Scheduling home connector websocket reconnect in ${reconnectDelayMs}ms consecutiveReconnects=${consecutiveReconnects}`, ) + captureHomeConnectorMessage( + 'Scheduling home connector websocket reconnect.', + { + level: 'info', + tags: { + connector_event: 'websocket.reconnect_scheduled', + }, + extra: { + ...createSocketEventContext({ + config: input.config, + connectionAttempt, + consecutiveReconnects, + }), + reconnectDelayMs, + }, + }, + ) reconnectTimer = setTimeout(() => { reconnectTimer = null connect() @@ -237,96 +267,198 @@ export function createWorkerConnector(input: { console.info( `Opening home connector websocket attempt=${connectionAttempt} url=${input.config.workerWebSocketUrl}`, ) + captureHomeConnectorMessage('Opening home connector websocket.', { + level: 'info', + tags: { + connector_event: 'websocket.connecting', + }, + extra: createSocketEventContext({ + config: input.config, + connectionAttempt, + consecutiveReconnects, + }), + }) socket = new WebSocket(input.config.workerWebSocketUrl) socket.addEventListener('open', () => { console.info( `Home connector websocket opened attempt=${connectionAttempt} connectorId=${input.config.homeConnectorId}`, ) + captureHomeConnectorMessage('Home connector websocket opened.', { + level: 'info', + tags: { + connector_event: 'websocket.open', + }, + extra: createSocketEventContext({ + config: input.config, + connectionAttempt, + consecutiveReconnects, + }), + }) const hello: HomeConnectorHelloMessage = { type: 'connector.hello', connectorKind: 'home', connectorId: input.config.homeConnectorId, sharedSecret: input.config.sharedSecret!, } + captureHomeConnectorMessage('Sending home connector websocket hello.', { + level: 'info', + tags: { + connector_event: 'websocket.hello_sent', + }, + extra: createSocketEventContext({ + config: input.config, + connectionAttempt, + consecutiveReconnects, + }), + }) socket?.send(stringifyHomeConnectorMessage(hello)) }) socket.addEventListener('message', async (event) => { - const value = JSON.parse(String(event.data)) as - | HomeConnectorClientMessage - | HomeConnectorJsonRpcEnvelope - switch (value.type) { - case 'server.ping': - hasReportedSocketIssue = false - updateConnectionState(input.state, { - lastSyncAt: new Date().toISOString(), - lastError: null, - }) - return - case 'server.error': - updateConnectionState(input.state, { - connected: false, - lastError: value.message, - }) - captureHomeConnectorMessage(value.message, { - level: 'error', - tags: { - connector_event: 'server.error', - }, - }) - console.error(`Home connector error: ${value.message}`) - return - case 'server.ack': - hasReportedSocketIssue = false - consecutiveReconnects = 0 - updateConnectionState(input.state, { - connected: true, - lastSyncAt: new Date().toISOString(), - lastError: null, - }) - console.info( - `Home connector websocket acknowledged connectorId=${value.connectorId}`, - ) - if (isAckMessage(value) && socket?.readyState === WebSocket.OPEN) { - socket.send( - stringifyHomeConnectorMessage({ - type: 'connector.jsonrpc', - message: createToolsChangedNotification(), - }), - ) - } - return - case 'connector.jsonrpc': { - const message = value.message - if (isJsonRpcEnvelope(value) && isJsonRpcResponse(message)) { + try { + const value = JSON.parse(String(event.data)) as + | HomeConnectorClientMessage + | HomeConnectorJsonRpcEnvelope + switch (value.type) { + case 'server.ping': + hasReportedSocketIssue = false updateConnectionState(input.state, { lastSyncAt: new Date().toISOString(), lastError: null, }) return - } - if ( - isJsonRpcEnvelope(value) && - isJsonRpcRequest(message) && - socket?.readyState === WebSocket.OPEN - ) { - const response = await handleJsonRpcRequest( - message, - input.toolRegistry, - ) - socket.send( - stringifyHomeConnectorMessage({ - type: 'connector.jsonrpc', - message: response, + case 'server.error': + updateConnectionState(input.state, { + connected: false, + lastError: value.message, + }) + captureHomeConnectorMessage(value.message, { + level: 'error', + tags: { + connector_event: 'server.error', + }, + extra: createSocketEventContext({ + config: input.config, + connectionAttempt, + consecutiveReconnects, }), - ) + }) + console.error(`Home connector error: ${value.message}`) + return + case 'server.ack': + hasReportedSocketIssue = false + consecutiveReconnects = 0 updateConnectionState(input.state, { + connected: true, lastSyncAt: new Date().toISOString(), lastError: null, }) + console.info( + `Home connector websocket acknowledged connectorId=${value.connectorId}`, + ) + captureHomeConnectorMessage( + 'Home connector websocket acknowledged.', + { + level: 'info', + tags: { + connector_event: 'websocket.ack', + }, + extra: { + ...createSocketEventContext({ + config: input.config, + connectionAttempt, + consecutiveReconnects, + }), + acknowledgedConnectorId: value.connectorId, + }, + }, + ) + if (isAckMessage(value) && socket?.readyState === WebSocket.OPEN) { + captureHomeConnectorMessage( + 'Sending home connector tools changed notification.', + { + level: 'info', + tags: { + connector_event: 'tools.list_changed.sent', + }, + extra: createSocketEventContext({ + config: input.config, + connectionAttempt, + consecutiveReconnects, + }), + }, + ) + socket.send( + stringifyHomeConnectorMessage({ + type: 'connector.jsonrpc', + message: createToolsChangedNotification(), + }), + ) + } + return + case 'connector.jsonrpc': { + const message = value.message + if (isJsonRpcEnvelope(value) && isJsonRpcResponse(message)) { + updateConnectionState(input.state, { + lastSyncAt: new Date().toISOString(), + lastError: null, + }) + return + } + if ( + isJsonRpcEnvelope(value) && + isJsonRpcRequest(message) && + socket?.readyState === WebSocket.OPEN + ) { + const response = await handleJsonRpcRequest( + message, + input.toolRegistry, + ) + socket.send( + stringifyHomeConnectorMessage({ + type: 'connector.jsonrpc', + message: response, + }), + ) + updateConnectionState(input.state, { + lastSyncAt: new Date().toISOString(), + lastError: null, + }) + } + return } } + } catch (error) { + updateConnectionState(input.state, { + connected: false, + lastError: + error instanceof Error + ? error.message + : 'Home connector websocket message handling failed.', + }) + captureHomeConnectorException(error, { + tags: { + connector_event: 'websocket.message_error', + }, + contexts: { + websocket: { + ...createSocketEventContext({ + config: input.config, + connectionAttempt, + consecutiveReconnects, + }), + readyState: socket?.readyState ?? null, + }, + }, + extra: { + rawMessage: String(event.data), + }, + }) + console.error( + 'Home connector websocket message handling failed', + error, + ) } }) diff --git a/packages/worker/src/home/session.node.test.ts b/packages/worker/src/home/session.node.test.ts new file mode 100644 index 000000000..a7632f606 --- /dev/null +++ b/packages/worker/src/home/session.node.test.ts @@ -0,0 +1,131 @@ +import { expect, test, vi } from 'vitest' + +const captureMessageMock = vi.fn() + +vi.mock('@sentry/cloudflare', () => ({ + captureMessage: (...args: Array) => captureMessageMock(...args), + instrumentDurableObjectWithSentry: ( + _getOptions: unknown, + durableObjectClass: unknown, + ) => durableObjectClass, +})) + +vi.mock('cloudflare:workers', () => ({ + DurableObject: class { + protected readonly ctx: DurableObjectState + protected readonly env: Env + + constructor(ctx: DurableObjectState, env: Env) { + this.ctx = ctx + this.env = env + } + }, +})) + +const { HomeConnectorSession } = await import('./session.ts') + +function createState(input: { + storedState?: { + persisted: { + connectorId: string | null + connectorKind: string | null + connectedAt: string | null + lastSeenAt: string | null + } + tools: Array<{ name: string }> + } | null + webSockets?: Array +} = {}) { + const storedState = input.storedState ?? null + const webSockets = input.webSockets ?? [] + const persistedEntries = new Map() + if (storedState) { + persistedEntries.set('home-connector-session-state', storedState) + } + + return { + state: { + storage: { + get: vi.fn(async (key: string) => persistedEntries.get(key)), + put: vi.fn(async (key: string, value: unknown) => { + persistedEntries.set(key, value) + }), + }, + getWebSockets: vi.fn(() => webSockets), + acceptWebSocket: vi.fn(), + } as unknown as DurableObjectState, + persistedEntries, + } +} + +test('snapshot returns null when persisted connector has no live websocket', async () => { + captureMessageMock.mockReset() + const { state } = createState({ + storedState: { + persisted: { + connectorId: 'default', + connectorKind: 'home', + connectedAt: '2026-04-26T05:00:00.000Z', + lastSeenAt: '2026-04-26T05:01:00.000Z', + }, + tools: [{ name: 'bond_shade_set_position' }], + }, + }) + const session = new HomeConnectorSession( + { + storage: state.storage, + getWebSockets: state.getWebSockets, + acceptWebSocket: state.acceptWebSocket, + } as unknown as DurableObjectState, + {} as Env, + ) + + await new Promise((resolve) => setTimeout(resolve, 0)) + + const response = await session.fetch( + new Request('https://home-connectors/home/connectors/default/snapshot'), + ) + expect(await response.json()).toBeNull() +}) + +test('websocket close clears connectedAt and tools from persisted state', async () => { + captureMessageMock.mockReset() + const { state, persistedEntries } = createState({ + storedState: { + persisted: { + connectorId: 'default', + connectorKind: 'home', + connectedAt: '2026-04-26T05:00:00.000Z', + lastSeenAt: '2026-04-26T05:01:00.000Z', + }, + tools: [{ name: 'bond_shade_set_position' }], + }, + webSockets: [{} as WebSocket], + }) + const session = new HomeConnectorSession( + { + storage: state.storage, + getWebSockets: state.getWebSockets, + acceptWebSocket: state.acceptWebSocket, + } as unknown as DurableObjectState, + {} as Env, + ) + + await new Promise((resolve) => setTimeout(resolve, 0)) + session.webSocketClose({} as WebSocket, 1006, 'network', false) + await new Promise((resolve) => setTimeout(resolve, 0)) + + expect(captureMessageMock).toHaveBeenCalledWith( + 'Home connector session websocket closed code=1006 wasClean=false reason=network', + expect.objectContaining({ + level: 'warning', + }), + ) + expect(persistedEntries.get('home-connector-session-state')).toMatchObject({ + persisted: { + connectorId: 'default', + connectedAt: null, + }, + tools: [], + }) +}) diff --git a/packages/worker/src/home/session.ts b/packages/worker/src/home/session.ts index 23142b724..ee5be7d0f 100644 --- a/packages/worker/src/home/session.ts +++ b/packages/worker/src/home/session.ts @@ -54,6 +54,28 @@ class HomeConnectorSessionBase extends DurableObject { void this.restoreState() } + private clearConnectionState() { + this.stateSnapshot.persisted.connectedAt = null + this.stateSnapshot.tools = [] + } + + private captureSessionMessage( + message: string, + input: { + level?: 'warning' | 'error' + extra?: Record + } = {}, + ) { + Sentry.captureMessage(message, { + level: input.level ?? 'warning', + tags: { + service: 'worker', + worker_component: 'home-connector-session', + }, + ...(input.extra ? { extra: input.extra } : {}), + }) + } + async fetch(request: Request): Promise { const url = new URL(request.url) if (request.headers.get('Upgrade') === 'websocket') { @@ -113,14 +135,11 @@ class HomeConnectorSessionBase extends DurableObject { wasClean: boolean, ): void { this.stateSnapshot.persisted.lastSeenAt = new Date().toISOString() + this.clearConnectionState() const closeMessage = `Home connector session websocket closed code=${code} wasClean=${wasClean}${reason ? ` reason=${reason}` : ''}` console.warn(closeMessage) - Sentry.captureMessage(closeMessage, { + this.captureSessionMessage(closeMessage, { level: 'warning', - tags: { - service: 'worker', - worker_component: 'home-connector-session', - }, extra: { code, reason, @@ -149,6 +168,9 @@ class HomeConnectorSessionBase extends DurableObject { const { connectorId, connectorKind, connectedAt, lastSeenAt } = this.stateSnapshot.persisted if (!connectorId || !connectedAt || !lastSeenAt) return null + if (this.ctx.getWebSockets(connectorTag).length === 0) { + return null + } const kind = (connectorKind && connectorKind.trim()) || ('home' as const) return { ...(kind !== 'home' ? { connectorKind: kind } : {}), @@ -202,6 +224,16 @@ class HomeConnectorSessionBase extends DurableObject { try { parsed = parseHomeConnectorMessage(message) } catch (error) { + this.captureSessionMessage( + 'Home connector session received invalid websocket payload.', + { + level: 'error', + extra: { + connectorId: this.stateSnapshot.persisted.connectorId, + error: error instanceof Error ? error.message : String(error), + }, + }, + ) ws.send( stringifyHomeConnectorMessage({ type: 'server.error', @@ -328,7 +360,23 @@ class HomeConnectorSessionBase extends DurableObject { 'method' in parsed && parsed.method === 'notifications/tools/list_changed' ) { - await this.refreshToolsSnapshot() + try { + await this.refreshToolsSnapshot() + } catch (error) { + this.clearConnectionState() + this.captureSessionMessage( + 'Home connector tools snapshot refresh failed.', + { + level: 'error', + extra: { + connectorId: this.stateSnapshot.persisted.connectorId, + error: error instanceof Error ? error.message : String(error), + }, + }, + ) + await this.persistState() + throw error + } } } From a807a8b49510b3fcae7edac49deb2b75aca6866d Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sun, 26 Apr 2026 17:29:08 +0000 Subject: [PATCH 2/7] Address PR review feedback Co-authored-by: Kent C. Dodds --- packages/home-connector/server/index.ts | 48 ++++++++++++------- .../src/transport/worker-connector.ts | 23 +++++++-- packages/worker/src/home/session.ts | 14 ++---- 3 files changed, 52 insertions(+), 33 deletions(-) diff --git a/packages/home-connector/server/index.ts b/packages/home-connector/server/index.ts index 984e784dc..ab4816814 100644 --- a/packages/home-connector/server/index.ts +++ b/packages/home-connector/server/index.ts @@ -17,28 +17,39 @@ function installGracefulShutdownHandlers(input: { server: http.Server connector: Awaited> }) { - let isShuttingDown = false + let shutdownPromise: Promise | null = null - async function shutdown(reason: string, closeSentry: boolean) { - if (isShuttingDown) { - return - } - isShuttingDown = true - console.info(`Shutting down home connector reason=${reason}`) - input.connector.workerConnector.stop() + async function closeServerWithWatchdog() { await new Promise((resolve) => { - input.server.close(() => resolve()) + const watchdog = setTimeout(() => { + input.server.closeAllConnections() + resolve() + }, 5_000) + input.server.close(() => { + clearTimeout(watchdog) + resolve() + }) }) - if (closeSentry) { - await closeHomeConnectorSentry() - return + } + + function shutdown(reason: string) { + if (shutdownPromise) { + return shutdownPromise } - await flushHomeConnectorSentry() + + shutdownPromise = (async () => { + console.info(`Shutting down home connector reason=${reason}`) + input.connector.workerConnector.stop() + await closeServerWithWatchdog() + await closeHomeConnectorSentry() + })() + + return shutdownPromise } for (const signal of ['SIGINT', 'SIGTERM'] as const) { process.once(signal, () => { - void shutdown(`signal:${signal}`, true).finally(() => { + void shutdown(`signal:${signal}`).finally(() => { process.exit(signalExitCodeByName[signal]) }) }) @@ -51,19 +62,22 @@ function installGracefulShutdownHandlers(input: { process_event: 'uncaughtException', }, }) - void shutdown('uncaughtException', true).finally(() => { + void shutdown('uncaughtException').finally(() => { process.exit(1) }) }) - process.once('unhandledRejection', (reason) => { + process.once('unhandledRejection', (reason, promise) => { captureHomeConnectorException(reason, { tags: { area: 'process', process_event: 'unhandledRejection', }, + extra: { + promise: String(promise), + }, }) - void shutdown('unhandledRejection', true).finally(() => { + void shutdown('unhandledRejection').finally(() => { process.exit(1) }) }) diff --git a/packages/home-connector/src/transport/worker-connector.ts b/packages/home-connector/src/transport/worker-connector.ts index 709503923..1134921e8 100644 --- a/packages/home-connector/src/transport/worker-connector.ts +++ b/packages/home-connector/src/transport/worker-connector.ts @@ -85,6 +85,21 @@ function createSocketEventContext(input: { } } +function createSocketPayloadPreview(data: unknown) { + const raw = String(data) + const maxPreviewLength = 500 + if (raw.length <= maxPreviewLength) { + return { + rawMessagePreview: raw, + rawMessageLength: raw.length, + } + } + return { + rawMessagePreview: `${raw.slice(0, maxPreviewLength)}...[truncated]`, + rawMessageLength: raw.length, + } +} + async function handleJsonRpcRequest( message: JSONRPCRequest, toolRegistry: HomeConnectorToolRegistry, @@ -374,7 +389,7 @@ export function createWorkerConnector(input: { }, }, ) - if (isAckMessage(value) && socket?.readyState === WebSocket.OPEN) { + if (socket?.readyState === WebSocket.OPEN) { captureHomeConnectorMessage( 'Sending home connector tools changed notification.', { @@ -399,7 +414,7 @@ export function createWorkerConnector(input: { return case 'connector.jsonrpc': { const message = value.message - if (isJsonRpcEnvelope(value) && isJsonRpcResponse(message)) { + if (isJsonRpcResponse(message)) { updateConnectionState(input.state, { lastSyncAt: new Date().toISOString(), lastError: null, @@ -407,7 +422,6 @@ export function createWorkerConnector(input: { return } if ( - isJsonRpcEnvelope(value) && isJsonRpcRequest(message) && socket?.readyState === WebSocket.OPEN ) { @@ -431,7 +445,6 @@ export function createWorkerConnector(input: { } } catch (error) { updateConnectionState(input.state, { - connected: false, lastError: error instanceof Error ? error.message @@ -452,7 +465,7 @@ export function createWorkerConnector(input: { }, }, extra: { - rawMessage: String(event.data), + ...createSocketPayloadPreview(event.data), }, }) console.error( diff --git a/packages/worker/src/home/session.ts b/packages/worker/src/home/session.ts index ee5be7d0f..f6d1daea3 100644 --- a/packages/worker/src/home/session.ts +++ b/packages/worker/src/home/session.ts @@ -268,14 +268,10 @@ class HomeConnectorSessionBase extends DurableObject { ) const ingressSessionKey = this.loadIngressSessionKey(ws) if (ingressSessionKey && ingressSessionKey !== expectedSessionKey) { - Sentry.captureMessage( + this.captureSessionMessage( 'Remote connector session rejected hello (session key mismatch).', { level: 'error', - tags: { - service: 'worker', - worker_component: 'home-connector-session', - }, extra: { connectorId: canonicalInstanceId, declaredKind, @@ -300,14 +296,10 @@ class HomeConnectorSessionBase extends DurableObject { this.env, ) if (!expectedSecret || message.sharedSecret !== expectedSecret) { - Sentry.captureMessage( + this.captureSessionMessage( 'Home connector session rejected websocket hello.', { level: 'error', - tags: { - service: 'worker', - worker_component: 'home-connector-session', - }, extra: { connectorId: canonicalInstanceId, declaredKind, @@ -375,7 +367,7 @@ class HomeConnectorSessionBase extends DurableObject { }, ) await this.persistState() - throw error + return } } } From 480f0b373dc45b1c2b7c230458481ecaac4f0f7f Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Mon, 27 Apr 2026 04:49:40 +0000 Subject: [PATCH 3/7] Refine connector review follow-ups Co-authored-by: Kent C. Dodds --- packages/home-connector/server/index.ts | 18 ++++++++++++--- .../home-connector/src/sentry.node.test.ts | 14 ++++++++++++ packages/home-connector/src/sentry.ts | 22 +++++++++++++++++++ .../src/transport/worker-connector.ts | 22 +++++++++++-------- packages/worker/src/home/session.ts | 4 ++-- 5 files changed, 66 insertions(+), 14 deletions(-) diff --git a/packages/home-connector/server/index.ts b/packages/home-connector/server/index.ts index ab4816814..a74183b11 100644 --- a/packages/home-connector/server/index.ts +++ b/packages/home-connector/server/index.ts @@ -62,7 +62,7 @@ function installGracefulShutdownHandlers(input: { process_event: 'uncaughtException', }, }) - void shutdown('uncaughtException').finally(() => { + void flushHomeConnectorSentry().finally(() => { process.exit(1) }) }) @@ -74,10 +74,22 @@ function installGracefulShutdownHandlers(input: { process_event: 'unhandledRejection', }, extra: { - promise: String(promise), + ...(typeof reason === 'string' || + typeof reason === 'number' || + typeof reason === 'boolean' + ? { reason: String(reason) } + : {}), + promiseConstructor: + promise && + typeof promise === 'object' && + 'constructor' in promise && + typeof promise.constructor === 'function' && + promise.constructor.name + ? promise.constructor.name + : 'Promise', }, }) - void shutdown('unhandledRejection').finally(() => { + void flushHomeConnectorSentry().finally(() => { process.exit(1) }) }) diff --git a/packages/home-connector/src/sentry.node.test.ts b/packages/home-connector/src/sentry.node.test.ts index ee0cc18fb..351414396 100644 --- a/packages/home-connector/src/sentry.node.test.ts +++ b/packages/home-connector/src/sentry.node.test.ts @@ -1,6 +1,7 @@ import { expect, test, vi } from 'vitest' const sentryMock = vi.hoisted(() => ({ + addBreadcrumb: vi.fn(), close: vi.fn(), flush: vi.fn(), init: vi.fn(), @@ -13,6 +14,7 @@ vi.mock('@sentry/node', () => sentryMock) const { buildHomeConnectorSentryOptions, + addHomeConnectorBreadcrumb, closeHomeConnectorSentry, flushHomeConnectorSentry, initializeHomeConnectorSentry, @@ -111,3 +113,15 @@ test('closeHomeConnectorSentry returns true when Sentry is disabled', async () = await expect(closeHomeConnectorSentry()).resolves.toBe(true) expect(sentryMock.close).not.toHaveBeenCalled() }) + +test('addHomeConnectorBreadcrumb is a no-op when Sentry is disabled', () => { + sentryMock.isEnabled.mockReturnValue(false) + sentryMock.addBreadcrumb.mockReset() + + addHomeConnectorBreadcrumb({ + message: 'Opening home connector websocket.', + category: 'websocket.lifecycle', + }) + + expect(sentryMock.addBreadcrumb).not.toHaveBeenCalled() +}) diff --git a/packages/home-connector/src/sentry.ts b/packages/home-connector/src/sentry.ts index 7f4c0f78a..d01d967c3 100644 --- a/packages/home-connector/src/sentry.ts +++ b/packages/home-connector/src/sentry.ts @@ -112,6 +112,28 @@ export function captureHomeConnectorMessage( }) } +export function addHomeConnectorSentryBreadcrumb(input: { + message: string + category: string + level?: 'info' | 'warning' | 'error' + data?: Record +}) { + if (!Sentry.isEnabled()) { + return + } + + Sentry.addBreadcrumb({ + type: 'default', + category: input.category, + message: input.message, + level: input.level ?? 'info', + data: { + service: 'home-connector', + ...(input.data ?? {}), + }, + }) +} + export async function flushHomeConnectorSentry(timeout = 2_000) { if (!Sentry.isEnabled()) { return true diff --git a/packages/home-connector/src/transport/worker-connector.ts b/packages/home-connector/src/transport/worker-connector.ts index 1134921e8..a4ebbd825 100644 --- a/packages/home-connector/src/transport/worker-connector.ts +++ b/packages/home-connector/src/transport/worker-connector.ts @@ -14,6 +14,7 @@ import { type HomeConnectorConfig } from '../config.ts' import { type HomeConnectorState, updateConnectionState } from '../state.ts' import { type HomeConnectorToolRegistry } from '../mcp/server.ts' import { + addHomeConnectorSentryBreadcrumb, captureHomeConnectorException, captureHomeConnectorMessage, } from '../sentry.ts' @@ -243,7 +244,7 @@ export function createWorkerConnector(input: { console.info( `Scheduling home connector websocket reconnect in ${reconnectDelayMs}ms consecutiveReconnects=${consecutiveReconnects}`, ) - captureHomeConnectorMessage( + addHomeConnectorSentryBreadcrumb( 'Scheduling home connector websocket reconnect.', { level: 'info', @@ -282,7 +283,7 @@ export function createWorkerConnector(input: { console.info( `Opening home connector websocket attempt=${connectionAttempt} url=${input.config.workerWebSocketUrl}`, ) - captureHomeConnectorMessage('Opening home connector websocket.', { + addHomeConnectorSentryBreadcrumb('Opening home connector websocket.', { level: 'info', tags: { connector_event: 'websocket.connecting', @@ -299,7 +300,7 @@ export function createWorkerConnector(input: { console.info( `Home connector websocket opened attempt=${connectionAttempt} connectorId=${input.config.homeConnectorId}`, ) - captureHomeConnectorMessage('Home connector websocket opened.', { + addHomeConnectorSentryBreadcrumb('Home connector websocket opened.', { level: 'info', tags: { connector_event: 'websocket.open', @@ -316,7 +317,7 @@ export function createWorkerConnector(input: { connectorId: input.config.homeConnectorId, sharedSecret: input.config.sharedSecret!, } - captureHomeConnectorMessage('Sending home connector websocket hello.', { + addHomeConnectorSentryBreadcrumb('Sending home connector websocket hello.', { level: 'info', tags: { connector_event: 'websocket.hello_sent', @@ -363,7 +364,7 @@ export function createWorkerConnector(input: { return case 'server.ack': hasReportedSocketIssue = false - consecutiveReconnects = 0 + const previousConsecutiveReconnects = consecutiveReconnects updateConnectionState(input.state, { connected: true, lastSyncAt: new Date().toISOString(), @@ -372,7 +373,7 @@ export function createWorkerConnector(input: { console.info( `Home connector websocket acknowledged connectorId=${value.connectorId}`, ) - captureHomeConnectorMessage( + addHomeConnectorSentryBreadcrumb( 'Home connector websocket acknowledged.', { level: 'info', @@ -383,14 +384,15 @@ export function createWorkerConnector(input: { ...createSocketEventContext({ config: input.config, connectionAttempt, - consecutiveReconnects, + consecutiveReconnects: + previousConsecutiveReconnects, }), acknowledgedConnectorId: value.connectorId, }, }, ) if (socket?.readyState === WebSocket.OPEN) { - captureHomeConnectorMessage( + addHomeConnectorSentryBreadcrumb( 'Sending home connector tools changed notification.', { level: 'info', @@ -400,7 +402,8 @@ export function createWorkerConnector(input: { extra: createSocketEventContext({ config: input.config, connectionAttempt, - consecutiveReconnects, + consecutiveReconnects: + previousConsecutiveReconnects, }), }, ) @@ -411,6 +414,7 @@ export function createWorkerConnector(input: { }), ) } + consecutiveReconnects = 0 return case 'connector.jsonrpc': { const message = value.message diff --git a/packages/worker/src/home/session.ts b/packages/worker/src/home/session.ts index f6d1daea3..effaf0846 100644 --- a/packages/worker/src/home/session.ts +++ b/packages/worker/src/home/session.ts @@ -72,7 +72,7 @@ class HomeConnectorSessionBase extends DurableObject { service: 'worker', worker_component: 'home-connector-session', }, - ...(input.extra ? { extra: input.extra } : {}), + extra: input.extra ?? {}, }) } @@ -355,7 +355,7 @@ class HomeConnectorSessionBase extends DurableObject { try { await this.refreshToolsSnapshot() } catch (error) { - this.clearConnectionState() + this.stateSnapshot.tools = [] this.captureSessionMessage( 'Home connector tools snapshot refresh failed.', { From eab74f8b1c0334abe4e619ca4b891217ccbafea3 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Mon, 27 Apr 2026 05:05:09 +0000 Subject: [PATCH 4/7] Fix connector telemetry regressions Co-authored-by: Kent C. Dodds --- .../home-connector/src/sentry.node.test.ts | 6 +- .../src/transport/worker-connector.ts | 120 ++++++++---------- packages/worker/src/home/session.ts | 76 +++++++++-- 3 files changed, 117 insertions(+), 85 deletions(-) diff --git a/packages/home-connector/src/sentry.node.test.ts b/packages/home-connector/src/sentry.node.test.ts index 351414396..3c543e55d 100644 --- a/packages/home-connector/src/sentry.node.test.ts +++ b/packages/home-connector/src/sentry.node.test.ts @@ -14,7 +14,7 @@ vi.mock('@sentry/node', () => sentryMock) const { buildHomeConnectorSentryOptions, - addHomeConnectorBreadcrumb, + addHomeConnectorSentryBreadcrumb, closeHomeConnectorSentry, flushHomeConnectorSentry, initializeHomeConnectorSentry, @@ -114,11 +114,11 @@ test('closeHomeConnectorSentry returns true when Sentry is disabled', async () = expect(sentryMock.close).not.toHaveBeenCalled() }) -test('addHomeConnectorBreadcrumb is a no-op when Sentry is disabled', () => { +test('addHomeConnectorSentryBreadcrumb is a no-op when Sentry is disabled', () => { sentryMock.isEnabled.mockReturnValue(false) sentryMock.addBreadcrumb.mockReset() - addHomeConnectorBreadcrumb({ + addHomeConnectorSentryBreadcrumb({ message: 'Opening home connector websocket.', category: 'websocket.lifecycle', }) diff --git a/packages/home-connector/src/transport/worker-connector.ts b/packages/home-connector/src/transport/worker-connector.ts index a4ebbd825..4559d5b37 100644 --- a/packages/home-connector/src/transport/worker-connector.ts +++ b/packages/home-connector/src/transport/worker-connector.ts @@ -34,12 +34,6 @@ function isJsonRpcRequest(message: JSONRPCMessage): message is JSONRPCRequest { return 'id' in message && 'method' in message } -function isAckMessage( - message: HomeConnectorClientMessage, -): message is HomeConnectorAckMessage { - return message.type === 'server.ack' -} - function isJsonRpcEnvelope( message: HomeConnectorClientMessage | HomeConnectorJsonRpcEnvelope, ): message is HomeConnectorJsonRpcEnvelope { @@ -244,23 +238,19 @@ export function createWorkerConnector(input: { console.info( `Scheduling home connector websocket reconnect in ${reconnectDelayMs}ms consecutiveReconnects=${consecutiveReconnects}`, ) - addHomeConnectorSentryBreadcrumb( - 'Scheduling home connector websocket reconnect.', - { - level: 'info', - tags: { - connector_event: 'websocket.reconnect_scheduled', - }, - extra: { - ...createSocketEventContext({ - config: input.config, - connectionAttempt, - consecutiveReconnects, - }), - reconnectDelayMs, - }, + addHomeConnectorSentryBreadcrumb({ + message: 'Scheduling home connector websocket reconnect.', + category: 'websocket.reconnect_scheduled', + level: 'info', + data: { + ...createSocketEventContext({ + config: input.config, + connectionAttempt, + consecutiveReconnects, + }), + reconnectDelayMs, }, - ) + }) reconnectTimer = setTimeout(() => { reconnectTimer = null connect() @@ -283,12 +273,11 @@ export function createWorkerConnector(input: { console.info( `Opening home connector websocket attempt=${connectionAttempt} url=${input.config.workerWebSocketUrl}`, ) - addHomeConnectorSentryBreadcrumb('Opening home connector websocket.', { + addHomeConnectorSentryBreadcrumb({ + message: 'Opening home connector websocket.', + category: 'websocket.connecting', level: 'info', - tags: { - connector_event: 'websocket.connecting', - }, - extra: createSocketEventContext({ + data: createSocketEventContext({ config: input.config, connectionAttempt, consecutiveReconnects, @@ -300,12 +289,11 @@ export function createWorkerConnector(input: { console.info( `Home connector websocket opened attempt=${connectionAttempt} connectorId=${input.config.homeConnectorId}`, ) - addHomeConnectorSentryBreadcrumb('Home connector websocket opened.', { + addHomeConnectorSentryBreadcrumb({ + message: 'Home connector websocket opened.', + category: 'websocket.open', level: 'info', - tags: { - connector_event: 'websocket.open', - }, - extra: createSocketEventContext({ + data: createSocketEventContext({ config: input.config, connectionAttempt, consecutiveReconnects, @@ -317,12 +305,11 @@ export function createWorkerConnector(input: { connectorId: input.config.homeConnectorId, sharedSecret: input.config.sharedSecret!, } - addHomeConnectorSentryBreadcrumb('Sending home connector websocket hello.', { + addHomeConnectorSentryBreadcrumb({ + message: 'Sending home connector websocket hello.', + category: 'websocket.hello_sent', level: 'info', - tags: { - connector_event: 'websocket.hello_sent', - }, - extra: createSocketEventContext({ + data: createSocketEventContext({ config: input.config, connectionAttempt, consecutiveReconnects, @@ -362,7 +349,7 @@ export function createWorkerConnector(input: { }) console.error(`Home connector error: ${value.message}`) return - case 'server.ack': + case 'server.ack': { hasReportedSocketIssue = false const previousConsecutiveReconnects = consecutiveReconnects updateConnectionState(input.state, { @@ -373,40 +360,32 @@ export function createWorkerConnector(input: { console.info( `Home connector websocket acknowledged connectorId=${value.connectorId}`, ) - addHomeConnectorSentryBreadcrumb( - 'Home connector websocket acknowledged.', - { - level: 'info', - tags: { - connector_event: 'websocket.ack', - }, - extra: { - ...createSocketEventContext({ - config: input.config, - connectionAttempt, - consecutiveReconnects: - previousConsecutiveReconnects, - }), - acknowledgedConnectorId: value.connectorId, - }, + addHomeConnectorSentryBreadcrumb({ + message: 'Home connector websocket acknowledged.', + category: 'websocket.ack', + level: 'info', + data: { + ...createSocketEventContext({ + config: input.config, + connectionAttempt, + consecutiveReconnects: + previousConsecutiveReconnects, + }), + acknowledgedConnectorId: value.connectorId, }, - ) + }) if (socket?.readyState === WebSocket.OPEN) { - addHomeConnectorSentryBreadcrumb( - 'Sending home connector tools changed notification.', - { - level: 'info', - tags: { - connector_event: 'tools.list_changed.sent', - }, - extra: createSocketEventContext({ - config: input.config, - connectionAttempt, - consecutiveReconnects: - previousConsecutiveReconnects, - }), - }, - ) + addHomeConnectorSentryBreadcrumb({ + message: 'Sending home connector tools changed notification.', + category: 'tools.list_changed.sent', + level: 'info', + data: createSocketEventContext({ + config: input.config, + connectionAttempt, + consecutiveReconnects: + previousConsecutiveReconnects, + }), + }) socket.send( stringifyHomeConnectorMessage({ type: 'connector.jsonrpc', @@ -416,6 +395,7 @@ export function createWorkerConnector(input: { } consecutiveReconnects = 0 return + } case 'connector.jsonrpc': { const message = value.message if (isJsonRpcResponse(message)) { diff --git a/packages/worker/src/home/session.ts b/packages/worker/src/home/session.ts index effaf0846..d2f5534c1 100644 --- a/packages/worker/src/home/session.ts +++ b/packages/worker/src/home/session.ts @@ -34,6 +34,16 @@ type HomeConnectorSessionState = { tools: Array } +function summarizeSessionKey(value: string | null) { + if (!value) { + return null + } + return { + length: value.length, + present: true, + } +} + class HomeConnectorSessionBase extends DurableObject { private stateSnapshot: HomeConnectorSessionState = { persisted: { @@ -243,16 +253,41 @@ class HomeConnectorSessionBase extends DurableObject { return } - switch (parsed.type) { - case 'connector.hello': - await this.handleHello(ws, parsed) - return - case 'connector.heartbeat': - await this.handleHeartbeat() - return - case 'connector.jsonrpc': - await this.handleJsonRpcMessage(parsed.message) - return + try { + switch (parsed.type) { + case 'connector.hello': + await this.handleHello(ws, parsed) + return + case 'connector.heartbeat': + await this.handleHeartbeat() + return + case 'connector.jsonrpc': + await this.handleJsonRpcMessage(parsed.message) + return + } + } catch (error) { + this.captureSessionMessage( + 'Home connector session message handler threw.', + { + level: 'error', + extra: { + connectorId: this.stateSnapshot.persisted.connectorId, + messageType: parsed.type, + error: error instanceof Error ? error.message : String(error), + }, + }, + ) + try { + ws.send( + stringifyHomeConnectorMessage({ + type: 'server.error', + message: error instanceof Error ? error.message : String(error), + }), + ) + } catch { + // Ignore send failures while we're already handling a websocket error. + } + return } } @@ -275,8 +310,9 @@ class HomeConnectorSessionBase extends DurableObject { extra: { connectorId: canonicalInstanceId, declaredKind, - ingressSessionKey, - expectedSessionKey, + ingressSessionKeySummary: summarizeSessionKey(ingressSessionKey), + expectedSessionKeySummary: summarizeSessionKey(expectedSessionKey), + sessionKeyMatch: false, }, }, ) @@ -331,6 +367,22 @@ class HomeConnectorSessionBase extends DurableObject { connectorId: canonicalInstanceId, }), ) + try { + await this.refreshToolsSnapshot() + } catch (error) { + this.stateSnapshot.tools = [] + this.captureSessionMessage( + 'Home connector tools snapshot refresh failed after websocket hello.', + { + level: 'error', + extra: { + connectorId: this.stateSnapshot.persisted.connectorId, + error: error instanceof Error ? error.message : String(error), + }, + }, + ) + await this.persistState() + } } private async handleHeartbeat() { From b7427342012ad2dd48273370584cb33d3d65628b Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Mon, 27 Apr 2026 10:33:35 +0000 Subject: [PATCH 5/7] Remove stale websocket type guards Co-authored-by: Kent C. Dodds --- packages/home-connector/src/transport/worker-connector.ts | 7 ------- 1 file changed, 7 deletions(-) diff --git a/packages/home-connector/src/transport/worker-connector.ts b/packages/home-connector/src/transport/worker-connector.ts index 4559d5b37..27cb8078a 100644 --- a/packages/home-connector/src/transport/worker-connector.ts +++ b/packages/home-connector/src/transport/worker-connector.ts @@ -4,7 +4,6 @@ import { type JSONRPCResponse, } from '@modelcontextprotocol/sdk/types.js' import { - type HomeConnectorAckMessage, type HomeConnectorHelloMessage, type HomeConnectorClientMessage, type HomeConnectorJsonRpcEnvelope, @@ -34,12 +33,6 @@ function isJsonRpcRequest(message: JSONRPCMessage): message is JSONRPCRequest { return 'id' in message && 'method' in message } -function isJsonRpcEnvelope( - message: HomeConnectorClientMessage | HomeConnectorJsonRpcEnvelope, -): message is HomeConnectorJsonRpcEnvelope { - return message.type === 'connector.jsonrpc' -} - function createToolsChangedNotification(): JSONRPCMessage { return { jsonrpc: '2.0', From b17f6c7cb3b002174f4378ab122d35a737380a81 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Mon, 27 Apr 2026 14:35:44 +0000 Subject: [PATCH 6/7] Harden connector reinitialization and socket close handling Co-authored-by: Kent C. Dodds --- packages/home-connector/index.ts | 6 ++- packages/worker/src/home/session.node.test.ts | 40 +++++++++++++++++++ packages/worker/src/home/session.ts | 20 +++++++++- 3 files changed, 63 insertions(+), 3 deletions(-) diff --git a/packages/home-connector/index.ts b/packages/home-connector/index.ts index 4c404d9ed..b47e7935f 100644 --- a/packages/home-connector/index.ts +++ b/packages/home-connector/index.ts @@ -1,5 +1,9 @@ import 'dotenv/config' -import './src/sentry-init.ts' +import { initializeHomeConnectorSentry } from './src/sentry.ts' + +// `--import ./src/sentry-init.ts` can run before `dotenv/config`, so initialize +// again after env-file loading to pick up `.env`-only DSNs. +initializeHomeConnectorSentry() if (process.env.MOCKS === 'true') { await import('./mocks/index.ts') diff --git a/packages/worker/src/home/session.node.test.ts b/packages/worker/src/home/session.node.test.ts index a7632f606..a775ae342 100644 --- a/packages/worker/src/home/session.node.test.ts +++ b/packages/worker/src/home/session.node.test.ts @@ -112,6 +112,7 @@ test('websocket close clears connectedAt and tools from persisted state', async ) await new Promise((resolve) => setTimeout(resolve, 0)) + state.getWebSockets.mockReturnValue([]) session.webSocketClose({} as WebSocket, 1006, 'network', false) await new Promise((resolve) => setTimeout(resolve, 0)) @@ -129,3 +130,42 @@ test('websocket close clears connectedAt and tools from persisted state', async tools: [], }) }) + +test('stale websocket close preserves active connection state', async () => { + captureMessageMock.mockReset() + const activeSocket = {} as WebSocket + const staleSocket = {} as WebSocket + const { state, persistedEntries } = createState({ + storedState: { + persisted: { + connectorId: 'default', + connectorKind: 'home', + connectedAt: '2026-04-26T05:00:00.000Z', + lastSeenAt: '2026-04-26T05:01:00.000Z', + }, + tools: [{ name: 'bond_shade_set_position' }], + }, + webSockets: [activeSocket, staleSocket], + }) + const session = new HomeConnectorSession( + { + storage: state.storage, + getWebSockets: state.getWebSockets, + acceptWebSocket: state.acceptWebSocket, + } as unknown as DurableObjectState, + {} as Env, + ) + + await new Promise((resolve) => setTimeout(resolve, 0)) + state.getWebSockets.mockReturnValue([activeSocket]) + session.webSocketClose(staleSocket, 1006, 'stale-socket', false) + await new Promise((resolve) => setTimeout(resolve, 0)) + + expect(persistedEntries.get('home-connector-session-state')).toMatchObject({ + persisted: { + connectorId: 'default', + connectedAt: '2026-04-26T05:00:00.000Z', + }, + tools: [{ name: 'bond_shade_set_position' }], + }) +}) diff --git a/packages/worker/src/home/session.ts b/packages/worker/src/home/session.ts index d2f5534c1..bb878225e 100644 --- a/packages/worker/src/home/session.ts +++ b/packages/worker/src/home/session.ts @@ -69,6 +69,14 @@ class HomeConnectorSessionBase extends DurableObject { this.stateSnapshot.tools = [] } + private rejectPendingRequests(reason: string) { + for (const [id, pending] of this.pendingRequests) { + clearTimeout(pending.timeout) + pending.reject(new Error(`${reason} requestId=${id}`)) + } + this.pendingRequests.clear() + } + private captureSessionMessage( message: string, input: { @@ -139,13 +147,21 @@ class HomeConnectorSessionBase extends DurableObject { } webSocketClose( - _ws: WebSocket, + ws: WebSocket, code: number, reason: string, wasClean: boolean, ): void { this.stateSnapshot.persisted.lastSeenAt = new Date().toISOString() - this.clearConnectionState() + const activeSockets = this.ctx + .getWebSockets(connectorTag) + .filter((socket) => socket !== ws) + if (activeSockets.length === 0) { + this.clearConnectionState() + this.rejectPendingRequests( + `Home connector websocket closed code=${code} wasClean=${wasClean}${reason ? ` reason=${reason}` : ''} before RPC response.`, + ) + } const closeMessage = `Home connector session websocket closed code=${code} wasClean=${wasClean}${reason ? ` reason=${reason}` : ''}` console.warn(closeMessage) this.captureSessionMessage(closeMessage, { From 20cc8430d7318570351b3a59b15d8efe719f26e8 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Mon, 27 Apr 2026 15:28:13 +0000 Subject: [PATCH 7/7] Polish final review feedback Co-authored-by: Kent C. Dodds --- packages/home-connector/server/index.ts | 16 ++++++++-------- .../src/transport/worker-connector.ts | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/packages/home-connector/server/index.ts b/packages/home-connector/server/index.ts index a74183b11..8d81dfdb1 100644 --- a/packages/home-connector/server/index.ts +++ b/packages/home-connector/server/index.ts @@ -49,6 +49,8 @@ function installGracefulShutdownHandlers(input: { for (const signal of ['SIGINT', 'SIGTERM'] as const) { process.once(signal, () => { + // For clean termination, close the client so it stops accepting events + // before the process exits. void shutdown(`signal:${signal}`).finally(() => { process.exit(signalExitCodeByName[signal]) }) @@ -62,6 +64,8 @@ function installGracefulShutdownHandlers(input: { process_event: 'uncaughtException', }, }) + // On fatal process paths, flush buffered events but avoid relying on a full + // async shutdown from an undefined runtime state. void flushHomeConnectorSentry().finally(() => { process.exit(1) }) @@ -79,16 +83,12 @@ function installGracefulShutdownHandlers(input: { typeof reason === 'boolean' ? { reason: String(reason) } : {}), - promiseConstructor: - promise && - typeof promise === 'object' && - 'constructor' in promise && - typeof promise.constructor === 'function' && - promise.constructor.name - ? promise.constructor.name - : 'Promise', + reasonType: typeof reason, + ...(reason instanceof Error ? { reasonName: reason.name } : {}), }, }) + // On fatal process paths, flush buffered events but avoid relying on a full + // async shutdown from an undefined runtime state. void flushHomeConnectorSentry().finally(() => { process.exit(1) }) diff --git a/packages/home-connector/src/transport/worker-connector.ts b/packages/home-connector/src/transport/worker-connector.ts index 27cb8078a..1f04fe5a6 100644 --- a/packages/home-connector/src/transport/worker-connector.ts +++ b/packages/home-connector/src/transport/worker-connector.ts @@ -345,6 +345,7 @@ export function createWorkerConnector(input: { case 'server.ack': { hasReportedSocketIssue = false const previousConsecutiveReconnects = consecutiveReconnects + consecutiveReconnects = 0 updateConnectionState(input.state, { connected: true, lastSyncAt: new Date().toISOString(), @@ -386,7 +387,6 @@ export function createWorkerConnector(input: { }), ) } - consecutiveReconnects = 0 return } case 'connector.jsonrpc': {