Skip to content

Commit 43b45c8

Browse files
support splunk data ingestion
1 parent 9109416 commit 43b45c8

9 files changed

Lines changed: 452 additions & 20 deletions

File tree

app/vlinsert/datadog/datadog.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@ var parserPool fastjson.ParserPool
3232
// RequestHandler processes Datadog insert requests
3333
func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
3434
switch path {
35-
case "/insert/datadog/api/v1/validate":
35+
case "/api/v1/validate":
3636
fmt.Fprintf(w, `{}`)
3737
return true
38-
case "/insert/datadog/api/v2/logs":
38+
case "/api/v2/logs":
3939
return datadogLogsIngestion(w, r)
4040
default:
4141
return false

app/vlinsert/main.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/VictoriaMetrics/VictoriaLogs/app/vlinsert/loki"
1717
"github.com/VictoriaMetrics/VictoriaLogs/app/vlinsert/nativeinsert"
1818
"github.com/VictoriaMetrics/VictoriaLogs/app/vlinsert/opentelemetry"
19+
"github.com/VictoriaMetrics/VictoriaLogs/app/vlinsert/splunk"
1920
"github.com/VictoriaMetrics/VictoriaLogs/app/vlinsert/syslog"
2021
)
2122

@@ -56,6 +57,11 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
5657
return true
5758
}
5859

60+
switch {
61+
case strings.HasPrefix(path, "/services/collector"):
62+
return splunk.RequestHandler(path, w, r)
63+
}
64+
5965
return false
6066
}
6167

@@ -78,15 +84,18 @@ func insertHandler(w http.ResponseWriter, r *http.Request, path string) bool {
7884
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8353
7985
case strings.HasPrefix(path, "/insert/elasticsearch"):
8086
return elasticsearch.RequestHandler(path, w, r)
81-
87+
case strings.HasPrefix(path, "/insert/splunk"):
88+
idx := len("/insert/splunk")
89+
return splunk.RequestHandler(path[idx:], w, r)
8290
case strings.HasPrefix(path, "/insert/loki/"):
8391
return loki.RequestHandler(path, w, r)
8492
case strings.HasPrefix(path, "/insert/opentelemetry/"):
8593
return opentelemetry.RequestHandler(path, w, r)
8694
case strings.HasPrefix(path, "/insert/journald/"):
8795
return journald.RequestHandler(path, w, r)
8896
case strings.HasPrefix(path, "/insert/datadog/"):
89-
return datadog.RequestHandler(path, w, r)
97+
idx := len("/insert/datadog")
98+
return datadog.RequestHandler(path[idx:], w, r)
9099
}
91100

92101
return false

