Skip to content
This repository was archived by the owner on Apr 21, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ linters:
- usetesting
- wastedassign
- whitespace
- unused
settings:
goconst:
min-len: 2
Expand Down
9 changes: 9 additions & 0 deletions internal/actions/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,12 @@ func NewDefaultActionHandlers(
func (h ActionHandlers) Close() error {
return h[reflect.TypeFor[*castai.ActionCreateEvent]()].(*CreateEventHandler).Close()
}

// ShouldWaitForVolumeDetach returns whether to wait for VolumeAttachments based on per-action config.
// Returns true only if explicitly enabled via action field; defaults to false (disabled).
func ShouldWaitForVolumeDetach(req *castai.ActionDrainNode) bool {
if req.WaitForVolumeDetach != nil {
return *req.WaitForVolumeDetach
}
return false
}
2 changes: 1 addition & 1 deletion internal/actions/check_node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (h *CheckNodeStatusHandler) checkNodeReady(ctx context.Context, _ *logrus.E
func isNodeReady(log logrus.FieldLogger, node *corev1.Node, castNodeID, providerID string) bool {
// if node has castai node id label, check if it matches the one we are waiting for
// if it doesn't match, we can skip this node.
if err := k8s.IsNodeIDProviderIDValid(node, castNodeID, providerID, log); err != nil {
if err := k8s.IsNodeIDProviderIDValid(node, castNodeID, providerID); err != nil {
log.WithFields(logrus.Fields{
"node": node.Name,
"node_id": castNodeID,
Expand Down
2 changes: 1 addition & 1 deletion internal/actions/delete_node_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (h *DeleteNodeHandler) Handle(ctx context.Context, action *castai.ClusterAc
// Create delete options with grace period 0 - force delete.
deleteOptions := metav1.NewDeleteOptions(0)
deletePod := func(ctx context.Context, pod v1.Pod) error {
return h.deletePod(ctx, *deleteOptions, pod)
return k8s.DeletePod(ctx, *deleteOptions, pod, h.cfg.deleteRetries, h.cfg.deleteRetryWait, h.clientset, h.log)
}

deletedPods, failedPods := k8s.ExecuteBatchPodActions(ctx, log, pods, deletePod, "delete-pod")
Expand Down
75 changes: 4 additions & 71 deletions internal/actions/drain_node_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/samber/lo"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
Expand All @@ -25,10 +24,6 @@ import (

var _ ActionHandler = &DrainNodeHandler{}

const (
minDrainTimeout = 0 // Minimal pod drain timeout.
)

type drainNodeConfig struct {
podsDeleteTimeout time.Duration
podDeleteRetries int
Expand Down Expand Up @@ -61,17 +56,6 @@ func NewDrainNodeHandler(
}
}

// getDrainTimeout returns drain timeout adjusted to action creation time.
// the result is clamped between 0s and the requested timeout.
func (h *DrainNodeHandler) getDrainTimeout(action *castai.ClusterAction) time.Duration {
timeSinceCreated := time.Since(action.CreatedAt)
requestedTimeout := time.Duration(action.ActionDrainNode.DrainTimeoutSeconds) * time.Second

drainTimeout := requestedTimeout - timeSinceCreated

return lo.Clamp(drainTimeout, minDrainTimeout*time.Second, requestedTimeout)
}

type DrainNodeHandler struct {
log logrus.FieldLogger
clientset kubernetes.Interface
Expand All @@ -92,7 +76,7 @@ func (h *DrainNodeHandler) Handle(ctx context.Context, action *castai.ClusterAct
if !ok {
return newUnexpectedTypeErr(action.Data(), req)
}
drainTimeout := h.getDrainTimeout(action)
drainTimeout := k8s.GetDrainTimeout(action)

log := h.log.WithFields(logrus.Fields{
"node_name": req.NodeName,
Expand All @@ -119,7 +103,7 @@ func (h *DrainNodeHandler) Handle(ctx context.Context, action *castai.ClusterAct

log.Info("cordoning node for draining")

if err := h.cordonNode(ctx, node); err != nil {
if err := k8s.CordonNode(ctx, h.log, h.clientset, node); err != nil {
return fmt.Errorf("cordoning node %q: %w", req.NodeName, err)
}

Expand Down Expand Up @@ -190,34 +174,11 @@ func (h *DrainNodeHandler) Handle(ctx context.Context, action *castai.ClusterAct
return deleteErr
}

func (h *DrainNodeHandler) cordonNode(ctx context.Context, node *v1.Node) error {
if node.Spec.Unschedulable {
return nil
}

err := k8s.PatchNode(ctx, h.log, h.clientset, node, func(n *v1.Node) {
n.Spec.Unschedulable = true
})
if err != nil {
return fmt.Errorf("patching node unschedulable: %w", err)
}
return nil
}

// shouldWaitForVolumeDetach returns whether to wait for VolumeAttachments based on per-action config.
// Returns true only if explicitly enabled via action field; defaults to false (disabled).
func (h *DrainNodeHandler) shouldWaitForVolumeDetach(req *castai.ActionDrainNode) bool {
if req.WaitForVolumeDetach != nil {
return *req.WaitForVolumeDetach
}
return false
}

// waitForVolumeDetachIfEnabled waits for VolumeAttachments to be deleted if the feature is enabled.
// This is called after successful drain to give CSI drivers time to clean up volumes.
// nonEvictablePods are pods that won't be evicted (DaemonSet, static) - their was are excluded from waiting.
func (h *DrainNodeHandler) waitForVolumeDetachIfEnabled(ctx context.Context, log logrus.FieldLogger, nodeName string, req *castai.ActionDrainNode, nonEvictablePods []v1.Pod) {
if !h.shouldWaitForVolumeDetach(req) || h.vaWaiter == nil {
if !ShouldWaitForVolumeDetach(req) || h.vaWaiter == nil {
return
}

Expand Down Expand Up @@ -314,7 +275,7 @@ func (h *DrainNodeHandler) deleteNodePods(ctx context.Context, log logrus.FieldL
}

deletePod := func(ctx context.Context, pod v1.Pod) error {
return h.deletePod(ctx, options, pod)
return k8s.DeletePod(ctx, options, pod, h.cfg.podDeleteRetries, h.cfg.podDeleteRetryDelay, h.clientset, h.log)
}

_, podsWithFailedDeletion := k8s.ExecuteBatchPodActions(ctx, log, nodePods.toEvict, deletePod, "delete-pod")
Expand Down Expand Up @@ -442,34 +403,6 @@ func (h *DrainNodeHandler) waitNodePodsTerminated(ctx context.Context, log logru
)
}

func (h *DrainNodeHandler) deletePod(ctx context.Context, options metav1.DeleteOptions, pod v1.Pod) error {
b := waitext.NewConstantBackoff(h.cfg.podDeleteRetryDelay)
action := func(ctx context.Context) (bool, error) {
err := h.clientset.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, options)
if err != nil {
// Pod is not found - ignore.
if apierrors.IsNotFound(err) {
return false, nil
}

// Pod is misconfigured - stop retry.
if apierrors.IsInternalError(err) {
return false, err
}
}

// Other errors - retry.
return true, err
}
err := waitext.Retry(ctx, b, h.cfg.podDeleteRetries, action, func(err error) {
h.log.Warnf("deleting pod %s on node %s in namespace %s, will retry: %v", pod.Name, pod.Spec.NodeName, pod.Namespace, err)
})
if err != nil {
return fmt.Errorf("deleting pod %s in namespace %s: %w", pod.Name, pod.Namespace, err)
}
return nil
}

func logCastPodsToEvict(log logrus.FieldLogger, castPods []v1.Pod) {
if len(castPods) == 0 {
return
Expand Down
22 changes: 4 additions & 18 deletions internal/actions/drain_node_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,7 @@ func TestGetDrainTimeout(t *testing.T) {
},
CreatedAt: time.Now().UTC(),
}
h := DrainNodeHandler{
log: log,
cfg: drainNodeConfig{},
}

timeout := h.getDrainTimeout(action)
timeout := k8s.GetDrainTimeout(action)

// We give some wiggle room as the test might get here a few milliseconds late.
r.InDelta((100 * time.Second).Milliseconds(), timeout.Milliseconds(), 10)
Expand All @@ -62,12 +57,8 @@ func TestGetDrainTimeout(t *testing.T) {
},
CreatedAt: time.Now().UTC().Add(-3 * time.Minute),
}
h := DrainNodeHandler{
log: log,
cfg: drainNodeConfig{},
}

timeout := h.getDrainTimeout(action)
timeout := k8s.GetDrainTimeout(action)
r.Less(int(math.Floor(timeout.Seconds())), 600)
})

Expand All @@ -82,12 +73,8 @@ func TestGetDrainTimeout(t *testing.T) {
},
CreatedAt: time.Now().UTC().Add(-60 * time.Minute),
}
h := DrainNodeHandler{
log: log,
cfg: drainNodeConfig{},
}

timeout := h.getDrainTimeout(action)
timeout := k8s.GetDrainTimeout(action)
r.Equal(0, int(timeout.Seconds()))
})
}
Expand Down Expand Up @@ -776,11 +763,10 @@ func TestShouldWaitForVolumeDetach(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
h := &DrainNodeHandler{}
req := &castai.ActionDrainNode{
WaitForVolumeDetach: tt.waitForVolumeDetach,
}
require.Equal(t, tt.want, h.shouldWaitForVolumeDetach(req))
require.Equal(t, tt.want, ShouldWaitForVolumeDetach(req))
})
}
}
6 changes: 2 additions & 4 deletions internal/informer/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -50,7 +49,6 @@ type Manager struct {
started bool
vaAvailable bool
cancelFunc context.CancelFunc
mu sync.RWMutex
}

// Option is a functional option for configuring the Manager.
Expand Down Expand Up @@ -390,8 +388,8 @@ func (m *Manager) reportCacheSize(ctx context.Context) {
}

if m.vaAvailable {
vas := m.volumeAttachments.Informer().GetStore().ListKeys()
size := len(vas)
was := m.volumeAttachments.Informer().GetStore().ListKeys()
size := len(was)
m.log.WithField("cache_size", size).Debug("volumeattachment informer cache size")
metrics.SetInformerCacheSize("volumeattachment", size)
}
Expand Down
Loading
Loading