diff --git a/airflow-core/src/airflow/api_fastapi/common/cursors.py b/airflow-core/src/airflow/api_fastapi/common/cursors.py new file mode 100644 index 0000000000000..a9d2191197f2f --- /dev/null +++ b/airflow-core/src/airflow/api_fastapi/common/cursors.py @@ -0,0 +1,143 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Cursor-based (keyset) pagination helpers. + +:meta private: +""" + +from __future__ import annotations + +import base64 +import uuid as uuid_mod +from typing import Any + +import msgspec +from fastapi import HTTPException, status +from sqlalchemy import and_, or_ +from sqlalchemy.sql import Select +from sqlalchemy.sql.elements import ColumnElement +from sqlalchemy.sql.sqltypes import Uuid + +from airflow.api_fastapi.common.parameters import SortParam + + +def _b64url_decode_padded(token: str) -> bytes: + padding = 4 - (len(token) % 4) + if padding != 4: + token = token + ("=" * padding) + return base64.urlsafe_b64decode(token.encode("ascii")) + + +def _nonstrict_bound(col: ColumnElement, value: Any, is_desc: bool) -> ColumnElement[bool]: + """Inclusive range edge on the leading column at each nesting level (``>=`` / ``<=``).""" + return col <= value if is_desc else col >= value + + +def _strict_bound(col: ColumnElement, value: Any, is_desc: bool) -> ColumnElement[bool]: + """Strict inequality for ``or_`` branches (``<`` / ``>``).""" + return col < value if is_desc else col > value + + +def _nested_keyset_predicate( + resolved: list[tuple[str, ColumnElement, bool]], values: list[Any] +) -> ColumnElement[bool]: + """ + Keyset predicate for rows strictly after the cursor in ``ORDER BY`` order. + + Uses nested ``and_(non-strict, or_(strict, ...))`` so leading sort keys use + inclusive range bounds and inner branches use strict inequalities—friendly + for composite index range scans. Logically equivalent to an OR-of-prefix- + equalities formulation. + """ + n = len(resolved) + _, col, is_desc = resolved[n - 1] + inner: ColumnElement[bool] = _strict_bound(col, values[n - 1], is_desc) + for i in range(n - 2, -1, -1): + _, col_i, is_desc_i = resolved[i] + inner = and_( + _nonstrict_bound(col_i, values[i], is_desc_i), + or_(_strict_bound(col_i, values[i], is_desc_i), inner), + ) + return inner + + +def _coerce_value(column: ColumnElement, value: Any) -> Any: + """Normalize decoded values for SQL bind parameters (e.g. UUID columns).""" + if value is None or not isinstance(value, str): + return value + ctype = getattr(column, "type", None) + if isinstance(ctype, Uuid): + try: + return uuid_mod.UUID(value) + except ValueError: + return value + return value + + +def encode_cursor(row: Any, sort_param: SortParam) -> str: + """ + Encode cursor token from the boundary row of a result set. + + The token is a url-safe base64 encoding of a MessagePack list of sort-key + values (no padding ``=``), so the cursor is compact and safe in query strings. + Binary msgpack is not URL-safe by itself, so base64 is still required. + """ + resolved = sort_param.get_resolved_columns() + if not resolved: + raise ValueError("SortParam has no resolved columns.") + + parts = [getattr(row, attr_name, None) for attr_name, _col, _desc in resolved] + payload = msgspec.msgpack.encode(parts) + return base64.urlsafe_b64encode(payload).decode("ascii").rstrip("=") + + +def decode_cursor(token: str) -> list[Any]: + """Decode a cursor token to the list of sort-key values.""" + try: + raw = _b64url_decode_padded(token) + except Exception: + raise HTTPException(status.HTTP_400_BAD_REQUEST, "Invalid cursor token") + + try: + data: Any = msgspec.msgpack.decode(raw) + except Exception: + raise HTTPException(status.HTTP_400_BAD_REQUEST, "Invalid cursor token") + + if not isinstance(data, list): + raise HTTPException(status.HTTP_400_BAD_REQUEST, "Invalid cursor token structure") + + return data + + +def apply_cursor_filter(statement: Select, cursor: str, sort_param: SortParam) -> Select: + """ + Apply a keyset pagination WHERE clause from a cursor token. + + Uses nested ``and_(col <=/>= v, or_(col v, ...))`` so each leading sort + key carries a range-friendly non-strict bound, with strict inequalities on + the ``or_`` branches—aligned with common composite index range scans. + """ + raw_values = decode_cursor(cursor) + + resolved = sort_param.get_resolved_columns() + if len(raw_values) != len(resolved): + raise HTTPException(status.HTTP_400_BAD_REQUEST, "Cursor token does not match current query shape") + + parsed_values = [_coerce_value(col, val) for (_, col, _), val in zip(resolved, raw_values, strict=True)] + + return statement.where(_nested_keyset_predicate(resolved, parsed_values)) diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py b/airflow-core/src/airflow/api_fastapi/common/parameters.py index 065dff8cda8bb..269d5ef0b9586 100644 --- a/airflow-core/src/airflow/api_fastapi/common/parameters.py +++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py @@ -119,7 +119,10 @@ def to_orm(self, select: Select) -> Select: return select.offset(self.value) @classmethod - def depends(cls, offset: NonNegativeInt = 0) -> OffsetFilter: + def depends( + cls, + offset: NonNegativeInt = 0, + ) -> OffsetFilter: return cls().set_value(offset) @@ -281,10 +284,16 @@ def __init__( self.allowed_attrs = allowed_attrs self.model = model self.to_replace = to_replace + self._cached_resolution: list[tuple[str, ColumnElement, bool]] | None = None - def to_orm(self, select: Select) -> Select: - if self.skip_none is False: - raise ValueError(f"Cannot set 'skip_none' to False on a {type(self)}") + def set_value(self, value: list[str] | None) -> Self: + self._cached_resolution = None + return super().set_value(value) + + def _resolve(self) -> list[tuple[str, ColumnElement, bool]]: + """Resolve sort columns as (attr_name, column, is_descending) tuples. Cached after first call.""" + if self._cached_resolution is not None: + return self._cached_resolution if self.value is None: self.value = [self.get_primary_key_string()] @@ -296,9 +305,10 @@ def to_orm(self, select: Select) -> Select: f"Ordering with more than {self.MAX_SORT_PARAMS} parameters is not allowed. Provided: {order_by_values}", ) - columns: list[ColumnElement] = [] + resolved: list[tuple[str, ColumnElement, bool]] = [] for order_by_value in order_by_values: lstriped_orderby = order_by_value.lstrip("-") + attr_name = lstriped_orderby column: Column | None = None if self.to_replace: replacement = self.to_replace.get(lstriped_orderby, lstriped_orderby) @@ -316,22 +326,28 @@ def to_orm(self, select: Select) -> Select: if column is None: column = getattr(self.model, lstriped_orderby) - if order_by_value.startswith("-"): - columns.append(column.desc()) - else: - columns.append(column.asc()) - - # Reset default sorting - select = select.order_by(None) + resolved.append((attr_name, column, order_by_value.startswith("-"))) primary_key_column = self.get_primary_key_column() - # Always add a final discriminator to enforce deterministic ordering. - if order_by_values and order_by_values[0].startswith("-"): - columns.append(primary_key_column.desc()) - else: - columns.append(primary_key_column.asc()) + pk_name = self.get_primary_key_string() + if not any(name == pk_name for name, _, _ in resolved): + pk_desc = bool(order_by_values and order_by_values[0].startswith("-")) + resolved.append((pk_name, primary_key_column, pk_desc)) + + self._cached_resolution = resolved + return self._cached_resolution + + def to_orm(self, select: Select) -> Select: + if self.skip_none is False: + raise ValueError(f"Cannot set 'skip_none' to False on a {type(self)}") + + resolved = self._resolve() + columns = [col.desc() if is_desc else col.asc() for _, col, is_desc in resolved] + return select.order_by(None).order_by(*columns) - return select.order_by(*columns) + def get_resolved_columns(self) -> list[tuple[str, ColumnElement, bool]]: + """Return resolved sort columns as (attr_name, column_element, is_descending) tuples.""" + return self._resolve() def get_primary_key_column(self) -> Column: """Get the primary key column of the model of SortParam object.""" diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py index c8e7ab2378dd1..c02f1f9eee592 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py @@ -83,10 +83,33 @@ class TaskInstanceResponse(BaseModel): class TaskInstanceCollectionResponse(BaseModel): - """Task Instance Collection serializer for responses.""" + """ + Task instance collection response supporting both offset and cursor pagination. + + A single flat model is used instead of a discriminated union + (``Annotated[Offset | Cursor, Field(discriminator=...)]``) because + the OpenAPI ``oneOf`` + ``discriminator`` construct is not handled + correctly by ``@hey-api/openapi-ts`` / ``@7nohe/openapi-react-query-codegen``: + return types degrade to ``unknown`` in JSDoc and can produce + incorrect TypeScript types (see hey-api/openapi-ts#1613, #3270). + """ task_instances: Iterable[TaskInstanceResponse] - total_entries: int + total_entries: int | None = Field( + default=None, + description="Total number of matching items. Populated for offset pagination, " + "``null`` when using cursor pagination.", + ) + next_cursor: str | None = Field( + default=None, + description="Token pointing to the next page. Populated for cursor pagination, " + "``null`` when using offset pagination or when there is no next page.", + ) + previous_cursor: str | None = Field( + default=None, + description="Token pointing to the previous page. Populated for cursor pagination, " + "``null`` when using offset pagination or when on the first page.", + ) class TaskDependencyResponse(BaseModel): diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index bd8e6aa62ba45..e610b93e65469 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -6524,10 +6524,22 @@ paths: description: 'Get list of task instances. - This endpoint allows specifying `~` as the dag_id, dag_run_id to retrieve - Task Instances for all DAGs + This endpoint allows specifying `~` as the dag_id, dag_run_id - and DAG runs.' + to retrieve task instances for all DAGs and DAG runs. + + + Supports two pagination modes: + + + **Offset (default):** use `limit` and `offset` query parameters. Returns `total_entries`. + + + **Cursor:** pass `cursor` (empty string for the first page, then `next_cursor` + from the response). + + When `cursor` is provided, `offset` is ignored and `total_entries` is not + returned.' operationId: get_task_instances security: - OAuth2PasswordBearer: [] @@ -6545,6 +6557,20 @@ paths: schema: type: string title: Dag Run Id + - name: cursor + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + description: Cursor for keyset-based pagination (mutually exclusive with + offset). Pass an empty string for the first page, then use ``next_cursor`` + from the response. + title: Cursor + description: Cursor for keyset-based pagination (mutually exclusive with offset). + Pass an empty string for the first page, then use ``next_cursor`` from the + response. - name: task_id in: query required: false @@ -12576,14 +12602,45 @@ components: type: array title: Task Instances total_entries: - type: integer + anyOf: + - type: integer + - type: 'null' title: Total Entries + description: Total number of matching items. Populated for offset pagination, + ``null`` when using cursor pagination. + next_cursor: + anyOf: + - type: string + - type: 'null' + title: Next Cursor + description: Token pointing to the next page. Populated for cursor pagination, + ``null`` when using offset pagination or when there is no next page. + previous_cursor: + anyOf: + - type: string + - type: 'null' + title: Previous Cursor + description: Token pointing to the previous page. Populated for cursor pagination, + ``null`` when using offset pagination or when on the first page. type: object required: - task_instances - - total_entries title: TaskInstanceCollectionResponse - description: Task Instance Collection serializer for responses. + description: 'Task instance collection response supporting both offset and cursor + pagination. + + + A single flat model is used instead of a discriminated union + + (``Annotated[Offset | Cursor, Field(discriminator=...)]``) because + + the OpenAPI ``oneOf`` + ``discriminator`` construct is not handled + + correctly by ``@hey-api/openapi-ts`` / ``@7nohe/openapi-react-query-codegen``: + + return types degrade to ``unknown`` in JSDoc and can produce + + incorrect TypeScript types (see hey-api/openapi-ts#1613, #3270).' TaskInstanceHistoryCollectionResponse: properties: task_instances: diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py index 1af56b565f309..71194e784d907 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -27,13 +27,14 @@ from sqlalchemy.sql.selectable import Select from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity +from airflow.api_fastapi.common.cursors import apply_cursor_filter, encode_cursor from airflow.api_fastapi.common.dagbag import ( DagBagDep, get_dag_for_run, get_dag_for_run_or_latest_version, get_latest_version_of_dag, ) -from airflow.api_fastapi.common.db.common import SessionDep, paginated_select +from airflow.api_fastapi.common.db.common import SessionDep, apply_filters_to_select, paginated_select from airflow.api_fastapi.common.db.task_instances import eager_load_TI_and_TIH_for_validation from airflow.api_fastapi.common.parameters import ( FilterOptionEnum, @@ -65,6 +66,7 @@ search_param_factory, ) from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.core_api.base import OrmClause from airflow.api_fastapi.core_api.datamodels.common import BulkBody, BulkResponse from airflow.api_fastapi.core_api.datamodels.task_instance_history import ( TaskInstanceHistoryCollectionResponse, @@ -470,13 +472,26 @@ def get_task_instances( ], readable_ti_filter: ReadableTIFilterDep, session: SessionDep, + cursor: str | None = Query( + None, + description="Cursor for keyset-based pagination (mutually exclusive with offset). " + "Pass an empty string for the first page, then use ``next_cursor`` from the response.", + ), ) -> TaskInstanceCollectionResponse: """ Get list of task instances. - This endpoint allows specifying `~` as the dag_id, dag_run_id to retrieve Task Instances for all DAGs - and DAG runs. + This endpoint allows specifying `~` as the dag_id, dag_run_id + to retrieve task instances for all DAGs and DAG runs. + + Supports two pagination modes: + + **Offset (default):** use `limit` and `offset` query parameters. Returns `total_entries`. + + **Cursor:** pass `cursor` (empty string for the first page, then `next_cursor` from the response). + When `cursor` is provided, `offset` is ignored and `total_entries` is not returned. """ + use_cursor = cursor is not None dag_run = None query = eager_load_TI_and_TIH_for_validation(select(TI)) if dag_run_id != "~": @@ -498,40 +513,53 @@ def get_task_instances( if dag: task_group_id.dag = dag + filters: list[OrmClause] = [ + run_after_range, + logical_date_range, + start_date_range, + end_date_range, + update_at_range, + duration_range, + state, + pool, + pool_name_pattern, + queue, + queue_name_pattern, + executor, + task_id, + task_display_name_pattern, + task_group_id, + dag_id_pattern, + run_id_pattern, + version_number, + readable_ti_filter, + try_number, + operator, + operator_name_pattern, + map_index, + ] + + if use_cursor: + task_instance_select = apply_filters_to_select(statement=query, filters=[*filters, order_by, limit]) + if cursor: + task_instance_select = apply_cursor_filter(task_instance_select, cursor, order_by) + + task_instances = list(session.scalars(task_instance_select)) + return TaskInstanceCollectionResponse( + task_instances=task_instances, + next_cursor=encode_cursor(task_instances[-1], order_by) if task_instances else None, + previous_cursor=encode_cursor(task_instances[0], order_by) if task_instances else None, + ) + task_instance_select, total_entries = paginated_select( statement=query, - filters=[ - run_after_range, - logical_date_range, - start_date_range, - end_date_range, - update_at_range, - duration_range, - state, - pool, - pool_name_pattern, - queue, - queue_name_pattern, - executor, - task_id, - task_display_name_pattern, - task_group_id, - dag_id_pattern, - run_id_pattern, - version_number, - readable_ti_filter, - try_number, - operator, - operator_name_pattern, - map_index, - ], + filters=filters, order_by=order_by, offset=offset, limit=limit, session=session, ) - - task_instances = session.scalars(task_instance_select) + task_instances = list(session.scalars(task_instance_select)) return TaskInstanceCollectionResponse( task_instances=task_instances, total_entries=total_entries, diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index 849b8ba490abe..ab44ca991b053 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -478,7 +478,8 @@ export const UseTaskInstanceServiceGetMappedTaskInstanceKeyFn = ({ dagId, dagRun export type TaskInstanceServiceGetTaskInstancesDefaultResponse = Awaited>; export type TaskInstanceServiceGetTaskInstancesQueryResult = UseQueryResult; export const useTaskInstanceServiceGetTaskInstancesKey = "TaskInstanceServiceGetTaskInstances"; -export const UseTaskInstanceServiceGetTaskInstancesKeyFn = ({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }: { +export const UseTaskInstanceServiceGetTaskInstancesKeyFn = ({ cursor, dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }: { + cursor?: string; dagId: string; dagIdPattern?: string; dagRunId: string; @@ -524,7 +525,7 @@ export const UseTaskInstanceServiceGetTaskInstancesKeyFn = ({ dagId, dagIdPatter updatedAtLt?: string; updatedAtLte?: string; versionNumber?: number[]; -}, queryKey?: Array) => [useTaskInstanceServiceGetTaskInstancesKey, ...(queryKey ?? [{ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }])]; +}, queryKey?: Array) => [useTaskInstanceServiceGetTaskInstancesKey, ...(queryKey ?? [{ cursor, dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }])]; export type TaskInstanceServiceGetTaskInstanceTryDetailsDefaultResponse = Awaited>; export type TaskInstanceServiceGetTaskInstanceTryDetailsQueryResult = UseQueryResult; export const useTaskInstanceServiceGetTaskInstanceTryDetailsKey = "TaskInstanceServiceGetTaskInstanceTryDetails"; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts index f841cfdca4db5..690a0f95060bc 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts @@ -902,11 +902,19 @@ export const ensureUseTaskInstanceServiceGetMappedTaskInstanceData = (queryClien * Get Task Instances * Get list of task instances. * -* This endpoint allows specifying `~` as the dag_id, dag_run_id to retrieve Task Instances for all DAGs -* and DAG runs. +* This endpoint allows specifying `~` as the dag_id, dag_run_id +* to retrieve task instances for all DAGs and DAG runs. +* +* Supports two pagination modes: +* +* **Offset (default):** use `limit` and `offset` query parameters. Returns `total_entries`. +* +* **Cursor:** pass `cursor` (empty string for the first page, then `next_cursor` from the response). +* When `cursor` is provided, `offset` is ignored and `total_entries` is not returned. * @param data The data for the request. * @param data.dagId * @param data.dagRunId +* @param data.cursor Cursor for keyset-based pagination (mutually exclusive with offset). Pass an empty string for the first page, then use ``next_cursor`` from the response. * @param data.taskId * @param data.runAfterGte * @param data.runAfterGt @@ -953,7 +961,8 @@ export const ensureUseTaskInstanceServiceGetMappedTaskInstanceData = (queryClien * @returns TaskInstanceCollectionResponse Successful Response * @throws ApiError */ -export const ensureUseTaskInstanceServiceGetTaskInstancesData = (queryClient: QueryClient, { dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }: { +export const ensureUseTaskInstanceServiceGetTaskInstancesData = (queryClient: QueryClient, { cursor, dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }: { + cursor?: string; dagId: string; dagIdPattern?: string; dagRunId: string; @@ -999,7 +1008,7 @@ export const ensureUseTaskInstanceServiceGetTaskInstancesData = (queryClient: Qu updatedAtLt?: string; updatedAtLte?: string; versionNumber?: number[]; -}) => queryClient.ensureQueryData({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }), queryFn: () => TaskInstanceService.getTaskInstances({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }) }); +}) => queryClient.ensureQueryData({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ cursor, dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }), queryFn: () => TaskInstanceService.getTaskInstances({ cursor, dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }) }); /** * Get Task Instance Try Details * Get task instance details by try number. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts index 7de17099b84da..7f674ab8eab80 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts @@ -902,11 +902,19 @@ export const prefetchUseTaskInstanceServiceGetMappedTaskInstance = (queryClient: * Get Task Instances * Get list of task instances. * -* This endpoint allows specifying `~` as the dag_id, dag_run_id to retrieve Task Instances for all DAGs -* and DAG runs. +* This endpoint allows specifying `~` as the dag_id, dag_run_id +* to retrieve task instances for all DAGs and DAG runs. +* +* Supports two pagination modes: +* +* **Offset (default):** use `limit` and `offset` query parameters. Returns `total_entries`. +* +* **Cursor:** pass `cursor` (empty string for the first page, then `next_cursor` from the response). +* When `cursor` is provided, `offset` is ignored and `total_entries` is not returned. * @param data The data for the request. * @param data.dagId * @param data.dagRunId +* @param data.cursor Cursor for keyset-based pagination (mutually exclusive with offset). Pass an empty string for the first page, then use ``next_cursor`` from the response. * @param data.taskId * @param data.runAfterGte * @param data.runAfterGt @@ -953,7 +961,8 @@ export const prefetchUseTaskInstanceServiceGetMappedTaskInstance = (queryClient: * @returns TaskInstanceCollectionResponse Successful Response * @throws ApiError */ -export const prefetchUseTaskInstanceServiceGetTaskInstances = (queryClient: QueryClient, { dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }: { +export const prefetchUseTaskInstanceServiceGetTaskInstances = (queryClient: QueryClient, { cursor, dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }: { + cursor?: string; dagId: string; dagIdPattern?: string; dagRunId: string; @@ -999,7 +1008,7 @@ export const prefetchUseTaskInstanceServiceGetTaskInstances = (queryClient: Quer updatedAtLt?: string; updatedAtLte?: string; versionNumber?: number[]; -}) => queryClient.prefetchQuery({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }), queryFn: () => TaskInstanceService.getTaskInstances({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }) }); +}) => queryClient.prefetchQuery({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ cursor, dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }), queryFn: () => TaskInstanceService.getTaskInstances({ cursor, dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }) }); /** * Get Task Instance Try Details * Get task instance details by try number. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index 40b61f1e1f4c9..aa9cf439239d3 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -902,11 +902,19 @@ export const useTaskInstanceServiceGetMappedTaskInstance = = unknown[]>({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }: { +export const useTaskInstanceServiceGetTaskInstances = = unknown[]>({ cursor, dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }: { + cursor?: string; dagId: string; dagIdPattern?: string; dagRunId: string; @@ -999,7 +1008,7 @@ export const useTaskInstanceServiceGetTaskInstances = , "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }, queryKey), queryFn: () => TaskInstanceService.getTaskInstances({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }) as TData, ...options }); +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ cursor, dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }, queryKey), queryFn: () => TaskInstanceService.getTaskInstances({ cursor, dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }) as TData, ...options }); /** * Get Task Instance Try Details * Get task instance details by try number. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts index e04f3f813b5c9..aa4c7ab1d571b 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts @@ -902,11 +902,19 @@ export const useTaskInstanceServiceGetMappedTaskInstanceSuspense = = unknown[]>({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }: { +export const useTaskInstanceServiceGetTaskInstancesSuspense = = unknown[]>({ cursor, dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }: { + cursor?: string; dagId: string; dagIdPattern?: string; dagRunId: string; @@ -999,7 +1008,7 @@ export const useTaskInstanceServiceGetTaskInstancesSuspense = , "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }, queryKey), queryFn: () => TaskInstanceService.getTaskInstances({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }) as TData, ...options }); +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ cursor, dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }, queryKey), queryFn: () => TaskInstanceService.getTaskInstances({ cursor, dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }) as TData, ...options }); /** * Get Task Instance Try Details * Get task instance details by try number. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index c37bb8400177f..4e0a537853d02 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -5307,14 +5307,53 @@ export const $TaskInstanceCollectionResponse = { title: 'Task Instances' }, total_entries: { - type: 'integer', - title: 'Total Entries' + anyOf: [ + { + type: 'integer' + }, + { + type: 'null' + } + ], + title: 'Total Entries', + description: 'Total number of matching items. Populated for offset pagination, ``null`` when using cursor pagination.' + }, + next_cursor: { + anyOf: [ + { + type: 'string' + }, + { + type: 'null' + } + ], + title: 'Next Cursor', + description: 'Token pointing to the next page. Populated for cursor pagination, ``null`` when using offset pagination or when there is no next page.' + }, + previous_cursor: { + anyOf: [ + { + type: 'string' + }, + { + type: 'null' + } + ], + title: 'Previous Cursor', + description: 'Token pointing to the previous page. Populated for cursor pagination, ``null`` when using offset pagination or when on the first page.' } }, type: 'object', - required: ['task_instances', 'total_entries'], + required: ['task_instances'], title: 'TaskInstanceCollectionResponse', - description: 'Task Instance Collection serializer for responses.' + description: `Task instance collection response supporting both offset and cursor pagination. + +A single flat model is used instead of a discriminated union +(\`\`Annotated[Offset | Cursor, Field(discriminator=...)]\`\`) because +the OpenAPI \`\`oneOf\`\` + \`\`discriminator\`\` construct is not handled +correctly by \`\`@hey-api/openapi-ts\`\` / \`\`@7nohe/openapi-react-query-codegen\`\`: +return types degrade to \`\`unknown\`\` in JSDoc and can produce +incorrect TypeScript types (see hey-api/openapi-ts#1613, #3270).` } as const; export const $TaskInstanceHistoryCollectionResponse = { diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index ce11a219fb0cd..890417b014bde 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -2319,11 +2319,19 @@ export class TaskInstanceService { * Get Task Instances * Get list of task instances. * - * This endpoint allows specifying `~` as the dag_id, dag_run_id to retrieve Task Instances for all DAGs - * and DAG runs. + * This endpoint allows specifying `~` as the dag_id, dag_run_id + * to retrieve task instances for all DAGs and DAG runs. + * + * Supports two pagination modes: + * + * **Offset (default):** use `limit` and `offset` query parameters. Returns `total_entries`. + * + * **Cursor:** pass `cursor` (empty string for the first page, then `next_cursor` from the response). + * When `cursor` is provided, `offset` is ignored and `total_entries` is not returned. * @param data The data for the request. * @param data.dagId * @param data.dagRunId + * @param data.cursor Cursor for keyset-based pagination (mutually exclusive with offset). Pass an empty string for the first page, then use ``next_cursor`` from the response. * @param data.taskId * @param data.runAfterGte * @param data.runAfterGt @@ -2379,6 +2387,7 @@ export class TaskInstanceService { dag_run_id: data.dagRunId }, query: { + cursor: data.cursor, task_id: data.taskId, run_after_gte: data.runAfterGte, run_after_gt: data.runAfterGt, diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 9a6c9fb79350a..23f502fef3302 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1375,11 +1375,29 @@ export type TaskInletAssetReference = { }; /** - * Task Instance Collection serializer for responses. + * Task instance collection response supporting both offset and cursor pagination. + * + * A single flat model is used instead of a discriminated union + * (``Annotated[Offset | Cursor, Field(discriminator=...)]``) because + * the OpenAPI ``oneOf`` + ``discriminator`` construct is not handled + * correctly by ``@hey-api/openapi-ts`` / ``@7nohe/openapi-react-query-codegen``: + * return types degrade to ``unknown`` in JSDoc and can produce + * incorrect TypeScript types (see hey-api/openapi-ts#1613, #3270). */ export type TaskInstanceCollectionResponse = { task_instances: Array; - total_entries: number; + /** + * Total number of matching items. Populated for offset pagination, ``null`` when using cursor pagination. + */ + total_entries?: number | null; + /** + * Token pointing to the next page. Populated for cursor pagination, ``null`` when using offset pagination or when there is no next page. + */ + next_cursor?: string | null; + /** + * Token pointing to the previous page. Populated for cursor pagination, ``null`` when using offset pagination or when on the first page. + */ + previous_cursor?: string | null; }; /** @@ -3069,6 +3087,10 @@ export type PatchTaskInstanceByMapIndexData = { export type PatchTaskInstanceByMapIndexResponse = TaskInstanceCollectionResponse; export type GetTaskInstancesData = { + /** + * Cursor for keyset-based pagination (mutually exclusive with offset). Pass an empty string for the first page, then use ``next_cursor`` from the response. + */ + cursor?: string | null; dagId: string; /** * SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`). Regular expressions are **not** supported. diff --git a/airflow-core/src/airflow/ui/src/components/ActionAccordion/ActionAccordion.tsx b/airflow-core/src/airflow/ui/src/components/ActionAccordion/ActionAccordion.tsx index 514d93799d52f..0b8920b8c5a77 100644 --- a/airflow-core/src/airflow/ui/src/components/ActionAccordion/ActionAccordion.tsx +++ b/airflow-core/src/airflow/ui/src/components/ActionAccordion/ActionAccordion.tsx @@ -99,7 +99,7 @@ const ActionAccordion = ({ affectedTasks, groupByRunId = false, note, setNote }: {translate("dags:runAndTaskActions.affectedTasks.title", { - count: affectedTasks.total_entries, + count: affectedTasks.total_entries ?? 0, })} diff --git a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx index 5dc09c1f42141..0b5beb490ef5c 100644 --- a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx +++ b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx @@ -117,7 +117,7 @@ export const ClearGroupTaskInstanceDialog = ({ onClose, open, taskInstance }: Pr {translate("dags:runAndTaskActions.clear.title", { - type: translate("taskInstance", { count: affectedTasks.total_entries }), + type: translate("taskInstance", { count: affectedTasks.total_entries ?? 0 }), })} : {" "} diff --git a/airflow-core/src/airflow/ui/src/pages/Dag/Overview/Overview.tsx b/airflow-core/src/airflow/ui/src/pages/Dag/Overview/Overview.tsx index 7ea155fe84761..b284777b351ac 100644 --- a/airflow-core/src/airflow/ui/src/pages/Dag/Overview/Overview.tsx +++ b/airflow-core/src/airflow/ui/src/pages/Dag/Overview/Overview.tsx @@ -60,6 +60,8 @@ export const Overview = () => { state: ["failed"], }); + const failedTaskCount = failedTasks?.total_entries ?? 0; + const [limit] = useLocalStorage(dagRunsLimitKey(dagId ?? ""), 10); const { data: failedRuns, isLoading: isLoadingFailedRuns } = useDagRunServiceGetDagRuns({ dagId: dagId ?? "", @@ -94,14 +96,14 @@ export const Overview = () => { ({ timestamp: ti.start_date ?? ti.logical_date, }))} isLoading={isLoading} - label={translate("overview.buttons.failedTask", { count: failedTasks?.total_entries ?? 0 })} + label={translate("overview.buttons.failedTask", { count: failedTaskCount })} route={{ pathname: "tasks", search: `${SearchParamsKeys.STATE}=failed`, diff --git a/airflow-core/src/airflow/ui/src/pages/Task/Overview/Overview.tsx b/airflow-core/src/airflow/ui/src/pages/Task/Overview/Overview.tsx index 164b34c02d7ba..e9fc510ff6f93 100644 --- a/airflow-core/src/airflow/ui/src/pages/Task/Overview/Overview.tsx +++ b/airflow-core/src/airflow/ui/src/pages/Task/Overview/Overview.tsx @@ -42,7 +42,7 @@ export const Overview = () => { const refetchInterval = useAutoRefresh({}); - const { data: failedTaskInstances, isLoading: isFailedTaskInstancesLoading } = + const { data: failedTaskInstancesData, isLoading: isFailedTaskInstancesLoading } = useTaskInstanceServiceGetTaskInstances({ dagId, dagRunId: "~", @@ -54,6 +54,8 @@ export const Overview = () => { taskId: Boolean(groupId) ? undefined : taskId, }); + const failedTaskCount = failedTaskInstancesData?.total_entries ?? 0; + const { data: tiData, isLoading: isLoadingTaskInstances } = useTaskInstanceServiceGetTaskInstances( { dagId, @@ -84,15 +86,15 @@ export const Overview = () => { ({ + events={(failedTaskInstancesData?.task_instances ?? []).map((ti) => ({ timestamp: ti.start_date ?? ti.logical_date, }))} isLoading={isFailedTaskInstancesLoading} label={translate("overview.buttons.failedTaskInstance", { - count: failedTaskInstances?.total_entries ?? 0, + count: failedTaskCount, })} route={{ pathname: "task_instances", diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx b/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx index e066436afd0cf..a796ae23ce495 100644 --- a/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx +++ b/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx @@ -360,7 +360,7 @@ export const TaskInstances = () => { isLoading={isLoading} modelName="common:taskInstance" onStateChange={setTableURLState} - total={data?.total_entries} + total={data?.total_entries ?? undefined} /> diff --git a/airflow-core/tests/unit/api_fastapi/common/test_cursors.py b/airflow-core/tests/unit/api_fastapi/common/test_cursors.py new file mode 100644 index 0000000000000..b863de9eb6f3e --- /dev/null +++ b/airflow-core/tests/unit/api_fastapi/common/test_cursors.py @@ -0,0 +1,144 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import base64 +import uuid +from datetime import datetime, timezone +from unittest.mock import MagicMock + +import msgspec +import pytest +from fastapi import HTTPException +from sqlalchemy import select + +from airflow.api_fastapi.common.cursors import apply_cursor_filter, decode_cursor, encode_cursor +from airflow.api_fastapi.common.parameters import SortParam +from airflow.models.taskinstance import TaskInstance + + +def _msgpack_cursor_token(payload: object) -> str: + """Match production: msgpack + base64url without padding.""" + return base64.urlsafe_b64encode(msgspec.msgpack.encode(payload)).decode("ascii").rstrip("=") + + +class TestCursorPagination: + """Tests for cursor-based pagination helpers.""" + + def _make_sort_param_with_resolved_columns(self, order_by_values=None): + """Build a SortParam for TaskInstance and resolve its columns.""" + sp = SortParam(["id", "start_date", "map_index"], TaskInstance) + sp.set_value(order_by_values or ["map_index"]) + sp.to_orm(select(TaskInstance)) + return sp + + def test_encode_decode_cursor_roundtrip(self): + sp = self._make_sort_param_with_resolved_columns(["start_date"]) + row = MagicMock(spec=["start_date", "id"]) + row.start_date = "2024-01-15T10:00:00+00:00" + row.id = "019462ab-1234-5678-9abc-def012345678" + + token = encode_cursor(row, sp) + decoded = decode_cursor(token) + + assert decoded == [ + "2024-01-15T10:00:00+00:00", + "019462ab-1234-5678-9abc-def012345678", + ] + + def test_decode_cursor_invalid_base64(self): + with pytest.raises(HTTPException, match="Invalid cursor token"): + decode_cursor("not-valid-base64!!!") + + def test_decode_cursor_invalid_msgpack(self): + token = base64.urlsafe_b64encode(b"not-msgpack").decode().rstrip("=") + with pytest.raises(HTTPException, match="Invalid cursor token"): + decode_cursor(token) + + def test_decode_cursor_not_a_list(self): + token = _msgpack_cursor_token({"wrong": "type"}) + with pytest.raises(HTTPException, match="Invalid cursor token structure"): + decode_cursor(token) + + def test_encode_cursor_works_without_prior_to_orm(self): + """get_resolved_columns now lazily resolves, so to_orm is no longer required before encode.""" + sp = SortParam(["id"], TaskInstance) + sp.set_value(["id"]) + row = MagicMock(spec=["id"]) + row.id = "019462ab-1234-5678-9abc-def012345678" + token = encode_cursor(row, sp) + decoded = decode_cursor(token) + assert decoded == ["019462ab-1234-5678-9abc-def012345678"] + + def test_apply_cursor_filter_wrong_value_count(self): + sp = self._make_sort_param_with_resolved_columns(["start_date"]) + token = _msgpack_cursor_token(["only-one-value"]) + + with pytest.raises(HTTPException, match="does not match"): + apply_cursor_filter(select(TaskInstance), token, sp) + + def test_apply_cursor_filter_ascending(self): + sp = self._make_sort_param_with_resolved_columns(["start_date"]) + values = [ + datetime(2024, 1, 15, 10, 0, 0, tzinfo=timezone.utc), + uuid.UUID("019462ab-1234-5678-9abc-def012345678"), + ] + token = _msgpack_cursor_token(values) + + stmt = apply_cursor_filter(select(TaskInstance), token, sp) + sql = str(stmt) + assert ">" in sql + + def test_apply_cursor_filter_descending(self): + sp = self._make_sort_param_with_resolved_columns(["-start_date"]) + values = [ + datetime(2024, 1, 15, 10, 0, 0, tzinfo=timezone.utc), + uuid.UUID("019462ab-1234-5678-9abc-def012345678"), + ] + token = _msgpack_cursor_token(values) + + stmt = apply_cursor_filter(select(TaskInstance), token, sp) + sql = str(stmt) + assert "<" in sql + + def test_sort_param_get_resolved_columns(self): + sp = self._make_sort_param_with_resolved_columns(["start_date"]) + resolved = sp.get_resolved_columns() + + assert len(resolved) == 2 + assert resolved[0][0] == "start_date" + assert resolved[0][2] is False + assert resolved[1][0] == "id" + assert resolved[1][2] is False + + def test_sort_param_get_resolved_columns_descending(self): + sp = self._make_sort_param_with_resolved_columns(["-start_date"]) + resolved = sp.get_resolved_columns() + + assert len(resolved) == 2 + assert resolved[0][0] == "start_date" + assert resolved[0][2] is True + assert resolved[1][0] == "id" + assert resolved[1][2] is True + + def test_sort_param_pk_not_duplicated_when_sorting_by_id(self): + sp = self._make_sort_param_with_resolved_columns(["id"]) + resolved = sp.get_resolved_columns() + + assert len(resolved) == 1 + assert resolved[0][0] == "id" diff --git a/airflow-core/tests/unit/api_fastapi/common/test_parameters.py b/airflow-core/tests/unit/api_fastapi/common/test_parameters.py index 5ad860477d151..40076137467ce 100644 --- a/airflow-core/tests/unit/api_fastapi/common/test_parameters.py +++ b/airflow-core/tests/unit/api_fastapi/common/test_parameters.py @@ -24,7 +24,12 @@ from fastapi import Depends, FastAPI, HTTPException from sqlalchemy import select -from airflow.api_fastapi.common.parameters import FilterParam, SortParam, _SearchParam, filter_param_factory +from airflow.api_fastapi.common.parameters import ( + FilterParam, + SortParam, + _SearchParam, + filter_param_factory, +) from airflow.models import DagModel, DagRun, Log diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index 00abc961b821e..5c115b8abbe88 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -1717,6 +1717,98 @@ def test_should_respond_200_for_pagination(self, test_client, session): assert (num_entries_batch1 + num_entries_batch2) == ti_count assert response_batch1 != response_batch2 + def test_cursor_pagination_first_page(self, test_client, session): + """First page with cursor='' returns cursor response without needing a real token.""" + dag_id = "example_python_operator" + self.create_task_instances( + session, + task_instances=[ + {"start_date": DEFAULT_DATETIME_1 + dt.timedelta(minutes=(i + 1))} for i in range(5) + ], + dag_id=dag_id, + ) + response = test_client.get( + "/dags/~/dagRuns/~/taskInstances", + params={"limit": 3, "order_by": ["map_index"], "cursor": ""}, + ) + assert response.status_code == 200, response.json() + body = response.json() + assert body["next_cursor"] is not None + assert body["previous_cursor"] is not None + assert body["total_entries"] is None + assert len(body["task_instances"]) == 3 + + def test_cursor_pagination_returns_cursor_response(self, test_client, session): + """When cursor param is provided, response has cursor fields and no total_entries.""" + dag_id = "example_python_operator" + self.create_task_instances( + session, + task_instances=[ + {"start_date": DEFAULT_DATETIME_1 + dt.timedelta(minutes=(i + 1))} for i in range(5) + ], + dag_id=dag_id, + ) + # First page in cursor mode (empty cursor) + response1 = test_client.get( + "/dags/~/dagRuns/~/taskInstances", + params={"limit": 3, "order_by": ["map_index"], "cursor": ""}, + ) + assert response1.status_code == 200 + body1 = response1.json() + assert body1["total_entries"] is None + assert len(body1["task_instances"]) == 3 + next_cursor = body1["next_cursor"] + assert next_cursor is not None + + # Second page using next_cursor from first page + response2 = test_client.get( + "/dags/~/dagRuns/~/taskInstances", + params={"limit": 100, "cursor": next_cursor, "order_by": ["map_index"]}, + ) + assert response2.status_code == 200 + body2 = response2.json() + assert body2["next_cursor"] is not None + assert body2["previous_cursor"] is not None + assert body2["total_entries"] is None + + def test_cursor_pagination_pages_do_not_overlap(self, test_client, session): + """Cursor-driven pages partition the result set without overlap.""" + dag_id = "example_python_operator" + self.create_task_instances( + session, + task_instances=[ + {"start_date": DEFAULT_DATETIME_1 + dt.timedelta(minutes=(i + 1))} for i in range(10) + ], + dag_id=dag_id, + ) + page_size = 4 + all_ids: list[str] = [] + cursor_token = "" + + for _ in range(5): + params: dict = {"limit": page_size, "order_by": ["map_index"], "cursor": cursor_token} + response = test_client.get("/dags/~/dagRuns/~/taskInstances", params=params) + assert response.status_code == 200, response.json() + body = response.json() + assert body["total_entries"] is None + page_ids = [ti["id"] for ti in body["task_instances"]] + all_ids.extend(page_ids) + + cursor_token = body.get("next_cursor") + if cursor_token is None: + break + + assert len(all_ids) == len(set(all_ids)), "Cursor pages should not have overlapping items" + + def test_cursor_pagination_invalid_token(self, test_client, session): + """Invalid cursor token returns 400.""" + self.create_task_instances(session) + response = test_client.get( + "/dags/~/dagRuns/~/taskInstances", + params={"cursor": "this-is-not-valid", "order_by": ["map_index"]}, + ) + assert response.status_code == 400 + def test_task_group_filter_uses_run_version_not_latest(self, test_client, dag_maker, session): """ Task group lookup should use the DAG version from the run, not the latest version. @@ -4247,6 +4339,8 @@ def test_should_call_mocked_api(self, mock_set_ti_state, test_client, session): } ], "total_entries": 1, + "next_cursor": None, + "previous_cursor": None, } mock_set_ti_state.assert_called_once_with( @@ -4521,6 +4615,8 @@ def test_should_raise_422_for_invalid_task_instance_state(self, payload, expecte } ], "total_entries": 1, + "next_cursor": None, + "previous_cursor": None, }, 1, ), @@ -4657,6 +4753,8 @@ def test_update_mask_set_note_should_respond_200( } ], "total_entries": 1, + "next_cursor": None, + "previous_cursor": None, } _check_task_instance_note(session, response_data["task_instances"][0]["id"], ti_note_data) @@ -4718,6 +4816,8 @@ def test_set_note_should_respond_200(self, test_client, session): } ], "total_entries": 1, + "next_cursor": None, + "previous_cursor": None, } _check_task_instance_note( @@ -4797,6 +4897,8 @@ def test_set_note_should_respond_200_mapped_task_with_rtif(self, test_client, se } ], "total_entries": 1, + "next_cursor": None, + "previous_cursor": None, } _check_task_instance_note( @@ -4994,6 +5096,8 @@ def test_should_call_mocked_api(self, mock_set_ti_state, test_client, session): } ], "total_entries": 1, + "next_cursor": None, + "previous_cursor": None, } mock_set_ti_state.assert_called_once_with( @@ -5280,6 +5384,8 @@ def test_should_raise_422_for_invalid_task_instance_state(self, payload, expecte } ], "total_entries": 1, + "next_cursor": None, + "previous_cursor": None, }, 1, ), @@ -5358,7 +5464,12 @@ def test_should_return_empty_list_for_updating_same_task_instance_state( }, ) assert response.status_code == 200 - assert response.json() == {"task_instances": [], "total_entries": 0} + assert response.json() == { + "task_instances": [], + "total_entries": 0, + "next_cursor": None, + "previous_cursor": None, + } class TestDeleteTaskInstance(TestTaskInstanceEndpoint): diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index a0b32b7fbdd39..49abbba069efb 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -2041,11 +2041,38 @@ class TaskCollectionResponse(BaseModel): class TaskInstanceCollectionResponse(BaseModel): """ - Task Instance Collection serializer for responses. + Task instance collection response supporting both offset and cursor pagination. + + A single flat model is used instead of a discriminated union + (``Annotated[Offset | Cursor, Field(discriminator=...)]``) because + the OpenAPI ``oneOf`` + ``discriminator`` construct is not handled + correctly by ``@hey-api/openapi-ts`` / ``@7nohe/openapi-react-query-codegen``: + return types degrade to ``unknown`` in JSDoc and can produce + incorrect TypeScript types (see hey-api/openapi-ts#1613, #3270). """ task_instances: Annotated[list[TaskInstanceResponse], Field(title="Task Instances")] - total_entries: Annotated[int, Field(title="Total Entries")] + total_entries: Annotated[ + int | None, + Field( + description="Total number of matching items. Populated for offset pagination, ``null`` when using cursor pagination.", + title="Total Entries", + ), + ] = None + next_cursor: Annotated[ + str | None, + Field( + description="Token pointing to the next page. Populated for cursor pagination, ``null`` when using offset pagination or when there is no next page.", + title="Next Cursor", + ), + ] = None + previous_cursor: Annotated[ + str | None, + Field( + description="Token pointing to the previous page. Populated for cursor pagination, ``null`` when using offset pagination or when on the first page.", + title="Previous Cursor", + ), + ] = None class TaskInstanceHistoryCollectionResponse(BaseModel):