Skip to content

Commit d80178e

Browse files
committed
storage: use minimum of broker_timestamp and max_ts for retention_ms
The motivating case for `broker_time_based_retention` was the fact that records with bad timestamps produced in the future could lead to time-based retention being stuck indefinitely [1]. However, using _only_ the `broker_ts` can lead to unexpected behavior when e.g. replicating data from an existing cluster using MM2, as the timestamps of the Kafka records themselves are correctly preserved, but internally, `redpanda` data structures are not. To avoid the potentially curious behavior of a divergence in retention enforcement, take the minimum of the record's timestamp as written and the current broker time. This will achieve the original goal of preventing future timestamps from blocking retention enforcement, while also avoiding any unexpected behavior with past record timestamps. We also need to deal with the case where a client may have left a batch's `max_timestamp` unset, in which case it is marked with `{-1}` [2]. Clamp any non-positive values for `max_ts` to `broker_ts` in this case. [1]: * #9820 * #12991 [2]: * #25200
1 parent ff26805 commit d80178e

3 files changed

Lines changed: 104 additions & 40 deletions

File tree

src/v/storage/segment_index.h

Lines changed: 59 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -36,39 +36,51 @@ using broker_timestamp_t = ss::lowres_system_clock::time_point;
3636

3737
// clang-format off
3838
// this truth table shows which timestamps gets used as retention_timestamp
39+
// `use_min_broker_batch_ts` is only relevant iff `use_broker_ts` is also `TRUE`.
3940
//
40-
// use_broker_ts has_broker_ts ignore_future_ts has_alternative_to_future_ts retention_ts
41+
// use_broker_ts has_broker_ts ignore_future_ts has_alternative_to_future_ts use_min_broker_batch_ts retention_ts
4142
// (likely false)
42-
// TRUE TRUE TRUE TRUE broker_timestamp
43-
// TRUE TRUE TRUE FALSE broker_timestamp
44-
// TRUE TRUE FALSE TRUE broker_timestamp
45-
// TRUE TRUE FALSE FALSE broker_timestamp // new segment, new cluster
46-
// TRUE FALSE TRUE TRUE segment_index::_retention_ms // buggy old segment, new cluster
47-
// TRUE FALSE TRUE FALSE max_timestamp
48-
// TRUE FALSE FALSE TRUE max_timestamp
49-
// TRUE FALSE FALSE FALSE max_timestamp // old segment, new cluster
50-
// FALSE TRUE TRUE TRUE segment_index::_retention_ms
51-
// FALSE TRUE TRUE FALSE max_timestamp
52-
// FALSE TRUE FALSE TRUE max_timestamp
53-
// FALSE TRUE FALSE FALSE max_timestamp // new segment, upgraded cluster
54-
// FALSE FALSE TRUE TRUE segment_index::_retention_ms // buggy old segments
55-
// FALSE FALSE TRUE FALSE max_timestamp
56-
// FALSE FALSE FALSE TRUE max_timestamp
57-
// FALSE FALSE FALSE FALSE max_timestamp // old segment, upgraded cluster
43+
// TRUE TRUE TRUE TRUE TRUE min(max_timestamp, broker_timestamp)
44+
// TRUE TRUE TRUE FALSE TRUE min(max_timestamp, broker_timestamp)
45+
// TRUE TRUE FALSE TRUE TRUE min(max_timestamp, broker_timestamp)
46+
// TRUE TRUE FALSE FALSE TRUE min(max_timestamp, broker_timestamp) // new segment, new cluster
47+
// TRUE FALSE TRUE TRUE TRUE segment_index::_retention_ms // buggy old segment, new cluster
48+
// TRUE FALSE TRUE FALSE TRUE max_timestamp
49+
// TRUE FALSE FALSE TRUE TRUE max_timestamp
50+
// TRUE FALSE FALSE FALSE TRUE max_timestamp // old segment, new cluster
51+
// TRUE TRUE TRUE TRUE FALSE broker_timestamp
52+
// TRUE TRUE TRUE FALSE FALSE broker_timestamp
53+
// TRUE TRUE FALSE TRUE FALSE broker_timestamp
54+
// TRUE TRUE FALSE FALSE FALSE broker_timestamp // new segment, new cluster
55+
// TRUE FALSE TRUE TRUE FALSE segment_index::_retention_ms // buggy old segment, new cluster
56+
// TRUE FALSE TRUE FALSE FALSE max_timestamp
57+
// TRUE FALSE FALSE TRUE FALSE max_timestamp
58+
// TRUE FALSE FALSE FALSE FALSE max_timestamp // old segment, new cluster
59+
// FALSE TRUE TRUE TRUE ----- segment_index::_retention_ms
60+
// FALSE TRUE TRUE FALSE ----- max_timestamp
61+
// FALSE TRUE FALSE TRUE ----- max_timestamp
62+
// FALSE TRUE FALSE FALSE ----- max_timestamp // new segment, upgraded cluster
63+
// FALSE FALSE TRUE TRUE ----- segment_index::_retention_ms // buggy old segments
64+
// FALSE FALSE TRUE FALSE ----- max_timestamp
65+
// FALSE FALSE FALSE TRUE ----- max_timestamp
66+
// FALSE FALSE FALSE FALSE ----- max_timestamp // old segment, upgraded cluster
5867
// clang-format on
5968