app/vlinsert/splunk/splunk.go

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
package splunk
2+
3+
import (
4+
"flag"
5+
"fmt"
6+
"io"
7+
"net/http"
8+
"strings"
9+
"time"
10+
11+
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
12+
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
13+
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
14+
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
15+
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
16+
"github.com/VictoriaMetrics/metrics"
17+
18+
"github.com/VictoriaMetrics/VictoriaLogs/app/vlinsert/insertutil"
19+
"github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage"
20+
)
21+
22+
var (
23+
splunkStreamFields = flagutil.NewArrayString("splunk.streamFields", "Comma-separated list of fields to use as log stream fields for logs ingested over splunk protocol. "+
24+
"See https://docs.victoriametrics.com/victorialogs/data-ingestion/splunk/#stream-fields")
25+
splunkIgnoreFields = flagutil.NewArrayString("splunk.ignoreFields", "Comma-separated list of fields to ignore for logs ingested over splunk protocol. "+
26+
"See https://docs.victoriametrics.com/victorialogs/data-ingestion/splunk/#dropping-fields")
27+
splunkTimeField = flag.String("splunk.timeField", "time", "Field to use as a log timestamp for logs ingested via splunk protocol. "+
28+
"See https://docs.victoriametrics.com/victorialogs/data-ingestion/splunk/#time-field")
29+
splunkMsgField = flagutil.NewArrayString("splunk.msgField", "Field to use as a log message for logs ingested via splunk protocol. "+
30+
"See https://docs.victoriametrics.com/victorialogs/data-ingestion/splunk/#message-field")
31+
splunkTenantID = flag.String("splunk.tenantID", "0:0", "TenantID for logs ingested via the Journald endpoint. "+
32+
"See https://docs.victoriametrics.com/victorialogs/data-ingestion/splunk/#multitenancy")
33+
)
34+
35+
func getCommonParams(r *http.Request) (*insertutil.CommonParams, error) {
36+
cp, err := insertutil.GetCommonParams(r)
37+
if err != nil {
38+
return nil, err
39+
}
40+
if cp.TenantID.AccountID == 0 && cp.TenantID.ProjectID == 0 {
41+
tenantID, err := logstorage.ParseTenantID(*splunkTenantID)
42+
if err != nil {
43+
return nil, fmt.Errorf("cannot parse -splunk.tenantID=%q for splunk: %w", *splunkTenantID, err)
44+
}
45+
cp.TenantID = tenantID
46+
}
47+
48+
if !cp.IsTimeFieldSet {
49+
cp.TimeFields = []string{*splunkTimeField}
50+
}
51+
if len(cp.StreamFields) == 0 {
52+
cp.StreamFields = getStreamFields()
53+
}
54+
if len(cp.IgnoreFields) == 0 {
55+
cp.IgnoreFields = *splunkIgnoreFields
56+
}
57+
if len(cp.MsgFields) == 0 {
58+
cp.MsgFields = getMsgFields()
59+
}
60+
return cp, nil
61+
}
62+
63+
func getMsgFields() []string {
64+
if len(*splunkMsgField) > 0 {
65+
return *splunkMsgField
66+
}
67+
return []string{"event"}
68+
}
69+
70+
func getStreamFields() []string {
71+
if len(*splunkStreamFields) > 0 {
72+
return *splunkStreamFields
73+
}
74+
return defaultStreamFields
75+
}
76+
77+
var defaultStreamFields = []string{
78+
"sourcetype",
79+
"host",
80+
"source",
81+
}
82+
83+
// RequestHandler processes splunk insert requests
84+
func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
85+
if !strings.HasPrefix(path, "/services/collector/event") {
86+
return false
87+
}
88+
switch r.Method {
89+
case http.MethodOptions:
90+
w.WriteHeader(http.StatusOK)
91+
return true
92+
case http.MethodPost:
93+
w.Header().Add("Content-Type", "application/json")
94+
default:
95+
w.WriteHeader(http.StatusMethodNotAllowed)
96+
return true
97+
}
98+
99+
startTime := time.Now()
100+
requestsTotal.Inc()
101+
102+
cp, err := getCommonParams(r)
103+
if err != nil {
104+
httpserver.Errorf(w, r, "%s", err)
105+
return true
106+
}
107+
if err := insertutil.CanWriteData(); err != nil {
108+
httpserver.Errorf(w, r, "%s", err)
109+
return true
110+
}
111+
112+
encoding := r.Header.Get("Content-Encoding")
113+
reader, err := protoparserutil.GetUncompressedReader(r.Body, encoding)
114+
if err != nil {
115+
logger.Errorf("cannot decode splunk request: %s", err)
116+
return true
117+
}
118+
defer protoparserutil.PutUncompressedReader(reader)
119+
120+
lmp := cp.NewLogMessageProcessor("splunk", true)
121+
streamName := fmt.Sprintf("remoteAddr=%s, requestURI=%q", httpserver.GetQuotedRemoteAddr(r), r.RequestURI)
122+
err = processStreamInternal(streamName, reader, cp.TimeFields, cp.MsgFields, lmp)
123+
lmp.MustClose()
124+
if err != nil {
125+
httpserver.Errorf(w, r, "cannot process splunk request; error: %s", err)
126+
return true
127+
}
128+
129+
requestDuration.UpdateDuration(startTime)
130+
return true
131+
}
132+
133+
func processStreamInternal(streamName string, r io.Reader, timeFields, msgFields []string, lmp insertutil.LogMessageProcessor) error {
134+
wcr := writeconcurrencylimiter.GetReader(r)
135+
defer writeconcurrencylimiter.PutReader(wcr)
136+
137+
lr := insertutil.NewLineReader(streamName, wcr)
138+
139+
n := 0
140+
errors := 0
141+
var lastError error
142+
for {
143+
ok, err := readLine(lr, timeFields, msgFields, lmp)
144+
wcr.DecConcurrency()
145+
if err != nil {
146+
lastError = err
147+
errors++
148+
logger.Warnf("splunk: cannot read line #%d in /splunk request: %s", n, err)
149+
}
150+
if !ok {
151+
break
152+
}
153+
n++
154+
}
155+
if errors > 0 {
156+
errorsTotal.Add(errors)
157+
if n == errors {
158+
// Return an error if no logs were processed and there were errors
159+
return lastError
160+
}
161+
}
162+
163+
return nil
164+
}
165+
166+
func readLine(lr *insertutil.LineReader, timeFields, msgFields []string, lmp insertutil.LogMessageProcessor) (bool, error) {
167+
var line []byte
168+
for len(line) == 0 {
169+
if !lr.NextLine() {
170+
err := lr.Err()
171+
return false, err
172+
}
173+
line = lr.Line
174+
}
175+
176+
p := logstorage.GetJSONParser()
177+
defer logstorage.PutJSONParser(p)
178+
179+
p.Init(line)
180+
for p.NextMessage() {
181+
if err := p.Error(); err != nil {
182+
return true, err
183+
}
184+
ts, err := insertutil.ExtractTimestampFromFields(timeFields, p.Fields)
185+
if err != nil {
186+
return true, err
187+
}
188+
logstorage.RenameField(p.Fields, msgFields, "_msg")
189+
lmp.AddRow(ts, p.Fields, -1)
190+
}
191+
if err := p.Error(); err != nil {
192+
return true, err
193+
}
194+
return true, nil
195+
}
196+
197+
var (
198+
requestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/splunk"}`)
199+
errorsTotal = metrics.NewCounter(`vl_http_errors_total{path="/insert/splunk"}`)
200+
201+
requestDuration = metrics.NewSummary(`vl_http_request_duration_seconds{path="/insert/splunk"}`)
202+
)

