Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 89 additions & 19 deletions test/extended/router/config_manager_ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
exutil "github.com/openshift/origin/test/extended/util"
)

// execPodRef defines the attributes of the router pod used to execute local HTTP requests
type execPodRef struct {
types.NamespacedName
ipAddress string
Expand All @@ -47,21 +48,46 @@ type execPodRef struct {
var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift.io][OCPFeatureGate:IngressControllerDynamicConfigurationManager]", func() {
defer g.GinkgoRecover()

// dcmIngressTimeout defines the maximum amount of time to wait for test operations to complete.
const dcmIngressTimeout = 2 * time.Minute
// maxDynamicServers defines the number of empty dynamic servers to be allocated when a reload happens.
// Some tests use this value as a premise to limit the number of new servers on scale-out operations.
const maxDynamicServers = 4

ctx := context.Background()
oc := exutil.NewCLIWithPodSecurityLevel("router-dcm-ingress", api.LevelPrivileged).AsAdmin()
kubeClient := oc.AdminKubeClient()
routeClient := oc.AdminRouteClient()

// variables updated on every new test
var (
execPod execPodRef
controller types.NamespacedName
routeSelectorSet labels.Set
)
// execPod is the pod used to run requests against the router.
var execPod execPodRef
// controller is the fully qualified name of the ingress controller resource used in the current test.
var controller types.NamespacedName
// routeSelectorSet is the label key/value pair that the ingress controller uses to filter route resources.
// All route resources should add this label, so the router can find and process them.
var routeSelectorSet labels.Set

g.AfterEach(func() {
if g.CurrentSpecReport().Failed() {
routes, err := routeClient.RouteV1().Routes(oc.Namespace()).List(ctx, metav1.ListOptions{})
if err != nil {
framework.Logf("failed to list Routes in namespace %q: %v", oc.Namespace(), err)
} else {
outputIngress(routes.Items...)
}
endpoints, err := kubeClient.CoreV1().Endpoints(oc.Namespace()).List(ctx, metav1.ListOptions{})
if err != nil {
framework.Logf("failed to list Endpoints in namespace %q: %v", oc.Namespace(), err)
} else {
outputEndpoints(endpoints.Items...)
}
epsList, err := kubeClient.DiscoveryV1().EndpointSlices(oc.Namespace()).List(ctx, metav1.ListOptions{})
if err != nil {
framework.Logf("failed to list EndpointSlices in namespace %q: %v", oc.Namespace(), err)
} else {
outputEndpointSlice(epsList.Items...)
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
if controller.Name != "" {
err := oc.AdminOperatorClient().OperatorV1().IngressControllers(controller.Namespace).Delete(ctx, controller.Name, *metav1.NewDeleteOptions(1))
o.Expect(err).NotTo(o.HaveOccurred())
Expand All @@ -73,7 +99,7 @@ var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift.
nsOperator := "openshift-ingress-operator"
controllerName := names.SimpleNameGenerator.GenerateName("e2e-dcm-")

// ... and its router is created on router's namespace
// ... and its router and service are created in router's namespace
nsRouter := "openshift-ingress"
svcName := "router-internal-" + controllerName

Expand Down Expand Up @@ -189,12 +215,12 @@ var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift.
// scaling-out to 4 replicas, one at a time
for replicas := initReplicas + 1; replicas <= 4; replicas++ {

g.By(fmt.Sprintf("scaling-out to %d replicas", replicas))
framework.Logf("scaling-out to %d replicas", replicas)

currentServers, err := builder.scaleDeployment(ctx, replicas, dcmIngressTimeout)
o.Expect(err).NotTo(o.HaveOccurred())

g.By("waiting router to add all the backend servers to the load balance")
framework.Logf("waiting router to add all the backend servers to the load balance")

// router should eventually reach all the known replicas
eventuallyRouteAllServers(execPod, hostname, false, currentServers, dcmIngressTimeout)
Expand Down Expand Up @@ -235,7 +261,7 @@ var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift.
// instead of HAProxy removing it from the balance due to health-check starting to fail.
for replicas := initReplicas - 1; replicas >= 1; replicas-- {

g.By(fmt.Sprintf("scaling-in to %d replicas", replicas))
framework.Logf("scaling-in to %d replicas", replicas)

currentServers, err := builder.scaleInEndpoints(ctx, serviceName, replicas)
o.Expect(err).NotTo(o.HaveOccurred())
Expand Down Expand Up @@ -471,6 +497,7 @@ var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift.

// ... k8s recreates it and we wait it to be fully functional
err = builder.waitDeployment(replicas, dcmIngressTimeout)
builder.printDeploymentState(ctx)
o.Expect(err).NotTo(o.HaveOccurred())
}

Expand Down Expand Up @@ -637,7 +664,7 @@ var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift.
// Iterates over a number of scaling operations, always checking if the change is being applied.
for i, replicas := range changingReplicas {

g.By(fmt.Sprintf("iteration %d, scaling from %d to %d replicas", i+1, prevReplicas, replicas))
framework.Logf("iteration %d, scaling from %d to %d replicas", i+1, prevReplicas, replicas)

currentServers, err := builder.scaleDeployment(ctx, replicas, dcmIngressTimeout)
o.Expect(err).NotTo(o.HaveOccurred())
Expand Down Expand Up @@ -778,7 +805,12 @@ func (r *routeStackBuilder) createDeploymentStack(ctx context.Context, routetype
if err = r.waitDeployment(replicas, timeout); err != nil {
return nil, err
}
return r.exposeDeployment(ctx)
backendServers, err = r.exposeDeployment(ctx)
if err != nil {
return nil, err
}
r.printDeploymentState(ctx)
return backendServers, nil
}

// scaleDeployment scales-in/out the common deployment to the specified replicas. It waits for all the pods to be created and returns their names.
Expand All @@ -796,6 +828,7 @@ func (r *routeStackBuilder) scaleDeployment(ctx context.Context, replicas int, t
}
return len(backendServers) == replicas, nil
})
r.printDeploymentState(ctx)
return backendServers, err
}

Expand Down Expand Up @@ -824,7 +857,14 @@ func (r *routeStackBuilder) createDetachedService(ctx context.Context) (serviceN
}

// we also need the deprecated Endpoints API, since router still uses it depending on the ROUTER_WATCH_ENDPOINTS envvar
epCurrent, err := r.kubeClient.CoreV1().Endpoints(svcCurrent.Namespace).Get(ctx, svcCurrent.Name, metav1.GetOptions{})
var epCurrent *corev1.Endpoints
err = wait.PollUntilContextTimeout(ctx, time.Second, 5*time.Second, false, func(ctx context.Context) (done bool, err error) {
epCurrent, err = r.kubeClient.CoreV1().Endpoints(svcCurrent.Namespace).Get(ctx, svcCurrent.Name, metav1.GetOptions{})
if err != nil {
framework.Logf("error fetching Endpoints: %s", err.Error())
}
return err == nil, nil
})
if err != nil {
return "", err
}
Expand Down Expand Up @@ -910,7 +950,7 @@ func (r *routeStackBuilder) scaleInEndpoints(ctx context.Context, detachedServic
if err != nil {
return err
}
// deleting addresses, from all subnets, whose IP address is not found in the patched `eps`
// deleting addresses, from all subsets, whose IP address is not found in the patched `eps`
for i := range ep.Subsets {
ss := &ep.Subsets[i]
ss.Addresses = slices.DeleteFunc(ss.Addresses, func(addr corev1.EndpointAddress) bool {
Expand Down Expand Up @@ -939,6 +979,8 @@ func (r *routeStackBuilder) createServeHostnameDeployment(replicas int) error {

// createDeployment creates the deployment resource. It should be called just once, since it uses the OC namespace and the common resource name.
func (r *routeStackBuilder) createDeployment(image string, replicas, port int, cmd ...string) error {
// This deployment is safe under Single Node OpenShift (SNO): although it can create more replicas,
// those replicas does not configure anti-affinity, so all of them can run in the same node.
runArgs := []string{"deployment", r.resourceName, "--image", image, "--replicas", strconv.Itoa(replicas), "--port", strconv.Itoa(port), "--"}
runArgs = append(runArgs, cmd...)
return r.oc.Run("create").Args(runArgs...).Execute()
Expand All @@ -953,6 +995,25 @@ func (r *routeStackBuilder) exposeDeployment(ctx context.Context) (backendServer
return r.fetchServiceReplicas(ctx)
}

// printDeploymentState outputs the pod names, status, and their IP addresses. Best effort, it outputs the error instead in case it happens.
// It requires that `exposeDeployment()` was already called.
func (r *routeStackBuilder) printDeploymentState(ctx context.Context) {
pods, err := r.fetchPods(ctx)
if err != nil {
framework.Logf("deployment state: error reading deployment pods: %v", err)
return
}
var podDescription []string
for _, pod := range pods {
var podIPs []string
for _, ip := range pod.Status.PodIPs {
podIPs = append(podIPs, ip.IP)
}
podDescription = append(podDescription, fmt.Sprintf("%s/%s/%s", pod.Name, pod.Status.Phase, strings.Join(podIPs, ",")))
}
framework.Logf("deployment state: replicas=%d pods=%s", len(pods), strings.Join(podDescription, " // "))
}

// fetchEndpointSlice fetches the EndpointSlice of the provided service name. It currently supports only one EndpointSlice instance for simplicity.
func (r *routeStackBuilder) fetchEndpointSlice(ctx context.Context, serviceName string) (*discoveryv1.EndpointSlice, error) {
listOpts := metav1.ListOptions{LabelSelector: discoveryv1.LabelServiceName + "=" + serviceName}
Expand All @@ -967,8 +1028,8 @@ func (r *routeStackBuilder) fetchEndpointSlice(ctx context.Context, serviceName
return &epsList.Items[0], nil
}

// fetchServiceReplicas fetches the pod names from the exposed common deployment. It requires that `exposeDeployment()` was already called.
func (r *routeStackBuilder) fetchServiceReplicas(ctx context.Context) ([]string, error) {
// fetchPods fetches the pods from the exposed common deployment. It requires that `exposeDeployment()` was already called.
func (r *routeStackBuilder) fetchPods(ctx context.Context) ([]corev1.Pod, error) {
svc, err := r.kubeClient.CoreV1().Services(r.namespace).Get(ctx, r.resourceName, metav1.GetOptions{})
if err != nil {
return nil, err
Expand All @@ -978,9 +1039,18 @@ func (r *routeStackBuilder) fetchServiceReplicas(ctx context.Context) ([]string,
if err != nil {
return nil, err
}
backendServers := make([]string, len(pods.Items))
for i := range pods.Items {
backendServers[i] = pods.Items[i].Name
return pods.Items, nil
}

// fetchServiceReplicas fetches the pod names from the exposed common deployment. It requires that `exposeDeployment()` was already called.
func (r *routeStackBuilder) fetchServiceReplicas(ctx context.Context) ([]string, error) {
pods, err := r.fetchPods(ctx)
if err != nil {
return nil, err
}
backendServers := make([]string, len(pods))
for i := range pods {
backendServers[i] = pods[i].Name
}
return backendServers, nil
}
Expand Down
60 changes: 60 additions & 0 deletions test/extended/router/stress.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"strconv"
"strings"
"text/tabwriter"
"time"
Expand All @@ -16,6 +17,7 @@ import (

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -656,6 +658,64 @@ func outputIngress(routes ...routev1.Route) {
e2e.Logf("Routes:\n%s", b.String())
}

func outputEndpoints(endpoints ...corev1.Endpoints) {
b := &bytes.Buffer{}
w := tabwriter.NewWriter(b, 0, 0, 2, ' ', 0)
fmt.Fprintf(w, "NAME\tADDRESSES\tNOT READY ADDRESSES\tPORTS\n")
for _, ep := range endpoints {
for _, ss := range ep.Subsets {
resumeAddrs := func(addrs []corev1.EndpointAddress) string {
var addrList []string
for _, addr := range addrs {
val := "-"
if addr.IP != "" {
val = addr.IP
} else if addr.Hostname != "" {
val = addr.Hostname
}
addrList = append(addrList, val)
}
return strings.Join(addrList, ",")
}
var portList []string
for _, port := range ss.Ports {
portList = append(portList, strconv.Itoa(int(port.Port)))
}
fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", ep.Name, resumeAddrs(ss.Addresses), resumeAddrs(ss.NotReadyAddresses), strings.Join(portList, ","))
}
}
w.Flush()
e2e.Logf("Endpoints:\n%s", b.String())
}

func outputEndpointSlice(epss ...discoveryv1.EndpointSlice) {
b := &bytes.Buffer{}
w := tabwriter.NewWriter(b, 0, 0, 2, ' ', 0)
fmt.Fprintf(w, "NAME\tSERVICE\tADDRESSES\tNOT READY ADDRESSES\tPORTS\n")
for _, eps := range epss {
var addrList, notReadyAddrList []string
for _, ep := range eps.Endpoints {
addrs := strings.Join(ep.Addresses, "+")
if ready := ep.Conditions.Ready; ready == nil || *ready == true {
addrList = append(addrList, addrs)
} else {
notReadyAddrList = append(notReadyAddrList, addrs)
}
}
var portList []string
for _, port := range eps.Ports {
val := "-"
if port.Port != nil {
val = strconv.Itoa(int(*port.Port))
}
portList = append(portList, val)
}
fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n", eps.Name, eps.Labels[discoveryv1.LabelServiceName], strings.Join(addrList, ","), strings.Join(notReadyAddrList, ","), strings.Join(portList, ","))
}
w.Flush()
e2e.Logf("EndpointSlices:\n%s", b.String())
}

// findMostRecentConditionTime returns the time of the most recent condition.
func findMostRecentConditionTime(conditions []routev1.RouteIngressCondition) time.Time {
var recent time.Time
Expand Down