-
Notifications
You must be signed in to change notification settings - Fork 91
app/vlagent: feat: add namespace labels/annotations metadata and flags #949
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,11 @@ type kubernetesCollector struct { | |
|
|
||
| currentNode node | ||
|
|
||
| // namespaces caches metadata for all namespaces in the cluster. | ||
| // namespacesLock guards concurrent access, since refreshNamespaces may run while events are handled. | ||
| namespaces map[string]namespace | ||
| namespacesLock sync.RWMutex | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you please double-check if we really need this mutex here? |
||
|
|
||
| ctx context.Context | ||
| cancel context.CancelFunc | ||
| wg sync.WaitGroup | ||
|
|
@@ -59,8 +64,16 @@ func startKubernetesCollector(client *kubeAPIClient, currentNodeName, logsPath, | |
| ctx: ctx, | ||
| cancel: cancel, | ||
| logsPath: logsPath, | ||
| namespaces: make(map[string]namespace), | ||
| } | ||
|
|
||
| nsl, err := client.getAllNamespaces(ctx) | ||
| if err != nil { | ||
| cancel() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @vadimalekseev , it is better from maintainability PoV to put the @vadimalekseev , could you update all the |
||
| return nil, fmt.Errorf("cannot fetch namespaces: %w; this is required for proper filtering", err) | ||
| } | ||
| kc.setNamespaces(nsl.Items) | ||
|
|
||
| storage := &remotewrite.Storage{} | ||
| newProcessor := func(commonFields []logstorage.Field) processor { | ||
| return newLogFileProcessor(storage, commonFields) | ||
|
|
@@ -171,8 +184,10 @@ func (kc *kubernetesCollector) startWatchCluster(ctx context.Context, resourceVe | |
| } | ||
|
|
||
| func (kc *kubernetesCollector) startReadPodLogs(pod pod) { | ||
| ns := kc.getNamespace(pod.Metadata.Namespace) | ||
|
|
||
| startRead := func(pc podContainer, cs containerStatus) { | ||
| commonFields := getCommonFields(kc.currentNode, pod, cs) | ||
| commonFields := getCommonFields(kc.currentNode, ns, pod, cs) | ||
|
|
||
| filePath := kc.getLogFilePath(pod, pc, cs) | ||
|
|
||
|
|
@@ -202,7 +217,7 @@ func (kc *kubernetesCollector) startReadPodLogs(pod pod) { | |
| // Must be synced with getCommonFields. | ||
| var streamFieldNames = []string{"kubernetes.container_name", "kubernetes.pod_name", "kubernetes.pod_namespace"} | ||
|
|
||
| func getCommonFields(n node, p pod, cs containerStatus) []logstorage.Field { | ||
| func getCommonFields(n node, ns namespace, p pod, cs containerStatus) []logstorage.Field { | ||
| var fs logstorage.Fields | ||
|
|
||
| // Fields should match vector.dev kubernetes_source for easy migration. | ||
|
|
@@ -222,6 +237,15 @@ func getCommonFields(n node, p pod, cs containerStatus) []logstorage.Field { | |
| fs.Add(fieldName, v) | ||
| } | ||
|
|
||
| for k, v := range ns.Metadata.Labels { | ||
| fieldName := "kubernetes.namespace_labels." + k | ||
| fs.Add(fieldName, v) | ||
| } | ||
| for k, v := range ns.Metadata.Annotations { | ||
| fieldName := "kubernetes.namespace_annotations." + k | ||
| fs.Add(fieldName, v) | ||
| } | ||
|
|
||
| for k, v := range n.Metadata.Labels { | ||
| fieldName := "kubernetes.node_labels." + k | ||
| fs.Add(fieldName, v) | ||
|
|
@@ -257,3 +281,54 @@ func (kc *kubernetesCollector) stop() { | |
| kc.wg.Wait() | ||
| kc.fileCollector.stop() | ||
| } | ||
|
|
||
| // setNamespaces replaces the cached namespaces map with the provided list. | ||
| func (kc *kubernetesCollector) setNamespaces(namespaces []namespace) { | ||
| m := make(map[string]namespace, len(namespaces)) | ||
| for _, ns := range namespaces { | ||
| m[ns.Metadata.Name] = ns | ||
| } | ||
|
|
||
| kc.namespacesLock.Lock() | ||
| kc.namespaces = m | ||
| kc.namespacesLock.Unlock() | ||
| } | ||
|
|
||
| // getNamespace returns namespace metadata from cache. | ||
| // If the namespace is not in cache (e.g., it was created after vlagent startup), | ||
| // this method refreshes the namespace list from Kubernetes API. | ||
| // It terminates vlagent if the namespace cannot be found or if the API call fails, | ||
| // as processing logs without complete namespace metadata could bypass excludeFilter. | ||
| func (kc *kubernetesCollector) getNamespace(namespaceName string) namespace { | ||
| kc.namespacesLock.RLock() | ||
| ns, ok := kc.namespaces[namespaceName] | ||
| kc.namespacesLock.RUnlock() | ||
| if ok { | ||
| return ns | ||
| } | ||
|
|
||
| if err := kc.refreshNamespaces(); err != nil { | ||
| logger.Fatalf("FATAL: cannot refresh namespaces: %s", err) | ||
| } | ||
|
|
||
| kc.namespacesLock.RLock() | ||
| ns, ok = kc.namespaces[namespaceName] | ||
| kc.namespacesLock.RUnlock() | ||
| if !ok { | ||
| logger.Fatalf("FATAL: cannot find namespace %q in Kubernetes API response", namespaceName) | ||
| } | ||
| return ns | ||
| } | ||
|
|
||
| // refreshNamespaces fetches all namespaces from Kubernetes API and updates the cache. | ||
| func (kc *kubernetesCollector) refreshNamespaces() error { | ||
| nsl, err := kc.client.getAllNamespaces(kc.ctx) | ||
| if err != nil { | ||
| return fmt.Errorf("cannot fetch namespaces from Kubernetes API: %w", err) | ||
| } | ||
|
|
||
| kc.setNamespaces(nsl.Items) | ||
|
|
||
| logger.Infof("refreshed namespace cache: %d namespaces", len(nsl.Items)) | ||
| return nil | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.