app/vlinsert/splunk/splunk_test.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package splunk
2+
3+
import (
4+
"bytes"
5+
"strings"
6+
"testing"
7+
8+
"github.com/VictoriaMetrics/VictoriaLogs/app/vlinsert/insertutil"
9+
)
10+
11+
func TestProcessStreamInternalSuccess(t *testing.T) {
12+
f := func(data, timeField, msgField string, timestampsExpected []int64, resultExpected string) {
13+
t.Helper()
14+
15+
timeFields := []string{timeField}
16+
msgFields := []string{msgField}
17+
tlp := &insertutil.TestLogMessageProcessor{}
18+
r := bytes.NewBufferString(data)
19+
if err := processStreamInternal("test", r, timeFields, msgFields, tlp); err != nil {
20+
t.Fatalf("unexpected error: %s", err)
21+
}
22+
23+
if err := tlp.Verify(timestampsExpected, resultExpected); err != nil {
24+
t.Fatal(err)
25+
}
26+
}
27+
28+
data := `{"@timestamp":"2023-06-06T04:48:11.735Z","event":"foobar","source":"docker","host":"localhost"}` +
29+
`{"@timestamp":"2023-06-06T04:48:12.735+01:00","event":"baz"}` +
30+
`{"event":"xyz","@timestamp":"2023-06-06 04:48:13.735Z","x":"y"}`
31+
timeField := "@timestamp"
32+
msgField := "event"
33+
timestampsExpected := []int64{1686026891735000000, 1686023292735000000, 1686026893735000000}
34+
resultExpected := `{"_msg":"foobar","source":"docker","host":"localhost"}
35+
{"_msg":"baz"}
36+
{"_msg":"xyz","x":"y"}`
37+
f(data, timeField, msgField, timestampsExpected, resultExpected)
38+
39+
// Non-existing msgField
40+
data = `{"@timestamp":"2023-06-06T04:48:11.735Z","log":{"offset":71770,"file":{"path":"/var/log/auth.log"}},"message":"foobar"}` +
41+
`{"@timestamp":"2023-06-06T04:48:12.735+01:00","message":"baz"}`
42+
timeField = "@timestamp"
43+
msgField = "foobar"
44+
timestampsExpected = []int64{1686026891735000000, 1686023292735000000}
45+
resultExpected = `{"log.offset":"71770","log.file.path":"/var/log/auth.log","message":"foobar"}
46+
{"message":"baz"}`
47+
f(data, timeField, msgField, timestampsExpected, resultExpected)
48+
49+
// invalid lines among valid lines
50+
data = `
51+
dsfodmasd
52+
53+
{"time":"2023-06-06T04:48:11.735Z","log":{"offset":71770,"file":{"path":"/var/log/auth.log"}},"message":"foobar"}
54+
invalid line
55+
{"time":"2023-06-06T04:48:12.735+01:00","message":"baz"}
56+
asbsdf
57+
58+
`
59+
timeField = "time"
60+
msgField = "message"
61+
timestampsExpected = []int64{1686026891735000000, 1686023292735000000}
62+
resultExpected = `{"log.offset":"71770","log.file.path":"/var/log/auth.log","_msg":"foobar"}
63+
{"_msg":"baz"}`
64+
f(data, timeField, msgField, timestampsExpected, resultExpected)
65+
}
66+
67+
func TestProcessStreamInternalFailure(t *testing.T) {
68+
f := func(data string) {
69+
t.Helper()
70+
71+
tlp := &insertutil.TestLogMessageProcessor{}
72+
r := strings.NewReader(data)
73+
if err := processStreamInternal("test", r, []string{"time"}, nil, tlp); err == nil {
74+
t.Fatalf("expected error, got nil")
75+
}
76+
77+
if err := tlp.Verify(nil, ""); err != nil {
78+
t.Fatalf("unexpected error: %s", err)
79+
}
80+
}
81+
82+
// invalid json
83+
f("foobar")
84+
85+
f(`foo
86+
bar`)
87+
88+
f(`
89+
foo
90+
91+
`)
92+
93+
// invalid timestamp field
94+
f(`{"time":"foobar"}`)
95+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
include:
2+
- ../compose-base.yml
3+
name: vector-splunk
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
api:
2+
enabled: true
3+
address: 0.0.0.0:8686
4+
sources:
5+
docker:
6+
type: docker_logs
7+
metrics:
8+
type: internal_metrics
9+
transforms:
10+
msg_parser:
11+
type: remap
12+
inputs:
13+
- docker
14+
source: |
15+
.message = parse_json(.message) ?? .message
16+
sinks:
17+
splunk:
18+
type: splunk_hec_logs
19+
inputs:
20+
- msg_parser
21+
endpoint: http://vlagent:9429/insert/splunk
22+
indexed_fields:
23+
- image
24+
encoding:
25+
codec: json
26+
compression: gzip
27+
default_token: test
28+
healthcheck:
29+
enabled: false
30+
victoriametrics:
31+
type: prometheus_remote_write
32+
endpoint: http://victoriametrics:8428/api/v1/write
33+
inputs:
34+
- metrics
35+
healthcheck:
36+
enabled: false

0 commit comments

Comments
 (0)