Fix/ivan/mem2 concurrency#2240
Conversation
Concurrent count()/query() on a shared SqliteStore connection returned 0 or raised TypeError because sqlite3 connections opened with check_same_thread=False are not safe for parallel use. Each connection borrowed by SqliteObservationStore / SqliteBlobStore / SqliteVectorStore / RegistryStore now shares a single threading.RLock looked up via conn_lock(conn), and every _conn.execute() runs under it. query() materializes via fetchall() so cursors aren't iterated across threads, and per-thread scratch holds the python-side post-filter handoff to Backend so concurrent query() callers don't clobber each other.
d5f8097 to
c88ead5
Compare
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
Greptile SummaryThis PR serializes concurrent access to shared
Confidence Score: 3/5The core serialization logic is sound, but two The
Important Files Changed
Sequence DiagramsequenceDiagram
participant T1 as Thread 1
participant T2 as Thread 2
participant CL as conn_lock registry
participant Conn as sqlite3.Connection
Note over CL: _locks: {id(conn): RLock}
T1->>CL: conn_lock(conn) → RLock_A
T2->>CL: conn_lock(conn) → RLock_A (same object)
T1->>RLock_A: acquire
T2->>RLock_A: acquire (blocks)
T1->>Conn: execute(sql) / fetchall()
T1->>RLock_A: release
T2->>RLock_A: acquired (unblocked)
T2->>Conn: execute(sql) / fetchall()
T2->>RLock_A: release
Note over T1,T2: Shared connection serialized via shared RLock
Note over T1,CL: On stop()
T1->>RLock_A: acquire (via with conn_lock)
T1->>Conn: conn.close()
T1->>CL: _locks.pop(id(conn))
Note over T2,CL: T2 already holds ref to RLock_A
T1->>RLock_A: release (with block exits)
T2->>RLock_A: acquired after stop — uses closed conn ⚠️
Reviews (1): Last reviewed commit: "memory2: serialize sqlite reads through ..." | Re-trigger Greptile |
| def _ensure_table(self, stream_name: str) -> None: | ||
| if stream_name in self._tables: | ||
| return | ||
| validate_identifier(stream_name) | ||
| self._conn.execute( | ||
| f'CREATE TABLE IF NOT EXISTS "{stream_name}_blob" ' | ||
| "(id INTEGER PRIMARY KEY, data BLOB NOT NULL)" | ||
| ) | ||
| with self._lock: | ||
| self._conn.execute( | ||
| f'CREATE TABLE IF NOT EXISTS "{stream_name}_blob" ' | ||
| "(id INTEGER PRIMARY KEY, data BLOB NOT NULL)" | ||
| ) | ||
| self._tables.add(stream_name) |
There was a problem hiding this comment.
The
_tables.add() call sits outside the with self._lock block. Two threads can both pass the initial if stream_name in self._tables guard, both execute the SQL (harmless due to IF NOT EXISTS), and both update _tables without synchronization. In CPython the GIL protects set.add, but under free-threaded Python (3.13+ --disable-gil) this is a data race. Moving the update inside the lock—along with a double-check—eliminates both the redundant SQL executions and the unguarded write.
| def _ensure_table(self, stream_name: str) -> None: | |
| if stream_name in self._tables: | |
| return | |
| validate_identifier(stream_name) | |
| self._conn.execute( | |
| f'CREATE TABLE IF NOT EXISTS "{stream_name}_blob" ' | |
| "(id INTEGER PRIMARY KEY, data BLOB NOT NULL)" | |
| ) | |
| with self._lock: | |
| self._conn.execute( | |
| f'CREATE TABLE IF NOT EXISTS "{stream_name}_blob" ' | |
| "(id INTEGER PRIMARY KEY, data BLOB NOT NULL)" | |
| ) | |
| self._tables.add(stream_name) | |
| def _ensure_table(self, stream_name: str) -> None: | |
| if stream_name in self._tables: | |
| return | |
| validate_identifier(stream_name) | |
| with self._lock: | |
| if stream_name not in self._tables: | |
| self._conn.execute( | |
| f'CREATE TABLE IF NOT EXISTS "{stream_name}_blob" ' | |
| "(id INTEGER PRIMARY KEY, data BLOB NOT NULL)" | |
| ) | |
| self._tables.add(stream_name) |
| def _ensure_table(self, stream_name: str, dim: int) -> None: | ||
| if stream_name in self._tables: | ||
| return | ||
| validate_identifier(stream_name) | ||
| self._conn.execute( | ||
| f'CREATE VIRTUAL TABLE IF NOT EXISTS "{stream_name}_vec" ' | ||
| f"USING vec0(embedding float[{dim}] distance_metric=cosine)" | ||
| ) | ||
| with self._lock: | ||
| self._conn.execute( | ||
| f'CREATE VIRTUAL TABLE IF NOT EXISTS "{stream_name}_vec" ' | ||
| f"USING vec0(embedding float[{dim}] distance_metric=cosine)" | ||
| ) | ||
| self._tables[stream_name] = dim |
There was a problem hiding this comment.
Same pattern as
SqliteBlobStore._ensure_table: self._tables[stream_name] = dim is written outside the lock. Two threads with different dim values could both pass the outer guard; the second executes a no-op CREATE … IF NOT EXISTS but then overwrites the correct dim with its own value, corrupting the _tables mapping. Moving both the SQL and the dict write inside the lock with a double-check prevents this.
| def _ensure_table(self, stream_name: str, dim: int) -> None: | |
| if stream_name in self._tables: | |
| return | |
| validate_identifier(stream_name) | |
| self._conn.execute( | |
| f'CREATE VIRTUAL TABLE IF NOT EXISTS "{stream_name}_vec" ' | |
| f"USING vec0(embedding float[{dim}] distance_metric=cosine)" | |
| ) | |
| with self._lock: | |
| self._conn.execute( | |
| f'CREATE VIRTUAL TABLE IF NOT EXISTS "{stream_name}_vec" ' | |
| f"USING vec0(embedding float[{dim}] distance_metric=cosine)" | |
| ) | |
| self._tables[stream_name] = dim | |
| def _ensure_table(self, stream_name: str, dim: int) -> None: | |
| if stream_name in self._tables: | |
| return | |
| validate_identifier(stream_name) | |
| with self._lock: | |
| if stream_name not in self._tables: | |
| self._conn.execute( | |
| f'CREATE VIRTUAL TABLE IF NOT EXISTS "{stream_name}_vec" ' | |
| f"USING vec0(embedding float[{dim}] distance_metric=cosine)" | |
| ) | |
| self._tables[stream_name] = dim |
| def stop(self) -> None: | ||
| super().stop() | ||
| self._registry_conn.close() | ||
| with conn_lock(self._registry_conn): | ||
| close_sqlite_connection(self._registry_conn) |
There was a problem hiding this comment.
Lock entry removed while lock is still held in
stop()
close_sqlite_connection calls conn.close() and then _locks.pop(id(conn)), both while the outer with conn_lock(self._registry_conn) is holding that same lock. Any thread that called conn_lock(self._registry_conn) before the pop and is now blocked waiting for the lock will acquire it after stop() releases it, but the connection is already closed — resulting in ProgrammingError: Cannot operate on a closed database. The lock should be released (i.e. the with block should finish) before the connection is closed, or the store should use a separate shutdown flag to reject operations after stop().
No description provided.