Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,21 @@ FROM gcr.io/distroless/static-debian12:nonroot AS distroless-static
############# fluent-bit-plugin #############
FROM distroless-static AS fluent-bit-plugin

COPY --from=builder /go/src/github.com/gardener/logging/build/out_vali.so /source/plugins/out_vali.so
COPY --from=builder /go/src/github.com/gardener/logging/build/output_plugin.so /source/plugins/output_plugin.so
COPY --from=builder /go/src/github.com/gardener/logging/build/copy /bin/cp

WORKDIR /

CMD ["/bin/cp", "/source/plugins/out_vali.so", "/plugins"]
CMD ["/bin/cp", "/source/plugins/output_plugin.so", "/plugins"]

############# fluent-bit-vali #############
FROM ghcr.io/fluent/fluent-operator/fluent-bit:4.1.0 AS fluent-bit-vali
############# fluent-bit-output #############
FROM ghcr.io/fluent/fluent-operator/fluent-bit:4.1.0 AS fluent-bit-output

COPY --from=builder /go/src/github.com/gardener/logging/build/out_vali.so /fluent-bit/plugins/out_vali.so
COPY --from=builder /go/src/github.com/gardener/logging/build/output_plugin.so /fluent-bit/plugins/output_plugin.so

WORKDIR /

CMD ["-e", "/fluent-bit/plugins/out_vali.so", "-c", "/fluent-bit/config/fluent-bit.conf"]
CMD ["-e", "/fluent-bit/plugins/output_plugin.so", "-c", "/fluent-bit/config/fluent-bit.conf"]

############# curator #############
FROM distroless-static AS curator
Expand Down
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ plugin: tidy
@GOOS=$(BUILD_PLATFORM) \
GOARCH=$(BUILD_ARCH) \
go build -buildmode=c-shared \
-o $(REPO_ROOT)/build/out_vali.so \
-o $(REPO_ROOT)/build/output_plugin.so \
-ldflags="$(LD_FLAGS)" \
./cmd/fluent-bit-vali-plugin
./cmd/fluent-bit-output-plugin

.PHONY: curator
curator: tidy
Expand Down Expand Up @@ -90,7 +90,7 @@ docker-images:
$(FLUENT_BIT_TO_VALI_IMAGE_REPOSITORY) $(IMAGE_TAG)

@BUILD_ARCH=$(BUILD_ARCH) \
$(REPO_ROOT)/hack/docker-image-build.sh "fluent-bit-vali" \
$(REPO_ROOT)/hack/docker-image-build.sh "fluent-bit-output" \
$(FLUENT_BIT_VALI_IMAGE_REPOSITORY) $(IMAGE_TAG)

@BUILD_ARCH=$(BUILD_ARCH) \
Expand Down Expand Up @@ -159,7 +159,7 @@ check: tidy
.PHONY: test
test: tidy
@go tool gotestsum $(REPO_ROOT)/pkg/... --v --ginkgo.v --ginkgo.no-color
@go tool gotestsum $(REPO_ROOT)/tests/valiplugin
@go tool gotestsum $(REPO_ROOT)/tests/plugin

.PHONY: e2e-tests
e2e-tests: tidy
Expand Down
4 changes: 3 additions & 1 deletion REUSE.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ SPDX-FileCopyrightText = "2017-2024 SAP SE or an SAP affiliate company and Garde
SPDX-License-Identifier = "CC-BY-4.0"

[[annotations]]
path = ["cmd/fluent-bit-vali-plugin/out_vali.go", "pkg/config/config.go", "pkg/config/plugin_config.go", "pkg/config/client_config.go", "pkg/valiplugin/vali.go", "pkg/valiplugin/utils.go", "pkg/client/buffer_client.go", "pkg/client/dque.go", "pkg/client/buffer.go"]
path = ["cmd/fluent-bit-output-plugin/output_plugin.go", "pkg/config/config.go", "pkg/config/plugin_config.go",
"pkg/config/client_config.go", "pkg/plugin/vali.go", "pkg/plugin/utils.go", "pkg/client/buffer_client.go",
"pkg/client/dque.go", "pkg/client/buffer.go"]
precedence = "aggregate"
SPDX-FileCopyrightText = "2017-2024 SAP SE or an SAP affiliate company and Gardener contributors"
SPDX-License-Identifier = "Apache-2.0"
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ import (
"github.com/gardener/logging/pkg/config"
"github.com/gardener/logging/pkg/healthz"
"github.com/gardener/logging/pkg/metrics"
"github.com/gardener/logging/pkg/valiplugin"
"github.com/gardener/logging/pkg/plugin"
)

