-
Notifications
You must be signed in to change notification settings - Fork 350
feat(console): summarize batch notifications by table #1323
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: newjitsu
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -72,6 +72,7 @@ const adminChannel: NotificationChannel = { | |
| slackWebhookUrl: SLACK_WEBHOOK_URL, | ||
| events: ["all"], | ||
| recurringAlertsPeriodHours: 24, | ||
| summarizeBatchNotificationsByTable: true, | ||
| }; | ||
|
|
||
| export type StatusChangeEntity = Omit<StatusChange, "type"> & { | ||
|
|
@@ -282,7 +283,12 @@ async function loadNotificationsChannels() { | |
| channelsByWorkspace = []; | ||
| channels[row.workspaceId] = channelsByWorkspace; | ||
| } | ||
| channelsByWorkspace.push({ ...omit(row, "config"), ...(row.config as any) } as unknown as NotificationChannel); | ||
| const merged = { ...omit(row, "config"), ...(row.config as any) } as unknown as NotificationChannel; | ||
| // Channels saved before this field existed have it undefined — default to true. | ||
| if (merged.summarizeBatchNotificationsByTable === undefined) { | ||
| merged.summarizeBatchNotificationsByTable = true; | ||
| } | ||
| channelsByWorkspace.push(merged); | ||
| } | ||
| }); | ||
|
|
||
|
|
@@ -325,6 +331,7 @@ async function loadNotificationsChannels() { | |
| : 168, | ||
| type: "notification", | ||
| workspaceId: row.workspaceId, | ||
| summarizeBatchNotificationsByTable: settings.summarizeBatchNotificationsByTable !== false, | ||
| }); | ||
| } | ||
| } | ||
|
|
@@ -381,6 +388,11 @@ async function processStatusChanges( | |
| if (!channel.events.includes(entity.type) && !channel.events.includes("all")) { | ||
| continue; | ||
| } | ||
| // For channels that summarize batch notifications by table, skip per-table changes — | ||
| // those are handled below as one connection-level aggregate. | ||
| if (entity.type === "batch" && entity.tableName && channel.summarizeBatchNotificationsByTable) { | ||
| continue; | ||
| } | ||
| const cStatuses = [...statuses]; | ||
| const chkey = chKey(channel.id, lastStatus.actorId, lastStatus.type, lastStatus.tableName); | ||
| let state = channelStates[chkey]; | ||
|
|
@@ -481,9 +493,263 @@ async function processStatusChanges( | |
| } | ||
| } | ||
|
|
||
| // Aggregate batch per-table status changes into one connection-level notification per channel. | ||
| // Each per-table StatusChange row remains intact in the DB; this only affects what gets sent. | ||
| const batchActorIdsWithChanges = new Set<string>(); | ||
| for (const change of statusChanges) { | ||
| if (change.type === "batch" && change.tableName) { | ||
| batchActorIdsWithChanges.add(change.actorId); | ||
| } | ||
| } | ||
| for (const actorId of batchActorIdsWithChanges) { | ||
| const view = computeBatchConnectionAggregate(entities, actorId); | ||
| if (!view) continue; | ||
| const channelList = [...(channels[view.workspaceId] || []), ...(channels["admin"] || [])]; | ||
| for (const channel of channelList) { | ||
| if (!channel.events.includes("batch") && !channel.events.includes("all")) continue; | ||
| if (!channel.summarizeBatchNotificationsByTable) continue; | ||
| const st = await processBatchAggregateNotification(channel, channelStates, view, publicEndpoints, dryRun); | ||
| if (st) sendStatuses.push(st); | ||
| } | ||
| } | ||
|
|
||
| return { date: processedTimestamp, statuses: sendStatuses }; | ||
| } | ||
|
|
||
| type BatchConnectionAggregate = { | ||
| workspaceId: string; | ||
| actorId: string; | ||
| workspaceName: string; | ||
| slug: string; | ||
| fromName: string; | ||
| toName: string; | ||
| perTableCount: number; | ||
| failedTableNames: string[]; | ||
| aggStatus: "SUCCESS" | "FAILED" | "PARTIAL"; | ||
| aggDescription: string; | ||
| aggStreamsFailed?: string; | ||
| aggIncidentDetails: string; | ||
| aggMaxId: bigint; | ||
| aggTimestamp: Date; | ||
| aggStartedAt: Date; | ||
| aggQueueSize: number; | ||
| changesPerHours: number; | ||
| changesPerDay: number; | ||
| }; | ||
|
|
||
| function computeBatchConnectionAggregate( | ||
| entities: Record<string, StatusChangeEntity>, | ||
| actorId: string | ||
| ): BatchConnectionAggregate | undefined { | ||
| let template: StatusChangeEntity | undefined; | ||
| let maxId: bigint = 0n; | ||
| let maxIdEntity: StatusChangeEntity | undefined; | ||
| let earliestIncidentStart: Date | undefined; | ||
| let totalChangesPerHours = 0; | ||
| let totalChangesPerDay = 0; | ||
| const failedTableNames: string[] = []; | ||
| let perTableCount = 0; | ||
| let firstFailureDescription: string | undefined; | ||
| let aggQueueSize = 0; | ||
| for (const ent of Object.values(entities)) { | ||
| if (ent.actorId !== actorId || ent.type !== "batch" || !ent.tableName) continue; | ||
| perTableCount++; | ||
| template = template ?? ent; | ||
| if (ent.id) { | ||
| const idBig = BigInt(ent.id); | ||
| if (idBig > maxId) { | ||
| maxId = idBig; | ||
| maxIdEntity = ent; | ||
| } | ||
| } | ||
| totalChangesPerHours += ent.changesPerHours || 0; | ||
| totalChangesPerDay += ent.changesPerDay || 0; | ||
| if (ent.status !== "SUCCESS") { | ||
| failedTableNames.push(ent.tableName!); | ||
| if (ent.startedAt && (!earliestIncidentStart || ent.startedAt < earliestIncidentStart)) { | ||
| earliestIncidentStart = ent.startedAt; | ||
| } | ||
| if (!firstFailureDescription) { | ||
| firstFailureDescription = extractDescription(ent as unknown as StatusChange) ?? ent.description ?? undefined; | ||
| } | ||
| } | ||
| aggQueueSize += ent.queueSize || 0; | ||
| } | ||
| if (!template || !maxIdEntity || perTableCount === 0) return undefined; | ||
| const failedCount = failedTableNames.length; | ||
| const succeededCount = perTableCount - failedCount; | ||
| let aggStatus: "SUCCESS" | "FAILED" | "PARTIAL"; | ||
| let aggStreamsFailed: string | undefined; | ||
| let aggIncidentDetails: string; | ||
| let aggDescription: string; | ||
| if (failedCount === 0) { | ||
| aggStatus = "SUCCESS"; | ||
| aggDescription = ""; | ||
| aggIncidentDetails = `All ${perTableCount} table(s) succeeded.`; | ||
| } else if (succeededCount === 0) { | ||
| aggStatus = "FAILED"; | ||
| aggIncidentDetails = `All ${perTableCount} table(s) failed: ${failedTableNames.join(", ")}.\n\nLatest error:\n${ | ||
| firstFailureDescription ?? "" | ||
| }`; | ||
| aggDescription = | ||
| _J_PREF + | ||
| JSON.stringify({ | ||
| description: firstFailureDescription ?? "", | ||
| incidentDetails: aggIncidentDetails, | ||
| }); | ||
| } else { | ||
| aggStatus = "PARTIAL"; | ||
| aggStreamsFailed = `${failedCount} of ${perTableCount}`; | ||
| aggIncidentDetails = `${aggStreamsFailed} tables failed: ${failedTableNames.join(", ")}.\n\nLatest error:\n${ | ||
| firstFailureDescription ?? "" | ||
| }`; | ||
| aggDescription = | ||
| _J_PREF + | ||
| JSON.stringify({ | ||
| status: "PARTIAL", | ||
| description: aggIncidentDetails, | ||
| streamsFailed: aggStreamsFailed, | ||
| incidentDetails: aggIncidentDetails, | ||
| }); | ||
| } | ||
| return { | ||
| workspaceId: template.workspaceId!, | ||
| actorId, | ||
| workspaceName: template.workspaceName, | ||
| slug: template.slug, | ||
| fromName: template.fromName, | ||
| toName: template.toName, | ||
| perTableCount, | ||
| failedTableNames, | ||
| aggStatus, | ||
| aggDescription: aggDescription ?? "", | ||
| aggStreamsFailed, | ||
| aggIncidentDetails, | ||
| aggMaxId: maxId, | ||
| aggTimestamp: maxIdEntity.timestamp!, | ||
| aggStartedAt: earliestIncidentStart ?? maxIdEntity.startedAt!, | ||
| aggQueueSize, | ||
| changesPerHours: totalChangesPerHours, | ||
| changesPerDay: totalChangesPerDay, | ||
| }; | ||
| } | ||
|
|
||
| async function processBatchAggregateNotification( | ||
| channel: NotificationChannel, | ||
| channelStates: Record<string, NotificationState>, | ||
| view: BatchConnectionAggregate, | ||
| publicEndpoints: PublicEndpoint, | ||
| dryRun: boolean | ||
| ): Promise<SendStatus | undefined> { | ||
| const chkey = chKey(channel.id, view.actorId, "batch", ""); | ||
| const state = channelStates[chkey]; | ||
| const sendRecurringTime = | ||
| (state?.lastNotification?.getTime() || 0) + channel.recurringAlertsPeriodHours * 60 * 60 * 1000; | ||
| let doNotify = false; | ||
| let renderStatus: string = view.aggStatus; | ||
| let prevStatus: string | undefined; | ||
|
|
||
| if (!state) { | ||
| // First notification on this channel for this connection aggregate. | ||
| if (view.aggStatus === "SUCCESS") { | ||
| // Only emit a FIRST_RUN-style notification if some per-table activity has been observed. | ||
| renderStatus = "FIRST_RUN"; | ||
| doNotify = true; | ||
| } else { | ||
| doNotify = true; | ||
| } | ||
| } else { | ||
| const prevId = BigInt(state.statusChangeId); | ||
| if (view.aggMaxId > prevId) { | ||
| doNotify = true; | ||
| if (view.aggStatus === "SUCCESS") { | ||
| // Only notify on SUCCESS if it represents a recovery from a prior failure. | ||
| try { | ||
| const prevRow = await db.prisma().statusChange.findUnique({ where: { id: state.statusChangeId } }); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| prevStatus = prevRow?.status; | ||
| if (prevRow && prevRow.status !== "SUCCESS") { | ||
| renderStatus = "RECOVERED"; | ||
| } else { | ||
| doNotify = false; | ||
| } | ||
| } catch (e: any) { | ||
| log.atWarn().log(`Failed to load previous statusChange ${state.statusChangeId}: ${e?.message}`); | ||
| doNotify = false; | ||
| } | ||
| } | ||
| } else if (view.aggStatus !== "SUCCESS" && view.aggTimestamp.getTime() > sendRecurringTime) { | ||
| doNotify = true; | ||
| renderStatus = "ONGOING"; | ||
| } | ||
| } | ||
|
|
||
| if (!doNotify) return; | ||
|
|
||
| let description: string; | ||
| if (renderStatus === "FIRST_RUN") { | ||
| description = _J_PREF + JSON.stringify({ status: "FIRST_RUN", incidentDetails: view.aggIncidentDetails }); | ||
| } else if (renderStatus === "RECOVERED") { | ||
| description = | ||
| _J_PREF + | ||
| JSON.stringify({ | ||
| status: "RECOVERED", | ||
| incidentStatus: prevStatus ?? "FAILED", | ||
| incidentStartedAt: view.aggStartedAt?.toISOString(), | ||
| incidentDetails: view.aggIncidentDetails, | ||
| }); | ||
| } else if (renderStatus === "ONGOING") { | ||
| description = | ||
| _J_PREF + | ||
| JSON.stringify({ | ||
| status: "ONGOING", | ||
| description: view.aggIncidentDetails, | ||
| streamsFailed: view.aggStreamsFailed, | ||
| incidentDetails: view.aggIncidentDetails, | ||
| incidentStatus: view.aggStatus, | ||
| }); | ||
| } else if (view.aggStatus === "PARTIAL") { | ||
| description = view.aggDescription; | ||
| } else { | ||
| description = view.aggDescription; | ||
| } | ||
|
|
||
| const syntheticStatus: StatusChange = { | ||
| id: view.aggMaxId, | ||
| workspaceId: view.workspaceId, | ||
| actorId: view.actorId, | ||
| type: "batch", | ||
| tableName: "", | ||
| timestamp: view.aggTimestamp, | ||
| startedAt: view.aggStartedAt, | ||
| status: view.aggStatus, | ||
| description, | ||
| counts: 1, | ||
| queueSize: view.aggQueueSize, | ||
| }; | ||
|
|
||
| const aggEntity: StatusChangeEntity = { | ||
| id: view.aggMaxId, | ||
| workspaceId: view.workspaceId, | ||
| actorId: view.actorId, | ||
| type: "batch", | ||
| tableName: "", | ||
| workspaceName: view.workspaceName, | ||
| slug: view.slug, | ||
| fromName: view.fromName, | ||
| toName: view.toName, | ||
| timestamp: view.aggTimestamp, | ||
| startedAt: view.aggStartedAt, | ||
| status: view.aggStatus, | ||
| description, | ||
| counts: 1, | ||
| queueSize: view.aggQueueSize, | ||
| changesPerHours: view.changesPerHours, | ||
| changesPerDay: view.changesPerDay, | ||
| }; | ||
|
|
||
| return await processNotification(channel, channelStates, [syntheticStatus], aggEntity, publicEndpoints, dryRun); | ||
| } | ||
|
|
||
| function makeNotificationState( | ||
| channel: NotificationChannel, | ||
| statusChange: StatusChange, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For
failedCount === 0(the recovery case),earliestIncidentStartis always undefined, soaggStartedAtfalls back tomaxIdEntity.startedAt(the latest success timestamp). LaterRECOVEREDuses this asincidentStartedAt, so the email reports the incident as starting at recovery time instead of when failures began.