Skip to content

Commit f04635d

Browse files
cosmo0920edsiper
authored andcommitted
in_splunk: Plug memory issues
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
1 parent 843000e commit f04635d

File tree

3 files changed

+73
-68
lines changed

3 files changed

+73
-68
lines changed

plugins/in_splunk/splunk.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,6 @@ struct flb_splunk {
7373
struct flb_downstream *downstream; /* Client manager */
7474
struct mk_list connections; /* linked list of connections */
7575
struct mk_server *server;
76-
77-
/* Remote address */
78-
flb_sds_t current_remote_addr;
79-
size_t current_remote_addr_len;
8076
};
8177

8278

plugins/in_splunk/splunk_config.c

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,6 @@ struct flb_splunk *splunk_config_create(struct flb_input_instance *ins)
146146

147147
ctx->ingested_auth_header = NULL;
148148

149-
ctx->current_remote_addr = NULL;
150-
ctx->current_remote_addr_len = 0;
151-
152149
ret = setup_hec_tokens(ctx);
153150
if (ret != 0) {
154151
splunk_config_destroy(ctx);

plugins/in_splunk/splunk_prot.c

Lines changed: 73 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,10 @@ static flb_sds_t tag_key(struct flb_splunk *ctx, msgpack_object *map)
341341
* Process a raw text payload for Splunk HEC requests, uses the delimited character to split records,
342342
* return the number of processed bytes
343343
*/
344-
static int process_raw_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char *buf, size_t size)
344+
static int process_raw_payload_pack(struct flb_splunk *ctx, flb_sds_t tag,
345+
char *buf, size_t size,
346+
const char *remote_addr,
347+
size_t remote_addr_len)
345348
{
346349
int ret = FLB_EVENT_ENCODER_SUCCESS;
347350

@@ -388,8 +391,8 @@ static int process_raw_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char
388391

389392
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
390393
ret = append_remote_addr(ctx,
391-
ctx->current_remote_addr,
392-
ctx->current_remote_addr_len);
394+
remote_addr,
395+
remote_addr_len);
393396
}
394397

395398
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
@@ -481,9 +484,7 @@ static void process_flb_log_append(struct flb_splunk *ctx, msgpack_object *recor
481484
}
482485

483486
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
484-
ret = append_remote_addr(ctx,
485-
ctx->current_remote_addr,
486-
ctx->current_remote_addr_len);
487+
ret = append_remote_addr(ctx, remote_addr, remote_addr_len);
487488
}
488489

489490
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
@@ -519,7 +520,10 @@ static void process_flb_log_append(struct flb_splunk *ctx, msgpack_object *recor
519520
}
520521
}
521522

522-
static int process_json_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char *buf, size_t size)
523+
static int process_json_payload_pack(struct flb_splunk *ctx, flb_sds_t tag,
524+
char *buf, size_t size,
525+
const char *remote_addr,
526+
size_t remote_addr_len)
523527
{
524528
size_t off = 0;
525529
msgpack_unpacked result;
@@ -540,8 +544,8 @@ static int process_json_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char
540544
}
541545

542546
process_flb_log_append(ctx, &result.data, tag, tag_from_record, tm,
543-
ctx->current_remote_addr,
544-
ctx->current_remote_addr_len);
547+
remote_addr,
548+
remote_addr_len);
545549

546550
flb_log_event_encoder_reset(&ctx->log_encoder);
547551
}
@@ -557,8 +561,8 @@ static int process_json_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char
557561
}
558562

559563
process_flb_log_append(ctx, &record, tag, tag_from_record, tm,
560-
ctx->current_remote_addr,
561-
ctx->current_remote_addr_len);
564+
remote_addr,
565+
remote_addr_len);
562566