var (
// registered vali plugin instances, required for disposal during shutdown
plugins []valiplugin.Vali
plugins []plugin.OutputPlugin
pluginsMutex sync.RWMutex
logger log.Logger
informer cache.SharedIndexInformer
Expand Down Expand Up @@ -172,7 +172,7 @@ func (c *pluginConfig) toStringMap() map[string]string {
//
//export FLBPluginRegister
func FLBPluginRegister(ctx unsafe.Pointer) int {
return output.FLBPluginRegister(ctx, "gardenervali", "Ship fluent-bit logs to Credativ Vali")
return output.FLBPluginRegister(ctx, "gardenervali", "Ship fluent-bit logs to Credativ OutputPlugin")
}

// FLBPluginInit is called for each vali plugin instance
Expand All @@ -183,8 +183,8 @@ func FLBPluginRegister(ctx unsafe.Pointer) int {
//export FLBPluginInit
func FLBPluginInit(ctx unsafe.Pointer) int {
// shall create only if not found in the context and in plugins slice
if present := output.FLBPluginGetContext(ctx); present != nil && pluginsContains(present.(valiplugin.Vali)) {
_ = level.Info(logger).Log("msg", "plugin already present")
if present := output.FLBPluginGetContext(ctx); present != nil && pluginsContains(present.(plugin.OutputPlugin)) {
_ = level.Info(logger).Log("msg", "outputPlugin already present")

return output.FLB_OK
}
Expand All @@ -211,23 +211,23 @@ func FLBPluginInit(ctx unsafe.Pointer) int {

dumpConfiguration(_logger, conf)

plugin, err := valiplugin.NewPlugin(informer, conf, _logger)
outputPlugin, err := plugin.NewPlugin(informer, conf, _logger)
if err != nil {
metrics.Errors.WithLabelValues(metrics.ErrorNewPlugin).Inc()
_ = level.Error(_logger).Log("msg", "error creating plugin", "err", err)
_ = level.Error(_logger).Log("msg", "error creating outputPlugin", "err", err)

return output.FLB_ERROR
}

// register plugin instance, to be retrievable when sending logs
output.FLBPluginSetContext(ctx, plugin)
// remember plugin instance, required to cleanly dispose when fluent-bit is shutting down
// register outputPlugin instance, to be retrievable when sending logs
output.FLBPluginSetContext(ctx, outputPlugin)
// remember outputPlugin instance, required to cleanly dispose when fluent-bit is shutting down
pluginsMutex.Lock()
plugins = append(plugins, plugin)
plugins = append(plugins, outputPlugin)
pluginsMutex.Unlock()

_ = level.Info(_logger).Log(
"msg", "plugin initialized",
"msg", "outputPlugin initialized",
"length", len(plugins))

return output.FLB_OK
Expand All @@ -237,10 +237,10 @@ func FLBPluginInit(ctx unsafe.Pointer) int {
//
//export FLBPluginFlushCtx
func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int {
plugin, ok := output.FLBPluginGetContext(ctx).(valiplugin.Vali)
outputPlugin, ok := output.FLBPluginGetContext(ctx).(plugin.OutputPlugin)
if !ok {
metrics.Errors.WithLabelValues(metrics.ErrorFLBPluginFlushCtx).Inc()
_ = level.Error(logger).Log("[flb-go]", "plugin not initialized")
_ = level.Error(logger).Log("[flb-go]", "outputPlugin not initialized")

return output.FLB_ERROR
}
Expand Down Expand Up @@ -268,15 +268,15 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int
timestamp = time.Now()
}

err := plugin.SendRecord(record, timestamp)
err := outputPlugin.SendRecord(record, timestamp)
if err != nil {
_ = level.Error(logger).Log(
"msg", "error sending record, retrying...",
"tag", C.GoString(tag),
"err", err.Error(),
)

return output.FLB_RETRY // max retry of the plugin is set to 3, then it shall be discarded by fluent-bit
return output.FLB_RETRY // max retry of the outputPlugin is set to 3, then it shall be discarded by fluent-bit
}
}

Expand All @@ -292,16 +292,16 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int
//
//export FLBPluginExitCtx
func FLBPluginExitCtx(ctx unsafe.Pointer) int {
plugin, ok := output.FLBPluginGetContext(ctx).(valiplugin.Vali)
outputPlugin, ok := output.FLBPluginGetContext(ctx).(plugin.OutputPlugin)
if !ok {
_ = level.Error(logger).Log("[flb-go]", "plugin not known")
_ = level.Error(logger).Log("[flb-go]", "outputPlugin not known")

return output.FLB_ERROR
}
plugin.Close()
pluginsRemove(plugin)
outputPlugin.Close()
pluginsRemove(outputPlugin)
_ = level.Info(logger).Log(
"msg", "plugin removed",
"msg", "outputPlugin removed",
"length", len(plugins),
)

Expand All @@ -312,8 +312,8 @@ func FLBPluginExitCtx(ctx unsafe.Pointer) int {
//
//export FLBPluginExit
func FLBPluginExit() int {
for _, plugin := range plugins {
plugin.Close()
for _, outputPlugin := range plugins {
outputPlugin.Close()
}
if informerStopChan != nil {
close(informerStopChan)
Expand Down Expand Up @@ -417,23 +417,23 @@ func dumpConfiguration(_logger log.Logger, conf *config.Config) {
_ = level.Debug(paramLogger).Log("SendLogsToDefaultClientWhenClusterIsInMigrationState", fmt.Sprintf("%+v", conf.ControllerConfig.SeedControllerClientConfig.SendLogsWhenIsInMigrationState))
}

func pluginsContains(present valiplugin.Vali) bool {
func pluginsContains(present plugin.OutputPlugin) bool {
pluginsMutex.RLock()
defer pluginsMutex.Unlock()
for _, plugin := range plugins {
if present == plugin {
for _, outputPlugin := range plugins {
if present == outputPlugin {
return true
}
}

return false
}

func pluginsRemove(plugin valiplugin.Vali) {
func pluginsRemove(outputPlugin plugin.OutputPlugin) {
pluginsMutex.Lock()
defer pluginsMutex.Unlock()
for i, p := range plugins {
if plugin == p {
if outputPlugin == p {
plugins = append(plugins[:i], plugins[i+1:]...)

return
Expand Down
3 changes: 0 additions & 3 deletions pkg/client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,3 @@ type Entry struct {

// NewValiClientFunc returns a ValiClient on success.
type NewValiClientFunc func(cfg config.Config, logger log.Logger) (ValiClient, error)

// NewValiClientDecoratorFunc return ValiClient which wraps another ValiClient
type NewValiClientDecoratorFunc func(cfg config.Config, client ValiClient, logger log.Logger) (ValiClient, error)
4 changes: 2 additions & 2 deletions pkg/config/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type PluginConfig struct {
DynamicHostRegex string `mapstructure:"DynamicHostRegex"`
// KubernetesMetadata holds kubernetes metadata extraction configuration
KubernetesMetadata KubernetesMetadataExtraction `mapstructure:",squash"`
// DynamicTenant contains specs for the valiplugin dynamic functionality
// DynamicTenant contains specs for the plugin dynamic functionality
DynamicTenant DynamicTenant `mapstructure:",squash"`
// LabelSetInitCapacity sets the initial capacity for label sets
LabelSetInitCapacity int `mapstructure:"LabelSetInitCapacity"`
Expand All @@ -57,7 +57,7 @@ type KubernetesMetadataExtraction struct {
TagExpression string `mapstructure:"TagExpression"`
}

// DynamicTenant contains specs for the valiplugin dynamic functionality
// DynamicTenant contains specs for the plugin dynamic functionality
type DynamicTenant struct {
Tenant string `mapstructure:"-"`
Field string `mapstructure:"-"`
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type controllerClient struct {

var _ client.ValiClient = &controllerClient{}

// Client is a Vali client for the valiplugin controller
// Client is a Vali client for the plugin controller
type Client interface {
client.ValiClient
GetState() clusterState
Expand Down
11 changes: 4 additions & 7 deletions pkg/valiplugin/utils.go → pkg/plugin/utils.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
/*
This file was copied from the credativ/vali project
https://github.com/credativ/vali/blob/v2.2.4/cmd/fluent-bit/vali.go
// SPDX-FileCopyrightText: 2024 SAP SE or an SAP affiliate company and Gardener contributors
//
// SPDX-License-Identifier: Apache-2.0

Modifications Copyright SAP SE or an SAP affiliate company and Gardener contributors
*/

package valiplugin
package plugin

import (
"bytes"
Expand Down
4 changes: 2 additions & 2 deletions pkg/valiplugin/utils_test.go → pkg/plugin/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// SPDX-License-Identifier: Apache-2.0

package valiplugin
package plugin

import (
"errors"
Expand Down Expand Up @@ -68,7 +68,7 @@ type fallbackToTagWhenMetadataIsMissing struct {
err error
}

var _ = ginkgov2.Describe("Vali plugin utils", func() {
var _ = ginkgov2.Describe("OutputPlugin plugin utils", func() {
ginkgov2.DescribeTable("#createLine",
func(args createLineArgs) {
got, err := createLine(args.records, args.f)
Expand Down
24 changes: 13 additions & 11 deletions pkg/valiplugin/vali.go → pkg/plugin/vali.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
/*
This file was copied from the credativ/vali project
https://github.com/credativ/vali/blob/v2.2.4/cmd/fluent-bit/vali.go
// SPDX-FileCopyrightText: 2024 SAP SE or an SAP affiliate company and Gardener contributors
//
// SPDX-License-Identifier: Apache-2.0

Modifications Copyright SAP SE or an SAP affiliate company and Gardener contributors
*/

package valiplugin
package plugin

import (
"fmt"
Expand All @@ -25,8 +22,8 @@ import (
"github.com/gardener/logging/pkg/metrics"
)

// Vali plugin interface
type Vali interface {
// OutputPlugin plugin interface
type OutputPlugin interface {
SendRecord(r map[any]any, ts time.Time) error
Close()
}
Expand All @@ -43,8 +40,8 @@ type vali struct {
logger log.Logger
}

// NewPlugin returns Vali output plugin
func NewPlugin(informer cache.SharedIndexInformer, cfg *config.Config, logger log.Logger) (Vali, error) {
// NewPlugin returns OutputPlugin output plugin
func NewPlugin(informer cache.SharedIndexInformer, cfg *config.Config, logger log.Logger) (OutputPlugin, error) {
var err error
v := &vali{cfg: cfg, logger: logger}

Expand Down Expand Up @@ -169,6 +166,11 @@ func (v *vali) SendRecord(r map[any]any, ts time.Time) error {
return nil
}

// client.ValiClient - actual client chain to send the log to
// valitail or otlp, dynamicHostName is extracted from DynamicHostPath field
// in the record and must match DynamicHostRegex
// example shoot--local--local
// DynamicHostPath is json form "{"kubernetes": {"namespace_name": "namespace"}}"
c := v.getClient(dynamicHostName)

if c == nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// SPDX-License-Identifier: Apache-2.0

package valiplugin_test
package plugin_test

import (
"testing"
Expand Down
4 changes: 2 additions & 2 deletions pkg/valiplugin/vali_test.go → pkg/plugin/vali_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// SPDX-License-Identifier: Apache-2.0

package valiplugin
package plugin

import (
"os"
Expand Down Expand Up @@ -91,7 +91,7 @@ var (
logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
)

var _ = ginkgov2.Describe("Vali plugin", func() {
var _ = ginkgov2.Describe("OutputPlugin plugin", func() {
var (
simpleRecordFixture = map[any]any{
"foo": "bar",
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestMain(m *testing.M) {
envfuncs.CreateNamespace(shootNamespace),
envfuncs.CreateNamespace(seedNamespace),
envfuncs.SetupCRDs("./config", "*-crd.yaml"),
createContainerImage(pluginUnderTest, "fluent-bit-vali"),
createContainerImage(pluginUnderTest, "fluent-bit-output"),
createContainerImage(eventLoggerUnderTest, "event-logger"),
envfuncs.LoadImageToCluster(kindClusterName, pluginUnderTest),
envfuncs.LoadImageToCluster(kindClusterName, eventLoggerUnderTest),
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// SPDX-License-Identifier: Apache-2.0

package valiplugin_test
package plugin_test

import (
"testing"
Expand Down
Loading
Loading