Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions metaflow/plugins/aws/step_functions/production_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ def load_token(token_prefix):


def store_token(token_prefix, token):
from metaflow.util import atomic_json_update

path = _path(token_prefix)
config = _load_config(path)
config["production_token"] = token
_makedirs(os.path.dirname(path))
with open(path, "w") as f:
json.dump(config, f)
atomic_json_update(path, lambda d: {**d, "production_token": token})
37 changes: 12 additions & 25 deletions metaflow/plugins/pypi/conda_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,32 +524,19 @@ def read_from_environment_manifest(self, keys):
return data

def write_to_environment_manifest(self, keys, value):
from metaflow.util import atomic_json_update

path = self.get_environment_manifest_path()
try:
os.makedirs(os.path.dirname(path))
except OSError as x:
if x.errno != errno.EEXIST:
raise
with os.fdopen(os.open(path, os.O_RDWR | os.O_CREAT), "r+") as f:
try:
fcntl.flock(f, fcntl.LOCK_EX)
d = {}
if os.path.getsize(path) > 0:
f.seek(0)
d = json.load(f)
data = d
for key in keys[:-1]:
data = data.setdefault(key, {})
data[keys[-1]] = value
f.seek(0)
json.dump(d, f)
f.truncate()
return value
except IOError as e:
if e.errno != errno.EAGAIN:
raise
finally:
fcntl.flock(f, fcntl.LOCK_UN)

def _update(d):
data = d
for key in keys[:-1]:
data = data.setdefault(key, {})
data[keys[-1]] = value
return d

atomic_json_update(path, _update)
return value


class LazyOpen(BufferedIOBase):
Expand Down
58 changes: 58 additions & 0 deletions metaflow/util.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import errno
import fcntl
import json
Comment thread
greptile-apps[bot] marked this conversation as resolved.
Outdated
import os
import shutil
import sys
Expand Down Expand Up @@ -661,3 +664,58 @@ def _recurse(root, skip_dirs, seen):
for x in _recurse(top_root, skip_dirs, set()):
skip_dirs = default_skip_dirs
yield x


def atomic_json_update(path, updater_fn):
"""Read-modify-write a JSON file safely under concurrent access.

Uses a separate .lock file so that flock serializes all writers on a
stable inode, while the data file itself is replaced atomically via
os.replace (crash-safe: readers never see a half-written file).

Parameters
----------
path : str
Path to the JSON file. Created (with ``{}``) if it doesn't exist.
updater_fn : callable
Called with the current dict contents; must return the new dict
to write back.

Returns
-------
dict
The updated dict that was written.
"""
os.makedirs(os.path.dirname(path), exist_ok=True)

lock_path = path + ".lock"
with open(lock_path, "a+") as lock_f:
Comment thread
greptile-apps[bot] marked this conversation as resolved.
fcntl.flock(lock_f, fcntl.LOCK_EX)
try:
# Read current contents
d = {}
if os.path.exists(path) and os.path.getsize(path) > 0:
with open(path, "r") as f:
d = json.load(f)

# Apply update
d = updater_fn(d)
Comment thread
greptile-apps[bot] marked this conversation as resolved.

# Atomic write via temp file + replace
fd, tmp = tempfile.mkstemp(dir=os.path.dirname(path), suffix=".tmp")
Comment thread
greptile-apps[bot] marked this conversation as resolved.
Outdated
try:
with os.fdopen(fd, "w") as f:
json.dump(d, f)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, path)
except BaseException:
try:
os.unlink(tmp)
except OSError:
pass
raise

return d
finally:
fcntl.flock(lock_f, fcntl.LOCK_UN)
141 changes: 141 additions & 0 deletions test/plugins/conda/test_conda_environment_unit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
"""
Unit tests for CondaEnvironment helper logic.

Tests the manifest read/write with file locking and the environment
hashing/dedup logic. Uses temp files — no conda installation needed.
"""

import json
import os
import tempfile
import threading

import pytest


class TestManifestConcurrentWrites:
"""Test that manifest file operations are thread-safe."""

def test_concurrent_writes_no_corruption(self):
"""Multiple threads writing to the manifest should not corrupt it."""
from metaflow.util import atomic_json_update

errors = []
num_threads = 10
writes_per_thread = 5

with tempfile.NamedTemporaryFile(
mode="w", suffix=".manifest", delete=False
) as f:
manifest_path = f.name
json.dump({}, f)

def write_to_manifest(thread_id):
try:
for i in range(writes_per_thread):
key = f"env_{thread_id}_{i}"
atomic_json_update(
manifest_path,
lambda d, k=key, t=thread_id: {
**d,
k: {"platform": "linux-64", "thread": t},
},
)
except Exception as e:
errors.append(e)

threads = [
threading.Thread(target=write_to_manifest, args=(tid,))
for tid in range(num_threads)
]
for t in threads:
t.start()
for t in threads:
t.join()

try:
assert len(errors) == 0, f"Errors during concurrent writes: {errors}"

with open(manifest_path, "r") as f:
data = json.load(f)

expected_keys = num_threads * writes_per_thread
assert len(data) == expected_keys, (
f"Expected {expected_keys} entries, got {len(data)}. "
"Some writes may have been lost due to race conditions."
)
finally:
os.unlink(manifest_path)
# Clean up lock file
lock_path = manifest_path + ".lock"
if os.path.exists(lock_path):
os.unlink(lock_path)


class TestAtomicJsonUpdate:
"""Test the atomic_json_update utility directly."""

def test_creates_file_if_missing(self):
"""atomic_json_update should create the file if it doesn't exist."""
from metaflow.util import atomic_json_update

with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, "new.json")
result = atomic_json_update(path, lambda d: {**d, "key": "val"})
assert result == {"key": "val"}
with open(path, "r") as f:
assert json.load(f) == {"key": "val"}

def test_updates_existing_file(self):
"""atomic_json_update should merge into existing data."""
from metaflow.util import atomic_json_update

with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, "existing.json")
with open(path, "w") as f:
json.dump({"a": 1}, f)
result = atomic_json_update(path, lambda d: {**d, "b": 2})
assert result == {"a": 1, "b": 2}

def test_crash_safety_no_partial_writes(self):
"""If updater_fn raises, the original file should be untouched."""
from metaflow.util import atomic_json_update

with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, "safe.json")
with open(path, "w") as f:
json.dump({"original": True}, f)

def bad_update(d):
raise ValueError("simulated crash")

with pytest.raises(ValueError):
atomic_json_update(path, bad_update)

with open(path, "r") as f:
assert json.load(f) == {"original": True}


class TestCleanupCondaFile:
"""Test temp file cleanup patterns used by the conda environment."""

def test_cleanup_temp_file(self):
"""Verify temp files are cleaned up properly."""
with tempfile.NamedTemporaryFile(suffix=".conda_tmp", delete=False) as f:
tmp_path = f.name
f.write(b"test data")

assert os.path.exists(tmp_path)
os.unlink(tmp_path)
assert not os.path.exists(tmp_path)

def test_cleanup_nonexistent_file_no_error(self):
"""Cleaning up a file that doesn't exist should not raise."""
tmp_path = "/tmp/nonexistent_conda_test_file_12345.tmp"
if os.path.exists(tmp_path):
os.unlink(tmp_path)
# Should not raise
try:
os.unlink(tmp_path)
except FileNotFoundError:
pass # Expected behavior
Comment thread
greptile-apps[bot] marked this conversation as resolved.
Outdated
Loading