Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions task-sdk/src/airflow/sdk/io/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from __future__ import annotations

import logging
import shutil
from typing import TYPE_CHECKING, Any, ClassVar
from urllib.parse import urlsplit
Expand All @@ -33,6 +34,8 @@
from typing_extensions import Self
from upath.types import JoinablePathLike

log = logging.getLogger(__name__)


class _TrackingFileWrapper:
"""Wrapper that tracks file operations to intercept lineage."""
Expand Down Expand Up @@ -116,6 +119,13 @@ def __init__(
# to the underlying fsspec filesystem, which doesn't understand it
self._conn_id = storage_options.pop("conn_id", None)
super().__init__(*args, protocol=protocol, **storage_options)
# ProxyUPath delegates all operations to self.__wrapped__, which was
# constructed with empty storage_options (conn_id stripped above).
# Pre-populating __wrapped__._fs_cached with the Airflow-authenticated
# filesystem fixes every delegated method (exists, mkdir, iterdir, glob,
# walk, rename, read_bytes, write_bytes, …) in one place rather than
# requiring individual overrides for each one.
self._inject_authenticated_fs(self.__wrapped__)

@classmethod_or_method # type: ignore[arg-type]
def _from_upath(cls_or_self, upath, /):
Expand All @@ -127,8 +137,36 @@ def _from_upath(cls_or_self, upath, /):
obj = object.__new__(cls)
obj.__wrapped__ = upath
obj._conn_id = getattr(cls_or_self, "_conn_id", None) if is_instance else None
# If the wrapped UPath has not yet had its fs cached (e.g. when _from_upath is
# called as a classmethod with a fresh UPath), inject the authenticated fs now.
# Child UPaths produced by __wrapped__ operations (iterdir, glob, etc.) already
# inherit _fs_cached from the parent UPath, so the hasattr check is a no-op for them.
if not hasattr(upath, "_fs_cached"):
obj._inject_authenticated_fs(upath)
return obj

def _inject_authenticated_fs(self, wrapped: UPath) -> None:
"""
Inject the Airflow-authenticated filesystem into wrapped._fs_cached.

This ensures that all ProxyUPath-delegated operations use the connection-aware
filesystem rather than an unauthenticated one constructed from empty storage_options.
Failures are logged at DEBUG level and silently skipped so that construction always
succeeds — errors will surface naturally at first use of the path.
"""
if self._conn_id is None:
return
try:
wrapped._fs_cached = attach(wrapped.protocol or "file", self._conn_id).fs
except Exception:
log.debug(
"Could not pre-populate authenticated filesystem for %r (conn_id=%r); "
"operations will attempt lazy resolution at first use.",
self,
self._conn_id,
exc_info=True,
)

@property
def conn_id(self) -> str | None:
"""Return the connection ID for this path."""
Expand Down
109 changes: 109 additions & 0 deletions task-sdk/tests/task_sdk/io/test_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import pytest
from fsspec.implementations.local import LocalFileSystem
from fsspec.implementations.memory import MemoryFileSystem
from upath import UPath

from airflow.sdk import Asset, ObjectStoragePath
from airflow.sdk._shared.module_loading import qualname
Expand Down Expand Up @@ -228,6 +229,114 @@ def test_standard_extended_api(self, fake_files, fn, args, fn2, path, expected_a
method.assert_called_once_with(expected_args, **expected_kwargs)


class TestConnIdCredentialResolution:
"""
Regression tests for https://github.com/apache/airflow/issues/64632

When ObjectStoragePath was migrated from CloudPath to ProxyUPath (3.2.0),
methods like exists(), mkdir(), is_dir(), is_file() were delegated to
self.__wrapped__ which carries empty storage_options (conn_id is stored
separately). This caused NoCredentialsError / 401 errors for remote stores
even when a valid conn_id was provided.
"""

@pytest.fixture(autouse=True)
def restore_cache(self):
cache = _STORE_CACHE.copy()
yield
_STORE_CACHE.clear()
_STORE_CACHE.update(cache)

@pytest.fixture
def fake_fs_with_conn(self):
fs = _FakeRemoteFileSystem(conn_id="my_conn")
attach(protocol="ffs2", conn_id="my_conn", fs=fs)
try:
yield fs
finally:
_FakeRemoteFileSystem.store.clear()
_FakeRemoteFileSystem.pseudo_dirs[:] = [""]

def test_exists_uses_authenticated_fs(self, fake_fs_with_conn):
"""exists() must use self.fs (Airflow-attached) not __wrapped__.fs (unauthenticated)."""
p = ObjectStoragePath("ffs2://my_conn@bucket/some_file.txt", conn_id="my_conn")
# Verify the correct fs instance was injected, not merely any _FakeRemoteFileSystem
assert p.__wrapped__._fs_cached is fake_fs_with_conn
fake_fs_with_conn.touch("bucket/some_file.txt")

assert p.exists() is True
assert (
ObjectStoragePath("ffs2://my_conn@bucket/no_such_file.txt", conn_id="my_conn").exists() is False
)

def test_mkdir_uses_authenticated_fs(self, fake_fs_with_conn):
"""mkdir() must use self.fs (Airflow-attached) not __wrapped__.fs (unauthenticated)."""
p = ObjectStoragePath("ffs2://my_conn@bucket/new_dir/", conn_id="my_conn")
p.mkdir(parents=True, exist_ok=True)
assert fake_fs_with_conn.isdir("bucket/new_dir")

def test_is_dir_uses_authenticated_fs(self, fake_fs_with_conn):
"""is_dir() must use self.fs (Airflow-attached) not __wrapped__.fs (unauthenticated)."""
fake_fs_with_conn.mkdir("bucket/a_dir")
p = ObjectStoragePath("ffs2://my_conn@bucket/a_dir", conn_id="my_conn")
assert p.is_dir() is True

def test_is_file_uses_authenticated_fs(self, fake_fs_with_conn):
"""is_file() must use self.fs (Airflow-attached) not __wrapped__.fs (unauthenticated)."""
fake_fs_with_conn.touch("bucket/a_file.txt")
p = ObjectStoragePath("ffs2://my_conn@bucket/a_file.txt", conn_id="my_conn")
assert p.is_file() is True

def test_touch_uses_authenticated_fs(self, fake_fs_with_conn):
"""touch() must use self.fs (Airflow-attached) not __wrapped__.fs (unauthenticated)."""
p = ObjectStoragePath("ffs2://my_conn@bucket/touched_file.txt", conn_id="my_conn")
p.touch()
assert fake_fs_with_conn.exists("bucket/touched_file.txt")

def test_unlink_uses_authenticated_fs(self, fake_fs_with_conn):
"""unlink() must use self.fs (Airflow-attached) not __wrapped__.fs (unauthenticated)."""
fake_fs_with_conn.touch("bucket/to_delete.txt")
p = ObjectStoragePath("ffs2://my_conn@bucket/to_delete.txt", conn_id="my_conn")
p.unlink()
assert not fake_fs_with_conn.exists("bucket/to_delete.txt")

def test_rmdir_uses_authenticated_fs(self, fake_fs_with_conn):
"""rmdir() must use self.fs (Airflow-attached) not __wrapped__.fs (unauthenticated)."""
fake_fs_with_conn.mkdir("bucket/empty_dir")
p = ObjectStoragePath("ffs2://my_conn@bucket/empty_dir", conn_id="my_conn")
# upath's rmdir(recursive=False) calls next(self.iterdir()) without a default,
# which raises StopIteration on empty dirs — a upath bug. Use the default (recursive=True).
p.rmdir()
assert not fake_fs_with_conn.exists("bucket/empty_dir")

def test_conn_id_in_uri_works_for_exists(self, fake_fs_with_conn):
"""conn_id embedded in URI (user@host) should also work for exists()."""
fake_fs_with_conn.touch("bucket/target.txt")
p = ObjectStoragePath("ffs2://my_conn@bucket/target.txt")
assert p.conn_id == "my_conn"
assert p.exists() is True

def test_from_upath_injects_fs_when_no_cache(self, fake_fs_with_conn):
"""_from_upath must inject authenticated fs into a fresh UPath with no _fs_cached."""
# Simulate _from_upath called as an instance method with a fresh UPath that has
# no _fs_cached set (e.g. cwd() / home() or a cross-protocol _from_upath call).
p_instance = ObjectStoragePath("ffs2://my_conn@bucket/root", conn_id="my_conn")
fresh_upath = UPath("ffs2://bucket/other")
assert not hasattr(fresh_upath, "_fs_cached")
child = p_instance._from_upath(fresh_upath)
assert child.__wrapped__._fs_cached is fake_fs_with_conn

def test_iterdir_children_use_authenticated_fs(self, fake_fs_with_conn):
"""Children yielded by iterdir() must also carry the authenticated filesystem."""
fake_fs_with_conn.touch("bucket/dir/file1.txt")
fake_fs_with_conn.touch("bucket/dir/file2.txt")
p = ObjectStoragePath("ffs2://my_conn@bucket/dir", conn_id="my_conn")
children = list(p.iterdir())
assert len(children) == 2
# Each child path must use the same authenticated fs, not a fresh unauthenticated one
assert all(c.__wrapped__._fs_cached is fake_fs_with_conn for c in children)


class TestRemotePath:
def test_bucket_key_protocol(self):
bucket = "bkt"
Expand Down
Loading