From 7ae081b5a94e0fd7f3fb0390f1e79dceb56edaa1 Mon Sep 17 00:00:00 2001 From: Joao Morais Date: Wed, 13 May 2026 15:31:04 -0300 Subject: [PATCH] improve dcm tests to reduce flakiness in endpoints Addressing a race when patching endpoints resource. This update adds a polling to ensure the endpoints resource was created. Also, dump all endpoints and endpointSlice in the test namespace in case of a test failure, which helps to correlate the current state and what should be expected in the test scenario. Example of deployments state log: ``` deployment state: replicas=4 pods=route-scale-in-5b7b4f6b8c-9ncjg/Running/10.128.1.27 // route-scale-in-5b7b4f6b8c-ck45n/Running/10.128.1.28 // route-scale-in-5b7b4f6b8c-srffr/Running/10.128.1.29 // route-scale-in-5b7b4f6b8c-v9hm2/Running/10.128.1.26 ``` Example of Endpoints and EndpointSlice resources listed if the test fails: ``` Endpoints: NAME ADDRESSES NOT READY ADDRESSES PORTS route-scale-in 10.128.1.26,10.128.1.27,10.128.1.28,10.128.1.29 9376 route-scale-in-khbh5 10.128.1.26 9376 EndpointSlices: NAME SERVICE ADDRESSES NOT READY ADDRESSES PORTS route-scale-in-khbh5-8ncmh route-scale-in-khbh5 10.128.1.26 9376 route-scale-in-rrhzv route-scale-in 10.128.1.27,10.128.1.28,10.128.1.29,10.128.1.26 9376 ``` https://redhat.atlassian.net/browse/OCPBUGS-85426 --- .../extended/router/config_manager_ingress.go | 108 +++++++++++++++--- test/extended/router/stress.go | 60 ++++++++++ 2 files changed, 149 insertions(+), 19 deletions(-) diff --git a/test/extended/router/config_manager_ingress.go b/test/extended/router/config_manager_ingress.go index 5325c72943c3..fed186a23d7f 100644 --- a/test/extended/router/config_manager_ingress.go +++ b/test/extended/router/config_manager_ingress.go @@ -39,6 +39,7 @@ import ( exutil "github.com/openshift/origin/test/extended/util" ) +// execPodRef defines the attributes of the router pod used to execute local HTTP requests type execPodRef struct { types.NamespacedName ipAddress string @@ -47,21 +48,46 @@ type execPodRef struct { var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift.io][OCPFeatureGate:IngressControllerDynamicConfigurationManager]", func() { defer g.GinkgoRecover() + // dcmIngressTimeout defines the maximum amount of time to wait for test operations to complete. const dcmIngressTimeout = 2 * time.Minute + // maxDynamicServers defines the number of empty dynamic servers to be allocated when a reload happens. + // Some tests use this value as a premise to limit the number of new servers on scale-out operations. const maxDynamicServers = 4 ctx := context.Background() oc := exutil.NewCLIWithPodSecurityLevel("router-dcm-ingress", api.LevelPrivileged).AsAdmin() kubeClient := oc.AdminKubeClient() + routeClient := oc.AdminRouteClient() - // variables updated on every new test - var ( - execPod execPodRef - controller types.NamespacedName - routeSelectorSet labels.Set - ) + // execPod is the pod used to run requests against the router. + var execPod execPodRef + // controller is the fully qualified name of the ingress controller resource used in the current test. + var controller types.NamespacedName + // routeSelectorSet is the label key/value pair that the ingress controller uses to filter route resources. + // All route resources should add this label, so the router can find and process them. + var routeSelectorSet labels.Set g.AfterEach(func() { + if g.CurrentSpecReport().Failed() { + routes, err := routeClient.RouteV1().Routes(oc.Namespace()).List(ctx, metav1.ListOptions{}) + if err != nil { + framework.Logf("failed to list Routes in namespace %q: %v", oc.Namespace(), err) + } else { + outputIngress(routes.Items...) + } + endpoints, err := kubeClient.CoreV1().Endpoints(oc.Namespace()).List(ctx, metav1.ListOptions{}) + if err != nil { + framework.Logf("failed to list Endpoints in namespace %q: %v", oc.Namespace(), err) + } else { + outputEndpoints(endpoints.Items...) + } + epsList, err := kubeClient.DiscoveryV1().EndpointSlices(oc.Namespace()).List(ctx, metav1.ListOptions{}) + if err != nil { + framework.Logf("failed to list EndpointSlices in namespace %q: %v", oc.Namespace(), err) + } else { + outputEndpointSlice(epsList.Items...) + } + } if controller.Name != "" { err := oc.AdminOperatorClient().OperatorV1().IngressControllers(controller.Namespace).Delete(ctx, controller.Name, *metav1.NewDeleteOptions(1)) o.Expect(err).NotTo(o.HaveOccurred()) @@ -73,7 +99,7 @@ var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift. nsOperator := "openshift-ingress-operator" controllerName := names.SimpleNameGenerator.GenerateName("e2e-dcm-") - // ... and its router is created on router's namespace + // ... and its router and service are created in router's namespace nsRouter := "openshift-ingress" svcName := "router-internal-" + controllerName @@ -189,12 +215,12 @@ var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift. // scaling-out to 4 replicas, one at a time for replicas := initReplicas + 1; replicas <= 4; replicas++ { - g.By(fmt.Sprintf("scaling-out to %d replicas", replicas)) + framework.Logf("scaling-out to %d replicas", replicas) currentServers, err := builder.scaleDeployment(ctx, replicas, dcmIngressTimeout) o.Expect(err).NotTo(o.HaveOccurred()) - g.By("waiting router to add all the backend servers to the load balance") + framework.Logf("waiting router to add all the backend servers to the load balance") // router should eventually reach all the known replicas eventuallyRouteAllServers(execPod, hostname, false, currentServers, dcmIngressTimeout) @@ -235,7 +261,7 @@ var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift. // instead of HAProxy removing it from the balance due to health-check starting to fail. for replicas := initReplicas - 1; replicas >= 1; replicas-- { - g.By(fmt.Sprintf("scaling-in to %d replicas", replicas)) + framework.Logf("scaling-in to %d replicas", replicas) currentServers, err := builder.scaleInEndpoints(ctx, serviceName, replicas) o.Expect(err).NotTo(o.HaveOccurred()) @@ -471,6 +497,7 @@ var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift. // ... k8s recreates it and we wait it to be fully functional err = builder.waitDeployment(replicas, dcmIngressTimeout) + builder.printDeploymentState(ctx) o.Expect(err).NotTo(o.HaveOccurred()) } @@ -637,7 +664,7 @@ var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift. // Iterates over a number of scaling operations, always checking if the change is being applied. for i, replicas := range changingReplicas { - g.By(fmt.Sprintf("iteration %d, scaling from %d to %d replicas", i+1, prevReplicas, replicas)) + framework.Logf("iteration %d, scaling from %d to %d replicas", i+1, prevReplicas, replicas) currentServers, err := builder.scaleDeployment(ctx, replicas, dcmIngressTimeout) o.Expect(err).NotTo(o.HaveOccurred()) @@ -778,7 +805,12 @@ func (r *routeStackBuilder) createDeploymentStack(ctx context.Context, routetype if err = r.waitDeployment(replicas, timeout); err != nil { return nil, err } - return r.exposeDeployment(ctx) + backendServers, err = r.exposeDeployment(ctx) + if err != nil { + return nil, err + } + r.printDeploymentState(ctx) + return backendServers, nil } // scaleDeployment scales-in/out the common deployment to the specified replicas. It waits for all the pods to be created and returns their names. @@ -796,6 +828,7 @@ func (r *routeStackBuilder) scaleDeployment(ctx context.Context, replicas int, t } return len(backendServers) == replicas, nil }) + r.printDeploymentState(ctx) return backendServers, err } @@ -824,7 +857,14 @@ func (r *routeStackBuilder) createDetachedService(ctx context.Context) (serviceN } // we also need the deprecated Endpoints API, since router still uses it depending on the ROUTER_WATCH_ENDPOINTS envvar - epCurrent, err := r.kubeClient.CoreV1().Endpoints(svcCurrent.Namespace).Get(ctx, svcCurrent.Name, metav1.GetOptions{}) + var epCurrent *corev1.Endpoints + err = wait.PollUntilContextTimeout(ctx, time.Second, 5*time.Second, false, func(ctx context.Context) (done bool, err error) { + epCurrent, err = r.kubeClient.CoreV1().Endpoints(svcCurrent.Namespace).Get(ctx, svcCurrent.Name, metav1.GetOptions{}) + if err != nil { + framework.Logf("error fetching Endpoints: %s", err.Error()) + } + return err == nil, nil + }) if err != nil { return "", err } @@ -910,7 +950,7 @@ func (r *routeStackBuilder) scaleInEndpoints(ctx context.Context, detachedServic if err != nil { return err } - // deleting addresses, from all subnets, whose IP address is not found in the patched `eps` + // deleting addresses, from all subsets, whose IP address is not found in the patched `eps` for i := range ep.Subsets { ss := &ep.Subsets[i] ss.Addresses = slices.DeleteFunc(ss.Addresses, func(addr corev1.EndpointAddress) bool { @@ -939,6 +979,8 @@ func (r *routeStackBuilder) createServeHostnameDeployment(replicas int) error { // createDeployment creates the deployment resource. It should be called just once, since it uses the OC namespace and the common resource name. func (r *routeStackBuilder) createDeployment(image string, replicas, port int, cmd ...string) error { + // This deployment is safe under Single Node OpenShift (SNO): although it can create more replicas, + // those replicas does not configure anti-affinity, so all of them can run in the same node. runArgs := []string{"deployment", r.resourceName, "--image", image, "--replicas", strconv.Itoa(replicas), "--port", strconv.Itoa(port), "--"} runArgs = append(runArgs, cmd...) return r.oc.Run("create").Args(runArgs...).Execute() @@ -953,6 +995,25 @@ func (r *routeStackBuilder) exposeDeployment(ctx context.Context) (backendServer return r.fetchServiceReplicas(ctx) } +// printDeploymentState outputs the pod names, status, and their IP addresses. Best effort, it outputs the error instead in case it happens. +// It requires that `exposeDeployment()` was already called. +func (r *routeStackBuilder) printDeploymentState(ctx context.Context) { + pods, err := r.fetchPods(ctx) + if err != nil { + framework.Logf("deployment state: error reading deployment pods: %v", err) + return + } + var podDescription []string + for _, pod := range pods { + var podIPs []string + for _, ip := range pod.Status.PodIPs { + podIPs = append(podIPs, ip.IP) + } + podDescription = append(podDescription, fmt.Sprintf("%s/%s/%s", pod.Name, pod.Status.Phase, strings.Join(podIPs, ","))) + } + framework.Logf("deployment state: replicas=%d pods=%s", len(pods), strings.Join(podDescription, " // ")) +} + // fetchEndpointSlice fetches the EndpointSlice of the provided service name. It currently supports only one EndpointSlice instance for simplicity. func (r *routeStackBuilder) fetchEndpointSlice(ctx context.Context, serviceName string) (*discoveryv1.EndpointSlice, error) { listOpts := metav1.ListOptions{LabelSelector: discoveryv1.LabelServiceName + "=" + serviceName} @@ -967,8 +1028,8 @@ func (r *routeStackBuilder) fetchEndpointSlice(ctx context.Context, serviceName return &epsList.Items[0], nil } -// fetchServiceReplicas fetches the pod names from the exposed common deployment. It requires that `exposeDeployment()` was already called. -func (r *routeStackBuilder) fetchServiceReplicas(ctx context.Context) ([]string, error) { +// fetchPods fetches the pods from the exposed common deployment. It requires that `exposeDeployment()` was already called. +func (r *routeStackBuilder) fetchPods(ctx context.Context) ([]corev1.Pod, error) { svc, err := r.kubeClient.CoreV1().Services(r.namespace).Get(ctx, r.resourceName, metav1.GetOptions{}) if err != nil { return nil, err @@ -978,9 +1039,18 @@ func (r *routeStackBuilder) fetchServiceReplicas(ctx context.Context) ([]string, if err != nil { return nil, err } - backendServers := make([]string, len(pods.Items)) - for i := range pods.Items { - backendServers[i] = pods.Items[i].Name + return pods.Items, nil +} + +// fetchServiceReplicas fetches the pod names from the exposed common deployment. It requires that `exposeDeployment()` was already called. +func (r *routeStackBuilder) fetchServiceReplicas(ctx context.Context) ([]string, error) { + pods, err := r.fetchPods(ctx) + if err != nil { + return nil, err + } + backendServers := make([]string, len(pods)) + for i := range pods { + backendServers[i] = pods[i].Name } return backendServers, nil } diff --git a/test/extended/router/stress.go b/test/extended/router/stress.go index 04c7699f8579..9d4c88f56df9 100644 --- a/test/extended/router/stress.go +++ b/test/extended/router/stress.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "strconv" "strings" "text/tabwriter" "time" @@ -16,6 +17,7 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -656,6 +658,64 @@ func outputIngress(routes ...routev1.Route) { e2e.Logf("Routes:\n%s", b.String()) } +func outputEndpoints(endpoints ...corev1.Endpoints) { + b := &bytes.Buffer{} + w := tabwriter.NewWriter(b, 0, 0, 2, ' ', 0) + fmt.Fprintf(w, "NAME\tADDRESSES\tNOT READY ADDRESSES\tPORTS\n") + for _, ep := range endpoints { + for _, ss := range ep.Subsets { + resumeAddrs := func(addrs []corev1.EndpointAddress) string { + var addrList []string + for _, addr := range addrs { + val := "-" + if addr.IP != "" { + val = addr.IP + } else if addr.Hostname != "" { + val = addr.Hostname + } + addrList = append(addrList, val) + } + return strings.Join(addrList, ",") + } + var portList []string + for _, port := range ss.Ports { + portList = append(portList, strconv.Itoa(int(port.Port))) + } + fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", ep.Name, resumeAddrs(ss.Addresses), resumeAddrs(ss.NotReadyAddresses), strings.Join(portList, ",")) + } + } + w.Flush() + e2e.Logf("Endpoints:\n%s", b.String()) +} + +func outputEndpointSlice(epss ...discoveryv1.EndpointSlice) { + b := &bytes.Buffer{} + w := tabwriter.NewWriter(b, 0, 0, 2, ' ', 0) + fmt.Fprintf(w, "NAME\tSERVICE\tADDRESSES\tNOT READY ADDRESSES\tPORTS\n") + for _, eps := range epss { + var addrList, notReadyAddrList []string + for _, ep := range eps.Endpoints { + addrs := strings.Join(ep.Addresses, "+") + if ready := ep.Conditions.Ready; ready == nil || *ready == true { + addrList = append(addrList, addrs) + } else { + notReadyAddrList = append(notReadyAddrList, addrs) + } + } + var portList []string + for _, port := range eps.Ports { + val := "-" + if port.Port != nil { + val = strconv.Itoa(int(*port.Port)) + } + portList = append(portList, val) + } + fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n", eps.Name, eps.Labels[discoveryv1.LabelServiceName], strings.Join(addrList, ","), strings.Join(notReadyAddrList, ","), strings.Join(portList, ",")) + } + w.Flush() + e2e.Logf("EndpointSlices:\n%s", b.String()) +} + // findMostRecentConditionTime returns the time of the most recent condition. func findMostRecentConditionTime(conditions []routev1.RouteIngressCondition) time.Time { var recent time.Time