Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,30 @@

from __future__ import annotations

from collections.abc import Iterable
from datetime import datetime
from uuid import UUID

from pydantic import AliasPath, Field

from airflow.api_fastapi.core_api.base import BaseModel


class DeadlineResponse(BaseModel):
"""Deadline data for the DAG run deadlines tab."""
"""Deadline serializer for responses."""

id: UUID
deadline_time: datetime
missed: bool
created_at: datetime
alert_name: str | None = None
alert_description: str | None = None
alert_name: str | None = Field(validation_alias=AliasPath("deadline_alert", "name"), default=None)
alert_description: str | None = Field(
validation_alias=AliasPath("deadline_alert", "description"), default=None
)


class DealineCollectionResponse(BaseModel):
"""Deadline Collection serializer for responses."""

deadlines: Iterable[DeadlineResponse]
total_entries: int
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ paths:
security:
- OAuth2PasswordBearer: []
- HTTPBearer: []
/ui/deadlines/{dag_id}/{run_id}:
/ui/dags/{dag_id}/dagRuns/{dag_run_id}/deadlines:
get:
tags:
- Deadlines
Expand All @@ -495,22 +495,51 @@ paths:
schema:
type: string
title: Dag Id
- name: run_id
- name: dag_run_id
in: path
required: true
schema:
type: string
title: Run Id
title: Dag Run Id
- name: limit
in: query
required: false
schema:
type: integer
minimum: 0
default: 50
title: Limit
- name: offset
in: query
required: false
schema:
type: integer
minimum: 0
default: 0
title: Offset
- name: order_by
in: query
required: false
schema:
type: array
items:
type: string
description: 'Attributes to order by, multi criteria sort is supported.
Prefix with `-` for descending order. Supported attributes: `id, deadline_time,
created_at, alert_name`'
default:
- deadline_time
title: Order By
description: 'Attributes to order by, multi criteria sort is supported. Prefix
with `-` for descending order. Supported attributes: `id, deadline_time,
created_at, alert_name`'
responses:
'200':
description: Successful Response
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/DeadlineResponse'
title: Response Get Dag Run Deadlines
$ref: '#/components/schemas/DealineCollectionResponse'
'404':
content:
application/json:
Expand Down Expand Up @@ -2004,7 +2033,23 @@ components:
- missed
- created_at
title: DeadlineResponse
description: Deadline data for the DAG run deadlines tab.
description: Deadline serializer for responses.
DealineCollectionResponse:
properties:
deadlines:
items:
$ref: '#/components/schemas/DeadlineResponse'
type: array
title: Deadlines
total_entries:
type: integer
title: Total Entries
type: object
required:
- deadlines
- total_entries
title: DealineCollectionResponse
description: Deadline Collection serializer for responses.
EdgeResponse:
properties:
source_id:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,28 @@

from __future__ import annotations

from typing import Annotated

from fastapi import Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.orm import joinedload

from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity
from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset, SortParam
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.ui.deadline import DeadlineResponse
from airflow.api_fastapi.core_api.datamodels.ui.deadline import DealineCollectionResponse
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.security import requires_access_dag
from airflow.models.dagrun import DagRun
from airflow.models.deadline import Deadline
from airflow.models.deadline_alert import DeadlineAlert

