Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,18 @@ export const UserNotificationSettings: React.FC<{
<Switch id="account" />
</Form.Item>{" "}
</div>
<div className="flex flex-row w-full justify-between items-center border-x border-t border-collapse p-4">
<label htmlFor="summarizeBatchNotificationsByTable" className="font-main flex flex-col gap-1">
Summarize Batch Notifications by Table
<span className="text-xs text-textLight">
When a batch connection writes to multiple tables, send a single PARTIAL notification summarizing
failures across tables instead of one notification per table.
</span>
</label>
<Form.Item name="summarizeBatchNotificationsByTable" noStyle>
<Switch id="summarizeBatchNotificationsByTable" />
</Form.Item>
</div>
<div className="flex flex-row w-full justify-between items-center border-x border-t border-collapse p-4">
<label htmlFor="recurringAlertsPeriodHours" className="font-main flex flex-col gap-1">
Recurring Alerts Period (hours)
Expand Down
1 change: 1 addition & 0 deletions webapps/console/lib/schema/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ export const NotificationChannel = ConfigEntityBase.merge(
// allWorkspaceEmails: z.boolean().default(true).optional(),
emails: z.array(z.string()).optional(),
recurringAlertsPeriodHours: z.number().max(720).min(0).default(168),
summarizeBatchNotificationsByTable: z.boolean().default(true),
})
);
export type NotificationChannel = z.infer<typeof NotificationChannel>;
Expand Down
2 changes: 2 additions & 0 deletions webapps/console/lib/server/user-preferences.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export const UserNotificationsPreferences = z.object({
syncs: z.boolean().default(true),
dead: z.boolean().default(true),
account: z.boolean().default(true),
summarizeBatchNotificationsByTable: z.boolean().default(true),
recurringAlertsPeriodHours: z.coerce.number().max(720).min(0).default(168),
subscriptionCode: z.string().optional(),
});
Expand All @@ -18,6 +19,7 @@ export const DefaultUserNotificationsPreferences: UserNotificationsPreferences =
syncs: true,
dead: true,
account: true,
summarizeBatchNotificationsByTable: true,
recurringAlertsPeriodHours: 168,
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,15 @@ const NotificationChannelList: React.FC<{}> = () => {
</>
),
},
summarizeBatchNotificationsByTable: {
displayName: "Summarize Batch Notifications by Table",
documentation: (
<>
When a batch connection writes to multiple tables, send a single PARTIAL notification summarizing failures
across tables instead of one notification per table.
</>
),
},
},
noun: "Slack Notification Channel",
type: "notification",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ export default createRoute()
events: ["all"],
recurringAlertsPeriodHours: recurringAlertsPeriodHours || 168,
name: "Test Slack Channel",
summarizeBatchNotificationsByTable: true,
};
const statusChange: StatusChange = {
type: con?.type === "sync" ? "sync" : "batch",
Expand Down
268 changes: 267 additions & 1 deletion webapps/console/pages/api/admin/notifications.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ const adminChannel: NotificationChannel = {
slackWebhookUrl: SLACK_WEBHOOK_URL,
events: ["all"],
recurringAlertsPeriodHours: 24,
summarizeBatchNotificationsByTable: true,
};

export type StatusChangeEntity = Omit<StatusChange, "type"> & {
Expand Down Expand Up @@ -282,7 +283,12 @@ async function loadNotificationsChannels() {
channelsByWorkspace = [];
channels[row.workspaceId] = channelsByWorkspace;
}
channelsByWorkspace.push({ ...omit(row, "config"), ...(row.config as any) } as unknown as NotificationChannel);
const merged = { ...omit(row, "config"), ...(row.config as any) } as unknown as NotificationChannel;
// Channels saved before this field existed have it undefined — default to true.
if (merged.summarizeBatchNotificationsByTable === undefined) {
merged.summarizeBatchNotificationsByTable = true;
}
channelsByWorkspace.push(merged);
}
});

Expand Down Expand Up @@ -325,6 +331,7 @@ async function loadNotificationsChannels() {
: 168,
type: "notification",
workspaceId: row.workspaceId,
summarizeBatchNotificationsByTable: settings.summarizeBatchNotificationsByTable !== false,
});
}
}
Expand Down Expand Up @@ -381,6 +388,11 @@ async function processStatusChanges(
if (!channel.events.includes(entity.type) && !channel.events.includes("all")) {
continue;
}
// For channels that summarize batch notifications by table, skip per-table changes —
// those are handled below as one connection-level aggregate.
if (entity.type === "batch" && entity.tableName && channel.summarizeBatchNotificationsByTable) {
continue;
}
const cStatuses = [...statuses];
const chkey = chKey(channel.id, lastStatus.actorId, lastStatus.type, lastStatus.tableName);
let state = channelStates[chkey];
Expand Down Expand Up @@ -481,9 +493,263 @@ async function processStatusChanges(
}
}

// Aggregate batch per-table status changes into one connection-level notification per channel.
// Each per-table StatusChange row remains intact in the DB; this only affects what gets sent.
const batchActorIdsWithChanges = new Set<string>();
for (const change of statusChanges) {
if (change.type === "batch" && change.tableName) {
batchActorIdsWithChanges.add(change.actorId);
}
}
for (const actorId of batchActorIdsWithChanges) {
const view = computeBatchConnectionAggregate(entities, actorId);
if (!view) continue;
const channelList = [...(channels[view.workspaceId] || []), ...(channels["admin"] || [])];
for (const channel of channelList) {
if (!channel.events.includes("batch") && !channel.events.includes("all")) continue;
if (!channel.summarizeBatchNotificationsByTable) continue;
const st = await processBatchAggregateNotification(channel, channelStates, view, publicEndpoints, dryRun);
if (st) sendStatuses.push(st);
}
}

return { date: processedTimestamp, statuses: sendStatuses };
}

type BatchConnectionAggregate = {
workspaceId: string;
actorId: string;
workspaceName: string;
slug: string;
fromName: string;
toName: string;
perTableCount: number;
failedTableNames: string[];
aggStatus: "SUCCESS" | "FAILED" | "PARTIAL";
aggDescription: string;
aggStreamsFailed?: string;
aggIncidentDetails: string;
aggMaxId: bigint;
aggTimestamp: Date;
aggStartedAt: Date;
aggQueueSize: number;
changesPerHours: number;
changesPerDay: number;
};

function computeBatchConnectionAggregate(
entities: Record<string, StatusChangeEntity>,
actorId: string
): BatchConnectionAggregate | undefined {
let template: StatusChangeEntity | undefined;
let maxId: bigint = 0n;
let maxIdEntity: StatusChangeEntity | undefined;
let earliestIncidentStart: Date | undefined;
let totalChangesPerHours = 0;
let totalChangesPerDay = 0;
const failedTableNames: string[] = [];
let perTableCount = 0;
let firstFailureDescription: string | undefined;
let aggQueueSize = 0;
for (const ent of Object.values(entities)) {
if (ent.actorId !== actorId || ent.type !== "batch" || !ent.tableName) continue;
perTableCount++;
template = template ?? ent;
if (ent.id) {
const idBig = BigInt(ent.id);
if (idBig > maxId) {
maxId = idBig;
maxIdEntity = ent;
}
}
totalChangesPerHours += ent.changesPerHours || 0;
totalChangesPerDay += ent.changesPerDay || 0;
if (ent.status !== "SUCCESS") {
failedTableNames.push(ent.tableName!);
if (ent.startedAt && (!earliestIncidentStart || ent.startedAt < earliestIncidentStart)) {
earliestIncidentStart = ent.startedAt;
}
if (!firstFailureDescription) {
firstFailureDescription = extractDescription(ent as unknown as StatusChange) ?? ent.description ?? undefined;
}
}
aggQueueSize += ent.queueSize || 0;
}
if (!template || !maxIdEntity || perTableCount === 0) return undefined;
const failedCount = failedTableNames.length;
const succeededCount = perTableCount - failedCount;
let aggStatus: "SUCCESS" | "FAILED" | "PARTIAL";
let aggStreamsFailed: string | undefined;
let aggIncidentDetails: string;
let aggDescription: string;
if (failedCount === 0) {
aggStatus = "SUCCESS";
aggDescription = "";
aggIncidentDetails = `All ${perTableCount} table(s) succeeded.`;
} else if (succeededCount === 0) {
aggStatus = "FAILED";
aggIncidentDetails = `All ${perTableCount} table(s) failed: ${failedTableNames.join(", ")}.\n\nLatest error:\n${
firstFailureDescription ?? ""
}`;
aggDescription =
_J_PREF +
JSON.stringify({
description: firstFailureDescription ?? "",
incidentDetails: aggIncidentDetails,
});
} else {
aggStatus = "PARTIAL";
aggStreamsFailed = `${failedCount} of ${perTableCount}`;
aggIncidentDetails = `${aggStreamsFailed} tables failed: ${failedTableNames.join(", ")}.\n\nLatest error:\n${
firstFailureDescription ?? ""
}`;
aggDescription =
_J_PREF +
JSON.stringify({
status: "PARTIAL",
description: aggIncidentDetails,
streamsFailed: aggStreamsFailed,
incidentDetails: aggIncidentDetails,
});
}
return {
workspaceId: template.workspaceId!,
actorId,
workspaceName: template.workspaceName,
slug: template.slug,
fromName: template.fromName,
toName: template.toName,
perTableCount,
failedTableNames,
aggStatus,
aggDescription: aggDescription ?? "",
aggStreamsFailed,
aggIncidentDetails,
aggMaxId: maxId,
aggTimestamp: maxIdEntity.timestamp!,
aggStartedAt: earliestIncidentStart ?? maxIdEntity.startedAt!,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For failedCount === 0 (the recovery case), earliestIncidentStart is always undefined, so aggStartedAt falls back to maxIdEntity.startedAt (the latest success timestamp). Later RECOVERED uses this as incidentStartedAt, so the email reports the incident as starting at recovery time instead of when failures began.

aggQueueSize,
changesPerHours: totalChangesPerHours,
changesPerDay: totalChangesPerDay,
};
}

async function processBatchAggregateNotification(
channel: NotificationChannel,
channelStates: Record<string, NotificationState>,
view: BatchConnectionAggregate,
publicEndpoints: PublicEndpoint,
dryRun: boolean
): Promise<SendStatus | undefined> {
const chkey = chKey(channel.id, view.actorId, "batch", "");
const state = channelStates[chkey];
const sendRecurringTime =
(state?.lastNotification?.getTime() || 0) + channel.recurringAlertsPeriodHours * 60 * 60 * 1000;
let doNotify = false;
let renderStatus: string = view.aggStatus;
let prevStatus: string | undefined;

if (!state) {
// First notification on this channel for this connection aggregate.
if (view.aggStatus === "SUCCESS") {
// Only emit a FIRST_RUN-style notification if some per-table activity has been observed.
renderStatus = "FIRST_RUN";
doNotify = true;
} else {
doNotify = true;
}
} else {
const prevId = BigInt(state.statusChangeId);
if (view.aggMaxId > prevId) {
doNotify = true;
if (view.aggStatus === "SUCCESS") {
// Only notify on SUCCESS if it represents a recovery from a prior failure.
try {
const prevRow = await db.prisma().statusChange.findUnique({ where: { id: state.statusChangeId } });
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

state.statusChangeId stores view.aggMaxId (a real per-table StatusChange id), not the previous aggregate status. That means recovery detection can be wrong: if the previous aggregate was PARTIAL but the max-id table row was SUCCESS, this branch suppresses RECOVERED (doNotify = false) even though the connection actually recovered.

prevStatus = prevRow?.status;
if (prevRow && prevRow.status !== "SUCCESS") {
renderStatus = "RECOVERED";
} else {
doNotify = false;
}
} catch (e: any) {
log.atWarn().log(`Failed to load previous statusChange ${state.statusChangeId}: ${e?.message}`);
doNotify = false;
}
}
} else if (view.aggStatus !== "SUCCESS" && view.aggTimestamp.getTime() > sendRecurringTime) {
doNotify = true;
renderStatus = "ONGOING";
}
}

if (!doNotify) return;

let description: string;
if (renderStatus === "FIRST_RUN") {
description = _J_PREF + JSON.stringify({ status: "FIRST_RUN", incidentDetails: view.aggIncidentDetails });
} else if (renderStatus === "RECOVERED") {
description =
_J_PREF +
JSON.stringify({
status: "RECOVERED",
incidentStatus: prevStatus ?? "FAILED",
incidentStartedAt: view.aggStartedAt?.toISOString(),
incidentDetails: view.aggIncidentDetails,
});
} else if (renderStatus === "ONGOING") {
description =
_J_PREF +
JSON.stringify({
status: "ONGOING",
description: view.aggIncidentDetails,
streamsFailed: view.aggStreamsFailed,
incidentDetails: view.aggIncidentDetails,
incidentStatus: view.aggStatus,
});
} else if (view.aggStatus === "PARTIAL") {
description = view.aggDescription;
} else {
description = view.aggDescription;
}

const syntheticStatus: StatusChange = {
id: view.aggMaxId,
workspaceId: view.workspaceId,
actorId: view.actorId,
type: "batch",
tableName: "",
timestamp: view.aggTimestamp,
startedAt: view.aggStartedAt,
status: view.aggStatus,
description,
counts: 1,
queueSize: view.aggQueueSize,
};

const aggEntity: StatusChangeEntity = {
id: view.aggMaxId,
workspaceId: view.workspaceId,
actorId: view.actorId,
type: "batch",
tableName: "",
workspaceName: view.workspaceName,
slug: view.slug,
fromName: view.fromName,
toName: view.toName,
timestamp: view.aggTimestamp,
startedAt: view.aggStartedAt,
status: view.aggStatus,
description,
counts: 1,
queueSize: view.aggQueueSize,
changesPerHours: view.changesPerHours,
changesPerDay: view.changesPerDay,
};

return await processNotification(channel, channelStates, [syntheticStatus], aggEntity, publicEndpoints, dryRun);
}

function makeNotificationState(
channel: NotificationChannel,
statusChange: StatusChange,
Expand Down
Loading