Skip to content

Commit 1e8622c

Browse files
authored
Update useRealtime.ts to add filter functionality based on date and status
1 parent 49e6470 commit 1e8622c

1 file changed

Lines changed: 113 additions & 58 deletions

File tree

packages/react-hooks/src/hooks/useRealtime.ts

Lines changed: 113 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,7 @@
11
"use client";
22

3-
import {
4-
AnyTask,
5-
ApiClient,
6-
InferRunTypes,
7-
RealtimeRun,
8-
RealtimeRunSkipColumns,
9-
} from "@trigger.dev/core/v3";
10-
import { useCallback, useEffect, useId, useRef, useState } from "react";
3+
import { AnyTask, ApiClient, InferRunTypes, RealtimeRun } from "@trigger.dev/core/v3";
4+
import { useCallback, useEffect, useId, useRef, useState, useMemo } from "react";
115
import { KeyedMutator, useSWR } from "../utils/trigger-swr.js";
126
import { useApiClient, UseApiClientOptions } from "./useApiClient.js";
137
import { createThrottledQueue } from "../utils/throttle.js";
@@ -35,13 +29,6 @@ export type UseRealtimeSingleRunOptions<TTask extends AnyTask = AnyTask> = UseRe
3529
* Set this to false if you are making updates to the run metadata after completion through child runs
3630
*/
3731
stopOnCompletion?: boolean;
38-
39-
/**
40-
* Skip columns from the subscription.
41-
*
42-
* @default []
43-
*/
44-
skipColumns?: RealtimeRunSkipColumns;
4532
};
4633

