Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions stream-processing/fluent-bit-sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Use the following SQL statements in Fluent Bit.
```sql
SELECT results_statement
FROM STREAM:stream_name | TAG:match_rule
[WINDOW TUMBLING (integer SECOND)]
[WINDOW TUMBLING (integer SECOND) | WINDOW HOPPING (integer SECOND, ADVANCE BY integer SECOND)]
[WHERE condition]
[GROUP BY groupby]
```
Expand Down Expand Up @@ -68,7 +68,10 @@ CREATE STREAM hello AS SELECT * FROM TAG:'apache.*';

You can use aggregation functions in the `results_statement` on keys, which lets you perform data calculation on groups of records. These groups are determined by the `WINDOW` key. If `WINDOW` is unspecified, aggregation functions are applied to the current buffer of records received, which might have a non-deterministic number of elements. You can also apply aggregation functions to records in a window of a specific time interval.

Fluent Bit uses a tumbling window, which is non-overlapping. For example, a window size of `5` performs aggregation computations on records during a five-second interval, then starts new calculations for the next interval.
Fluent Bit supports two window types:

- **Tumbling window** (`WINDOW TUMBLING`): Non-overlapping windows. A window size of `5` performs aggregation on records during a five-second interval, then starts a fresh window for the next interval.
- **Hopping window** (`WINDOW HOPPING`): A sliding window with a configurable advance step. For example, `WINDOW HOPPING (10 SECOND, ADVANCE BY 2 SECOND)` maintains a 10-second window that advances every 2 seconds, so consecutive windows share overlapping records.

Additionally, you can use the `GROUP BY` statement to group results by one or more keys with matching values.

Expand All @@ -83,10 +86,10 @@ Calculates the average size of `POST` requests.
### `COUNT`

```sql
SELECT host, COUNT(*) FROM STREAM:apache WINDOW TUMBLING (X SECOND) GROUP BY host;
SELECT host, COUNT(*) FROM STREAM:apache WINDOW TUMBLING (5 SECOND) GROUP BY host;
```

Counts the number of records in a five-second window, grouped by host IP addresses.
Counts the number of records in a five-second tumbling window, grouped by host IP addresses.

### `MIN`

Expand All @@ -112,6 +115,22 @@ SELECT SUM(key) FROM STREAM:apache;

Calculates the sum of all values of a key in a set of records.

### `TIMESERIES_FORECAST`

```sql
SELECT TIMESERIES_FORECAST(num, 30) FROM STREAM:apache WINDOW TUMBLING (5 SECOND);
```

Uses linear regression to predict the future value of a key. The first argument is the key to forecast and the second argument is the number of seconds into the future to project. Requires a `WINDOW` to accumulate the data points used for the regression.

### `WINDOW HOPPING` example

```sql
SELECT host, COUNT(*) FROM STREAM:apache WINDOW HOPPING (10 SECOND, ADVANCE BY 2 SECOND) GROUP BY host;
```

Counts records per host using a 10-second hopping window that advances every 2 seconds. Each output overlaps with the previous window, unlike a tumbling window.

## Time functions

Use time functions to add a new key with time data into a record.
Expand Down Expand Up @@ -150,6 +169,8 @@ Append tag string associated to the record as a new key.
SELECT RECORD_TIME() FROM STREAM:apache;
```

Appends the record's timestamp as a new key in double format (`seconds.nanoseconds`). Output example: `1552196165.705683`.

## `WHERE` condition

Similar to conventional SQL statements, Fluent Bit supports the `WHERE` condition. You can use this condition in both keys and subkeys. For example:
Expand Down
6 changes: 6 additions & 0 deletions stream-processing/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ Most of the phases in the pipeline are implemented through plugins: input, filte

Filters can perform specific record modifications like appending or removing a key, enriching with metadata (for example, Kubernetes filter), or discarding records based on specific conditions. After data is stored, no further modifications are made, but records can optionally be redirected to the stream processor.

## Stream processor versus SQL processor

Fluent Bit provides two SQL-based query interfaces. The stream processor (described on this page) runs as an independent subsystem after records pass through filters. It uses tag and match rules to attach to specific input streams and can re-ingest results back into the pipeline.

The newer [SQL processor](../pipeline/processors/sql.md) runs inline within the pipeline as a standard processor, before records reach the output stage. It's configured in YAML only and doesn't support aggregation or stream creation. Use the SQL processor for per-record field selection; use the stream processor for aggregation, time windows, or routing results as new streams.

## Stream processor

The stream processor is an independent subsystem that checks for new records hitting the storage interface. Based on your configuration settings, the stream processor will attach to records that come from a specific input plugin or by applying tag and matching rules.
Expand Down
14 changes: 7 additions & 7 deletions stream-processing/tutorial.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ These steps use the official Fluent Bit Docker image.
Run the following command to confirm that Fluent Bit is installed and up-to-date:

```shell
$ docker run -ti fluent/fluent-bit:1.4 /fluent-bit/bin/fluent-bit --version
$ docker run -ti fluent/fluent-bit:4.2.3 /fluent-bit/bin/fluent-bit --version

Fluent Bit v4.0.4
Fluent Bit v4.2.3
```

### 2. Parse sample files
Expand All @@ -30,7 +30,7 @@ The sample file contains JSON records. Run the following command to append the `

```shell
docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log \
fluent/fluent-bit:1.8.2 \
fluent/fluent-bit:4.2.3 \
/fluent-bit/bin/fluent-bit -R /fluent-bit/etc/parsers.conf \
-i tail -p path=/sp-samples-1k.log \
-p parser=json \
Expand All @@ -55,7 +55,7 @@ Run the following command to create a stream processor query using the `-T` flag

```shell
docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log \
fluent/fluent-bit:1.2 \
fluent/fluent-bit:4.2.3 \
/fluent-bit/bin/fluent-bit \
-R /fluent-bit/etc/parsers.conf \
-i tail \
Expand All @@ -82,7 +82,7 @@ Run the following command to use the `AVG` aggregation function to get the avera

```shell
docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log \
fluent/fluent-bit:1.8.2 \
fluent/fluent-bit:4.2.3 \
/fluent-bit/bin/fluent-bit \
-R /fluent-bit/etc/parsers.conf \
-i tail \
Expand Down Expand Up @@ -115,7 +115,7 @@ Grouping results within a time window simplifies data processing. Run the follow

```shell
docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log \
fluent/fluent-bit:1.8.2 \
fluent/fluent-bit:4.2.3 \
/fluent-bit/bin/fluent-bit \
-R /fluent-bit/etc/parsers.conf \
-i tail \
Expand Down Expand Up @@ -143,7 +143,7 @@ Run the following command, which uses a `CREATE STREAM` statement to tag results

```shell
docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log \
fluent/fluent-bit:1.8.2 \
fluent/fluent-bit:4.2.3 \
/fluent-bit/bin/fluent-bit \
-R /fluent-bit/etc/parsers.conf \
-i tail \
Expand Down