Conversation
There was a problem hiding this comment.
Pull request overview
This PR aims to make concurrent save() operations on the same dataset isolated and effectively atomic to prevent version-table corruption and duplicate inserts under concurrency.
Changes:
- Implemented a staging-table “write then claim version then rename” flow in
DatasetQuery.save(). - Added UUID-based conflict detection to
metastore.create_dataset_version()and a retry loop for auto-versioned dataset creation inCatalog.create_dataset(). - Updated functional tests to reflect the new
create_dataset_version()return type and the fact thatcreate_dataset()no longer creates a rows table.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_atomicity.py | Removes create_rows usage to align with new dataset creation semantics. |
| tests/func/test_metastore.py | Updates expectations for create_dataset_version() returning (ds, version_created). |
| tests/func/test_datasets.py | Refactors fixtures/tests away from create_rows and adds coverage for “no rows table created”. |
| src/datachain/query/dataset.py | Implements staging-table save: temp table insert → claim version metadata → rename → finalize. |
| src/datachain/lib/dc/records.py | Creates rows table explicitly since create_dataset() is now metadata-only. |
| src/datachain/data_storage/metastore.py | Returns (dataset, version_created) and uses UUID equality as conflict/ownership detection. |
| src/datachain/catalog/catalog.py | Removes create_rows behavior, adds version-claim retry logic, and changes create_dataset_version() to metadata-only + tuple return. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| chain = dc.read_values(val=[1, 2, 3, 4], session=test_session).save(name) | ||
| catalog = test_session.catalog | ||
| return catalog.update_dataset( | ||
| chain.dataset, {"description": "test dataset", "attrs": ["test", "dataset"]} |
There was a problem hiding this comment.
Catalog.update_dataset takes updates via keyword arguments; passing a dict as the second positional arg binds it to conn and results in no fields being updated. Unpack the dict (**{...}) or pass description= / attrs= directly so the fixture actually updates the dataset metadata.
| chain.dataset, {"description": "test dataset", "attrs": ["test", "dataset"]} | |
| chain.dataset, | |
| description="test dataset", | |
| attrs=["test", "dataset"], |
| # Create the rows table (create_dataset only creates metadata). | ||
| table_name = warehouse.dataset_table_name(dsr, dsr.latest_version) | ||
| warehouse.create_dataset_rows_table(table_name, columns=columns) | ||
|
|
There was a problem hiding this comment.
After create_dataset() was changed to create metadata only, this function now creates and populates the rows table but never calls catalog.update_dataset_version_with_warehouse_info(...). That leaves the version marked COMPLETE with num_objects/size/preview potentially unset; update the warehouse info before setting status COMPLETE (or explicitly document that read_records skips these fields).
There was a problem hiding this comment.
This comment make sense to me, can you check. Also, why did we move creating warehouse table outside catalog? Now we need to do all this stuff like create table, update stats and maybe other things in multiple places.
There was a problem hiding this comment.
I'll check this is kind-a draft still, I'm iterating on it.
In short - the idea here is to never create the warehouse table with its actual name initially. Create a temporary table and rename it at the end. Temporary table is created in a different place.
this read_records is unique (and probably should be redesigned).
I'm still not sure if this design overall is right ...
b346f12 to
076e138
Compare
Deploying datachain with
|
| Latest commit: |
68019c1
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://a0a31138.datachain-2g6.pages.dev |
| Branch Preview URL: | https://fix-save-race-condition.datachain-2g6.pages.dev |
| try: | ||
| self.catalog.warehouse.rename_table(temp_table, final_table_name) | ||
| except Exception: | ||
| with contextlib.suppress(Exception): |
There was a problem hiding this comment.
Should we cleanup dataset metadata or mark it as failed here?
| # Create the rows table (create_dataset only creates metadata). | ||
| table_name = warehouse.dataset_table_name(dsr, dsr.latest_version) | ||
| warehouse.create_dataset_rows_table(table_name, columns=columns) | ||
|
|
There was a problem hiding this comment.
This comment make sense to me, can you check. Also, why did we move creating warehouse table outside catalog? Now we need to do all this stuff like create table, update stats and maybe other things in multiple places.
076e138 to
a3fea9d
Compare
a3fea9d to
68019c1
Compare
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
Concurrent
save()calls on the same dataset can corrupt data by double-inserting rows into the same version table.UUID-based conflict detection in
create_dataset_version(works across SQLite, PostgreSQL, Studio API), retry loop for auto-versioned saves, and a staging table pattern insave()— write to temp table first, claim version metadata, then rename.