Skip to content

Fix/ivan/mem2 concurrency#2240

Open
leshy wants to merge 1 commit into
mainfrom
fix/ivan/mem2-concurrency
Open

Fix/ivan/mem2 concurrency#2240
leshy wants to merge 1 commit into
mainfrom
fix/ivan/mem2-concurrency

Conversation

@leshy
Copy link
Copy Markdown
Member

@leshy leshy commented May 24, 2026

No description provided.

Concurrent count()/query() on a shared SqliteStore connection returned 0
or raised TypeError because sqlite3 connections opened with
check_same_thread=False are not safe for parallel use. Each connection
borrowed by SqliteObservationStore / SqliteBlobStore / SqliteVectorStore
/ RegistryStore now shares a single threading.RLock looked up via
conn_lock(conn), and every _conn.execute() runs under it. query()
materializes via fetchall() so cursors aren't iterated across threads,
and per-thread scratch holds the python-side post-filter handoff to
Backend so concurrent query() callers don't clobber each other.
@leshy leshy force-pushed the fix/ivan/mem2-concurrency branch from d5f8097 to c88ead5 Compare May 24, 2026 06:56
@leshy leshy marked this pull request as ready for review May 24, 2026 06:57
@codecov
Copy link
Copy Markdown

codecov Bot commented May 24, 2026

Codecov Report

❌ Patch coverage is 94.52055% with 8 lines in your changes missing coverage. Please review.
✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
dimos/memory2/observationstore/sqlite.py 92.59% 2 Missing ⚠️
dimos/memory2/test_sqlite_concurrency.py 96.00% 2 Missing ⚠️
dimos/memory2/vectorstore/sqlite.py 83.33% 2 Missing ⚠️
dimos/memory2/blobstore/sqlite.py 92.85% 0 Missing and 1 partial ⚠️
dimos/memory2/utils/sqlite.py 95.00% 0 Missing and 1 partial ⚠️

📢 Thoughts on this report? Let us know!

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 24, 2026

Greptile Summary

This PR serializes concurrent access to shared sqlite3.Connection objects across all memory2 store types by introducing a module-level per-connection RLock registry (conn_lock) in utils/sqlite.py, wrapping every connection use in the appropriate lock, and switching _pending_python_filters/_pending_query in the observation store from shared instance attributes to threading.local() so concurrent query() calls on different threads no longer clobber each other's post-filter state.

  • conn_lock registry (utils/sqlite.py): a side-table of {id(conn): RLock} is created and cleaned up via close_sqlite_connection; all store classes that share a connection obtain and share the same RLock, ensuring cross-store serialization on the same connection.
  • Query materialization (observationstore/sqlite.py): lazy cursor iteration is replaced with fetchall() under the lock, and per-thread TLS replaces the shared post-filter scratch fields.
  • New concurrency regression tests (test_sqlite_concurrency.py): 16 threads × 250 count() calls and a mixed count/iterate workload validate that the fix holds under load.

Confidence Score: 3/5

The core serialization logic is sound, but two _ensure_table methods update their in-memory _tables cache outside the lock, and the VectorStore variant can corrupt the stream-to-dimensionality mapping under concurrent first-use of the same stream name with different dimensions.

The SqliteVectorStore._ensure_table writes self._tables[stream_name] = dim after releasing the lock; if two threads race on the first write for the same stream with different dim values, the SQL no-ops for the second thread but it still overwrites the cached dim, leaving the in-memory mapping inconsistent with the physical table. Additionally, stop() in SqliteStore removes the connection's lock entry from the registry while the lock is still held, so a thread already holding a reference to that lock can acquire it after stop() completes and operate on the closed connection.

dimos/memory2/vectorstore/sqlite.py and dimos/memory2/blobstore/sqlite.py need the _ensure_table double-check pattern extended to cover the _tables update; dimos/memory2/store/sqlite.py stop() needs review of the shutdown ordering.

Important Files Changed

Filename Overview
dimos/memory2/utils/sqlite.py New module-level _locks registry providing per-connection RLocks via conn_lock(), with a double-checked locking pattern and cleanup via close_sqlite_connection(); safe under CPython's GIL.
dimos/memory2/blobstore/sqlite.py All connection accesses wrapped in with self._lock, but _tables.add(stream_name) in _ensure_table is placed outside the lock, so concurrent callers can redundantly execute SQL and update _tables unsynchronized.
dimos/memory2/vectorstore/sqlite.py Same pattern as BlobStore: connection accesses locked, but self._tables[stream_name] = dim in _ensure_table falls outside the lock, meaning the dim mapping can be overwritten by a racing thread after the table already exists.
dimos/memory2/observationstore/sqlite.py Replaces shared _pending_python_filters/_pending_query instance attrs with threading.local(), queries are now materialized (fetchall()) under the lock, and commit/rollback are guarded; clean fix for the concurrent read regression.
dimos/memory2/registry.py All CRUD operations on _streams table are now wrapped in conn_lock; straightforward and complete serialization.
dimos/memory2/store/sqlite.py delete_stream and stop now use conn_lock; however stop calls close_sqlite_connection (which removes the lock entry from _locks) while the lock is still held, creating a window where a racing thread can acquire the released lock and operate on the closed connection.
dimos/memory2/test_sqlite_concurrency.py New regression tests for the concurrent count/iterate race; well-structured with 16-thread × 250-call workloads covering both count() and iteration paths.

Sequence Diagram

