Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,20 @@ export type {
RealtimeEventType,
RealtimeEvent,
RealtimeCallback,
WatchChangeType,
WatchEvent,
WatchOptions,
WatchCallback,
} from "./modules/entities.types.js";

// Watch list manager for client-side sorting optimization
export {
WatchListManager,
createComparatorFromSort,
type WatchListManagerOptions,
type SortComparator,
} from "./utils/watch-list-manager.js";

export type {
AuthModule,
LoginResponse,
Expand Down
57 changes: 57 additions & 0 deletions src/modules/entities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ import {
RealtimeCallback,
RealtimeEvent,
RealtimeEventType,
WatchCallback,
WatchChangeType,
WatchEvent,
WatchOptions,
} from "./entities.types";
import { RoomsSocket } from "../utils/socket-utils.js";

Expand Down Expand Up @@ -69,6 +73,26 @@ function parseRealtimeMessage(dataStr: string): RealtimeEvent | null {
}
}

/**
* Parses the watch (live query) message data and extracts event information.
* @internal
*/
function parseWatchMessage(dataStr: string): WatchEvent | null {
try {
const parsed = JSON.parse(dataStr);
return {
changeType: parsed.change_type as WatchChangeType,
eventType: parsed.type as RealtimeEventType,
data: parsed.data,
id: parsed.id || parsed.data?.id,
timestamp: parsed.timestamp || new Date().toISOString(),
};
} catch (error) {
console.warn("[Base44 SDK] Failed to parse watch message:", error);
return null;
}
}

/**
* Creates a handler for a specific entity.
*
Expand Down Expand Up @@ -186,5 +210,38 @@ function createEntityHandler(

return unsubscribe;
},

// Watch for changes to a filtered subset (live query)
watch(options: WatchOptions, callback: WatchCallback): () => void {
const socket = getSocket();

// Use subscribeQuery to send subscription options to the server
const unsubscribe = socket.subscribeQuery(
appId,
entityName,
{
filter: options.filter,
sort: options.sort,
fields: options.fields,
limit: options.limit,
},
{
update_model: (msg) => {
const event = parseWatchMessage(msg.data);
if (!event) {
return;
}

try {
callback(event);
} catch (error) {
console.error("[Base44 SDK] Watch callback error:", error);
}
},
}
);

return unsubscribe;
},
};
}
101 changes: 101 additions & 0 deletions src/modules/entities.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@
*/
export type RealtimeEventType = "create" | "update" | "delete";

/**
* Change types for live query (watch) subscriptions.
* - "added": Entity now matches the filter but didn't before (or was created matching)
* - "modified": Entity matched before and still matches
* - "removed": Entity matched before but no longer matches (or was deleted)
*/
export type WatchChangeType = "added" | "modified" | "removed";

/**
* Payload received when a realtime event occurs.
*/
Expand All @@ -17,6 +25,41 @@ export interface RealtimeEvent {
timestamp: string;
}

/**
* Payload received when a watch (live query) event occurs.
*/
export interface WatchEvent {
/** The type of change relative to the subscription filter */
changeType: WatchChangeType;
/** The CUD event type that triggered this change */
eventType: RealtimeEventType;
/** The entity data after the change */
data: any;
/** The unique identifier of the affected entity */
id: string;
/** ISO 8601 timestamp of when the event occurred */
timestamp: string;
}

/**
* Options for watch (live query) subscriptions.
*/
export interface WatchOptions {
/** MongoDB-style filter query */
filter?: Record<string, any>;
/** Sort field with optional '-' prefix for descending */
sort?: string;
/** Array of field names to include in the response */
fields?: string[];
/** Maximum number of results */
limit?: number;
}

/**
* Callback function invoked when a watch (live query) event occurs.
*/
export type WatchCallback = (event: WatchEvent) => void;

