Skip to content

Commit 80a5a9f

Browse files
authored
Handle fields with high-cardinality and big payloads. (netdata#21716)
* Remove verbose logs and fix log-levels `tracing` maps debug logs to info on systemd. Remove verbose logs and set log level to trace for those we want to keep. * Add configurable journal file indexing limits Add max_unique_values_per_field and max_field_payload_size options to protect against memory exhaustion when indexing high-cardinality fields or large payloads. Limits are configurable via journal-viewer.yaml. * Deduplicate incoming facets.
1 parent a6036df commit 80a5a9f

11 files changed

Lines changed: 240 additions & 34 deletions

File tree

src/crates/journal-engine/examples/index.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232

3333
use foundation::Timeout;
3434
use journal_engine::{
35-
Facets, FileIndexCacheBuilder, FileIndexKey, QueryTimeRange, batch_compute_file_indexes,
35+
Facets, FileIndexCacheBuilder, FileIndexKey, IndexingLimits, QueryTimeRange,
36+
batch_compute_file_indexes,
3637
};
3738
use journal_index::FieldName;
3839
use journal_registry::{Monitor, Registry};
@@ -120,8 +121,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
120121

121122
// Run batch indexing
122123
let start = std::time::Instant::now();
123-
let responses =
124-
batch_compute_file_indexes(&cache, &registry, keys, &time_range, timeout).await?;
124+
let responses = batch_compute_file_indexes(
125+
&cache,
126+
&registry,
127+
keys,
128+
&time_range,
129+
timeout,
130+
IndexingLimits::default(),
131+
)
132+
.await?;
125133

126134
let elapsed = start.elapsed();
127135

src/crates/journal-engine/src/facets.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,9 @@ impl Facets {
103103
.collect()
104104
};
105105

106-
// Sort in order to get the same hash for the same set of fields
106+
// Sort and deduplicate to get a canonical set of fields
107107
facets.sort();
108+
facets.dedup();
108109

109110
use std::hash::Hasher;
110111
let mut hasher = std::hash::DefaultHasher::new();

src/crates/journal-engine/src/indexing.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::{
1010
query_time_range::QueryTimeRange,
1111
};
1212
use foundation::Timeout;
13-
use journal_index::{FileIndex, FileIndexer};
13+
use journal_index::{FileIndex, FileIndexer, IndexingLimits};
1414
use journal_registry::Registry;
1515
use tracing::{error, trace};
1616

@@ -136,8 +136,9 @@ impl Default for FileIndexCacheBuilder {
136136
/// * `cache` - The file index cache
137137
/// * `registry` - Registry to update with file metadata
138138
/// * `keys` - Vector of (file, facets, source_timestamp_field) to fetch/compute indexes for
139-
/// * `bucket_duration` - Duration of histogram buckets in seconds
139+
/// * `time_range` - Query time range for bucket duration calculation
140140
/// * `timeout` - Timeout for the entire operation (can be extended dynamically)
141+
/// * `indexing_limits` - Configuration limits for indexing (cardinality, payload size)
141142
///
142143
/// # Returns
143144
/// Vector of responses for each key. Successful responses contain the file index.
@@ -148,6 +149,7 @@ pub async fn batch_compute_file_indexes(
148149
keys: Vec<FileIndexKey>,
149150
time_range: &QueryTimeRange,
150151
timeout: Timeout,
152+
indexing_limits: IndexingLimits,
151153
) -> Result<Vec<(FileIndexKey, FileIndex)>> {
152154
let bucket_duration = time_range.bucket_duration_seconds();
153155
// Phase 1: Batch check cache for all keys upfront
@@ -240,7 +242,7 @@ pub async fn batch_compute_file_indexes(
240242
return (key, Err(EngineError::TimeBudgetExceeded));
241243
}
242244

243-
let mut file_indexer = FileIndexer::default();
245+
let mut file_indexer = FileIndexer::new(indexing_limits);
244246
let result = file_indexer
245247
.index(
246248
&key.file,

src/crates/journal-engine/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,6 @@ pub use histogram::{
3434
BucketRequest, BucketResponse, Histogram, HistogramEngine, calculate_bucket_duration,
3535
};
3636
pub use indexing::{FileIndexCacheBuilder, batch_compute_file_indexes};
37+
pub use journal_index::IndexingLimits;
3738
pub use logs::{CellValue, ColumnInfo, LogEntryData, LogQuery, Table, entry_data_to_table};
3839
pub use query_time_range::QueryTimeRange;

src/crates/journal-index/src/file_indexer.rs

Lines changed: 125 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,42 @@ use journal_registry::File;
1717
use std::num::NonZeroU64;
1818
use tracing::{error, warn};
1919

20+
/// Default maximum number of unique values to index per field.
21+
pub const DEFAULT_MAX_UNIQUE_VALUES_PER_FIELD: usize = 500;
22+
23+
/// Default maximum payload size (in bytes) for field values to index.
24+
pub const DEFAULT_MAX_FIELD_PAYLOAD_SIZE: usize = 100;
25+
26+
/// Configuration limits for the indexing process.
27+
///
28+
/// These limits protect against unbounded memory growth when indexing
29+
/// journal files with high-cardinality fields or large payloads.
30+
#[derive(Debug, Clone, Copy)]
31+
pub struct IndexingLimits {
32+
/// Maximum number of unique values to index per field.
33+
///
34+
/// Fields with more unique values than this limit will have their indexing
35+
/// truncated. This protects against high-cardinality fields (e.g., MESSAGE
36+
/// with millions of unique values) causing memory exhaustion.
37+
pub max_unique_values_per_field: usize,
38+
39+
/// Maximum payload size (in bytes) for field values to index.
40+
///
41+
/// Field values with payloads larger than this limit (or compressed values)
42+
/// will be skipped. This prevents large binary data or encoded content
43+
/// from consuming excessive memory.
44+
pub max_field_payload_size: usize,
45+
}
46+
47+
impl Default for IndexingLimits {
48+
fn default() -> Self {
49+
Self {
50+
max_unique_values_per_field: DEFAULT_MAX_UNIQUE_VALUES_PER_FIELD,
51+
max_field_payload_size: DEFAULT_MAX_FIELD_PAYLOAD_SIZE,
52+
}
53+
}
54+
}
55+
2056
/// Reusable indexer for creating searchable indexes from journal files.
2157
///
2258
/// # Indexing Process
@@ -39,9 +75,12 @@ use tracing::{error, warn};
3975
/// The indexer captures the journal file's `tail_object_offset` at the start of indexing
4076
/// to create a consistent snapshot. Any entries written to the file after indexing begins
4177
/// are ignored, preventing race conditions with concurrent writers.
42-
#[derive(Debug, Default)]
78+
#[derive(Debug)]
4379
#[cfg_attr(feature = "allocative", derive(allocative::Allocative))]
4480
pub struct FileIndexer {
81+
/// Configuration limits for the indexing process.
82+
limits: IndexingLimits,
83+
4584
// Associates a source timestamp value with its inlined cursor
4685
source_timestamp_cursor_pairs: Vec<(Microseconds, InlinedCursor)>,
4786

@@ -64,8 +103,35 @@ pub struct FileIndexer {
64103
entry_offset_index: HashMap<NonZeroU64, u64>,
65104
}
66105

106+
impl Default for FileIndexer {
107+
fn default() -> Self {
108+
Self::new(IndexingLimits::default())
109+
}
110+
}
111+
112+
impl FileIndexer {
113+
/// Create a new indexer with the specified configuration limits.
114+
pub fn new(limits: IndexingLimits) -> Self {
115+
Self {
116+
limits,
117+
source_timestamp_cursor_pairs: Vec::new(),
118+
entry_offsets: Vec::new(),
119+
source_timestamp_entry_offset_pairs: Vec::new(),
120+
realtime_entry_offset_pairs: Vec::new(),
121+
entry_indices: Vec::new(),
122+
entry_offset_index: HashMap::default(),
123+
}
124+
}
125+
}
126+
67127
impl FileIndexer {
68128
/// Create a searchable index from a journal file.
129+
///
130+
/// # Arguments
131+
/// * `file` - The journal file to index
132+
/// * `source_timestamp_field` - Optional field to use for timestamps
133+
/// * `field_names` - Fields to create bitmap indexes for
134+
/// * `bucket_duration` - Duration of histogram buckets
69135
pub fn index(
70136
&mut self,
71137
file: &File,
@@ -168,6 +234,9 @@ impl FileIndexer {
168234
///
169235
/// Only entries with offsets <= `tail_object_offset` are included in the
170236
/// bitmaps, ensuring a consistent snapshot.
237+
///
238+
/// Fields with more than `self.limits.max_unique_values_per_field` unique values
239+
/// will have their indexing truncated to prevent unbounded memory growth.
171240
fn build_entries_index(
172241
&mut self,
173242
journal_file: &JournalFile<Mmap>,
@@ -176,6 +245,8 @@ impl FileIndexer {
176245
tail_object_offset: NonZeroU64,
177246
) -> Result<HashMap<FieldValuePair, Bitmap>> {
178247
let mut entries_index = HashMap::default();
248+
let mut truncated_fields: Vec<&FieldName> = Vec::new();
249+
let mut fields_with_large_payloads: Vec<&FieldName> = Vec::new();
179250

180251
for field_name in field_names {
181252
let Some(systemd_field) = field_map.get(field_name.as_str()) else {
@@ -197,13 +268,32 @@ impl FileIndexer {
197268
}
198269
};
199270

271+
// Track the number of unique values indexed for this field
272+
let mut unique_values_count: usize = 0;
273+
let mut ignored_large_payloads: usize = 0;
274+
let mut was_truncated = false;
275+
200276
for data_object in field_data_iterator {
277+
// Check cardinality limit before processing this value
278+
if unique_values_count >= self.limits.max_unique_values_per_field {
279+
was_truncated = true;
280+
break;
281+
}
282+
201283
// Get the payload and the inlined cursor for this data object
202284
let (data_payload, inlined_cursor) = {
203285
let Ok(data_object) = data_object else {
204286
continue;
205287
};
206288

289+
// Do not create indexes with fields that contain large payloads.
290+
if data_object.raw_payload().len() >= self.limits.max_field_payload_size
291+
|| data_object.is_compressed()
292+
{
293+
ignored_large_payloads += 1;
294+
continue;
295+
}
296+
207297
// Skip the remapping value
208298
if data_object.raw_payload().ends_with(field_name.as_bytes()) {
209299
continue;
@@ -262,7 +352,41 @@ impl FileIndexer {
262352
let field_name = FieldName::new_unchecked(field_name);
263353
let k = FieldValuePair::new_unchecked(field_name, String::from(pair.value()));
264354
entries_index.insert(k, bitmap);
355+
356+
unique_values_count += 1;
357+
}
358+
359+
// Track fields that were truncated or had large payloads skipped
360+
if was_truncated {
361+
truncated_fields.push(field_name);
265362
}
363+
if ignored_large_payloads > 0 {
364+
fields_with_large_payloads.push(field_name);
365+
}
366+
}
367+
368+
// Log summary of indexing issues
369+
if !truncated_fields.is_empty() {
370+
let field_names: Vec<&str> = truncated_fields.iter().map(|f| f.as_str()).collect();
371+
warn!(
372+
"File '{}': {} field(s) truncated due to cardinality limit ({}): {:?}",
373+
journal_file.file().path(),
374+
truncated_fields.len(),
375+
self.limits.max_unique_values_per_field,
376+
field_names
377+
);
378+
}
379+
if !fields_with_large_payloads.is_empty() {
380+
let field_names: Vec<&str> = fields_with_large_payloads
381+
.iter()
382+
.map(|f| f.as_str())
383+
.collect();
384+
tracing::info!(
385+
"File '{}': {} field(s) had values skipped due to large payloads: {:?}",
386+
journal_file.file().path(),
387+
fields_with_large_payloads.len(),
388+
field_names
389+
);
266390
}
267391

268392
Ok(entries_index)

src/crates/journal-index/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ pub use file_index::{
2020
};
2121

2222
pub mod file_indexer;
23-
pub use file_indexer::FileIndexer;
23+
pub use file_indexer::{
24+
FileIndexer, IndexingLimits, DEFAULT_MAX_FIELD_PAYLOAD_SIZE, DEFAULT_MAX_UNIQUE_VALUES_PER_FIELD,
25+
};
2426

2527
pub mod bitmap;
2628
pub use bitmap::Bitmap;

src/crates/netdata-log-viewer/journal-function/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ pub mod netdata;
1111
// Re-export types from journal-engine for convenience
1212
pub use journal_engine::{
1313
BucketRequest, BucketResponse, CellValue, ColumnInfo, Facets, FileIndexCache,
14-
FileIndexCacheBuilder, FileIndexKey, Histogram, HistogramEngine, LogEntryData, LogQuery,
15-
QueryTimeRange, Result, Table, batch_compute_file_indexes, calculate_bucket_duration,
16-
entry_data_to_table,
14+
FileIndexCacheBuilder, FileIndexKey, Histogram, HistogramEngine, IndexingLimits,
15+
LogEntryData, LogQuery, QueryTimeRange, Result, Table, batch_compute_file_indexes,
16+
calculate_bucket_duration, entry_data_to_table,
1717
};
1818

1919
// Re-export Timeout from foundation (via rt for backward compatibility)

src/crates/netdata-log-viewer/journal-viewer-plugin/configs/journal-viewer.yaml.in

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,19 @@ cache:
4343
# Controls backpressure on the indexing system
4444
# Default: 100
4545
queue_capacity: 100
46+
47+
indexing:
48+
# Maximum number of unique values to index per field.
49+
# Fields with more unique values than this limit will have their indexing
50+
# truncated to prevent unbounded memory growth. This protects against
51+
# high-cardinality fields (e.g., MESSAGE with millions of unique values)
52+
# causing memory exhaustion during indexing.
53+
# Default: 500
54+
max_unique_values_per_field: 500
55+
56+
# Maximum payload size (in bytes) for field values to index.
57+
# Field values with payloads larger than this limit (or compressed values)
58+
# will be skipped. This prevents large binary data or encoded content
59+
# from consuming excessive memory.
60+
# Default: 100
61+
max_field_payload_size: 100

0 commit comments

Comments
 (0)