Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 42 additions & 2 deletions app/vlagent/kubernetescollector/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,20 +269,60 @@ func (c *kubeAPIClient) getPod(ctx context.Context, namespace, podName string) (
return p, nil
}

type nodeMetadata struct {
type objectMeta struct {
Name string `json:"name"`
Labels map[string]string `json:"labels"`
Annotations map[string]string `json:"annotations"`
}

type node struct {
Metadata nodeMetadata `json:"metadata"`
Metadata objectMeta `json:"metadata"`
}

type nodeList struct {
Items []node `json:"items"`
}

type namespaceList struct {
Metadata listMetadata `json:"metadata"`
Items []namespace `json:"items"`
}

type namespace struct {
Metadata objectMeta `json:"metadata"`
}

type listMetadata struct {
ResourceVersion string `json:"resourceVersion"`
}

// getAllNamespaces returns all namespaces in the Kubernetes cluster.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.26/#list-namespace-v1-core
func (c *kubeAPIClient) getAllNamespaces(ctx context.Context) (namespaceList, error) {
req := c.mustCreateRequest(ctx, http.MethodGet, "/api/v1/namespaces", nil)
resp, err := c.c.Do(req)
if err != nil {
return namespaceList{}, fmt.Errorf("cannot do %q GET request: %w", req.URL.String(), err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
payload, err := io.ReadAll(resp.Body)
if err != nil {
payload = []byte(err.Error())
}
return namespaceList{}, fmt.Errorf("unexpected status code %d from %q; response: %q", resp.StatusCode, req.URL.String(), payload)
}

var nsl namespaceList
if err := json.NewDecoder(resp.Body).Decode(&nsl); err != nil {
return namespaceList{}, fmt.Errorf("cannot decode response body: %w", err)
}

return nsl, nil
}

// getNodes returns the list of node names in the Kubernetes cluster.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.26/#list-node-v1-core
Expand Down
79 changes: 77 additions & 2 deletions app/vlagent/kubernetescollector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ type kubernetesCollector struct {

currentNode node

// namespaces caches metadata for all namespaces in the cluster.
// namespacesLock guards concurrent access, since refreshNamespaces may run while events are handled.
namespaces map[string]namespace
namespacesLock sync.RWMutex
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please double-check if we really need this mutex here?


ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
Expand Down Expand Up @@ -59,8 +64,16 @@ func startKubernetesCollector(client *kubeAPIClient, currentNodeName, logsPath,
ctx: ctx,
cancel: cancel,
logsPath: logsPath,
namespaces: make(map[string]namespace),
}

nsl, err := client.getAllNamespaces(ctx)
if err != nil {
cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vadimalekseev , it is better from maintainability PoV to put the defer cancel() call directly after the context.WithCancel() call above. This will prevent from possible issues with missing cancel() calls before early return paths from the function.

@vadimalekseev , could you update all the vlagent code according to this recommendation in a separate pull request?

return nil, fmt.Errorf("cannot fetch namespaces: %w; this is required for proper filtering", err)
}
kc.setNamespaces(nsl.Items)

storage := &remotewrite.Storage{}
newProcessor := func(commonFields []logstorage.Field) processor {
return newLogFileProcessor(storage, commonFields)
Expand Down Expand Up @@ -171,8 +184,10 @@ func (kc *kubernetesCollector) startWatchCluster(ctx context.Context, resourceVe
}

func (kc *kubernetesCollector) startReadPodLogs(pod pod) {
ns := kc.getNamespace(pod.Metadata.Namespace)

startRead := func(pc podContainer, cs containerStatus) {
commonFields := getCommonFields(kc.currentNode, pod, cs)
commonFields := getCommonFields(kc.currentNode, ns, pod, cs)

filePath := kc.getLogFilePath(pod, pc, cs)

Expand Down Expand Up @@ -202,7 +217,7 @@ func (kc *kubernetesCollector) startReadPodLogs(pod pod) {
// Must be synced with getCommonFields.
var streamFieldNames = []string{"kubernetes.container_name", "kubernetes.pod_name", "kubernetes.pod_namespace"}

func getCommonFields(n node, p pod, cs containerStatus) []logstorage.Field {
func getCommonFields(n node, ns namespace, p pod, cs containerStatus) []logstorage.Field {
var fs logstorage.Fields

// Fields should match vector.dev kubernetes_source for easy migration.
Expand All @@ -222,6 +237,15 @@ func getCommonFields(n node, p pod, cs containerStatus) []logstorage.Field {
fs.Add(fieldName, v)
}

for k, v := range ns.Metadata.Labels {
fieldName := "kubernetes.namespace_labels." + k
fs.Add(fieldName, v)
}
for k, v := range ns.Metadata.Annotations {
fieldName := "kubernetes.namespace_annotations." + k
fs.Add(fieldName, v)
}

for k, v := range n.Metadata.Labels {
fieldName := "kubernetes.node_labels." + k
fs.Add(fieldName, v)
Expand Down Expand Up @@ -257,3 +281,54 @@ func (kc *kubernetesCollector) stop() {
kc.wg.Wait()
kc.fileCollector.stop()
}

// setNamespaces replaces the cached namespaces map with the provided list.
func (kc *kubernetesCollector) setNamespaces(namespaces []namespace) {
m := make(map[string]namespace, len(namespaces))
for _, ns := range namespaces {
m[ns.Metadata.Name] = ns
}

kc.namespacesLock.Lock()
kc.namespaces = m
kc.namespacesLock.Unlock()
}

// getNamespace returns namespace metadata from cache.
// If the namespace is not in cache (e.g., it was created after vlagent startup),
// this method refreshes the namespace list from Kubernetes API.
// It terminates vlagent if the namespace cannot be found or if the API call fails,
// as processing logs without complete namespace metadata could bypass excludeFilter.
func (kc *kubernetesCollector) getNamespace(namespaceName string) namespace {
kc.namespacesLock.RLock()
ns, ok := kc.namespaces[namespaceName]
kc.namespacesLock.RUnlock()
if ok {
return ns
}

if err := kc.refreshNamespaces(); err != nil {
logger.Fatalf("FATAL: cannot refresh namespaces: %s", err)
}

kc.namespacesLock.RLock()
ns, ok = kc.namespaces[namespaceName]
kc.namespacesLock.RUnlock()
if !ok {
logger.Fatalf("FATAL: cannot find namespace %q in Kubernetes API response", namespaceName)
}
return ns
}

// refreshNamespaces fetches all namespaces from Kubernetes API and updates the cache.
func (kc *kubernetesCollector) refreshNamespaces() error {
nsl, err := kc.client.getAllNamespaces(kc.ctx)
if err != nil {
return fmt.Errorf("cannot fetch namespaces from Kubernetes API: %w", err)
}

kc.setNamespaces(nsl.Items)

logger.Infof("refreshed namespace cache: %d namespaces", len(nsl.Items))
return nil
}
10 changes: 8 additions & 2 deletions app/vlagent/kubernetescollector/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ var (
"Even this setting is disabled, Node labels are available for filtering via -kubernetes.excludeFilter flag")
includeNodeAnnotations = flag.Bool("kubernetesCollector.includeNodeAnnotations", false, "Include Node annotations as additional fields in the log entries. "+
"Even this setting is disabled, Node annotations are available for filtering via -kubernetes.excludeFilter flag")
includeNamespaceLabels = flag.Bool("kubernetesCollector.includeNamespaceLabels", false, "Include Namespace labels as additional fields in the log entries. "+
"Even this setting is disabled, Namespace labels are available for filtering via -kubernetes.excludeFilter flag")
includeNamespaceAnnotations = flag.Bool("kubernetesCollector.includeNamespaceAnnotations", false, "Include Namespace annotations as additional fields in the log entries. "+
"Even this setting is disabled, Namespace annotations are available for filtering via -kubernetes.excludeFilter flag")
)

type logFileProcessor struct {
Expand Down Expand Up @@ -68,13 +72,15 @@ type logFileProcessor struct {
// commonFields must not be modified as they can be accessed from multiple goroutines.
func newLogFileProcessor(storage insertutil.LogRowsStorage, commonFields []logstorage.Field) *logFileProcessor {
// Exclude labels or annotations if they should not be included.
if !*includePodLabels || !*includePodAnnotations || !*includeNodeLabels || !*includeNodeAnnotations {
if !*includePodLabels || !*includePodAnnotations || !*includeNodeLabels || !*includeNodeAnnotations || !*includeNamespaceLabels || !*includeNamespaceAnnotations {
var fields []logstorage.Field
for _, f := range commonFields {
excludeField := !*includePodLabels && strings.HasPrefix(f.Name, "kubernetes.pod_labels.") ||
!*includePodAnnotations && strings.HasPrefix(f.Name, "kubernetes.pod_annotations.") ||
!*includeNodeLabels && strings.HasPrefix(f.Name, "kubernetes.node_labels.") ||
!*includeNodeAnnotations && strings.HasPrefix(f.Name, "kubernetes.node_annotations.")
!*includeNodeAnnotations && strings.HasPrefix(f.Name, "kubernetes.node_annotations.") ||
!*includeNamespaceLabels && strings.HasPrefix(f.Name, "kubernetes.namespace_labels.") ||
!*includeNamespaceAnnotations && strings.HasPrefix(f.Name, "kubernetes.namespace_annotations.")

if !excludeField {
fields = append(fields, f)
Expand Down
14 changes: 10 additions & 4 deletions docs/victorialogs/vlagent.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,16 @@ Supported metadata fields:
- `kubernetes.container_id` - ID of the container in the runtime.
- `kubernetes.pod_labels.*` - any Pod label (e.g., `kubernetes.pod_labels.app`).
- `kubernetes.pod_annotations.*` - any Pod annotation (e.g., `kubernetes.pod_annotation.logging.vlagent.io/exclude`).
- `kubernetes.namespace_labels.*` - any Namespace label (e.g., `kubernetes.namespace_labels.environment`).
- `kubernetes.namespace_annotations.*` - any Namespace annotation (e.g., `kubernetes.namespace_annotations.owner`).
- `kubernetes.node_labels.*` - any Node label (e.g., `kubernetes.io/arch`).
- `kubernetes.node_annotations.*` - any Node annotation (e.g., `disk-type.gke.io/pd-ssd`).

To enable filtering, use the `-kubernetesCollector.excludeFilter` command-line flag with any [LogsQL filter](https://docs.victoriametrics.com/victorialogs/logsql/#filters).
Note that [pipes](https://docs.victoriametrics.com/victorialogs/logsql/#pipes) are not supported in filter expressions.

Note that even if Pod/Node labels and annotations are excluded from logs via `-kubernetesCollector.ignoreFields`
or `-kubernetesCollector.includePodLabels` (the same for Node labels and annotations) flags,
Note that even if Pod/Node/Namespace labels and annotations are excluded from logs via `-kubernetesCollector.ignoreFields`
or `-kubernetesCollector.includePodLabels` (the same for Namespace and Node labels and annotations) flags,
they are still available for filtering via `-kubernetesCollector.excludeFilter` flag.
You don't need to include this information in logs just to be able to filter by it.

Expand All @@ -186,9 +188,13 @@ You can control which metadata fields are attached to every log entry using the
* `-kubernetesCollector.includePodAnnotations` (default: `false`) - attach Pod annotations to every log entry.
* `-kubernetesCollector.includeNodeLabels` (default: `false`) - attach Node labels to every log entry.
* `-kubernetesCollector.includeNodeAnnotations` (default: `false`) - attach Node annotations to every log entry.
* `-kubernetesCollector.includeNamespaceLabels` (default: `false`) - attach Namespace labels to every log entry.
* `-kubernetesCollector.includeNamespaceAnnotations` (default: `false`) - attach Namespace annotations to every log entry.

Note that vlagent does not update node or pod labels during runtime.
Therefore, if node/pod metadata changes, you must restart vlagent to apply those changes.
Note that vlagent does not update node, pod, or namespace labels and annotations during runtime.
vlagent automatically discovers new namespaces created after startup and fetches their metadata.
However, if labels or annotations of existing nodes, pods, or namespaces are changed,
you must restart vlagent to apply those changes.

## Monitoring

Expand Down