Skip to content
This repository was archived by the owner on Apr 25, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 52 additions & 8 deletions pkg/controller/sync/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,19 @@ func (s *KubeFedSyncController) reconcile(qualifiedName util.QualifiedName) util
apiResource := s.typeConfig.GetTargetType()
gvk := apiResourceToGVK(&apiResource)
klog.V(2).Infof("Ensuring the removal of the label %q from %s %q in member clusters.", util.ManagedByKubeFedLabelKey, gvk.Kind, qualifiedName)
err = s.removeManagedLabel(gvk, qualifiedName)
// We can't compute resource placement, therefore we try to
// remove it from all member clusters.
clusters, err := s.informer.GetClusters()
if err != nil {
wrappedErr := errors.Wrap(err, "failed to get member clusters")
runtime.HandleError(wrappedErr)
return util.StatusError
}
clusterNames := sets.NewString()
for _, cluster := range clusters {
clusterNames = clusterNames.Insert(cluster.Name)
}
err = s.removeManagedLabel(gvk, qualifiedName, clusterNames)
if err != nil {
wrappedErr := errors.Wrapf(err, "failed to remove the label %q from %s %q in member clusters", util.ManagedByKubeFedLabelKey, gvk.Kind, qualifiedName)
runtime.HandleError(wrappedErr)
Expand Down Expand Up @@ -501,7 +513,19 @@ func (s *KubeFedSyncController) ensureDeletion(fedResource FederatedResource) ut
return util.StatusError
}
klog.V(2).Infof("Initiating the removal of the label %q from resources previously managed by %s %q.", util.ManagedByKubeFedLabelKey, kind, key)
err = s.removeManagedLabel(fedResource.TargetGVK(), fedResource.TargetName())
clusters, err := s.informer.GetClusters()
if err != nil {
wrappedErr := errors.Wrap(err, "failed to get member clusters")
runtime.HandleError(wrappedErr)
return util.StatusError
}
targetClusters, err := fedResource.ComputePlacement(clusters)
if err != nil {
wrappedErr := errors.Wrapf(err, "failed to compute placement for %s %q", kind, key)
runtime.HandleError(wrappedErr)
return util.StatusError
}
err = s.removeManagedLabel(fedResource.TargetGVK(), fedResource.TargetName(), targetClusters)
if err != nil {
wrappedErr := errors.Wrapf(err, "failed to remove the label %q from all resources previously managed by %s %q", util.ManagedByKubeFedLabelKey, kind, key)
runtime.HandleError(wrappedErr)
Expand Down Expand Up @@ -533,8 +557,8 @@ func (s *KubeFedSyncController) ensureDeletion(fedResource FederatedResource) ut

// removeManagedLabel attempts to remove the managed label from
// resources with the given name in member clusters.
func (s *KubeFedSyncController) removeManagedLabel(gvk schema.GroupVersionKind, qualifiedName util.QualifiedName) error {
ok, err := s.handleDeletionInClusters(gvk, qualifiedName, func(dispatcher dispatch.UnmanagedDispatcher, clusterName string, clusterObj *unstructured.Unstructured) {
func (s *KubeFedSyncController) removeManagedLabel(gvk schema.GroupVersionKind, qualifiedName util.QualifiedName, clusters sets.String) error {
ok, err := s.handleDeletionInClusters(gvk, qualifiedName, clusters, func(dispatcher dispatch.UnmanagedDispatcher, clusterName string, clusterObj *unstructured.Unstructured) {
if clusterObj.GetDeletionTimestamp() != nil {
return
}
Expand All @@ -554,8 +578,17 @@ func (s *KubeFedSyncController) deleteFromClusters(fedResource FederatedResource
gvk := fedResource.TargetGVK()
qualifiedName := fedResource.TargetName()

clusters, err := s.informer.GetClusters()
if err != nil {
return false, err
}
targetClusters, err := fedResource.ComputePlacement(clusters)
if err != nil {
return false, err
}

remainingClusters := []string{}
ok, err := s.handleDeletionInClusters(gvk, qualifiedName, func(dispatcher dispatch.UnmanagedDispatcher, clusterName string, clusterObj *unstructured.Unstructured) {
ok, err := s.handleDeletionInClusters(gvk, qualifiedName, targetClusters, func(dispatcher dispatch.UnmanagedDispatcher, clusterName string, clusterObj *unstructured.Unstructured) {
// If the containing namespace of a FederatedNamespace is
// marked for deletion, it is impossible to require the
// removal of the namespace in advance of removal of the sync
Expand Down Expand Up @@ -615,9 +648,17 @@ func (s *KubeFedSyncController) ensureRemovedOrUnmanaged(fedResource FederatedRe
return errors.Wrap(err, "failed to get a list of clusters")
}

targetClusters, err := fedResource.ComputePlacement(clusters)
if err != nil {
return errors.Wrapf(err, "failed to compute placement for %s %q", fedResource.FederatedKind(), fedResource.FederatedName().Name)
}

dispatcher := dispatch.NewCheckUnmanagedDispatcher(s.informer.GetClientForCluster, fedResource.TargetGVK(), fedResource.TargetName())
unreadyClusters := []string{}
for _, cluster := range clusters {
if !targetClusters.Has(cluster.Name) {
continue
}
if !util.IsClusterReady(&cluster.Status) {
unreadyClusters = append(unreadyClusters, cluster.Name)
continue
Expand All @@ -639,18 +680,21 @@ func (s *KubeFedSyncController) ensureRemovedOrUnmanaged(fedResource FederatedRe

// handleDeletionInClusters invokes the provided deletion handler for
// each managed resource in member clusters.
func (s *KubeFedSyncController) handleDeletionInClusters(gvk schema.GroupVersionKind, qualifiedName util.QualifiedName,
func (s *KubeFedSyncController) handleDeletionInClusters(gvk schema.GroupVersionKind, qualifiedName util.QualifiedName, clusters sets.String,
deletionFunc func(dispatcher dispatch.UnmanagedDispatcher, clusterName string, clusterObj *unstructured.Unstructured)) (bool, error) {
clusters, err := s.informer.GetClusters()
memberClusters, err := s.informer.GetClusters()
if err != nil {
return false, errors.Wrap(err, "failed to get a list of clusters")
}

dispatcher := dispatch.NewUnmanagedDispatcher(s.informer.GetClientForCluster, gvk, qualifiedName)
retrievalFailureClusters := []string{}
unreadyClusters := []string{}
for _, cluster := range clusters {
for _, cluster := range memberClusters {
clusterName := cluster.Name
if !clusters.Has(clusterName) {
continue
}

if !util.IsClusterReady(&cluster.Status) {
unreadyClusters = append(unreadyClusters, clusterName)
Expand Down
16 changes: 16 additions & 0 deletions scripts/pre-commit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,19 @@ function run-e2e-tests-with-in-memory-controllers() {
${IN_MEMORY_E2E_TEST_CMD}
}

function run-e2e-tests-with-not-ready-clusters() {
# Run the tests without any verbosity. The unhealthy nodes generate
# too much logs.
go test -timeout 900s ./test/e2e \
-args -kubeconfig=${HOME}/.kube/config \
-single-call-timeout=2m \
-ginkgo.randomizeAllSpecs \
-limited-scope=true \
-in-memory-controllers=true \
-simulate-federation=true \
-ginkgo.focus='\[NOT_READY\]'
}

function run-namespaced-e2e-tests() {
local namespaced_e2e_test_cmd="${E2E_TEST_CMD} -kubefed-namespace=foo -limited-scope=true"
# Run the placement test separately to avoid crud failures if
Expand Down Expand Up @@ -200,6 +213,9 @@ kubectl scale deployments kubefed-controller-manager -n kube-federation-system -
echo "Running e2e tests with race detector against cluster-scoped kubefed with in-memory controllers"
run-e2e-tests-with-in-memory-controllers

echo "Running e2e tests with not-ready clusters"
run-e2e-tests-with-not-ready-clusters

# FederatedTypeConfig controller is needed to remove finalizers from
# FederatedTypeConfigs in order to successfully delete the KubeFed
# control plane in the next step.
Expand Down
72 changes: 51 additions & 21 deletions test/common/crudtester.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"sigs.k8s.io/kubefed/pkg/apis/core/common"
"sigs.k8s.io/kubefed/pkg/apis/core/typeconfig"
fedv1a1 "sigs.k8s.io/kubefed/pkg/apis/core/v1alpha1"
"sigs.k8s.io/kubefed/pkg/apis/core/v1beta1"
genericclient "sigs.k8s.io/kubefed/pkg/client/generic"
"sigs.k8s.io/kubefed/pkg/controller/sync"
"sigs.k8s.io/kubefed/pkg/controller/sync/status"
Expand All @@ -65,6 +66,7 @@ type FederatedTypeCrudTester struct {
// operation that involves member clusters may take longer due to
// propagation latency.
clusterWaitTimeout time.Duration
clustersNamespace string
}

type TestClusterConfig struct {
Expand All @@ -77,7 +79,7 @@ type TestCluster struct {
Client util.ResourceClient
}

func NewFederatedTypeCrudTester(testLogger TestLogger, typeConfig typeconfig.Interface, kubeConfig *rest.Config, testClusters map[string]TestCluster, waitInterval, clusterWaitTimeout time.Duration) (*FederatedTypeCrudTester, error) {
func NewFederatedTypeCrudTester(testLogger TestLogger, typeConfig typeconfig.Interface, kubeConfig *rest.Config, testClusters map[string]TestCluster, clustersNamespace string, waitInterval, clusterWaitTimeout time.Duration) (*FederatedTypeCrudTester, error) {
return &FederatedTypeCrudTester{
tl: testLogger,
typeConfig: typeConfig,
Expand All @@ -87,11 +89,12 @@ func NewFederatedTypeCrudTester(testLogger TestLogger, typeConfig typeconfig.Int
testClusters: testClusters,
waitInterval: waitInterval,
clusterWaitTimeout: clusterWaitTimeout,
clustersNamespace: clustersNamespace,
}, nil
}

func (c *FederatedTypeCrudTester) CheckLifecycle(targetObject *unstructured.Unstructured, overrides []interface{}) {
fedObject := c.CheckCreate(targetObject, overrides)
func (c *FederatedTypeCrudTester) CheckLifecycle(targetObject *unstructured.Unstructured, overrides []interface{}, selectors map[string]string) {
fedObject := c.CheckCreate(targetObject, overrides, selectors)

c.CheckStatusCreated(util.NewQualifiedName(fedObject))

Expand All @@ -104,7 +107,7 @@ func (c *FederatedTypeCrudTester) CheckLifecycle(targetObject *unstructured.Unst
c.CheckDelete(fedObject, false)
}

func (c *FederatedTypeCrudTester) Create(targetObject *unstructured.Unstructured, overrides []interface{}) *unstructured.Unstructured {
func (c *FederatedTypeCrudTester) Create(targetObject *unstructured.Unstructured, overrides []interface{}, selectors map[string]string) *unstructured.Unstructured {
qualifiedName := util.NewQualifiedName(targetObject)
kind := c.typeConfig.GetTargetType().Kind
fedKind := c.typeConfig.GetFederatedType().Kind
Expand All @@ -113,10 +116,7 @@ func (c *FederatedTypeCrudTester) Create(targetObject *unstructured.Unstructured
c.tl.Fatalf("Error obtaining %s from %s %q: %v", fedKind, kind, qualifiedName, err)
}

fedObject, err = c.setAdditionalTestData(fedObject, overrides, targetObject.GetGenerateName())
if err != nil {
c.tl.Fatalf("Error setting overrides and placement on %s %q: %v", fedKind, qualifiedName, err)
}
fedObject = c.setAdditionalTestData(fedObject, overrides, selectors, targetObject.GetGenerateName())

return c.createResource(c.typeConfig.GetFederatedType(), fedObject)
}
Expand All @@ -141,15 +141,15 @@ func (c *FederatedTypeCrudTester) resourceClient(apiResource metav1.APIResource)
return client
}

func (c *FederatedTypeCrudTester) CheckCreate(targetObject *unstructured.Unstructured, overrides []interface{}) *unstructured.Unstructured {
fedObject := c.Create(targetObject, overrides)
func (c *FederatedTypeCrudTester) CheckCreate(targetObject *unstructured.Unstructured, overrides []interface{}, selectors map[string]string) *unstructured.Unstructured {
fedObject := c.Create(targetObject, overrides, selectors)

c.CheckPropagation(fedObject)
return fedObject
}

// AdditionalTestData additionally sets fixture overrides and placement clusternames into federated object
func (c *FederatedTypeCrudTester) setAdditionalTestData(fedObject *unstructured.Unstructured, overrides []interface{}, generateName string) (*unstructured.Unstructured, error) {
func (c *FederatedTypeCrudTester) setAdditionalTestData(fedObject *unstructured.Unstructured, overrides []interface{}, selectors map[string]string, generateName string) *unstructured.Unstructured {
fedKind := c.typeConfig.GetFederatedType().Kind
qualifiedName := util.NewQualifiedName(fedObject)

Expand All @@ -159,17 +159,23 @@ func (c *FederatedTypeCrudTester) setAdditionalTestData(fedObject *unstructured.
c.tl.Fatalf("Error updating overrides in %s %q: %v", fedKind, qualifiedName, err)
}
}
clusterNames := []string{}
for name := range c.testClusters {
clusterNames = append(clusterNames, name)
}
err := util.SetClusterNames(fedObject, clusterNames)
if err != nil {
c.tl.Fatalf("Error setting cluster names in %s %q: %v", fedKind, qualifiedName, err)
if selectors != nil {
if err := util.SetClusterSelector(fedObject, selectors); err != nil {
c.tl.Fatalf("Error setting cluster selectors for %s/%s: %v", fedObject.GetKind(), fedObject.GetName(), err)
}
} else {
clusterNames := []string{}
for name := range c.testClusters {
clusterNames = append(clusterNames, name)
}
err := util.SetClusterNames(fedObject, clusterNames)
if err != nil {
c.tl.Fatalf("Error setting cluster names in %s %q: %v", fedKind, qualifiedName, err)
}
}
fedObject.SetGenerateName(generateName)

return fedObject, err
return fedObject
}

func (c *FederatedTypeCrudTester) CheckUpdate(fedObject *unstructured.Unstructured) {
Expand Down Expand Up @@ -334,7 +340,14 @@ func (c *FederatedTypeCrudTester) CheckDelete(fedObject *unstructured.Unstructur
if deletingInCluster {
stateMsg = "not present"
}
clusters, err := util.ComputePlacement(fedObject, c.getClusters(), false)
if err != nil {
c.tl.Fatalf("Couldn't retrieve clusters for %s/%s: %v", federatedKind, name, err)
}
for clusterName, testCluster := range c.testClusters {
if !clusters.Has(clusterName) {
continue
}
namespace = util.QualifiedNameForCluster(clusterName, qualifiedName).Namespace
err = wait.PollImmediate(c.waitInterval, waitTimeout, func() (bool, error) {
obj, err := testCluster.Client.Resources(namespace).Get(context.Background(), name, metav1.GetOptions{})
Expand Down Expand Up @@ -425,16 +438,33 @@ func (c *FederatedTypeCrudTester) CheckReplicaSet(fedObject *unstructured.Unstru
}
}

func (c *FederatedTypeCrudTester) getClusters() []*v1beta1.KubeFedCluster {
client, err := genericclient.New(c.kubeConfig)
if err != nil {
c.tl.Fatalf("Failed to get kubefed clientset: %v", err)
}

fedClusters := []*v1beta1.KubeFedCluster{}
for cluster := range c.testClusters {
clusterResource := &v1beta1.KubeFedCluster{}
err = client.Get(context.Background(), clusterResource, c.clustersNamespace, cluster)
if err != nil {
c.tl.Fatalf("Cannot get cluster %s: %v", cluster, err)
}
fedClusters = append(fedClusters, clusterResource)
}
return fedClusters
}

// CheckPropagation checks propagation for the crud tester's clients
func (c *FederatedTypeCrudTester) CheckPropagation(fedObject *unstructured.Unstructured) {
federatedKind := c.typeConfig.GetFederatedType().Kind
qualifiedName := util.NewQualifiedName(fedObject)

clusterNames, err := util.GetClusterNames(fedObject)
selectedClusters, err := util.ComputePlacement(fedObject, c.getClusters(), false)
if err != nil {
c.tl.Fatalf("Error retrieving cluster names for %s %q: %v", federatedKind, qualifiedName, err)
}
selectedClusters := sets.NewString(clusterNames...)

templateVersion, err := sync.GetTemplateHash(fedObject.Object)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions test/e2e/crd.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,10 @@ overrides:
return targetObj, overrides, nil
}

crudTester, targetObject, overrides := initCrudTest(f, tl, typeConfig, testObjectsFunc)
crudTester, targetObject, overrides := initCrudTest(f, tl, f.KubeFedSystemNamespace(), typeConfig, testObjectsFunc)
// Make a copy for use in the orphan check.
deletionTargetObject := targetObject.DeepCopy()
crudTester.CheckLifecycle(targetObject, overrides)
crudTester.CheckLifecycle(targetObject, overrides, nil)

if namespaced {
// This check should not fail so long as the main test loop
Expand All @@ -228,7 +228,7 @@ overrides:
tl.Fatalf("Test of orphaned deletion assumes deletion of the containing namespace")
}
// Perform a check of orphan deletion.
fedObject := crudTester.CheckCreate(deletionTargetObject, nil)
fedObject := crudTester.CheckCreate(deletionTargetObject, nil, nil)
orphanDeletion := true
crudTester.CheckDelete(fedObject, orphanDeletion)
}
Expand Down
Loading