Skip to content

Commit 2b3234f

Browse files
swang392levan-m
andauthored
[AGENTONB-2759] Use k8s informer for helm metadata (#2488)
* informer on helm CMs/secrets * add watch processing logic to helm metadata * remove chartname from label filter * use sync.Map and add release snapshot ticker * cleanup unused args and methods * use workqueue for informer events * clean up uninstall deletion logic * reduce logging * update tests * fix make fmt * change ticker to 5 min * allow helm metadata collection for DAWO * change chart name * use manager.Runnable pattern for helm metadata * use getClient instead of getAPIReader * fix deletion logic w/ revision number and delete prefix * use releaseentry with mutex to prevent race condition * use multiple workers with crash handling * use 30 sec context timeout for all k8sclient calls * Update pkg/controller/utils/metadata/helm_metadata.go Co-authored-by: levan-m <116471169+levan-m@users.noreply.github.com> * fix syntax issue from merge conflict * make fmt --------- Co-authored-by: Levan Machablishvili <levan.machablishvili@datadoghq.com> Co-authored-by: levan-m <116471169+levan-m@users.noreply.github.com>
1 parent bdfc654 commit 2b3234f

7 files changed

Lines changed: 647 additions & 487 deletions

File tree

cmd/main.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -388,13 +388,18 @@ func run(opts *options) error {
388388
return setupErrorf(setupLog, err, "Unable to start controllers")
389389
}
390390

391+
// Register Helm metadata forwarder as a manager Runnable
392+
// This ensures it starts after cache sync and respects leader election
393+
if err = setupAndStartHelmMetadataForwarder(metadataLog, mgr, mgr.GetClient(), versionInfo.String(), options.CredsManager); err != nil {
394+
return setupErrorf(setupLog, err, "Unable to setup Helm metadata forwarder")
395+
}
396+
397+
// Start ticker-based metadata forwarders after leader election
391398
go func() {
392-
// Block until this controller manager is elected leader
393399
<-mgr.Elected()
394400
setupLog.Info("Starting metadata forwarders")
395-
setupAndStartOperatorMetadataForwarder(metadataLog, mgr.GetAPIReader(), versionInfo.String(), opts, options.CredsManager)
396-
setupAndStartHelmMetadataForwarder(metadataLog, mgr.GetAPIReader(), versionInfo.String(), options.CredsManager)
397-
setupAndStartCRDMetadataForwarder(metadataLog, mgr.GetAPIReader(), versionInfo.String(), opts, options.CredsManager)
401+
setupAndStartOperatorMetadataForwarder(metadataLog, mgr.GetClient(), versionInfo.String(), opts, options.CredsManager)
402+
setupAndStartCRDMetadataForwarder(metadataLog, mgr.GetClient(), versionInfo.String(), opts, options.CredsManager)
398403
}()
399404

400405
// +kubebuilder:scaffold:builder
@@ -629,7 +634,8 @@ func setupAndStartCRDMetadataForwarder(logger logr.Logger, client client.Reader,
629634
cmf.Start()
630635
}
631636

632-
func setupAndStartHelmMetadataForwarder(logger logr.Logger, client client.Reader, kubernetesVersion string, credsManager *config.CredentialManager) {
633-
hmf := metadata.NewHelmMetadataForwarder(logger, client, kubernetesVersion, version.GetVersion(), credsManager)
634-
hmf.Start()
637+
func setupAndStartHelmMetadataForwarder(logger logr.Logger, mgr manager.Manager, client client.Reader, kubernetesVersion string, credsManager *config.CredentialManager) error {
638+
hmf := metadata.NewHelmMetadataForwarderWithManager(logger, mgr, client, kubernetesVersion, version.GetVersion(), credsManager)
639+
// Register as a runnable with the manager - will be started after cache sync
640+
return mgr.Add(hmf)
635641
}

pkg/controller/utils/metadata/crd_metadata.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,10 @@ func (cmf *CRDMetadataForwarder) Start() {
110110
}
111111

112112
func (cmf *CRDMetadataForwarder) sendMetadata() error {
113-
allCRDs, listSuccess := cmf.getAllActiveCRDs()
113+
ctx, cancel := context.WithTimeout(context.Background(), DefaultOperationTimeout)
114+
defer cancel()
115+
116+
allCRDs, listSuccess := cmf.getAllActiveCRDs(ctx)
114117
crdsToSend := cmf.getCRDsToSend(allCRDs)
115118

116119
if len(crdsToSend) == 0 {
@@ -122,7 +125,7 @@ func (cmf *CRDMetadataForwarder) sendMetadata() error {
122125

123126
// Send individual payloads for each CRD
124127
for _, crd := range crdsToSend {
125-
if err := cmf.sendCRDMetadata(crd); err != nil {
128+
if err := cmf.sendCRDMetadata(ctx, crd); err != nil {
126129
cmf.logger.V(1).Info("Failed to send metadata", "error", err,
127130
"kind", crd.Kind, "name", crd.Name, "namespace", crd.Namespace)
128131
}
@@ -132,8 +135,8 @@ func (cmf *CRDMetadataForwarder) sendMetadata() error {
132135
return nil
133136
}
134137

135-
func (cmf *CRDMetadataForwarder) sendCRDMetadata(crdInstance CRDInstance) error {
136-
clusterUID, err := cmf.GetOrCreateClusterUID()
138+
func (cmf *CRDMetadataForwarder) sendCRDMetadata(ctx context.Context, crdInstance CRDInstance) error {
139+
clusterUID, err := cmf.GetOrCreateClusterUID(ctx)
137140
if err != nil {
138141
return fmt.Errorf("error getting cluster UID: %w", err)
139142
}
@@ -149,6 +152,8 @@ func (cmf *CRDMetadataForwarder) sendCRDMetadata(crdInstance CRDInstance) error
149152
return fmt.Errorf("error creating request: %w", err)
150153
}
151154

155+
req = req.WithContext(ctx)
156+
152157
resp, err := cmf.httpClient.Do(req)
153158
if err != nil {
154159
return fmt.Errorf("error sending metadata request: %w", err)
@@ -224,7 +229,7 @@ func (cmf *CRDMetadataForwarder) buildPayload(clusterUID string, crdInstance CRD
224229

225230
// getAllActiveCRDs returns all active CRDs and a map of list successes for each CRD type
226231
// Currently only DatadogAgent, DatadogAgentInternal, and DatadogAgentProfile are collected
227-
func (cmf *CRDMetadataForwarder) getAllActiveCRDs() ([]CRDInstance, map[string]bool) {
232+
func (cmf *CRDMetadataForwarder) getAllActiveCRDs(ctx context.Context) ([]CRDInstance, map[string]bool) {
228233
var crds []CRDInstance
229234
listSuccess := make(map[string]bool)
230235

@@ -234,7 +239,7 @@ func (cmf *CRDMetadataForwarder) getAllActiveCRDs() ([]CRDInstance, map[string]b
234239
// DDA
235240
if cmf.enabledCRDs.DatadogAgentEnabled {
236241
ddaList := &v2alpha1.DatadogAgentList{}
237-
if err := cmf.k8sClient.List(context.TODO(), ddaList); err == nil {
242+
if err := cmf.k8sClient.List(ctx, ddaList); err == nil {
238243
listSuccess["DatadogAgent"] = true
239244
for _, dda := range ddaList.Items {
240245
annotations := maps.Clone(dda.Annotations)
@@ -259,7 +264,7 @@ func (cmf *CRDMetadataForwarder) getAllActiveCRDs() ([]CRDInstance, map[string]b
259264
// DDAI
260265
if cmf.enabledCRDs.DatadogAgentInternalEnabled {
261266
ddaiList := &v1alpha1.DatadogAgentInternalList{}
262-
if err := cmf.k8sClient.List(context.TODO(), ddaiList); err == nil {
267+
if err := cmf.k8sClient.List(ctx, ddaiList); err == nil {
263268
listSuccess["DatadogAgentInternal"] = true
264269
for _, ddai := range ddaiList.Items {
265270
annotations := maps.Clone(ddai.Annotations)
@@ -284,7 +289,7 @@ func (cmf *CRDMetadataForwarder) getAllActiveCRDs() ([]CRDInstance, map[string]b
284289
// DAP
285290
if cmf.enabledCRDs.DatadogAgentProfileEnabled {
286291
dapList := &v1alpha1.DatadogAgentProfileList{}
287-
if err := cmf.k8sClient.List(context.TODO(), dapList); err == nil {
292+
if err := cmf.k8sClient.List(ctx, dapList); err == nil {
288293
listSuccess["DatadogAgentProfile"] = true
289294
for _, dap := range dapList.Items {
290295
annotations := maps.Clone(dap.Annotations)

pkg/controller/utils/metadata/credential_setup_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package metadata
77

88
import (
9+
"context"
910
"os"
1011
"strings"
1112
"testing"
@@ -334,7 +335,7 @@ func TestSetupRequestPrerequisites(t *testing.T) {
334335
assert.Equal(t, "application/json", req.Header.Get("Accept"), "Accept header should be set")
335336

336337
// Verify cluster UID is set
337-
clusterUID, err := omf.GetOrCreateClusterUID()
338+
clusterUID, err := omf.GetOrCreateClusterUID(context.Background())
338339
assert.NoError(t, err)
339340
assert.NotEmpty(t, clusterUID, "Cluster UID should be set")
340341
})

0 commit comments

Comments
 (0)