Skip to content

Commit 01452bf

Browse files
KyleAMathewsclaudeautofix-ci[bot]samwillis
authored
Fix: deleted items not disappearing from live queries with .limit() (#1044)
* fix: emit delete events when subscribing with includeInitialState: false When subscribing to a collection with includeInitialState: false, delete events were being filtered out because the sentKeys set was empty. This affected live queries with limit/offset where users would subscribe to get future changes after already loading initial data via preload() or values(). Changes: - Add skipFiltering flag separate from loadedInitialState to allow filtering to be skipped while still allowing requestSnapshot to work - Call markAllStateAsSeen() when includeInitialState is explicitly false - Change internal subscriptions to not pass includeInitialState: false explicitly, so they can be distinguished from user subscriptions - Add tests for optimistic delete behavior with limit Fixes the issue where deleted items would not disappear from live queries when using .limit() and subscribing with includeInitialState: false. * debug: add extensive logging to track delete event flow This is a DEBUG BUILD with [TanStack-DB-DEBUG] logs to help track down why delete events may not be reaching subscribers when using limit/offset. The debug logs cover: - subscribeChanges: when subscriptions are created - emitEvents: when events are emitted to subscriptions - Subscription.emitEvents: when individual subscriptions receive events - filterAndFlipChanges: when events are filtered or passed through - recomputeOptimisticState: when optimistic state is recomputed and events emitted - sendChangesToPipeline: when changes flow through the D2 pipeline - applyChanges: when D2 pipeline outputs to the live query collection To use: Filter browser console for "[TanStack-DB-DEBUG]" Also includes the fix for includeInitialState: false not emitting deletes. * ci: apply automated fixes * debug: add more logging to track delete event flow in live queries Add comprehensive debug logging to: - createFilteredCallback in change-events.ts for whereExpression filtering - sendChangesToInput for D2 pipeline input - subscribeToOrderedChanges for orderBy/limit path - splitUpdates for update event handling - recomputeOptimisticState for pending sync key filtering This additional logging helps track where delete events may be filtered out when using live queries with limit/offset and where clauses. * ci: apply automated fixes * debug: add logging to graph scheduling and execution Add debug logging to track: - scheduleGraphRun: when graph run is scheduled - executeGraphRun: when graph run executes or returns early - maybeRunGraph: when graph actually runs, pending work status This helps diagnose issues where deletes are sent to D2 pipeline but never appear in the output (applyChanges not called). * ci: apply automated fixes * debug: add detailed logging to D2 reduce and topK operators Add debug logging to track: - ReduceOperator: input processing, key handling, and result output - topK: consolidation, sorting, slicing, and result details Also add two new test cases: 1. Test delete from different page (page 1 delete while viewing page 2) - Verifies items shift correctly when delete occurs on earlier page 2. Test delete beyond TopK window (no-op case) - Verifies deleting item outside window doesn't affect results These tests and debug logs will help diagnose issues where deleted items don't disappear from live queries when using limit/offset. * ci: apply automated fixes * debug: add more detailed logging to D2 graph and subscription Add additional debug logging to help diagnose delete issues: D2 graph (d2.ts): - Log when run() starts and completes with step count - Log pendingWork() results with operator IDs - Log when operators have pending work in step() Output operator (output.ts): - Log when run is called with message count - Log items in each message being processed Subscription (subscription.ts): - Log trackSentKeys with keys being added - Show total sentKeys count This should help diagnose scenarios where delete events are sent to D2 but no applyChanges output is produced. * ci: apply automated fixes * debug: add operator type logging to trace D2 pipeline Add operatorType property to Operator base class and log it when operators run. This will help identify which operators are processing the delete and where the data is being lost. Also add detailed logging to LinearUnaryOperator.run() to show: - Input message count - Input/output item counts - Sample of input and output items This should reveal exactly which operator is dropping the delete. * debug: add logging to TopKWithFractionalIndexOperator This is the key operator for orderBy+limit queries. Add detailed logging to: - run(): Show message count and index size - processElement(): Show key, multiplicity changes, and action (INSERT/DELETE/NO_CHANGE) - processElement result: Show moveIn/moveOut keys This should reveal exactly why deletes aren't producing output changes when the item exists in the TopK index. * ci: apply automated fixes * fix: filter duplicate inserts in subscription to prevent D2 multiplicity issues When an item is inserted multiple times without a delete in between, D2 multiplicity goes above 1. Then when a single delete arrives, multiplicity goes from 2 to 1 (not 0), so TopK doesn't emit a DELETE event. This fix: 1. Filters out duplicate inserts in filterAndFlipChanges when key already in sentKeys 2. Removes keys from sentKeys on delete in both filterAndFlipChanges and trackSentKeys 3. Updates test expectation to reflect correct behavior (2 events instead of 3) Root cause: Multiple subscriptions or sync mechanisms could send duplicate insert events for the same key, causing D2 to track multiplicity > 1. * fix: add D2 input level deduplication to prevent multiplicity > 1 The previous fix in CollectionSubscription.filterAndFlipChanges was only catching duplicates at the subscription level. But each live query has its own CollectionSubscriber with its own D2 pipeline. This fix adds a sentToD2Keys set in CollectionSubscriber to track which keys have been sent to the D2 input, preventing duplicate inserts at the D2 level regardless of which code path triggers them. Also clears the tracking on truncate events. * docs: add detailed changeset for delete fix * ci: apply automated fixes * chore: remove debug logging from D2 pipeline and subscription code Remove all TanStack-DB-DEBUG console statements that were added during investigation of the deleted items not disappearing from live queries bug. The fix for duplicate D2 inserts is preserved - just removing the verbose debug output now that the issue is resolved. * debug: add logging to trace source of duplicate D2 inserts Add targeted debug logging to understand where duplicate inserts originate: 1. recomputeOptimisticState: Track what events are generated and when 2. CollectionSubscription.filterAndFlipChanges: Trace filtering decisions 3. CollectionSubscriber.sendChangesToPipeline: Track D2-level deduplication This will help determine if duplicates come from: - Multiple calls to recomputeOptimisticState for the same key - Overlap between initial snapshot and change events - Multiple code paths feeding the D2 pipeline * ci: apply automated fixes * fix: prevent race condition in snapshot loading by adding keys to sentKeys before callback The race condition occurred because snapshot methods (requestSnapshot, requestLimitedSnapshot) added keys to sentKeys AFTER calling the callback, while filterAndFlipChanges added keys BEFORE. If a change event arrived during callback execution, it would not see the keys in sentKeys yet, allowing duplicate inserts. Changes: - Add keys to sentKeys BEFORE calling callback in requestSnapshot and requestLimitedSnapshot - Remove redundant D2-level deduplication (sentToD2Keys) - subscription-level filtering is sufficient - Remove debug logging added during investigation * docs: update changeset to reflect race condition fix * cleanup * simplify changeset --------- Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Sam Willis <sam.willis@gmail.com>
1 parent 5eb8300 commit 01452bf

5 files changed

Lines changed: 672 additions & 9 deletions

File tree

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
---
2+
'@tanstack/db': patch
3+
---
4+
5+
fix: deleted items not disappearing from live queries with `.limit()`
6+
7+
Fixed a bug where deleting an item from a live query with `.orderBy()` and `.limit()` would not remove it from the query results. The `subscribeChanges` callback would never fire with a delete event.
8+
9+
The issue was caused by duplicate inserts reaching the D2 pipeline, which corrupted the multiplicity tracking used by `TopKWithFractionalIndexOperator`. A delete would decrement multiplicity from 2 to 1 instead of 1 to 0, so the item remained visible.
10+
11+
Fixed by ensuring `sentKeys` is updated before callbacks execute (preventing race conditions) and filtering duplicate inserts in `filterAndFlipChanges`.

packages/db/src/collection/changes.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@ export class CollectionChangesManager<
109109

110110
if (options.includeInitialState) {
111111
subscription.requestSnapshot({ trackLoadSubsetPromise: false })
112+
} else if (options.includeInitialState === false) {
113+
// When explicitly set to false (not just undefined), mark all state as "seen"
114+
// so that all future changes (including deletes) pass through unfiltered.
115+
subscription.markAllStateAsSeen()
112116
}
113117

114118
// Add to batched listeners

packages/db/src/collection/subscription.ts

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@ export class CollectionSubscription
5252
{
5353
private loadedInitialState = false
5454

55+
// Flag to skip filtering in filterAndFlipChanges.
56+
// This is separate from loadedInitialState because we want to allow
57+
// requestSnapshot to still work even when filtering is skipped.
58+
private skipFiltering = false
59+
5560
// Flag to indicate that we have sent at least 1 snapshot.
5661
// While `snapshotSent` is false we filter out all changes from subscription to the collection.
5762
private snapshotSent = false
@@ -244,6 +249,13 @@ export class CollectionSubscription
244249
(change) => !this.sentKeys.has(change.key),
245250
)
246251

252+
// Add keys to sentKeys BEFORE calling callback to prevent race condition.
253+
// If a change event arrives while the callback is executing, it will see
254+
// the keys already in sentKeys and filter out duplicates correctly.
255+
for (const change of filteredSnapshot) {
256+
this.sentKeys.add(change.key)
257+
}
258+
247259
this.snapshotSent = true
248260
this.callback(filteredSnapshot)
249261
return true
@@ -367,6 +379,13 @@ export class CollectionSubscription
367379
// Use the current count as the offset for this load
368380
const currentOffset = this.limitedSnapshotRowCount
369381

382+
// Add keys to sentKeys BEFORE calling callback to prevent race condition.
383+
// If a change event arrives while the callback is executing, it will see
384+
// the keys already in sentKeys and filter out duplicates correctly.
385+
for (const change of changes) {
386+
this.sentKeys.add(change.key)
387+
}
388+
370389
this.callback(changes)
371390

372391
// Update the row count and last key after sending (for next call's offset/cursor)
@@ -441,43 +460,74 @@ export class CollectionSubscription
441460
* Filters and flips changes for keys that have not been sent yet.
442461
* Deletes are filtered out for keys that have not been sent yet.
443462
* Updates are flipped into inserts for keys that have not been sent yet.
463+
* Duplicate inserts are filtered out to prevent D2 multiplicity > 1.
444464
*/
445465
private filterAndFlipChanges(changes: Array<ChangeMessage<any, any>>) {
446-
if (this.loadedInitialState) {
447-
// We loaded the entire initial state
466+
if (this.loadedInitialState || this.skipFiltering) {
467+
// We loaded the entire initial state or filtering is explicitly skipped
448468
// so no need to filter or flip changes
449469
return changes
450470
}
451471

452472
const newChanges = []
453473
for (const change of changes) {
454474
let newChange = change
455-
if (!this.sentKeys.has(change.key)) {
475+
const keyInSentKeys = this.sentKeys.has(change.key)
476+
477+
if (!keyInSentKeys) {
456478
if (change.type === `update`) {
457479
newChange = { ...change, type: `insert`, previousValue: undefined }
458480
} else if (change.type === `delete`) {
459481
// filter out deletes for keys that have not been sent
460482
continue
461483
}
462484
this.sentKeys.add(change.key)
485+
} else {
486+
// Key was already sent - handle based on change type
487+
if (change.type === `insert`) {
488+
// Filter out duplicate inserts - the key was already inserted.
489+
// This prevents D2 multiplicity from going above 1, which would
490+
// cause deletes to not properly remove items (multiplicity would
491+
// go from 2 to 1 instead of 1 to 0).
492+
continue
493+
} else if (change.type === `delete`) {
494+
// Remove from sentKeys so future inserts for this key are allowed
495+
// (e.g., after truncate + reinsert)
496+
this.sentKeys.delete(change.key)
497+
}
463498
}
464499
newChanges.push(newChange)
465500
}
466501
return newChanges
467502
}
468503

469504
private trackSentKeys(changes: Array<ChangeMessage<any, string | number>>) {
470-
if (this.loadedInitialState) {
471-
// No need to track sent keys if we loaded the entire state.
472-
// Since we sent everything, all keys must have been observed.
505+
if (this.loadedInitialState || this.skipFiltering) {
506+
// No need to track sent keys if we loaded the entire state or filtering is skipped.
507+
// Since filtering won't be applied, all keys are effectively "observed".
473508
return
474509
}
475510

476511
for (const change of changes) {
477-
this.sentKeys.add(change.key)
512+
if (change.type === `delete`) {
513+
// Remove deleted keys from sentKeys so future re-inserts are allowed
514+
this.sentKeys.delete(change.key)
515+
} else {
516+
// For inserts and updates, track the key as sent
517+
this.sentKeys.add(change.key)
518+
}
478519
}
479520
}
480521

522+
/**
523+
* Mark that the subscription should not filter any changes.
524+
* This is used when includeInitialState is explicitly set to false,
525+
* meaning the caller doesn't want initial state but does want ALL future changes.
526+
*/
527+
markAllStateAsSeen() {
528+
this.skipFiltering = true
529+
}
530+
481531
unsubscribe() {
482532
// Unload all subsets that this subscription loaded
483533
// We pass the exact same LoadSubsetOptions we used for loadSubset

packages/db/tests/collection-subscribe-changes.test.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1193,8 +1193,10 @@ describe(`Collection.subscribeChanges`, () => {
11931193
f.write({ type: `insert`, value: { id: 1, value: `server-after` } })
11941194
f.commit()
11951195

1196-
// Expect delete, insert with optimistic value, and an empty event from markReady
1197-
expect(changeEvents.length).toBe(3)
1196+
// Expect delete and insert with optimistic value
1197+
// Note: Previously there was a duplicate insert event that was incorrectly
1198+
// being sent, causing 3 events. Now duplicates are filtered correctly.
1199+
expect(changeEvents.length).toBe(2)
11981200
expect(changeEvents[0]).toEqual({
11991201
type: `delete`,
12001202
key: 1,

0 commit comments

Comments
 (0)