Skip to content

Commit d1dfc64

Browse files
committed
parser_json: Handle numeric timestamps
When the JSON parser is used, the `time_format` option may be set to a unit in order to parse a numeric timestamp. The following units are supported: - `SECONDS` - `MILLISECONDS` - `MICROSECONDS` - `NANOSECONDS` Signed-off-by: Hristo Venev <hristo@venev.name>
1 parent 13db2ee commit d1dfc64

File tree

4 files changed

+176
-60
lines changed

4 files changed

+176
-60
lines changed

include/fluent-bit/flb_parser.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ struct flb_parser {
5151
int time_system_timezone; /* use the system timezone as a fallback */
5252
int time_keep; /* keep time field */
5353
int time_strict; /* parse time field strictly */
54+
double time_numeric_unit; /* divisor for numeric time, or <= 0 if not enabled */
5455
int logfmt_no_bare_keys; /* in logfmt parsers, require all keys to have values */
5556
char *time_frac_secs; /* time format have fractional seconds ? */
5657
struct flb_parser_types *types; /* type casting */

src/flb_parser.c

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,26 @@ static void flb_interim_parser_destroy(struct flb_parser *parser)
145145
flb_free(parser);
146146
}
147147

148+
static double flb_parser_time_numeric_unit(const char *time_fmt)
149+
{
150+
if (!time_fmt) {
151+
return 0.0;
152+
}
153+
if (!strcmp(time_fmt, "SECONDS")) {
154+
return 1.0;
155+
}
156+
if (!strcmp(time_fmt, "MILLISECONDS")) {
157+
return 1000.0;
158+
}
159+
if (!strcmp(time_fmt, "MICROSECONDS")) {
160+
return 1000000.0;
161+
}
162+
if (!strcmp(time_fmt, "NANOSECONDS")) {
163+
return 1000000000.0;
164+
}
165+
return 0.0;
166+
}
167+
148168
struct flb_parser *flb_parser_create(const char *name, const char *format,
149169
const char *p_regex,
150170
int skip_empty,
@@ -231,6 +251,12 @@ struct flb_parser *flb_parser_create(const char *name, const char *format,
231251

232252
p->name = flb_strdup(name);
233253

254+
p->time_numeric_unit = flb_parser_time_numeric_unit(time_fmt);
255+
if (p->time_numeric_unit > 0) {
256+
/* Don't try to use the fixed string (SECONDS/...) as a format */
257+
time_fmt = NULL;
258+
}
259+
234260
if (time_fmt) {
235261
p->time_fmt_full = flb_strdup(time_fmt);
236262
if (!p->time_fmt_full) {

src/flb_parser_json.c

Lines changed: 87 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -18,43 +18,89 @@
1818
*/
1919

2020
#define _GNU_SOURCE
21+
#include <math.h>
22+
#include <stdbool.h>
2123
#include <time.h>
2224

2325
#include <fluent-bit/flb_parser.h>
2426
#include <fluent-bit/flb_pack.h>
2527
#include <fluent-bit/flb_mem.h>
2628
#include <fluent-bit/flb_parser_decoder.h>
2729

30+
static bool flb_parser_json_timestamp_str(struct flb_parser *parser,
31+
const char *ptr, size_t len,
32+
struct flb_time *out_time)
33+
{
34+
int ret;
35+
double tmfrac = 0;
36+
struct flb_tm tm = {0};
37+
time_t tmint = 0;
38+
39+
if (!parser->time_fmt) {
40+
return false;
41+
}
42+
43+
/* Lookup time */
44+
ret = flb_parser_time_lookup(ptr, len, 0, parser, &tm, &tmfrac);
45+
if (ret == -1) {
46+
flb_warn("[parser:%s] invalid time format %s for '%.*s'",
47+
parser->name, parser->time_fmt_full, len > 254 ? 254 : (int)len, ptr);
48+
return false;
49+
}
50+
51+
tmint = flb_parser_tm2time(&tm, parser->time_system_timezone);
52+
53+
out_time->tm.tv_sec = tmint;
54+
out_time->tm.tv_nsec = tmfrac * 1000000000;
55+
56+
return true;
57+
}
58+
59+
static bool flb_parser_json_timestamp_f64(struct flb_parser *parser,
60+
double val,
61+
struct flb_time *out_time)
62+
{
63+
double tmfrac = 0;
64+
double tmint = 0;
65+
66+
if (parser->time_numeric_unit <= 0) {
67+
flb_warn("[parser:%s] invalid non-string time", parser->name);
68+
return false;
69+
}
70+
71+
tmfrac = modf(val / parser->time_numeric_unit, &tmint);
72+
73+
out_time->tm.tv_sec = tmint;
74+
out_time->tm.tv_nsec = tmfrac * 1000000000;
75+
76+
return true;
77+
}
78+
2879
int flb_parser_json_do(struct flb_parser *parser,
2980
const char *in_buf, size_t in_size,
3081
void **out_buf, size_t *out_size,
3182
struct flb_time *out_time)
3283
{
3384
int i;
34-
int skip;
85+
int time_index;
3586
int ret;
3687
int slen;
3788
int root_type;
3889
int records;
39-
double tmfrac = 0;
90+
bool time_ok;
4091
char *mp_buf = NULL;
4192
char *time_key;
4293
char *tmp_out_buf = NULL;
43-
char tmp[255];
4494
size_t tmp_out_size = 0;
4595
size_t off = 0;
4696
size_t map_size;
4797
size_t mp_size;
48-
size_t len;
4998
msgpack_sbuffer mp_sbuf;
5099
msgpack_packer mp_pck;
51100
msgpack_unpacked result;
52101
msgpack_object map;
53102
msgpack_object *k = NULL;
54103
msgpack_object *v = NULL;
55-
time_t time_lookup;
56-
struct flb_tm tm = {0};
57-
struct flb_time *t;
58104
size_t consumed;
59105

60106
consumed = 0;
@@ -121,7 +167,7 @@ int flb_parser_json_do(struct flb_parser *parser,
121167
}
122168

123169
/* Do time resolution ? */
124-
if (!parser->time_fmt) {
170+
if (!parser->time_fmt && parser->time_numeric_unit <= 0) {
125171
msgpack_unpacked_destroy(&result);
126172

127173
return (int) consumed;
@@ -137,7 +183,7 @@ int flb_parser_json_do(struct flb_parser *parser,
137183

138184
/* Lookup time field */
139185
map_size = map.via.map.size;
140-
skip = map_size;
186+
time_index = map_size;
141187
for (i = 0; i < map_size; i++) {
142188
k = &map.via.map.ptr[i].key;
143189
v = &map.via.map.ptr[i].val;
@@ -162,14 +208,8 @@ int flb_parser_json_do(struct flb_parser *parser,
162208
}
163209

164210
if (strncmp(k->via.str.ptr, time_key, k->via.str.size) == 0) {
211+
time_index = i;
165212
/* We found the key, break the loop and keep the index */
166-
if (parser->time_keep == FLB_FALSE) {
167-
skip = i;
168-
break;
169-
}
170-
else {
171-
skip = -1;
172-
}
173213
break;
174214
}
175215

@@ -185,60 +225,47 @@ int flb_parser_json_do(struct flb_parser *parser,
185225
}
186226

187227
/* Ensure we have an accurate type */
188-
if (v->type != MSGPACK_OBJECT_STR) {
189-
msgpack_unpacked_destroy(&result);
228+
switch(v->type) {
229+
case MSGPACK_OBJECT_STR:
230+
time_ok = flb_parser_json_timestamp_str(parser, v->via.str.ptr, v->via.str.size, out_time);
231+
break;
190232

191-
return (int) consumed;
192-
}
233+
case MSGPACK_OBJECT_FLOAT32:
234+
case MSGPACK_OBJECT_FLOAT64:
235+
time_ok = flb_parser_json_timestamp_f64(parser, v->via.f64, out_time);
236+
break;
193237

194-
/* Lookup time */
195-
ret = flb_parser_time_lookup(v->via.str.ptr, v->via.str.size,
196-
0, parser, &tm, &tmfrac);
197-
if (ret == -1) {
198-
len = v->via.str.size;
199-
if (len > sizeof(tmp) - 1) {
200-
len = sizeof(tmp) - 1;
201-
}
202-
memcpy(tmp, v->via.str.ptr, len);
203-
tmp[len] = '\0';
204-
flb_warn("[parser:%s] invalid time format %s for '%s'",
205-
parser->name, parser->time_fmt_full, tmp);
206-
time_lookup = 0;
207-
skip = map_size;
208-
}
209-
else {
210-
time_lookup = flb_parser_tm2time(&tm, parser->time_system_timezone);
238+
case MSGPACK_OBJECT_POSITIVE_INTEGER:
239+
time_ok = flb_parser_json_timestamp_f64(parser, v->via.u64, out_time);
240+
break;
241+
242+
default:
243+
time_ok = false;
244+
break;
211245
}
212246

213-
/* Compose a new map without the time_key field */
214-
msgpack_sbuffer_init(&mp_sbuf);
215-
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
247+
if (time_ok && parser->time_keep == FLB_FALSE) {
248+
/* Compose a new map without the time_key field */
249+
msgpack_sbuffer_init(&mp_sbuf);
250+
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
216251

217-
if (parser->time_keep == FLB_FALSE && skip < map_size) {
218252
msgpack_pack_map(&mp_pck, map_size - 1);
219-
}
220-
else {
221-
msgpack_pack_map(&mp_pck, map_size);
222-
}
223253

224-
for (i = 0; i < map_size; i++) {
225-
if (i == skip) {
226-
continue;
227-
}
228-
229-
msgpack_pack_object(&mp_pck, map.via.map.ptr[i].key);
230-
msgpack_pack_object(&mp_pck, map.via.map.ptr[i].val);
231-
}
254+
for (i = 0; i < map_size; i++) {
255+
if (i == time_index) {
256+
continue;
257+
}
232258

233-
/* Export the proper buffer */
234-
flb_free(tmp_out_buf);
259+
msgpack_pack_object(&mp_pck, map.via.map.ptr[i].key);
260+
msgpack_pack_object(&mp_pck, map.via.map.ptr[i].val);
261+
}
235262

236-
*out_buf = mp_sbuf.data;
237-
*out_size = mp_sbuf.size;
263+
/* Export the proper buffer */
264+
flb_free(tmp_out_buf);
238265

239-
t = out_time;
240-
t->tm.tv_sec = time_lookup;
241-
t->tm.tv_nsec = (tmfrac * 1000000000);
266+
*out_buf = mp_sbuf.data;
267+
*out_size = mp_sbuf.size;
268+
}
242269

243270
msgpack_unpacked_destroy(&result);
244271

tests/internal/parser_json.c

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,67 @@ void test_time_keep()
319319
flb_config_exit(config);
320320
}
321321

322+
void test_time_numeric()
323+
{
324+
struct flb_parser *parser = NULL;
325+
struct flb_config *config = NULL;
326+
int ret = 0;
327+
char *input = "{\"str\":\"text\", \"int\":100, \"double\":1.23, \"bool\":true, \"time\":422500}";
328+
void *out_buf = NULL;
329+
size_t out_size = 0;
330+
struct flb_time out_time;
331+
char *expected_strs[] = {"str", "text", "int", "100", "double","1.23", "bool", "true"};
332+
struct str_list expected = {
333+
.size = sizeof(expected_strs)/sizeof(char*),
334+
.lists = &expected_strs[0],
335+
};
336+
337+
out_time.tm.tv_sec = 0;
338+
out_time.tm.tv_nsec = 0;
339+
340+
341+
config = flb_config_init();
342+
if(!TEST_CHECK(config != NULL)) {
343+
TEST_MSG("flb_config_init failed");
344+
exit(1);
345+
}
346+
347+
parser = flb_parser_create("json", "json", NULL, FLB_FALSE, "MILLISECONDS", "time", NULL,
348+
FLB_FALSE /*time_keep */, FLB_FALSE, FLB_FALSE, FLB_FALSE,
349+
NULL, 0, NULL, config);
350+
if (!TEST_CHECK(parser != NULL)) {
351+
TEST_MSG("flb_parser_create failed");
352+
flb_config_exit(config);
353+
exit(1);
354+
}
355+
356+
ret = flb_parser_do(parser, input, strlen(input), &out_buf, &out_size, &out_time);
357+
if (!TEST_CHECK(ret != -1)) {
358+
TEST_MSG("flb_parser_do failed");
359+
flb_parser_destroy(parser);
360+
flb_config_exit(config);
361+
exit(1);
362+
}
363+
364+
ret = compare_msgpack(out_buf, out_size, &expected);
365+
if (!TEST_CHECK(ret == 0)) {
366+
TEST_MSG("compare failed");
367+
flb_free(out_buf);
368+
flb_parser_destroy(parser);
369+
flb_config_exit(config);
370+
exit(1);
371+
}
372+
373+
if (!TEST_CHECK(out_time.tm.tv_sec == 422 && out_time.tm.tv_nsec == 500000000)) {
374+
TEST_MSG("timestamp error. sec Got=%ld Expect=422", out_time.tm.tv_sec);
375+
TEST_MSG("timestamp error. nsec Got=%ld Expect=500000000", out_time.tm.tv_nsec);
376+
}
377+
378+
flb_free(out_buf);
379+
flb_parser_destroy(parser);
380+
flb_config_exit(config);
381+
}
382+
322383
/*
323384
* JSON parser doesn't support 'types' option.
324385
* This test is to check that 'types' doesn't affect output.
@@ -540,6 +601,7 @@ TEST_LIST = {
540601
{ "basic", test_basic},
541602
{ "time_key", test_time_key},
542603
{ "time_keep", test_time_keep},
604+
{ "time_numeric", test_time_numeric},
543605
{ "types_is_not_supported", test_types_is_not_supported},
544606
{ "decode_field_json", test_decode_field_json},
545607
{ "time_key_kept_if_parse_fails", test_time_key_kept_if_parse_fails},

0 commit comments

Comments
 (0)