563567
/* TODO : Optimize this
564568
*
@@ -588,7 +592,9 @@ static int process_json_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char
588592
}
589593

590594
static ssize_t parse_hec_payload_json(struct flb_splunk *ctx, flb_sds_t tag,
591-
char *payload, size_t size)
595+
char *payload, size_t size,
596+
const char *remote_addr,
597+
size_t remote_addr_len)
592598
{
593599
int ret;
594600
int out_size;
@@ -617,7 +623,8 @@ static ssize_t parse_hec_payload_json(struct flb_splunk *ctx, flb_sds_t tag,
617623
}
618624

619625
/* Process the packaged JSON and return the last byte used */
620-
process_json_payload_pack(ctx, tag, pack, out_size);
626+
process_json_payload_pack(ctx, tag, pack, out_size,
627+
remote_addr, remote_addr_len);
621628
flb_free(pack);
622629

623630
return 0;
@@ -675,22 +682,28 @@ static int validate_auth_header(struct flb_splunk *ctx, struct mk_http_request *
675682
}
676683

677684
static int handle_hec_payload(struct flb_splunk *ctx, int content_type,
678-
flb_sds_t tag, char *buf, size_t size)
685+
flb_sds_t tag, char *buf, size_t size,
686+
const char *remote_addr,
687+
size_t remote_addr_len)
679688
{
680689
int ret = -1;
681690

682691
if (content_type == HTTP_CONTENT_JSON) {
683-
ret = parse_hec_payload_json(ctx, tag, buf, size);
692+
ret = parse_hec_payload_json(ctx, tag, buf, size,
693+
remote_addr, remote_addr_len);
684694
}
685695
else if (content_type == HTTP_CONTENT_TEXT) {
686-
ret = process_raw_payload_pack(ctx, tag, buf, size);
696+
ret = process_raw_payload_pack(ctx, tag, buf, size,
697+
remote_addr, remote_addr_len);
687698
}
688699
else if (content_type == HTTP_CONTENT_UNKNOWN) {
689700
if (buf[0] == '{') {
690-
ret = parse_hec_payload_json(ctx, tag, buf, size);
701+
ret = parse_hec_payload_json(ctx, tag, buf, size,
702+
remote_addr, remote_addr_len);
691703
}
692704
else {
693-
ret = process_raw_payload_pack(ctx, tag, buf, size);
705+
ret = process_raw_payload_pack(ctx, tag, buf, size,
706+
remote_addr, remote_addr_len);
694707
}
695708
}
696709

@@ -700,7 +713,9 @@ static int handle_hec_payload(struct flb_splunk *ctx, int content_type,
700713
static int process_hec_payload(struct flb_splunk *ctx, struct splunk_conn *conn,
701714
flb_sds_t tag,
702715
struct mk_http_session *session,
703-
struct mk_http_request *request)
716+
struct mk_http_request *request,
717+
const char *remote_addr,
718+
size_t remote_addr_len)
704719
{
705720
int i = 0;
706721
int ret = 0;
@@ -768,11 +783,13 @@ static int process_hec_payload(struct flb_splunk *ctx, struct splunk_conn *conn,
768783
return -1;
769784
}
770785

771-
ret = handle_hec_payload(ctx, type, tag, gz_data, gz_size);
786+
ret = handle_hec_payload(ctx, type, tag, gz_data, gz_size,
787+
remote_addr, remote_addr_len);
772788
flb_free(gz_data);
773789
}
774790
else {
775-
ret = handle_hec_payload(ctx, type, tag, request->data.data, request->data.len);
791+
ret = handle_hec_payload(ctx, type, tag, request->data.data, request->data.len,
792+
remote_addr, remote_addr_len);
776793
}
777794

778795
return ret;
@@ -814,7 +831,8 @@ static int process_hec_raw_payload(struct flb_splunk *ctx, struct splunk_conn *c
814831
}
815832

816833
/* Always handle as raw type of payloads here */
817-
ret = process_raw_payload_pack(ctx, tag, request->data.data, request->data.len);
834+
ret = process_raw_payload_pack(ctx, tag, request->data.data, request->data.len,
835+
remote_addr, remote_addr_len);
818836

819837
return ret;
820838
}
@@ -861,6 +879,8 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
861879
char *hval = NULL;
862880
size_t hlen = 0;
863881
const char *peer;
882+
const char *remote_addr = NULL;
883+
size_t remote_addr_len = 0;
864884