6069
// this struct is meant to be a local copy of the feature
6170
// broker_time_based_retention and configuration property
6271
// storage_ignore_timestamps_in_future_secs
6372
struct time_based_retention_cfg {
6473
bool use_broker_time;
74+
bool use_min_broker_batch_time;
6575
bool use_escape_hatch_for_timestamps_in_the_future;
6676

6777
static auto make(const features::feature_table& ft)
6878
-> time_based_retention_cfg {
6979
return {
7080
.use_broker_time = ft.is_active(
7181
features::feature::broker_time_based_retention),
82+
.use_min_broker_batch_time = ft.is_active(
83+
features::feature::min_broker_batch_time_based_retention),
7284
.use_escape_hatch_for_timestamps_in_the_future
7385
= config::shard_local_cfg()
7486
.storage_ignore_timestamps_in_future_sec()
@@ -83,7 +95,36 @@ struct time_based_retention_cfg {
8395
std::optional<model::timestamp> alternative_retention_ts) const noexcept {
8496
// new clusters and new segments should hit this branch
8597
if (likely(use_broker_time && broker_ts.has_value())) {
86-
return *broker_ts;
98+
auto ts = broker_ts.value();
99+
if (use_min_broker_batch_time) {
100+
// Some clients leave `max_timestamp` within a batch empty,
101+
// leaving it marked as {-1} (See:
102+
// https://github.com/redpanda-data/redpanda/pull/25200). Only
103+
// consider positive timestamps as reasonable alternatives to
104+
// `broker_ts`.
105+
if (max_ts > model::timestamp{0}) {
106+
// The motivating case for `broker_time_based_retention` was
107+
// the fact that records with bad timestamps produced in the
108+
// future could lead to time-based retention being stuck
109+
// indefinitely (See:
110+
// https://github.com/redpanda-data/redpanda/issues/9820,
111+
// https://github.com/redpanda-data/redpanda/pull/12991).
112+
// However, using _only_ the `broker_ts` can lead to
113+
// unexpected behavior when e.g. replicating data from an
114+
// existing cluster using MM2, as the timestamps of the
115+
// Kafka records themselves are correctly preserved, but
116+
// internally, `redpanda` data structures are not. To avoid
117+
// the potentially curious behavior of a divergence in
118+
// retention enforcement, take the minimum of the record's
119+
// timestamp as written and the current broker time. This
120+
// will achieve the original goal of preventing future
121+
// timestamps from blocking retention enforcement, while
122+
// also avoiding any unexpected behavior with past record
123+
// timestamps.
124+
ts = std::min(max_ts, ts);
125+
}
126+
}
127+
return ts;
87128
}
88129
// don't use broker time or no broker time available. fallback
89130
if (unlikely(

src/v/storage/tests/storage_e2e_test.cc

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -695,29 +695,8 @@ TEST_F(storage_test_fixture, test_time_based_eviction) {
695695
as);
696696
};
697697

698-
// gc with timestamp -1s, no segments should be evicted
698+
// gc with timestamp -1s, all segments should be evicted
699699
compact_and_prefix_truncate(*disk_log, make_compaction_cfg(broker_t0 - 2s));
700-
ASSERT_EQ(disk_log->segments().size(), 3);
701-
ASSERT_EQ(
702-
disk_log->segments().front()->offsets().get_base_offset(),
703-
model::offset(0));
704-
ASSERT_EQ(
705-
disk_log->segments().back()->offsets().get_dirty_offset(),
706-
model::offset(59));
707-
708-
// gc with timestamp +sep/2, should evict first segment
709-
compact_and_prefix_truncate(
710-
*disk_log, make_compaction_cfg(broker_t0 + (broker_ts_sep / 2)));
711-
ASSERT_EQ(disk_log->segments().size(), 2);
712-
ASSERT_EQ(
713-
disk_log->segments().front()->offsets().get_base_offset(),
714-
model::offset(10));
715-
ASSERT_EQ(
716-
disk_log->segments().back()->offsets().get_dirty_offset(),
717-
model::offset(59));
718-
// gc with timestamp +sep3/2, should evict another segment
719-
compact_and_prefix_truncate(
720-
*disk_log, make_compaction_cfg(broker_t0 + (3 * broker_ts_sep / 2)));
721700
ASSERT_EQ(disk_log->segments().size(), 1);
722701
ASSERT_EQ(
723702
disk_log->segments().front()->offsets().get_base_offset(),

tests/rptest/tests/retention_policy_test.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -972,3 +972,47 @@ def prefix_truncated():
972972
# Segments should be cleaned up now that we've switched on force-correction
973973
# of timestamps in the future
974974
self.redpanda.wait_until(prefix_truncated, timeout_sec=30, backoff_sec=1)
975+
976+
@cluster(num_nodes=2)
977+
def test_past_timestamps(self):
978+
"""
979+
While future record timestamps should be adjusted back to the broker
980+
timestamp to avoid blocking retention enforcement, record timestamps in the past
981+
should still be respected.
982+
"""
983+
984+
# Set `retention.ms` to 23 hours
985+
retention_ms = 23 * 3600 * 1000
986+
self.client().alter_topic_config(self.topic, "retention.ms", retention_ms)
987+
988+
# A fictional artificial timestamp base in milliseconds (one day previous)
989+
past_timestamp = (int(time.time()) - 24 * 3600) * 1000
990+
991+
# Produce a run of messages with CreateTime-style timestamps, each
992+
# record having a timestamp 1ms greater than the last.
993+
msg_size = 14000
994+
segments_count = 10
995+
msg_count = (self.segment_size // msg_size) * segments_count
996+
997+
# Write msg_count messages with timestamps in the past
998+
producer = KgoVerifierProducer(
999+
context=self.test_context,
1000+
redpanda=self.redpanda,
1001+
topic=self.topic,
1002+
msg_size=msg_size,
1003+
msg_count=(self.segment_size // msg_size) * segments_count,
1004+
fake_timestamp_ms=past_timestamp,
1005+
batch_max_bytes=msg_size * 2,
1006+
)
1007+
producer.start()
1008+
producer.wait()
1009+
1010+
def prefix_truncated():
1011+
segs = self.redpanda.node_storage(self.redpanda.nodes[0]).segments(
1012+
"kafka", self.topic, 0
1013+
)
1014+
self.logger.debug(f"Segments: {segs}")
1015+
return len(segs) <= 1
1016+
1017+
# Expect to see prefix truncation of day old records with `retention.ms=23h`.
1018+
self.redpanda.wait_until(prefix_truncated, timeout_sec=30, backoff_sec=1)

0 commit comments

Comments
 (0)