-
Notifications
You must be signed in to change notification settings - Fork 16.8k
Add cursor based pagination for get_task_instances endpoint #64845
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
pierrejeambrun
wants to merge
8
commits into
apache:main
Choose a base branch
from
astronomer:feature/cursor-pagination-task-instances
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
fea992d
Add cursor-based pagination to get_task_instances endpoint
pierrejeambrun a2206f1
Simplify cursor token and support first page without sentinel
pierrejeambrun edcefc1
Small adjustments
pierrejeambrun d11e2a5
Adjustments
pierrejeambrun 2df0f71
Narrow endpoint return types and encode cursor value types
pierrejeambrun 92abebb
Use msgpack for cursor tokens and nested keyset predicate
pierrejeambrun 38f1043
Flatten TaskInstanceCollectionRes
pierrejeambrun d55e4f2
Fix UI
pierrejeambrun File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,143 @@ | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
| """ | ||
| Cursor-based (keyset) pagination helpers. | ||
|
|
||
| :meta private: | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import base64 | ||
| import uuid as uuid_mod | ||
| from typing import Any | ||
|
|
||
| import msgspec | ||
| from fastapi import HTTPException, status | ||
| from sqlalchemy import and_, or_ | ||
| from sqlalchemy.sql import Select | ||
| from sqlalchemy.sql.elements import ColumnElement | ||
| from sqlalchemy.sql.sqltypes import Uuid | ||
|
|
||
| from airflow.api_fastapi.common.parameters import SortParam | ||
|
|
||
|
|
||
| def _b64url_decode_padded(token: str) -> bytes: | ||
| padding = 4 - (len(token) % 4) | ||
| if padding != 4: | ||
| token = token + ("=" * padding) | ||
| return base64.urlsafe_b64decode(token.encode("ascii")) | ||
|
|
||
|
|
||
| def _nonstrict_bound(col: ColumnElement, value: Any, is_desc: bool) -> ColumnElement[bool]: | ||
| """Inclusive range edge on the leading column at each nesting level (``>=`` / ``<=``).""" | ||
| return col <= value if is_desc else col >= value | ||
|
|
||
|
|
||
| def _strict_bound(col: ColumnElement, value: Any, is_desc: bool) -> ColumnElement[bool]: | ||
| """Strict inequality for ``or_`` branches (``<`` / ``>``).""" | ||
| return col < value if is_desc else col > value | ||
|
|
||
|
|
||
| def _nested_keyset_predicate( | ||
| resolved: list[tuple[str, ColumnElement, bool]], values: list[Any] | ||
| ) -> ColumnElement[bool]: | ||
| """ | ||
| Keyset predicate for rows strictly after the cursor in ``ORDER BY`` order. | ||
|
|
||
| Uses nested ``and_(non-strict, or_(strict, ...))`` so leading sort keys use | ||
| inclusive range bounds and inner branches use strict inequalities—friendly | ||
| for composite index range scans. Logically equivalent to an OR-of-prefix- | ||
| equalities formulation. | ||
| """ | ||
| n = len(resolved) | ||
| _, col, is_desc = resolved[n - 1] | ||
| inner: ColumnElement[bool] = _strict_bound(col, values[n - 1], is_desc) | ||
| for i in range(n - 2, -1, -1): | ||
| _, col_i, is_desc_i = resolved[i] | ||
| inner = and_( | ||
| _nonstrict_bound(col_i, values[i], is_desc_i), | ||
| or_(_strict_bound(col_i, values[i], is_desc_i), inner), | ||
| ) | ||
| return inner | ||
|
|
||
|
|
||
| def _coerce_value(column: ColumnElement, value: Any) -> Any: | ||
| """Normalize decoded values for SQL bind parameters (e.g. UUID columns).""" | ||
| if value is None or not isinstance(value, str): | ||
| return value | ||
| ctype = getattr(column, "type", None) | ||
| if isinstance(ctype, Uuid): | ||
| try: | ||
| return uuid_mod.UUID(value) | ||
| except ValueError: | ||
| return value | ||
| return value | ||
|
|
||
|
|
||
| def encode_cursor(row: Any, sort_param: SortParam) -> str: | ||
| """ | ||
| Encode cursor token from the boundary row of a result set. | ||
|
|
||
| The token is a url-safe base64 encoding of a MessagePack list of sort-key | ||
| values (no padding ``=``), so the cursor is compact and safe in query strings. | ||
| Binary msgpack is not URL-safe by itself, so base64 is still required. | ||
| """ | ||
| resolved = sort_param.get_resolved_columns() | ||
| if not resolved: | ||
| raise ValueError("SortParam has no resolved columns.") | ||
|
|
||
| parts = [getattr(row, attr_name, None) for attr_name, _col, _desc in resolved] | ||
| payload = msgspec.msgpack.encode(parts) | ||
| return base64.urlsafe_b64encode(payload).decode("ascii").rstrip("=") | ||
|
|
||
|
|
||
| def decode_cursor(token: str) -> list[Any]: | ||
| """Decode a cursor token to the list of sort-key values.""" | ||
| try: | ||
| raw = _b64url_decode_padded(token) | ||
| except Exception: | ||
| raise HTTPException(status.HTTP_400_BAD_REQUEST, "Invalid cursor token") | ||
|
|
||
| try: | ||
| data: Any = msgspec.msgpack.decode(raw) | ||
| except Exception: | ||
| raise HTTPException(status.HTTP_400_BAD_REQUEST, "Invalid cursor token") | ||
|
|
||
| if not isinstance(data, list): | ||
| raise HTTPException(status.HTTP_400_BAD_REQUEST, "Invalid cursor token structure") | ||
|
|
||
| return data | ||
|
|
||
|
|
||
| def apply_cursor_filter(statement: Select, cursor: str, sort_param: SortParam) -> Select: | ||
| """ | ||
| Apply a keyset pagination WHERE clause from a cursor token. | ||
|
|
||
| Uses nested ``and_(col <=/>= v, or_(col </> v, ...))`` so each leading sort | ||
| key carries a range-friendly non-strict bound, with strict inequalities on | ||
| the ``or_`` branches—aligned with common composite index range scans. | ||
| """ | ||
| raw_values = decode_cursor(cursor) | ||
|
|
||
| resolved = sort_param.get_resolved_columns() | ||
| if len(raw_values) != len(resolved): | ||
| raise HTTPException(status.HTTP_400_BAD_REQUEST, "Cursor token does not match current query shape") | ||
|
|
||
| parsed_values = [_coerce_value(col, val) for (_, col, _), val in zip(resolved, raw_values, strict=True)] | ||
|
|
||
| return statement.where(_nested_keyset_predicate(resolved, parsed_values)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah nice. I was about to say "so does this break compat" but you've covered it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, backward comp is handled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well with flattening the models 'total_entires' field can be "None | int" now, not only 'int'. Technically that's breaking, do you think it's fine, should we add a newsfragment ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me have a play and see if we can get discrimination working again