diff --git a/task-sdk/src/airflow/sdk/io/path.py b/task-sdk/src/airflow/sdk/io/path.py index cf78bb4ebc6be..14efa2b130a25 100644 --- a/task-sdk/src/airflow/sdk/io/path.py +++ b/task-sdk/src/airflow/sdk/io/path.py @@ -17,6 +17,7 @@ from __future__ import annotations +import logging import shutil from typing import TYPE_CHECKING, Any, ClassVar from urllib.parse import urlsplit @@ -33,6 +34,8 @@ from typing_extensions import Self from upath.types import JoinablePathLike +log = logging.getLogger(__name__) + class _TrackingFileWrapper: """Wrapper that tracks file operations to intercept lineage.""" @@ -116,6 +119,13 @@ def __init__( # to the underlying fsspec filesystem, which doesn't understand it self._conn_id = storage_options.pop("conn_id", None) super().__init__(*args, protocol=protocol, **storage_options) + # ProxyUPath delegates all operations to self.__wrapped__, which was + # constructed with empty storage_options (conn_id stripped above). + # Pre-populating __wrapped__._fs_cached with the Airflow-authenticated + # filesystem fixes every delegated method (exists, mkdir, iterdir, glob, + # walk, rename, read_bytes, write_bytes, …) in one place rather than + # requiring individual overrides for each one. + self._inject_authenticated_fs(self.__wrapped__) @classmethod_or_method # type: ignore[arg-type] def _from_upath(cls_or_self, upath, /): @@ -127,8 +137,36 @@ def _from_upath(cls_or_self, upath, /): obj = object.__new__(cls) obj.__wrapped__ = upath obj._conn_id = getattr(cls_or_self, "_conn_id", None) if is_instance else None + # If the wrapped UPath has not yet had its fs cached (e.g. when _from_upath is + # called as a classmethod with a fresh UPath), inject the authenticated fs now. + # Child UPaths produced by __wrapped__ operations (iterdir, glob, etc.) already + # inherit _fs_cached from the parent UPath, so the hasattr check is a no-op for them. + if not hasattr(upath, "_fs_cached"): + obj._inject_authenticated_fs(upath) return obj + def _inject_authenticated_fs(self, wrapped: UPath) -> None: + """ + Inject the Airflow-authenticated filesystem into wrapped._fs_cached. + + This ensures that all ProxyUPath-delegated operations use the connection-aware + filesystem rather than an unauthenticated one constructed from empty storage_options. + Failures are logged at DEBUG level and silently skipped so that construction always + succeeds — errors will surface naturally at first use of the path. + """ + if self._conn_id is None: + return + try: + wrapped._fs_cached = attach(wrapped.protocol or "file", self._conn_id).fs + except Exception: + log.debug( + "Could not pre-populate authenticated filesystem for %r (conn_id=%r); " + "operations will attempt lazy resolution at first use.", + self, + self._conn_id, + exc_info=True, + ) + @property def conn_id(self) -> str | None: """Return the connection ID for this path.""" diff --git a/task-sdk/tests/task_sdk/io/test_path.py b/task-sdk/tests/task_sdk/io/test_path.py index e65c65842931d..3113270a47f4c 100644 --- a/task-sdk/tests/task_sdk/io/test_path.py +++ b/task-sdk/tests/task_sdk/io/test_path.py @@ -26,6 +26,7 @@ import pytest from fsspec.implementations.local import LocalFileSystem from fsspec.implementations.memory import MemoryFileSystem +from upath import UPath from airflow.sdk import Asset, ObjectStoragePath from airflow.sdk._shared.module_loading import qualname @@ -228,6 +229,114 @@ def test_standard_extended_api(self, fake_files, fn, args, fn2, path, expected_a method.assert_called_once_with(expected_args, **expected_kwargs) +class TestConnIdCredentialResolution: + """ + Regression tests for https://github.com/apache/airflow/issues/64632 + + When ObjectStoragePath was migrated from CloudPath to ProxyUPath (3.2.0), + methods like exists(), mkdir(), is_dir(), is_file() were delegated to + self.__wrapped__ which carries empty storage_options (conn_id is stored + separately). This caused NoCredentialsError / 401 errors for remote stores + even when a valid conn_id was provided. + """ + + @pytest.fixture(autouse=True) + def restore_cache(self): + cache = _STORE_CACHE.copy() + yield + _STORE_CACHE.clear() + _STORE_CACHE.update(cache) + + @pytest.fixture + def fake_fs_with_conn(self): + fs = _FakeRemoteFileSystem(conn_id="my_conn") + attach(protocol="ffs2", conn_id="my_conn", fs=fs) + try: + yield fs + finally: + _FakeRemoteFileSystem.store.clear() + _FakeRemoteFileSystem.pseudo_dirs[:] = [""] + + def test_exists_uses_authenticated_fs(self, fake_fs_with_conn): + """exists() must use self.fs (Airflow-attached) not __wrapped__.fs (unauthenticated).""" + p = ObjectStoragePath("ffs2://my_conn@bucket/some_file.txt", conn_id="my_conn") + # Verify the correct fs instance was injected, not merely any _FakeRemoteFileSystem + assert p.__wrapped__._fs_cached is fake_fs_with_conn + fake_fs_with_conn.touch("bucket/some_file.txt") + + assert p.exists() is True + assert ( + ObjectStoragePath("ffs2://my_conn@bucket/no_such_file.txt", conn_id="my_conn").exists() is False + ) + + def test_mkdir_uses_authenticated_fs(self, fake_fs_with_conn): + """mkdir() must use self.fs (Airflow-attached) not __wrapped__.fs (unauthenticated).""" + p = ObjectStoragePath("ffs2://my_conn@bucket/new_dir/", conn_id="my_conn") + p.mkdir(parents=True, exist_ok=True) + assert fake_fs_with_conn.isdir("bucket/new_dir") + + def test_is_dir_uses_authenticated_fs(self, fake_fs_with_conn): + """is_dir() must use self.fs (Airflow-attached) not __wrapped__.fs (unauthenticated).""" + fake_fs_with_conn.mkdir("bucket/a_dir") + p = ObjectStoragePath("ffs2://my_conn@bucket/a_dir", conn_id="my_conn") + assert p.is_dir() is True + + def test_is_file_uses_authenticated_fs(self, fake_fs_with_conn): + """is_file() must use self.fs (Airflow-attached) not __wrapped__.fs (unauthenticated).""" + fake_fs_with_conn.touch("bucket/a_file.txt") + p = ObjectStoragePath("ffs2://my_conn@bucket/a_file.txt", conn_id="my_conn") + assert p.is_file() is True + + def test_touch_uses_authenticated_fs(self, fake_fs_with_conn): + """touch() must use self.fs (Airflow-attached) not __wrapped__.fs (unauthenticated).""" + p = ObjectStoragePath("ffs2://my_conn@bucket/touched_file.txt", conn_id="my_conn") + p.touch() + assert fake_fs_with_conn.exists("bucket/touched_file.txt") + + def test_unlink_uses_authenticated_fs(self, fake_fs_with_conn): + """unlink() must use self.fs (Airflow-attached) not __wrapped__.fs (unauthenticated).""" + fake_fs_with_conn.touch("bucket/to_delete.txt") + p = ObjectStoragePath("ffs2://my_conn@bucket/to_delete.txt", conn_id="my_conn") + p.unlink() + assert not fake_fs_with_conn.exists("bucket/to_delete.txt") + + def test_rmdir_uses_authenticated_fs(self, fake_fs_with_conn): + """rmdir() must use self.fs (Airflow-attached) not __wrapped__.fs (unauthenticated).""" + fake_fs_with_conn.mkdir("bucket/empty_dir") + p = ObjectStoragePath("ffs2://my_conn@bucket/empty_dir", conn_id="my_conn") + # upath's rmdir(recursive=False) calls next(self.iterdir()) without a default, + # which raises StopIteration on empty dirs — a upath bug. Use the default (recursive=True). + p.rmdir() + assert not fake_fs_with_conn.exists("bucket/empty_dir") + + def test_conn_id_in_uri_works_for_exists(self, fake_fs_with_conn): + """conn_id embedded in URI (user@host) should also work for exists().""" + fake_fs_with_conn.touch("bucket/target.txt") + p = ObjectStoragePath("ffs2://my_conn@bucket/target.txt") + assert p.conn_id == "my_conn" + assert p.exists() is True + + def test_from_upath_injects_fs_when_no_cache(self, fake_fs_with_conn): + """_from_upath must inject authenticated fs into a fresh UPath with no _fs_cached.""" + # Simulate _from_upath called as an instance method with a fresh UPath that has + # no _fs_cached set (e.g. cwd() / home() or a cross-protocol _from_upath call). + p_instance = ObjectStoragePath("ffs2://my_conn@bucket/root", conn_id="my_conn") + fresh_upath = UPath("ffs2://bucket/other") + assert not hasattr(fresh_upath, "_fs_cached") + child = p_instance._from_upath(fresh_upath) + assert child.__wrapped__._fs_cached is fake_fs_with_conn + + def test_iterdir_children_use_authenticated_fs(self, fake_fs_with_conn): + """Children yielded by iterdir() must also carry the authenticated filesystem.""" + fake_fs_with_conn.touch("bucket/dir/file1.txt") + fake_fs_with_conn.touch("bucket/dir/file2.txt") + p = ObjectStoragePath("ffs2://my_conn@bucket/dir", conn_id="my_conn") + children = list(p.iterdir()) + assert len(children) == 2 + # Each child path must use the same authenticated fs, not a fresh unauthenticated one + assert all(c.__wrapped__._fs_cached is fake_fs_with_conn for c in children) + + class TestRemotePath: def test_bucket_key_protocol(self): bucket = "bkt"