Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .changeset/brave-pens-sleep.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
'@tanstack/db': patch
---

fix: prevent duplicate inserts from reaching D2 pipeline in live queries

Added defensive measures to prevent duplicate INSERT events from reaching the D2 (differential dataflow) pipeline, which could cause items to not disappear when deleted (due to multiplicity going from 2 to 1 instead of 1 to 0).

Changes:

- Added `sentToD2Keys` tracking in `CollectionSubscriber` to filter duplicate inserts at the D2 pipeline entry point
- Fixed `includeInitialState` handling to only pass when `true`, preventing internal lazy-loading subscriptions from incorrectly disabling filtering
- Clear `sentToD2Keys` on truncate to allow re-inserts after collection reset
2 changes: 1 addition & 1 deletion examples/react/paced-mutations-demo/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
},
"dependencies": {
"@tanstack/db": "^0.5.11",
"@tanstack/react-db": "^0.1.56",
"@tanstack/react-db": "^0.1.58",
"mitt": "^3.0.1",
"react": "^19.2.1",
"react-dom": "^19.2.1"
Expand Down
4 changes: 2 additions & 2 deletions examples/react/todo/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
"dependencies": {
"@tanstack/electric-db-collection": "^0.2.12",
"@tanstack/query-core": "^5.90.12",
"@tanstack/query-db-collection": "^1.0.8",
"@tanstack/react-db": "^0.1.56",
"@tanstack/query-db-collection": "^1.0.10",
"@tanstack/react-db": "^0.1.58",
"@tanstack/react-router": "^1.140.0",
"@tanstack/react-start": "^1.140.0",
"@tanstack/trailbase-db-collection": "^0.1.55",
Expand Down
2 changes: 1 addition & 1 deletion examples/solid/todo/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"dependencies": {
"@tanstack/electric-db-collection": "^0.2.12",
"@tanstack/query-core": "^5.90.12",
"@tanstack/query-db-collection": "^1.0.8",
"@tanstack/query-db-collection": "^1.0.10",
"@tanstack/solid-db": "^0.1.54",
"@tanstack/solid-router": "^1.140.0",
"@tanstack/solid-start": "^1.140.0",
Expand Down
38 changes: 35 additions & 3 deletions packages/db/src/query/live/collection-subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ export class CollectionSubscriber<
{ resolve: () => void }
>()

// Track keys that have been sent to the D2 pipeline to prevent duplicate inserts
// This is necessary because different code paths (initial load, change events)
// can potentially send the same item to D2 multiple times.
private sentToD2Keys = new Set<string | number>()

constructor(
private alias: string,
private collectionId: string,
Expand Down Expand Up @@ -129,13 +134,33 @@ export class CollectionSubscriber<
changes: Iterable<ChangeMessage<any, string | number>>,
callback?: () => boolean,
) {
// Filter changes to prevent duplicate inserts to D2 pipeline.
// This ensures D2 multiplicity stays at 1 for visible items, so deletes
// properly reduce multiplicity to 0 (triggering DELETE output).
const changesArray = Array.isArray(changes) ? changes : [...changes]
const filteredChanges: Array<ChangeMessage<any, string | number>> = []
for (const change of changesArray) {
if (change.type === `insert`) {
if (this.sentToD2Keys.has(change.key)) {
// Skip duplicate insert - already sent to D2
continue
}
this.sentToD2Keys.add(change.key)
} else if (change.type === `delete`) {
// Remove from tracking so future re-inserts are allowed
this.sentToD2Keys.delete(change.key)
}
// Updates are handled as delete+insert by splitUpdates, so no special handling needed
filteredChanges.push(change)
}

// currentSyncState and input are always defined when this method is called
// (only called from active subscriptions during a sync session)
const input =
this.collectionConfigBuilder.currentSyncState!.inputs[this.alias]!
const sentChanges = sendChangesToInput(
input,
changes,
filteredChanges,
this.collection.config.getKey,
)

Expand All @@ -162,8 +187,13 @@ export class CollectionSubscriber<
this.sendChangesToPipeline(changes)
}

// Only pass includeInitialState when true. When it's false, we leave it
// undefined so that user subscriptions with explicit `includeInitialState: false`
// can be distinguished from internal lazy-loading subscriptions.
// If we pass `false`, changes.ts would call markAllStateAsSeen() which
// disables filtering - but internal subscriptions still need filtering.
const subscription = this.collection.subscribeChanges(sendChanges, {
includeInitialState,
...(includeInitialState && { includeInitialState }),
whereExpression,
})

Expand All @@ -190,10 +220,12 @@ export class CollectionSubscriber<
whereExpression,
})

// Listen for truncate events to reset cursor tracking state
// Listen for truncate events to reset cursor tracking state and sentToD2Keys
// This ensures that after a must-refetch/truncate, we don't use stale cursor data
// and allow re-inserts of previously sent keys
const truncateUnsubscribe = this.collection.on(`truncate`, () => {
this.biggest = undefined
this.sentToD2Keys.clear()
})

// Clean up truncate listener when subscription is unsubscribed
Expand Down
Loading
Loading