/**
* Callback function invoked when a realtime event occurs.
*/
Expand Down Expand Up @@ -312,6 +355,64 @@ export interface EntityHandler {
* ```
*/
subscribe(callback: RealtimeCallback): () => void;

/**
* Watches for changes to a filtered subset of records (live query).
*
* Similar to `subscribe`, but allows you to specify filter, sort, fields,
* and limit options. The callback receives events with a `changeType` that
* indicates how the change affects the filtered result set:
* - `'added'`: A record now matches your filter (created or updated to match)
* - `'modified'`: A record still matches your filter but was updated
* - `'removed'`: A record no longer matches your filter (deleted or updated to not match)
*
* @param options - Options for the watch subscription:
* - `filter`: MongoDB-style filter query to match records
* - `sort`: Sort field with optional '-' prefix for descending order
* - `fields`: Array of field names to include in the response
* - `limit`: Maximum number of records to track
* @param callback - Callback function called when a matching entity changes. The callback receives an event object with:
* - `changeType`: How the change affects your filtered results - `'added'`, `'modified'`, or `'removed'`
* - `eventType`: The underlying CUD operation - `'create'`, `'update'`, or `'delete'`
* - `data`: The entity data after the change
* - `id`: The unique identifier of the affected entity
* - `timestamp`: ISO 8601 timestamp of when the event occurred
* @returns Unsubscribe function to stop receiving updates.
*
* @example
* ```typescript
* // Watch for changes to active high-priority tasks
* const unsubscribe = base44.entities.Task.watch(
* {
* filter: { status: 'active', priority: 'high' },
* sort: '-created_date',
* limit: 10
* },
* (event) => {
* if (event.changeType === 'added') {
* console.log('New high-priority task:', event.data);
* } else if (event.changeType === 'removed') {
* console.log('Task no longer high-priority:', event.id);
* }
* }
* );
*
* // Later, clean up the subscription
* unsubscribe();
* ```
*
* @example
* ```typescript
* // Watch for changes to current user's tasks
* const unsubscribe = base44.entities.Task.watch(
* { filter: { assignee: currentUser.id } },
* (event) => {
* console.log(`My task ${event.id} was ${event.changeType}:`, event.data);
* }
* );
* ```
*/
watch(options: WatchOptions, callback: WatchCallback): () => void;
}

/**
Expand Down
80 changes: 80 additions & 0 deletions src/utils/socket-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,46 @@
export type TSocketRoom = string;
export type TJsonStr = string;

/**
* Options for watch (live query) subscriptions.
*/
export interface WatchSubscriptionOptions {
filter?: Record<string, any>;
sort?: string;
fields?: string[];
limit?: number;
}

type RoomsSocketEventsMap = {
listen: {
connect: () => Promise<void> | void;
update_model: (msg: {
room: string;
data: TJsonStr;
}) => Promise<void> | void;
subscribed: (msg: {
room: string;
entity_name: string;
options: WatchSubscriptionOptions;
}) => Promise<void> | void;
unsubscribed: (msg: {
room: string;
entity_name: string;
}) => Promise<void> | void;
error: (error: Error) => Promise<void> | void;
};
emit: {
join: (room: string) => void;
leave: (room: string) => void;
subscribe_query: (data: {
app_id: string;
entity_name: string;
options: WatchSubscriptionOptions;
}) => void;
unsubscribe_query: (data: {
app_id: string;
entity_name: string;
}) => void;
};
};

Expand Down Expand Up @@ -53,6 +81,14 @@
return handlers.update_model?.(msg);
});

socket.on("subscribed", async (msg) => {
return handlers.subscribed?.(msg);
});

socket.on("unsubscribed", async (msg) => {
return handlers.unsubscribed?.(msg);
});

socket.on("error", async (error) => {
return handlers.error?.(error);
});
Expand All @@ -74,7 +110,7 @@
Partial<RoomsSocketEventsMap["listen"]>[]
> = {};

const handlers: RoomsSocketEventsMap["listen"] = {

Check failure on line 113 in src/utils/socket-utils.ts

View workflow job for this annotation

GitHub Actions / publish-preview

Type '{ connect: () => Promise<void>; update_model: (msg: { room: string; data: string; }) => Promise<void>; error: (error: Error) => Promise<void>; }' is missing the following properties from type '{ connect: () => void | Promise<void>; update_model: (msg: { room: string; data: string; }) => void | Promise<void>; subscribed: (msg: { room: string; entity_name: string; options: WatchSubscriptionOptions; }) => void | Promise<...>; unsubscribed: (msg: { ...; }) => void | Promise<...>; error: (error: Error) => void...': subscribed, unsubscribed
connect: async () => {
const promises: Promise<void>[] = [];
Object.keys(roomsToListeners).forEach((room) => {
Expand Down Expand Up @@ -162,9 +198,53 @@
};
};

/**
* Subscribe to a live query with filter, sort, fields, and limit options.
* This sends subscribe_query to the server and sets up listeners for updates.
*/
const subscribeQuery = (
appId: string,
entityName: string,
options: WatchSubscriptionOptions,
handlers: Partial<{ [k in TEvent]: THandler<k> }>
) => {
// The room name matches the backend format
const room = `entities:${appId}:${entityName}:watch`;

// Add handlers for this room
if (!roomsToListeners[room]) {
roomsToListeners[room] = [];
}
roomsToListeners[room].push(handlers);

// Send subscribe_query event to server
socket.emit("subscribe_query", {
app_id: appId,
entity_name: entityName,
options,
});

// Return unsubscribe function
return () => {
roomsToListeners[room] =
roomsToListeners[room]?.filter((listener) => listener !== handlers) ??
[];

if (roomsToListeners[room].length === 0) {
// Send unsubscribe_query event to server
socket.emit("unsubscribe_query", {
app_id: appId,
entity_name: entityName,
});
delete roomsToListeners[room];
}
};
};

return {
socket,
subscribeToRoom,
subscribeQuery,
updateConfig,
updateModel,
disconnect,
Expand Down
Loading
Loading