deadlines_router = AirflowRouter(prefix="/deadlines", tags=["Deadlines"])
deadlines_router = AirflowRouter(prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/deadlines", tags=["Deadlines"])


@deadlines_router.get(
"/{dag_id}/{run_id}",
"",
responses=create_openapi_http_exception_doc(
[
status.HTTP_404_NOT_FOUND,
Expand All @@ -51,36 +55,50 @@
)
def get_dag_run_deadlines(
dag_id: str,
run_id: str,
dag_run_id: str,
session: SessionDep,
) -> list[DeadlineResponse]:
limit: QueryLimit,
offset: QueryOffset,
order_by: Annotated[
SortParam,
Depends(
SortParam(
["id", "deadline_time", "created_at"],
Deadline,
to_replace={
"alert_name": DeadlineAlert.name,
},
).dynamic_depends(default="deadline_time")
),
],
) -> DealineCollectionResponse:
"""Get all deadlines for a specific DAG run."""
dag_run = session.scalar(select(DagRun).where(DagRun.dag_id == dag_id, DagRun.run_id == run_id))
dag_run = session.scalar(select(DagRun).where(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id))

if not dag_run:
raise HTTPException(
status.HTTP_404_NOT_FOUND,
f"No DAG run found for dag_id={dag_id} run_id={run_id}",
f"No DAG run found for dag_id={dag_id} dag_run_id={dag_run_id}",
)

deadlines = (
session.scalars(
select(Deadline)
.where(Deadline.dagrun_id == dag_run.id)
.options(joinedload(Deadline.deadline_alert))
.order_by(Deadline.deadline_time.asc())
)
.unique()
.all()
query = (
select(Deadline)
.join(Deadline.dagrun)
.outerjoin(Deadline.deadline_alert)
.where(Deadline.dagrun_id == dag_run.id)
.where(DagRun.dag_id == dag_id)
.options(joinedload(Deadline.deadline_alert))
)

return [
DeadlineResponse(
id=d.id,
deadline_time=d.deadline_time,
missed=d.missed,
created_at=d.created_at,
alert_name=d.deadline_alert.name if d.deadline_alert else None,
alert_description=d.deadline_alert.description if d.deadline_alert else None,
)
for d in deadlines
]
deadlines_select, total_entries = paginated_select(
statement=query,
filters=None,
order_by=order_by,
offset=offset,
limit=limit,
session=session,
)

deadlines = session.scalars(deadlines_select)

return DealineCollectionResponse(deadlines=deadlines, total_entries=total_entries)
9 changes: 6 additions & 3 deletions airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -810,10 +810,13 @@ export const UseDashboardServiceDagStatsKeyFn = (queryKey?: Array<unknown>) => [
export type DeadlinesServiceGetDagRunDeadlinesDefaultResponse = Awaited<ReturnType<typeof DeadlinesService.getDagRunDeadlines>>;
export type DeadlinesServiceGetDagRunDeadlinesQueryResult<TData = DeadlinesServiceGetDagRunDeadlinesDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useDeadlinesServiceGetDagRunDeadlinesKey = "DeadlinesServiceGetDagRunDeadlines";
export const UseDeadlinesServiceGetDagRunDeadlinesKeyFn = ({ dagId, runId }: {
export const UseDeadlinesServiceGetDagRunDeadlinesKeyFn = ({ dagId, dagRunId, limit, offset, orderBy }: {
dagId: string;
runId: string;
}, queryKey?: Array<unknown>) => [useDeadlinesServiceGetDagRunDeadlinesKey, ...(queryKey ?? [{ dagId, runId }])];
dagRunId: string;
limit?: number;
offset?: number;
orderBy?: string[];
}, queryKey?: Array<unknown>) => [useDeadlinesServiceGetDagRunDeadlinesKey, ...(queryKey ?? [{ dagId, dagRunId, limit, offset, orderBy }])];
export type StructureServiceStructureDataDefaultResponse = Awaited<ReturnType<typeof StructureService.structureData>>;
export type StructureServiceStructureDataQueryResult<TData = StructureServiceStructureDataDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useStructureServiceStructureDataKey = "StructureServiceStructureData";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1536,14 +1536,20 @@ export const ensureUseDashboardServiceDagStatsData = (queryClient: QueryClient)
* Get all deadlines for a specific DAG run.
* @param data The data for the request.
* @param data.dagId
* @param data.runId
* @returns DeadlineResponse Successful Response
* @param data.dagRunId
* @param data.limit
* @param data.offset
* @param data.orderBy Attributes to order by, multi criteria sort is supported. Prefix with `-` for descending order. Supported attributes: `id, deadline_time, created_at, alert_name`
* @returns DealineCollectionResponse Successful Response
* @throws ApiError
*/
export const ensureUseDeadlinesServiceGetDagRunDeadlinesData = (queryClient: QueryClient, { dagId, runId }: {
export const ensureUseDeadlinesServiceGetDagRunDeadlinesData = (queryClient: QueryClient, { dagId, dagRunId, limit, offset, orderBy }: {
dagId: string;
runId: string;
}) => queryClient.ensureQueryData({ queryKey: Common.UseDeadlinesServiceGetDagRunDeadlinesKeyFn({ dagId, runId }), queryFn: () => DeadlinesService.getDagRunDeadlines({ dagId, runId }) });
dagRunId: string;
limit?: number;
offset?: number;
orderBy?: string[];
}) => queryClient.ensureQueryData({ queryKey: Common.UseDeadlinesServiceGetDagRunDeadlinesKeyFn({ dagId, dagRunId, limit, offset, orderBy }), queryFn: () => DeadlinesService.getDagRunDeadlines({ dagId, dagRunId, limit, offset, orderBy }) });
/**
* Structure Data
* Get Structure Data.
Expand Down
16 changes: 11 additions & 5 deletions airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1536,14 +1536,20 @@ export const prefetchUseDashboardServiceDagStats = (queryClient: QueryClient) =>
* Get all deadlines for a specific DAG run.
* @param data The data for the request.
* @param data.dagId
* @param data.runId
* @returns DeadlineResponse Successful Response
* @param data.dagRunId
* @param data.limit
* @param data.offset
* @param data.orderBy Attributes to order by, multi criteria sort is supported. Prefix with `-` for descending order. Supported attributes: `id, deadline_time, created_at, alert_name`
* @returns DealineCollectionResponse Successful Response
* @throws ApiError
*/
export const prefetchUseDeadlinesServiceGetDagRunDeadlines = (queryClient: QueryClient, { dagId, runId }: {
export const prefetchUseDeadlinesServiceGetDagRunDeadlines = (queryClient: QueryClient, { dagId, dagRunId, limit, offset, orderBy }: {
dagId: string;
runId: string;
}) => queryClient.prefetchQuery({ queryKey: Common.UseDeadlinesServiceGetDagRunDeadlinesKeyFn({ dagId, runId }), queryFn: () => DeadlinesService.getDagRunDeadlines({ dagId, runId }) });
dagRunId: string;
limit?: number;
offset?: number;
orderBy?: string[];
}) => queryClient.prefetchQuery({ queryKey: Common.UseDeadlinesServiceGetDagRunDeadlinesKeyFn({ dagId, dagRunId, limit, offset, orderBy }), queryFn: () => DeadlinesService.getDagRunDeadlines({ dagId, dagRunId, limit, offset, orderBy }) });
/**
* Structure Data
* Get Structure Data.
Expand Down
16 changes: 11 additions & 5 deletions airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1536,14 +1536,20 @@ export const useDashboardServiceDagStats = <TData = Common.DashboardServiceDagSt
* Get all deadlines for a specific DAG run.
* @param data The data for the request.
* @param data.dagId
* @param data.runId
* @returns DeadlineResponse Successful Response
* @param data.dagRunId
* @param data.limit
* @param data.offset
* @param data.orderBy Attributes to order by, multi criteria sort is supported. Prefix with `-` for descending order. Supported attributes: `id, deadline_time, created_at, alert_name`
* @returns DealineCollectionResponse Successful Response
* @throws ApiError
*/
export const useDeadlinesServiceGetDagRunDeadlines = <TData = Common.DeadlinesServiceGetDagRunDeadlinesDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, runId }: {
export const useDeadlinesServiceGetDagRunDeadlines = <TData = Common.DeadlinesServiceGetDagRunDeadlinesDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId, limit, offset, orderBy }: {
dagId: string;
runId: string;
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: Common.UseDeadlinesServiceGetDagRunDeadlinesKeyFn({ dagId, runId }, queryKey), queryFn: () => DeadlinesService.getDagRunDeadlines({ dagId, runId }) as TData, ...options });
dagRunId: string;
limit?: number;
offset?: number;
orderBy?: string[];
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: Common.UseDeadlinesServiceGetDagRunDeadlinesKeyFn({ dagId, dagRunId, limit, offset, orderBy }, queryKey), queryFn: () => DeadlinesService.getDagRunDeadlines({ dagId, dagRunId, limit, offset, orderBy }) as TData, ...options });
/**
* Structure Data
* Get Structure Data.
Expand Down
16 changes: 11 additions & 5 deletions airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1536,14 +1536,20 @@ export const useDashboardServiceDagStatsSuspense = <TData = Common.DashboardServ
* Get all deadlines for a specific DAG run.
* @param data The data for the request.
* @param data.dagId
* @param data.runId
* @returns DeadlineResponse Successful Response
* @param data.dagRunId
* @param data.limit
* @param data.offset
* @param data.orderBy Attributes to order by, multi criteria sort is supported. Prefix with `-` for descending order. Supported attributes: `id, deadline_time, created_at, alert_name`
* @returns DealineCollectionResponse Successful Response
* @throws ApiError
*/
export const useDeadlinesServiceGetDagRunDeadlinesSuspense = <TData = Common.DeadlinesServiceGetDagRunDeadlinesDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, runId }: {
export const useDeadlinesServiceGetDagRunDeadlinesSuspense = <TData = Common.DeadlinesServiceGetDagRunDeadlinesDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId, limit, offset, orderBy }: {
dagId: string;
runId: string;
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: Common.UseDeadlinesServiceGetDagRunDeadlinesKeyFn({ dagId, runId }, queryKey), queryFn: () => DeadlinesService.getDagRunDeadlines({ dagId, runId }) as TData, ...options });
dagRunId: string;
limit?: number;
offset?: number;
orderBy?: string[];
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: Common.UseDeadlinesServiceGetDagRunDeadlinesKeyFn({ dagId, dagRunId, limit, offset, orderBy }, queryKey), queryFn: () => DeadlinesService.getDagRunDeadlines({ dagId, dagRunId, limit, offset, orderBy }) as TData, ...options });
/**
* Structure Data
* Get Structure Data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7867,7 +7867,27 @@ export const $DeadlineResponse = {
type: 'object',
required: ['id', 'deadline_time', 'missed', 'created_at'],
title: 'DeadlineResponse',
description: 'Deadline data for the DAG run deadlines tab.'
description: 'Deadline serializer for responses.'
} as const;

export const $DealineCollectionResponse = {
properties: {
deadlines: {
items: {
'$ref': '#/components/schemas/DeadlineResponse'
},
type: 'array',
title: 'Deadlines'
},
total_entries: {
type: 'integer',
title: 'Total Entries'
}
},
type: 'object',
required: ['deadlines', 'total_entries'],
title: 'DealineCollectionResponse',
description: 'Deadline Collection serializer for responses.'
} as const;

export const $EdgeResponse = {
Expand Down
Loading
Loading