sequenceDiagram
    participant T1 as Thread 1
    participant T2 as Thread 2
    participant CL as conn_lock registry
    participant Conn as sqlite3.Connection

    Note over CL: _locks: {id(conn): RLock}

    T1->>CL: conn_lock(conn) → RLock_A
    T2->>CL: conn_lock(conn) → RLock_A (same object)

    T1->>RLock_A: acquire
    T2->>RLock_A: acquire (blocks)

    T1->>Conn: execute(sql) / fetchall()
    T1->>RLock_A: release
    T2->>RLock_A: acquired (unblocked)
    T2->>Conn: execute(sql) / fetchall()
    T2->>RLock_A: release

    Note over T1,T2: Shared connection serialized via shared RLock

    Note over T1,CL: On stop()
    T1->>RLock_A: acquire (via with conn_lock)
    T1->>Conn: conn.close()
    T1->>CL: _locks.pop(id(conn))
    Note over T2,CL: T2 already holds ref to RLock_A
    T1->>RLock_A: release (with block exits)
    T2->>RLock_A: acquired after stop — uses closed conn ⚠️
Loading

Reviews (1): Last reviewed commit: "memory2: serialize sqlite reads through ..." | Re-trigger Greptile

Comment on lines 68 to 77
def _ensure_table(self, stream_name: str) -> None:
if stream_name in self._tables:
return
validate_identifier(stream_name)
self._conn.execute(
f'CREATE TABLE IF NOT EXISTS "{stream_name}_blob" '
"(id INTEGER PRIMARY KEY, data BLOB NOT NULL)"
)
with self._lock:
self._conn.execute(
f'CREATE TABLE IF NOT EXISTS "{stream_name}_blob" '
"(id INTEGER PRIMARY KEY, data BLOB NOT NULL)"
)
self._tables.add(stream_name)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 The _tables.add() call sits outside the with self._lock block. Two threads can both pass the initial if stream_name in self._tables guard, both execute the SQL (harmless due to IF NOT EXISTS), and both update _tables without synchronization. In CPython the GIL protects set.add, but under free-threaded Python (3.13+ --disable-gil) this is a data race. Moving the update inside the lock—along with a double-check—eliminates both the redundant SQL executions and the unguarded write.

Suggested change
def _ensure_table(self, stream_name: str) -> None:
if stream_name in self._tables:
return
validate_identifier(stream_name)
self._conn.execute(
f'CREATE TABLE IF NOT EXISTS "{stream_name}_blob" '
"(id INTEGER PRIMARY KEY, data BLOB NOT NULL)"
)
with self._lock:
self._conn.execute(
f'CREATE TABLE IF NOT EXISTS "{stream_name}_blob" '
"(id INTEGER PRIMARY KEY, data BLOB NOT NULL)"
)
self._tables.add(stream_name)
def _ensure_table(self, stream_name: str) -> None:
if stream_name in self._tables:
return
validate_identifier(stream_name)
with self._lock:
if stream_name not in self._tables:
self._conn.execute(
f'CREATE TABLE IF NOT EXISTS "{stream_name}_blob" '
"(id INTEGER PRIMARY KEY, data BLOB NOT NULL)"
)
self._tables.add(stream_name)

Comment on lines 66 to 75
def _ensure_table(self, stream_name: str, dim: int) -> None:
if stream_name in self._tables:
return
validate_identifier(stream_name)
self._conn.execute(
f'CREATE VIRTUAL TABLE IF NOT EXISTS "{stream_name}_vec" '
f"USING vec0(embedding float[{dim}] distance_metric=cosine)"
)
with self._lock:
self._conn.execute(
f'CREATE VIRTUAL TABLE IF NOT EXISTS "{stream_name}_vec" '
f"USING vec0(embedding float[{dim}] distance_metric=cosine)"
)
self._tables[stream_name] = dim
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Same pattern as SqliteBlobStore._ensure_table: self._tables[stream_name] = dim is written outside the lock. Two threads with different dim values could both pass the outer guard; the second executes a no-op CREATE … IF NOT EXISTS but then overwrites the correct dim with its own value, corrupting the _tables mapping. Moving both the SQL and the dict write inside the lock with a double-check prevents this.

Suggested change
def _ensure_table(self, stream_name: str, dim: int) -> None:
if stream_name in self._tables:
return
validate_identifier(stream_name)
self._conn.execute(
f'CREATE VIRTUAL TABLE IF NOT EXISTS "{stream_name}_vec" '
f"USING vec0(embedding float[{dim}] distance_metric=cosine)"
)
with self._lock:
self._conn.execute(
f'CREATE VIRTUAL TABLE IF NOT EXISTS "{stream_name}_vec" '
f"USING vec0(embedding float[{dim}] distance_metric=cosine)"
)
self._tables[stream_name] = dim
def _ensure_table(self, stream_name: str, dim: int) -> None:
if stream_name in self._tables:
return
validate_identifier(stream_name)
with self._lock:
if stream_name not in self._tables:
self._conn.execute(
f'CREATE VIRTUAL TABLE IF NOT EXISTS "{stream_name}_vec" '
f"USING vec0(embedding float[{dim}] distance_metric=cosine)"
)
self._tables[stream_name] = dim

Comment on lines 222 to +225
def stop(self) -> None:
super().stop()
self._registry_conn.close()
with conn_lock(self._registry_conn):
close_sqlite_connection(self._registry_conn)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Lock entry removed while lock is still held in stop()

close_sqlite_connection calls conn.close() and then _locks.pop(id(conn)), both while the outer with conn_lock(self._registry_conn) is holding that same lock. Any thread that called conn_lock(self._registry_conn) before the pop and is now blocked waiting for the lock will acquire it after stop() releases it, but the connection is already closed — resulting in ProgrammingError: Cannot operate on a closed database. The lock should be released (i.e. the with block should finish) before the connection is closed, or the store should use a separate shutdown flag to reject operations after stop().

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant