Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
ad766ff
fix: emit delete events when subscribing with includeInitialState: false
claude Dec 17, 2025
0ebf869
debug: add extensive logging to track delete event flow
claude Dec 18, 2025
8303c34
ci: apply automated fixes
autofix-ci[bot] Dec 18, 2025
63af591
debug: add more logging to track delete event flow in live queries
claude Dec 18, 2025
478914f
ci: apply automated fixes
autofix-ci[bot] Dec 18, 2025
7d68052
debug: add logging to graph scheduling and execution
claude Dec 18, 2025
7a5a44f
ci: apply automated fixes
autofix-ci[bot] Dec 18, 2025
1bc3b6f
debug: add detailed logging to D2 reduce and topK operators
claude Dec 18, 2025
446afb9
ci: apply automated fixes
autofix-ci[bot] Dec 18, 2025
df7b9fe
debug: add more detailed logging to D2 graph and subscription
claude Dec 18, 2025
bf9e93c
ci: apply automated fixes
autofix-ci[bot] Dec 18, 2025
f4f6942
debug: add operator type logging to trace D2 pipeline
claude Dec 18, 2025
2e4f3b2
debug: add logging to TopKWithFractionalIndexOperator
claude Dec 18, 2025
3f62c53
ci: apply automated fixes
autofix-ci[bot] Dec 18, 2025
da72c72
fix: filter duplicate inserts in subscription to prevent D2 multiplic…
claude Dec 18, 2025
c64ba31
fix: add D2 input level deduplication to prevent multiplicity > 1
claude Dec 18, 2025
620f6dc
docs: add detailed changeset for delete fix
claude Dec 18, 2025
8d5c8f3
ci: apply automated fixes
autofix-ci[bot] Dec 18, 2025
e917126
chore: remove debug logging from D2 pipeline and subscription code
claude Dec 18, 2025
de2a8e7
debug: add logging to trace source of duplicate D2 inserts
claude Dec 18, 2025
fffcd66
ci: apply automated fixes
autofix-ci[bot] Dec 18, 2025
0662052
fix: prevent race condition in snapshot loading by adding keys to sen…
claude Dec 18, 2025
56e532b
docs: update changeset to reflect race condition fix
claude Dec 18, 2025
90fd10f
cleanup
samwillis Dec 19, 2025
8ddb26a
simplify changeset
samwillis Dec 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changeset/fix-deleted-items-not-disappearing-from-live-queries.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
'@tanstack/db': patch
---

fix: deleted items not disappearing from live queries with `.limit()`

Fixed a bug where deleting an item from a live query with `.orderBy()` and `.limit()` would not remove it from the query results. The `subscribeChanges` callback would never fire with a delete event.

The issue was caused by duplicate inserts reaching the D2 pipeline, which corrupted the multiplicity tracking used by `TopKWithFractionalIndexOperator`. A delete would decrement multiplicity from 2 to 1 instead of 1 to 0, so the item remained visible.

Fixed by ensuring `sentKeys` is updated before callbacks execute (preventing race conditions) and filtering duplicate inserts in `filterAndFlipChanges`.
4 changes: 4 additions & 0 deletions packages/db/src/collection/changes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ export class CollectionChangesManager<

if (options.includeInitialState) {
subscription.requestSnapshot({ trackLoadSubsetPromise: false })
} else if (options.includeInitialState === false) {
// When explicitly set to false (not just undefined), mark all state as "seen"
// so that all future changes (including deletes) pass through unfiltered.
subscription.markAllStateAsSeen()
}

// Add to batched listeners
Expand Down
64 changes: 57 additions & 7 deletions packages/db/src/collection/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ export class CollectionSubscription
{
private loadedInitialState = false

// Flag to skip filtering in filterAndFlipChanges.
// This is separate from loadedInitialState because we want to allow
// requestSnapshot to still work even when filtering is skipped.
private skipFiltering = false

// Flag to indicate that we have sent at least 1 snapshot.
// While `snapshotSent` is false we filter out all changes from subscription to the collection.
private snapshotSent = false
Expand Down Expand Up @@ -244,6 +249,13 @@ export class CollectionSubscription
(change) => !this.sentKeys.has(change.key),
)

// Add keys to sentKeys BEFORE calling callback to prevent race condition.
// If a change event arrives while the callback is executing, it will see
// the keys already in sentKeys and filter out duplicates correctly.
for (const change of filteredSnapshot) {
this.sentKeys.add(change.key)
}

this.snapshotSent = true
this.callback(filteredSnapshot)
return true
Expand Down Expand Up @@ -367,6 +379,13 @@ export class CollectionSubscription
// Use the current count as the offset for this load
const currentOffset = this.limitedSnapshotRowCount

// Add keys to sentKeys BEFORE calling callback to prevent race condition.
// If a change event arrives while the callback is executing, it will see
// the keys already in sentKeys and filter out duplicates correctly.
for (const change of changes) {
this.sentKeys.add(change.key)
}

this.callback(changes)

// Update the row count and last key after sending (for next call's offset/cursor)
Expand Down Expand Up @@ -441,43 +460,74 @@ export class CollectionSubscription
* Filters and flips changes for keys that have not been sent yet.
* Deletes are filtered out for keys that have not been sent yet.
* Updates are flipped into inserts for keys that have not been sent yet.
* Duplicate inserts are filtered out to prevent D2 multiplicity > 1.
*/
private filterAndFlipChanges(changes: Array<ChangeMessage<any, any>>) {
if (this.loadedInitialState) {
// We loaded the entire initial state
if (this.loadedInitialState || this.skipFiltering) {
// We loaded the entire initial state or filtering is explicitly skipped
// so no need to filter or flip changes
return changes
}

const newChanges = []
for (const change of changes) {
let newChange = change
if (!this.sentKeys.has(change.key)) {
const keyInSentKeys = this.sentKeys.has(change.key)

if (!keyInSentKeys) {
if (change.type === `update`) {
newChange = { ...change, type: `insert`, previousValue: undefined }
} else if (change.type === `delete`) {
// filter out deletes for keys that have not been sent
continue
}
this.sentKeys.add(change.key)
} else {
// Key was already sent - handle based on change type
if (change.type === `insert`) {
// Filter out duplicate inserts - the key was already inserted.
// This prevents D2 multiplicity from going above 1, which would
// cause deletes to not properly remove items (multiplicity would
// go from 2 to 1 instead of 1 to 0).
continue
} else if (change.type === `delete`) {
// Remove from sentKeys so future inserts for this key are allowed
// (e.g., after truncate + reinsert)
this.sentKeys.delete(change.key)
}
}
newChanges.push(newChange)
}
return newChanges
}

private trackSentKeys(changes: Array<ChangeMessage<any, string | number>>) {
if (this.loadedInitialState) {
// No need to track sent keys if we loaded the entire state.
// Since we sent everything, all keys must have been observed.
if (this.loadedInitialState || this.skipFiltering) {
// No need to track sent keys if we loaded the entire state or filtering is skipped.
// Since filtering won't be applied, all keys are effectively "observed".
return
}

for (const change of changes) {
this.sentKeys.add(change.key)
if (change.type === `delete`) {
// Remove deleted keys from sentKeys so future re-inserts are allowed
this.sentKeys.delete(change.key)
} else {
// For inserts and updates, track the key as sent
this.sentKeys.add(change.key)
}
}
}

/**
* Mark that the subscription should not filter any changes.
* This is used when includeInitialState is explicitly set to false,
* meaning the caller doesn't want initial state but does want ALL future changes.
*/
markAllStateAsSeen() {
this.skipFiltering = true
}

unsubscribe() {
// Unload all subsets that this subscription loaded
// We pass the exact same LoadSubsetOptions we used for loadSubset
Expand Down
6 changes: 4 additions & 2 deletions packages/db/tests/collection-subscribe-changes.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1193,8 +1193,10 @@ describe(`Collection.subscribeChanges`, () => {
f.write({ type: `insert`, value: { id: 1, value: `server-after` } })
f.commit()

// Expect delete, insert with optimistic value, and an empty event from markReady
expect(changeEvents.length).toBe(3)
// Expect delete and insert with optimistic value
// Note: Previously there was a duplicate insert event that was incorrectly
// being sent, causing 3 events. Now duplicates are filtered correctly.
expect(changeEvents.length).toBe(2)
expect(changeEvents[0]).toEqual({
type: `delete`,
key: 1,
Expand Down
Loading
Loading