Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions app/vlagent/kubernetescollector/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,16 @@ type nodeList struct {
Items []node `json:"items"`
}

type namespaceMetadata struct {
Name string `json:"name"`
Labels map[string]string `json:"labels"`
Annotations map[string]string `json:"annotations"`
}

type namespace struct {
Metadata namespaceMetadata `json:"metadata"`
Comment thread
withlin marked this conversation as resolved.
Outdated
}

// getNodes returns the list of node names in the Kubernetes cluster.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.26/#list-node-v1-core
Expand Down Expand Up @@ -341,6 +351,33 @@ func (c *kubeAPIClient) getNodeByName(ctx context.Context, nodeName string) (nod
return n, nil
}

// getNamespace returns the namespace with the given name.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.26/#read-namespace-v1-core
func (c *kubeAPIClient) getNamespace(ctx context.Context, namespaceName string) (namespace, error) {
req := c.mustCreateRequest(ctx, http.MethodGet, "/api/v1/namespaces/"+namespaceName, nil)
resp, err := c.c.Do(req)
if err != nil {
return namespace{}, fmt.Errorf("cannot do %q GET request: %w", req.URL.String(), err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
payload, err := io.ReadAll(resp.Body)
if err != nil {
payload = []byte(err.Error())
}
return namespace{}, fmt.Errorf("unexpected status code %d from %q; response: %q", resp.StatusCode, req.URL.String(), payload)
}

var ns namespace
if err := json.NewDecoder(resp.Body).Decode(&ns); err != nil {
return namespace{}, fmt.Errorf("cannot decode response body: %w", err)
}

return ns, nil
}

func (c *kubeAPIClient) mustCreateRequest(ctx context.Context, method, urlPath string, args url.Values) *http.Request {
req, err := http.NewRequestWithContext(ctx, method, "/", nil)
if err != nil {
Expand Down
44 changes: 42 additions & 2 deletions app/vlagent/kubernetescollector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ type kubernetesCollector struct {

currentNode node

// namespaces caches metadata for already requested namespaces to avoid repeated API calls.
namespaces map[string]namespace
namespacesLock sync.Mutex

ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
Expand Down Expand Up @@ -59,6 +63,7 @@ func startKubernetesCollector(client *kubeAPIClient, currentNodeName, logsPath,
ctx: ctx,
cancel: cancel,
logsPath: logsPath,
namespaces: make(map[string]namespace),
}

storage := &remotewrite.Storage{}
Expand Down Expand Up @@ -172,7 +177,8 @@ func (kc *kubernetesCollector) startWatchCluster(ctx context.Context, resourceVe

func (kc *kubernetesCollector) startReadPodLogs(pod pod) {
startRead := func(pc podContainer, cs containerStatus) {
commonFields := getCommonFields(kc.currentNode, pod, cs)
ns := kc.getNamespace(pod.Metadata.Namespace)
Comment thread
withlin marked this conversation as resolved.
Outdated
commonFields := getCommonFields(kc.currentNode, ns, pod, cs)

filePath := kc.getLogFilePath(pod, pc, cs)

Expand Down Expand Up @@ -202,7 +208,7 @@ func (kc *kubernetesCollector) startReadPodLogs(pod pod) {
// Must be synced with getCommonFields.
var streamFieldNames = []string{"kubernetes.container_name", "kubernetes.pod_name", "kubernetes.pod_namespace"}

func getCommonFields(n node, p pod, cs containerStatus) []logstorage.Field {
func getCommonFields(n node, ns namespace, p pod, cs containerStatus) []logstorage.Field {
var fs logstorage.Fields

// Fields should match vector.dev kubernetes_source for easy migration.
Expand All @@ -222,6 +228,15 @@ func getCommonFields(n node, p pod, cs containerStatus) []logstorage.Field {
fs.Add(fieldName, v)
}

for k, v := range ns.Metadata.Labels {
fieldName := "kubernetes.namespace_labels." + k
fs.Add(fieldName, v)
}
for k, v := range ns.Metadata.Annotations {
fieldName := "kubernetes.namespace_annotations." + k
fs.Add(fieldName, v)
}

for k, v := range n.Metadata.Labels {
fieldName := "kubernetes.node_labels." + k
fs.Add(fieldName, v)
Expand Down Expand Up @@ -257,3 +272,28 @@ func (kc *kubernetesCollector) stop() {
kc.wg.Wait()
kc.fileCollector.stop()
}

func (kc *kubernetesCollector) getNamespace(namespaceName string) namespace {
kc.namespacesLock.Lock()
ns, ok := kc.namespaces[namespaceName]
kc.namespacesLock.Unlock()
if ok {
return ns
}

ns, err := kc.client.getNamespace(kc.ctx, namespaceName)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@valyala, can we use updateNeededFields to determine if the excludeFilter relies on namespace metadata? This would help us decide whether fetching this information from the Kubernetes API is necessary.

For example:

pf := &prefixfilter.Filter{}
fc.excludeFilter.UpdateNeededFields(pf)

if *includeNamespaceLabels || *includeNamespaceAnnotations ||
	pf.MatchStringOrWildcard("kubernetes.namespace_labels.*") ||
	pf.MatchStringOrWildcard("kubernetes.namespace_annotations.*") {
	// fetch namespace labels and annotations from Kubernetes API server...
}

If the answer is 'yes' let's address this in a separate PR to keep this contribution simple.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use updateNeededFields to determine if the excludeFilter relies on namespace metadata?

I'm not entirely convinced this is a useful feature, as it won't really save much. There aren't many namespaces, and storing them in memory is practically free.

However it could allow us to avoid requesting permissions for resources we don't actually need and completely avoid synchronous network calls within the startRead function, even when encountering a new namespace

if err != nil {
logger.Errorf("cannot get namespace %q metadata: %s", namespaceName, err)
return namespace{
Comment thread
vadimalekseev marked this conversation as resolved.
Outdated
Metadata: namespaceMetadata{
Name: namespaceName,
},
}
}

kc.namespacesLock.Lock()
kc.namespaces[namespaceName] = ns
kc.namespacesLock.Unlock()

return ns
}
10 changes: 8 additions & 2 deletions app/vlagent/kubernetescollector/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ var (
"Even this setting is disabled, Node labels are available for filtering via -kubernetes.excludeFilter flag")
includeNodeAnnotations = flag.Bool("kubernetesCollector.includeNodeAnnotations", false, "Include Node annotations as additional fields in the log entries. "+
"Even this setting is disabled, Node annotations are available for filtering via -kubernetes.excludeFilter flag")
includeNamespaceLabels = flag.Bool("kubernetesCollector.includeNamespaceLabels", false, "Include Namespace labels as additional fields in the log entries. "+
"Even this setting is disabled, Namespace labels are available for filtering via -kubernetes.excludeFilter flag")
includeNamespaceAnnotations = flag.Bool("kubernetesCollector.includeNamespaceAnnotations", false, "Include Namespace annotations as additional fields in the log entries. "+
"Even this setting is disabled, Namespace annotations are available for filtering via -kubernetes.excludeFilter flag")
)

type logFileProcessor struct {
Expand Down Expand Up @@ -68,13 +72,15 @@ type logFileProcessor struct {
// commonFields must not be modified as they can be accessed from multiple goroutines.
func newLogFileProcessor(storage insertutil.LogRowsStorage, commonFields []logstorage.Field) *logFileProcessor {
// Exclude labels or annotations if they should not be included.
if !*includePodLabels || !*includePodAnnotations || !*includeNodeLabels || !*includeNodeAnnotations {
if !*includePodLabels || !*includePodAnnotations || !*includeNodeLabels || !*includeNodeAnnotations || !*includeNamespaceLabels || !*includeNamespaceAnnotations {
var fields []logstorage.Field
for _, f := range commonFields {
excludeField := !*includePodLabels && strings.HasPrefix(f.Name, "kubernetes.pod_labels.") ||
!*includePodAnnotations && strings.HasPrefix(f.Name, "kubernetes.pod_annotations.") ||
!*includeNodeLabels && strings.HasPrefix(f.Name, "kubernetes.node_labels.") ||
!*includeNodeAnnotations && strings.HasPrefix(f.Name, "kubernetes.node_annotations.")
!*includeNodeAnnotations && strings.HasPrefix(f.Name, "kubernetes.node_annotations.") ||
!*includeNamespaceLabels && strings.HasPrefix(f.Name, "kubernetes.namespace_labels.") ||
!*includeNamespaceAnnotations && strings.HasPrefix(f.Name, "kubernetes.namespace_annotations.")

if !excludeField {
fields = append(fields, f)
Expand Down
8 changes: 6 additions & 2 deletions docs/victorialogs/vlagent.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,16 @@ Supported metadata fields:
- `kubernetes.container_id` - ID of the container in the runtime.
- `kubernetes.pod_labels.*` - any Pod label (e.g., `kubernetes.pod_labels.app`).
- `kubernetes.pod_annotations.*` - any Pod annotation (e.g., `kubernetes.pod_annotation.logging.vlagent.io/exclude`).
- `kubernetes.namespace_labels.*` - any Namespace label (e.g., `kubernetes.namespace_labels.environment`).
- `kubernetes.namespace_annotations.*` - any Namespace annotation (e.g., `kubernetes.namespace_annotations.owner`).
- `kubernetes.node_labels.*` - any Node label (e.g., `kubernetes.io/arch`).
- `kubernetes.node_annotations.*` - any Node annotation (e.g., `disk-type.gke.io/pd-ssd`).

To enable filtering, use the `-kubernetesCollector.excludeFilter` command-line flag with any [LogsQL filter](https://docs.victoriametrics.com/victorialogs/logsql/#filters).
Note that [pipes](https://docs.victoriametrics.com/victorialogs/logsql/#pipes) are not supported in filter expressions.

Note that even if Pod/Node labels and annotations are excluded from logs via `-kubernetesCollector.ignoreFields`
or `-kubernetesCollector.includePodLabels` (the same for Node labels and annotations) flags,
Note that even if Pod/Node/Namespace labels and annotations are excluded from logs via `-kubernetesCollector.ignoreFields`
or `-kubernetesCollector.includePodLabels` (the same for Namespace and Node labels and annotations) flags,
they are still available for filtering via `-kubernetesCollector.excludeFilter` flag.
You don't need to include this information in logs just to be able to filter by it.

Expand All @@ -186,6 +188,8 @@ You can control which metadata fields are attached to every log entry using the
* `-kubernetesCollector.includePodAnnotations` (default: `false`) - attach Pod annotations to every log entry.
* `-kubernetesCollector.includeNodeLabels` (default: `false`) - attach Node labels to every log entry.
* `-kubernetesCollector.includeNodeAnnotations` (default: `false`) - attach Node annotations to every log entry.
* `-kubernetesCollector.includeNamespaceLabels` (default: `false`) - attach Namespace labels to every log entry.
* `-kubernetesCollector.includeNamespaceAnnotations` (default: `false`) - attach Namespace annotations to every log entry.

Note that vlagent does not update node or pod labels during runtime.
Therefore, if node/pod metadata changes, you must restart vlagent to apply those changes.
Expand Down