865885
if (request->uri.data[0] != '/') {
866886
send_response(conn, 400, "error: invalid request\n");
@@ -1005,22 +1025,17 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
10051025
request->data.len = out_chunked_size;
10061026
}
10071027

1008-
/* Resolve per-request remote address */
1009-
ctx->current_remote_addr = NULL;
1010-
ctx->current_remote_addr_len = 0;
1011-
10121028
if (http_header_lookup(HTTP_PROTOCOL_VERSION_11, request,
10131029
SPLUNK_XFF_HEADER, &hval, &hlen) == 0) {
10141030
extract_remote_address(hval, hlen, conn->connection,
1015-
&ctx->current_remote_addr,
1016-
&ctx->current_remote_addr_len);
1031+
(char **) &remote_addr,
1032+
&remote_addr_len);
10171033
}
1018-
else {
1019-
/* fallback to peer addr */
1034+
if (remote_addr == NULL || remote_addr_len == 0) {
10201035
peer = flb_connection_get_remote_address(conn->connection);
1021-
if (peer) {
1022-
ctx->current_remote_addr = peer;
1023-
ctx->current_remote_addr_len = strlen(peer);
1036+
if (peer != NULL) {
1037+
remote_addr = peer;
1038+
remote_addr_len = strlen(peer);
10241039
}
10251040
}
10261041

@@ -1031,8 +1046,7 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
10311046
if (strcasecmp(uri, "/services/collector/raw/1.0") == 0 ||
10321047
strcasecmp(uri, "/services/collector/raw") == 0) {
10331048
ret = process_hec_raw_payload(ctx, conn, tag, session, request,
1034-
ctx->current_remote_addr,
1035-
ctx->current_remote_addr_len);
1049+
remote_addr, remote_addr_len);
10361050

10371051
if (ret == -2) {
10381052
/* Response already sent, skip further response */
@@ -1057,7 +1071,8 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
10571071
strcasecmp(uri, "/services/collector/event") == 0 ||
10581072
strcasecmp(uri, "/services/collector") == 0) {
10591073

1060-
ret = process_hec_payload(ctx, conn, tag, session, request);
1074+
ret = process_hec_payload(ctx, conn, tag, session, request,
1075+
remote_addr, remote_addr_len);
10611076
if (ret == -2) {
10621077
flb_sds_destroy(tag);
10631078
mk_mem_free(uri);
@@ -1118,10 +1133,6 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
11181133
request->data.data = original_data;
11191134
request->data.len = original_data_size;
11201135

1121-
/* Clear per-request remote address to avoid leakage across keep-alive/pipeline */
1122-
ctx->current_remote_addr = NULL;
1123-
ctx->current_remote_addr_len = 0;
1124-
11251136
return ret;
11261137
}
11271138

@@ -1251,7 +1262,9 @@ static int validate_auth_header_ng(struct flb_splunk *ctx, struct flb_http_reque
12511262
static int process_hec_payload_ng(struct flb_http_request *request,
12521263
struct flb_http_response *response,
12531264
flb_sds_t tag,
1254-
struct flb_splunk *ctx)
1265+
struct flb_splunk *ctx,
1266+
const char *remote_addr,
1267+
size_t remote_addr_len)
12551268
{
12561269
int type = -1;
12571270
int ret = 0;
@@ -1287,13 +1300,16 @@ static int process_hec_payload_ng(struct flb_http_request *request,
12871300
return -2;
12881301
}
12891302

1290-
return handle_hec_payload(ctx, type, tag, request->body, cfl_sds_len(request->body));
1303+
return handle_hec_payload(ctx, type, tag, request->body, cfl_sds_len(request->body),
1304+
remote_addr, remote_addr_len);
12911305
}
12921306

12931307
static int process_hec_raw_payload_ng(struct flb_http_request *request,
12941308
struct flb_http_response *response,
12951309
flb_sds_t tag,
1296-
struct flb_splunk *ctx)
1310+
struct flb_splunk *ctx,
1311+
const char *remote_addr,
1312+
size_t remote_addr_len)
12971313
{
12981314
int ret = 0;
12991315
size_t size = 0;
@@ -1324,7 +1340,8 @@ static int process_hec_raw_payload_ng(struct flb_http_request *request,
13241340
}
13251341

13261342
/* Always handle as raw type of payloads here */
1327-
return process_raw_payload_pack(ctx, tag, request->body, cfl_sds_len(request->body));
1343+
return process_raw_payload_pack(ctx, tag, request->body, cfl_sds_len(request->body),
1344+
remote_addr, remote_addr_len);
13281345
}
13291346

13301347
int splunk_prot_handle_ng(struct flb_http_request *request,
@@ -1337,6 +1354,8 @@ int splunk_prot_handle_ng(struct flb_http_request *request,
13371354
char *hval = NULL;
13381355
size_t hlen = 0;
13391356
const char *peer;
1357+
const char *remote_addr = NULL;
1358+
size_t remote_addr_len = 0;
13401359

13411360
context = (struct flb_splunk *) response->stream->user_data;
13421361

@@ -1384,24 +1403,19 @@ int splunk_prot_handle_ng(struct flb_http_request *request,
13841403
/* Handle every ingested payload cleanly */
13851404
flb_log_event_encoder_reset(&context->log_encoder);
13861405

1387-
/* Resolve per-request remote address */
1388-
context->current_remote_addr = NULL;
1389-
context->current_remote_addr_len = 0;
1390-
13911406
parent_session = (struct flb_http_server_session *) request->stream->parent;
13921407
if (parent_session != NULL) {
13931408
if (http_header_lookup(HTTP_PROTOCOL_VERSION_20, request,
13941409
SPLUNK_XFF_HEADER, &hval, &hlen) == 0) {
13951410
extract_remote_address(hval, hlen, parent_session->connection,
1396-
&context->current_remote_addr,
1397-
&context->current_remote_addr_len);
1411+
(char **) &remote_addr,
1412+
&remote_addr_len);
13981413
}
1399-
else {
1400-
/* fallback to peer addr */
1414+
if (remote_addr == NULL || remote_addr_len == 0) {
14011415
peer = flb_connection_get_remote_address(parent_session->connection);
1402-
if (peer) {
1403-
context->current_remote_addr = peer;
1404-
context->current_remote_addr_len = strlen(peer);
1416+
if (peer != NULL) {
1417+
remote_addr = peer;
1418+
remote_addr_len = strlen(peer);
14051419
}
14061420
}
14071421
}
@@ -1421,7 +1435,8 @@ int splunk_prot_handle_ng(struct flb_http_request *request,
14211435

14221436
if (strcasecmp(request->path, "/services/collector/raw/1.0") == 0 ||
14231437
strcasecmp(request->path, "/services/collector/raw") == 0) {
1424-
ret = process_hec_raw_payload_ng(request, response, tag, context);
1438+
ret = process_hec_raw_payload_ng(request, response, tag, context,
1439+
remote_addr, remote_addr_len);
14251440
if (ret == -2) {
14261441
/* Response already sent, skip further response */
14271442
flb_sds_destroy(tag);
@@ -1439,7 +1454,8 @@ int splunk_prot_handle_ng(struct flb_http_request *request,
14391454
else if (strcasecmp(request->path, "/services/collector/event/1.0") == 0 ||
14401455
strcasecmp(request->path, "/services/collector/event") == 0 ||
14411456
strcasecmp(request->path, "/services/collector") == 0) {
1442-
ret = process_hec_payload_ng(request, response, tag, context);
1457+
ret = process_hec_payload_ng(request, response, tag, context,
1458+
remote_addr, remote_addr_len);
14431459
if (ret == -2) {
14441460
/* Response already sent, skip further response */
14451461
flb_sds_destroy(tag);
@@ -1461,9 +1477,5 @@ int splunk_prot_handle_ng(struct flb_http_request *request,
14611477

14621478
flb_sds_destroy(tag);
14631479

1464-
/* Clear per-request remote address to avoid leakage across keep-alive/pipeline */
1465-
context->current_remote_addr = NULL;
1466-
context->current_remote_addr_len = 0;
1467-
14681480
return ret;
14691481
}

0 commit comments

Comments
 (0)