4734
export type UseRealtimeRunInstance<TTask extends AnyTask = AnyTask> = {
@@ -114,7 +101,6 @@ export function useRealtimeRun<TTask extends AnyTask>(
114101

115102
await processRealtimeRun(
116103
runId,
117-
{ skipColumns: options?.skipColumns },
118104
apiClient,
119105
mutateRun,
120106
setError,
@@ -275,7 +261,6 @@ export function useRealtimeRunWithStreams<
275261

276262
await processRealtimeRunWithStreams(
277263
runId,
278-
{ skipColumns: options?.skipColumns },
279264
apiClient,
280265
mutateRun,
281266
mutateStreams,
@@ -349,32 +334,6 @@ export type UseRealtimeRunsInstance<TTask extends AnyTask = AnyTask> = {
349334
stop: () => void;
350335
};
351336

352-
export type UseRealtimeRunsWithTagOptions = UseRealtimeRunOptions & {
353-
/**
354-
* Filter runs by the time they were created. You must specify the duration string like "1h", "10s", "30m", etc.
355-
*
356-
* @example
357-
* "1h" - 1 hour ago
358-
* "10s" - 10 seconds ago
359-
* "30m" - 30 minutes ago
360-
* "1d" - 1 day ago
361-
* "1w" - 1 week ago
362-
*
363-
* The maximum duration is 1 week
364-
*
365-
* @note The timestamp will be calculated on the server side when you first subscribe to the runs.
366-
*
367-
*/
368-
createdAt?: string;
369-
370-
/**
371-
* Skip columns from the subscription.
372-
*
373-
* @default []
374-
*/
375-
skipColumns?: RealtimeRunSkipColumns;
376-
};
377-
378337
/**
379338
* Hook to subscribe to realtime updates of task runs filtered by tag(s).
380339
*
@@ -389,13 +348,73 @@ export type UseRealtimeRunsWithTagOptions = UseRealtimeRunOptions & {
389348
* const { runs, error } = useRealtimeRunsWithTag<typeof myTask>('my-tag');
390349
* // Or with multiple tags
391350
* const { runs, error } = useRealtimeRunsWithTag<typeof myTask>(['tag1', 'tag2']);
392-
* // Or with a createdAt filter
393-
* const { runs, error } = useRealtimeRunsWithTag<typeof myTask>('my-tag', { createdAt: '1h' });
394351
* ```
395352
*/
353+
354+
export interface RealtimeFilterOptions {
355+
/**
356+
* Inclusive start date. Accepts Date or ISO date string or epoch number.
357+
*/
358+
startDate?: Date | string | number;
359+
/**
360+
* Inclusive end date. Accepts Date or ISO date string or epoch number.
361+
*/
362+
endDate?: Date | string | number;
363+
/**
364+
* Allowed run statuses (exact string matches).
365+
*/
366+
statuses?: string[];
367+
}
368+
369+
/** Utility: normalize a possible date input to a Date instance, or null */
370+
function normalizeToDate(value?: Date | string | number): Date | null {
371+
if (value === undefined || value === null) return null;
372+
if (value instanceof Date) return value;
373+
if (typeof value === "number") return new Date(value);
374+
if (typeof value === "string") {
375+
const d = new Date(value);
376+
return Number.isNaN(d.getTime()) ? null : d;
377+
}
378+
return null;
379+
}
380+
381+
/** Utility: pull a best-effort Date from a run object in a type-safe way */
382+
function getRunDate<TTask extends AnyTask>(run: RealtimeRun<TTask>): Date | null {
383+
// Common timestamp fields used by different APIs
384+
const candidates = ["startedAt", "createdAt", "started_at", "created_at"] as const;
385+
386+
for (const key of candidates) {
387+
const val = (run as unknown as Record<string, unknown>)[key];
388+
if (val instanceof Date) return val;
389+
if (typeof val === "string" || typeof val === "number") {
390+
const d = normalizeToDate(val as string | number);
391+
if (d) return d;
392+
}
393+
}
394+
395+
return null;
396+
}
397+
398+
/** Utility: get status string from run in a safe way */
399+
function getRunStatus<TTask extends AnyTask>(run: RealtimeRun<TTask>): string {
400+
const status = (run as unknown as Record<string, unknown>).status;
401+
if (typeof status === "string") return status;
402+
if (typeof status === "number") return String(status);
403+
return "";
404+
}
405+
406+
/** Stable serialisation/keys for filters so deps are stable and readable */
407+
function createFiltersKey(filters?: RealtimeFilterOptions): string {
408+
if (!filters) return "";
409+
const startIso = normalizeToDate(filters.startDate)?.toISOString() ?? "";
410+
const endIso = normalizeToDate(filters.endDate)?.toISOString() ?? "";
411+
const statuses = Array.isArray(filters.statuses) ? filters.statuses.join(",") : "";
412+
return `${startIso}|${endIso}|${statuses}`;
413+
}
414+
396415
export function useRealtimeRunsWithTag<TTask extends AnyTask>(
397416
tag: string | string[],
398-
options?: UseRealtimeRunsWithTagOptions
417+
options?: UseRealtimeRunOptions & { filters?: RealtimeFilterOptions }
399418
): UseRealtimeRunsInstance<TTask> {
400419
const hookId = useId();
401420
const idKey = options?.id ?? hookId;
@@ -439,7 +458,6 @@ export function useRealtimeRunsWithTag<TTask extends AnyTask>(
439458

440459
await processRealtimeRunsWithTag(
441460
tag,
442-
{ createdAt: options?.createdAt, skipColumns: options?.skipColumns },
443461
apiClient,
444462
mutateRuns,
445463
runsRef,
@@ -471,9 +489,51 @@ export function useRealtimeRunsWithTag<TTask extends AnyTask>(
471489
return () => {
472490
stop();
473491
};
474-
}, [tag, stop, options?.enabled]);
492+
// Including filtersKey here to restart the streaming request when filters change
493+
// This ensures we get fresh data when filter criteria are modified
494+
// eslint-disable-next-line react-hooks/exhaustive-deps
495+
}, [tag, stop, options?.enabled, createFiltersKey(options?.filters)]);
496+
// Stable key for useMemo deps
497+
const filtersKey = useMemo(
498+
() => createFiltersKey(options?.filters),
499+
[options?.filters]
500+
); // Client-side filtering: useMemo + typed return
501+
const filteredRuns = useMemo<RealtimeRun<TTask>[]>(() => {
502+
const list = runs ?? [];
503+
const f = options?.filters;
504+
if (!f) return list;
505+
506+
const start = normalizeToDate(f.startDate);
507+
const end = normalizeToDate(f.endDate);
508+
const allowedStatuses = Array.isArray(f.statuses) && f.statuses.length > 0
509+
? f.statuses.map((s) => s.toString())
510+
: null;
511+
512+
// small, readable filter function
513+
return list.filter((run: any) => {
514+
const runDate = getRunDate(run);
515+
if (start && runDate) {
516+
if (runDate < start) return false;
517+
}
518+
if (end && runDate) {
519+
if (runDate > end) return false;
520+
}
475521

476-
return { runs: runs ?? [], error, stop };
522+
if (allowedStatuses) {
523+
const status = getRunStatus(run);
524+
if (!allowedStatuses.includes(status)) return false;
525+
}
526+
527+
return true;
528+
});
529+
// filtersKey so memo invalidates when filters change
530+
}, [runs, filtersKey]);
531+
532+
return {
533+
runs: filteredRuns,
534+
error,
535+
stop,
536+
};
477537
}
478538

479539
/**
@@ -591,7 +651,7 @@ async function processRealtimeBatch<TTask extends AnyTask = AnyTask>(
591651
}
592652
}
593653

594-
// Inserts and then orders by the run createdAt timestamp, and ensures that the run is not duplicated
654+
// Inserts and then orders by the run number, and ensures that the run is not duplicated
595655
function insertRunShapeInOrder<TTask extends AnyTask>(
596656
previousRuns: RealtimeRun<TTask>[],
597657
run: RealtimeRun<TTask>
@@ -601,8 +661,8 @@ function insertRunShapeInOrder<TTask extends AnyTask>(
601661
return previousRuns.map((r) => (r.id === run.id ? run : r));
602662
}
603663

604-
const runCreatedAt = run.createdAt;
605-
const index = previousRuns.findIndex((r) => r.createdAt > runCreatedAt);
664+
const runNumber = run.number;
665+
const index = previousRuns.findIndex((r) => r.number > runNumber);
606666
if (index === -1) {
607667
return [...previousRuns, run];
608668
}
@@ -612,14 +672,13 @@ function insertRunShapeInOrder<TTask extends AnyTask>(
612672

613673
async function processRealtimeRunsWithTag<TTask extends AnyTask = AnyTask>(
614674
tag: string | string[],
615-
filters: { createdAt?: string; skipColumns?: RealtimeRunSkipColumns },
616675
apiClient: ApiClient,
617676
mutateRunsData: KeyedMutator<RealtimeRun<TTask>[]>,
618677
existingRunsRef: React.MutableRefObject<RealtimeRun<TTask>[]>,
619678
onError: (e: Error) => void,
620679
abortControllerRef: React.MutableRefObject<AbortController | null>
621680
) {
622-
const subscription = apiClient.subscribeToRunsWithTag<InferRunTypes<TTask>>(tag, filters, {
681+
const subscription = apiClient.subscribeToRunsWithTag<InferRunTypes<TTask>>(tag, {
623682
signal: abortControllerRef.current?.signal,
624683
onFetchError: onError,
625684
});
@@ -655,7 +714,6 @@ async function processRealtimeRunWithStreams<
655714
TStreams extends Record<string, any> = Record<string, any>,
656715
>(
657716
runId: string,
658-
filters: { skipColumns?: RealtimeRunSkipColumns },
659717
apiClient: ApiClient,
660718
mutateRunData: KeyedMutator<RealtimeRun<TTask>>,
661719
mutateStreamData: KeyedMutator<StreamResults<TStreams>>,
@@ -669,7 +727,6 @@ async function processRealtimeRunWithStreams<
669727
signal: abortControllerRef.current?.signal,
670728
closeOnComplete: stopOnCompletion,
671729
onFetchError: onError,
672-
skipColumns: filters.skipColumns,
673730
});
674731

675732
type StreamUpdate = {
@@ -716,7 +773,6 @@ async function processRealtimeRunWithStreams<
716773

717774
async function processRealtimeRun<TTask extends AnyTask = AnyTask>(
718775
runId: string,
719-
filters: { skipColumns?: RealtimeRunSkipColumns },
720776
apiClient: ApiClient,
721777
mutateRunData: KeyedMutator<RealtimeRun<TTask>>,
722778
onError: (e: Error) => void,
@@ -727,7 +783,6 @@ async function processRealtimeRun<TTask extends AnyTask = AnyTask>(
727783
signal: abortControllerRef.current?.signal,
728784
closeOnComplete: stopOnCompletion,
729785
onFetchError: onError,
730-
skipColumns: filters.skipColumns,
731786
});
732787

733788
for await (const part of subscription) {

0 commit comments

Comments
 (0)