Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions providers/amazon/tests/system/amazon/aws/example_emr_eks.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from airflow.utils.trigger_rule import TriggerRule # type: ignore[no-redef,attr-defined]

from system.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder
from system.amazon.aws.utils.k8s import get_describe_pod_operator

DAG_ID = "example_emr_eks"

Expand Down Expand Up @@ -292,6 +293,10 @@ def delete_virtual_cluster(virtual_cluster_id):
)
# [END howto_sensor_emr_container]

# Describe pods only on failure to help diagnose EMR on EKS job issues.
describe_pod = get_describe_pod_operator(cluster_name=eks_cluster_name, namespace=eks_namespace)
describe_pod.trigger_rule = TriggerRule.ONE_FAILED

delete_eks_cluster = EksDeleteClusterOperator(
task_id="delete_eks_cluster",
cluster_name=eks_cluster_name,
Expand Down Expand Up @@ -330,6 +335,7 @@ def delete_virtual_cluster(virtual_cluster_id):
create_emr_eks_cluster,
job_starter,
job_waiter,
describe_pod,
# TEST TEARDOWN
delete_iam_oidc_identity_provider(eks_cluster_name),
delete_virtual_cluster(str(create_emr_eks_cluster.output)),
Expand Down
38 changes: 32 additions & 6 deletions providers/amazon/tests/system/amazon/aws/utils/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,50 @@
# under the License.
from __future__ import annotations

from airflow.utils.helpers import exactly_one

try:
from airflow.providers.standard.operators.bash import BashOperator
except ImportError:
# Fallback for older Airflow versions
from airflow.operators.bash import BashOperator # type: ignore[no-redef]


def get_describe_pod_operator(cluster_name: str, pod_name: str) -> BashOperator:
"""Returns an operator that'll print the output of a `k describe pod` in the airflow logs."""
def get_describe_pod_operator(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beauty 👌

cluster_name: str,
*,
pod_name: str | None = None,
namespace: str | None = None,
) -> BashOperator:
"""Return an operator that prints ``kubectl describe pod(s)`` output in the Airflow logs.

Exactly one of *pod_name* or *namespace* must be provided.

:param cluster_name: Name of the EKS cluster
:param pod_name: Describe a single pod by name
:param namespace: List and describe *all* pods in the given namespace
"""
if not exactly_one(pod_name, namespace):
raise ValueError("Exactly one of 'pod_name' or 'namespace' must be provided.")

if pod_name:
kubectl_commands = f"""
echo "***** pod description *****";
kubectl describe pod {pod_name};"""
else:
kubectl_commands = f"""
echo "***** pods in namespace {namespace} *****";
kubectl get pods -n {namespace} -o wide;
echo "***** pod descriptions *****";
kubectl describe pods -n {namespace};"""

return BashOperator(
task_id="describe_pod",
bash_command=f"""
install_aws.sh;
install_kubectl.sh;
# configure kubectl to hit the right cluster
aws eks update-kubeconfig --name {cluster_name};
# once all this setup is done, actually describe the pod
echo "vvv pod description below vvv";
kubectl describe pod {pod_name};
echo "^^^ pod description above ^^^" """,
{kubectl_commands}
""",
)
Loading