Skip to content

Commit 045ea4d

Browse files
committed
storage: use minimum of broker_timestamp and max_ts for retention_ms
The motivating case for `broker_time_based_retention` was the fact that records with bad timestamps produced in the future could lead to time-based retention being stuck indefinitely [1]. However, using _only_ the `broker_ts` can lead to unexpected behavior when e.g. replicating data from an existing cluster using MM2, as the timestamps of the Kafka records themselves are correctly preserved, but internally, `redpanda` data structures are not. To avoid the potentially curious behavior of a divergence in retention enforcement, take the minimum of the record's timestamp as written and the current broker time. This will achieve the original goal of preventing future timestamps from blocking retention enforcement, while also avoiding any unexpected behavior with past record timestamps. [1]: * redpanda-data#9820 * redpanda-data#12991
1 parent f7fb365 commit 045ea4d

3 files changed

Lines changed: 63 additions & 23 deletions

File tree

src/v/storage/segment_index.h

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,23 @@ struct time_based_retention_cfg {
8383
std::optional<model::timestamp> alternative_retention_ts) const noexcept {
8484
// new clusters and new segments should hit this branch
8585
if (likely(use_broker_time && broker_ts.has_value())) {
86-
return *broker_ts;
86+
// The motivating case for `broker_time_based_retention` was the
87+
// fact that records with bad timestamps produced in the future
88+
// could lead to time-based retention being stuck indefinitely (See:
89+
// https://github.com/redpanda-data/redpanda/issues/9820,
90+
// https://github.com/redpanda-data/redpanda/pull/12991).
91+
// However, using _only_ the `broker_ts` can lead to unexpected
92+
// behavior when e.g. replicating data from an existing cluster
93+
// using MM2, as the timestamps of the Kafka records themselves are
94+
// correctly preserved, but internally, `redpanda` data structures
95+
// are not.
96+
// To avoid the potentially curious behavior of a divergence in
97+
// retention enforcement, take the minimum of the record's timestamp
98+
// as written and the current broker time. This will achieve the
99+
// original goal of preventing future timestamps from blocking
100+
// retention enforcement, while also avoiding any unexpected
101+
// behavior with past record timestamps.
102+
return std::min(max_ts, broker_ts.value());
87103
}
88104
// don't use broker time or no broker time available. fallback
89105
if (unlikely(

src/v/storage/tests/storage_e2e_test.cc

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -695,29 +695,8 @@ TEST_F(storage_test_fixture, test_time_based_eviction) {
695695
as);
696696
};
697697

698-
// gc with timestamp -1s, no segments should be evicted
698+
// gc with timestamp -1s, all segments should be evicted
699699
compact_and_prefix_truncate(*disk_log, make_compaction_cfg(broker_t0 - 2s));
700-
ASSERT_EQ(disk_log->segments().size(), 3);
701-
ASSERT_EQ(
702-
disk_log->segments().front()->offsets().get_base_offset(),
703-
model::offset(0));
704-
ASSERT_EQ(
705-
disk_log->segments().back()->offsets().get_dirty_offset(),
706-
model::offset(59));
707-
708-
// gc with timestamp +sep/2, should evict first segment
709-
compact_and_prefix_truncate(
710-
*disk_log, make_compaction_cfg(broker_t0 + (broker_ts_sep / 2)));
711-
ASSERT_EQ(disk_log->segments().size(), 2);
712-
ASSERT_EQ(
713-
disk_log->segments().front()->offsets().get_base_offset(),
714-
model::offset(10));
715-
ASSERT_EQ(
716-
disk_log->segments().back()->offsets().get_dirty_offset(),
717-
model::offset(59));
718-
// gc with timestamp +sep3/2, should evict another segment
719-
compact_and_prefix_truncate(
720-
*disk_log, make_compaction_cfg(broker_t0 + (3 * broker_ts_sep / 2)));
721700
ASSERT_EQ(disk_log->segments().size(), 1);
722701
ASSERT_EQ(
723702
disk_log->segments().front()->offsets().get_base_offset(),

tests/rptest/tests/retention_policy_test.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -863,3 +863,48 @@ def prefix_truncated():
863863
self.redpanda.wait_until(prefix_truncated,
864864
timeout_sec=30,
865865
backoff_sec=1)
866+
867+
@cluster(num_nodes=2)
868+
def test_past_timestamps(self):
869+
"""
870+
While future record timestamps should be adjusted back to the broker
871+
timestamp to avoid blocking retention enforcement, record timestamps in the past
872+
should still be respected.
873+
"""
874+
875+
# Set `retention.ms` to 23 hours
876+
retention_ms = 23 * 3600 * 1000
877+
self.client().alter_topic_config(self.topic, 'retention.ms',
878+
retention_ms)
879+
880+
# A fictional artificial timestamp base in milliseconds (one day previous)
881+
past_timestamp = (int(time.time()) - 24 * 3600) * 1000
882+
883+
# Produce a run of messages with CreateTime-style timestamps, each
884+
# record having a timestamp 1ms greater than the last.
885+
msg_size = 14000
886+
segments_count = 10
887+
msg_count = (self.segment_size // msg_size) * segments_count
888+
889+
# Write msg_count messages with timestamps in the past
890+
producer = KgoVerifierProducer(
891+
context=self.test_context,
892+
redpanda=self.redpanda,
893+
topic=self.topic,
894+
msg_size=msg_size,
895+
msg_count=(self.segment_size // msg_size) * segments_count,
896+
fake_timestamp_ms=past_timestamp,
897+
batch_max_bytes=msg_size * 2)
898+
producer.start()
899+
producer.wait()
900+
901+
def prefix_truncated():
902+
segs = self.redpanda.node_storage(self.redpanda.nodes[0]).segments(
903+
"kafka", self.topic, 0)
904+
self.logger.debug(f"Segments: {segs}")
905+
return len(segs) <= 1
906+
907+
# Expect to see prefix truncation of day old records with `retention.ms=23h`.
908+
self.redpanda.wait_until(prefix_truncated,
909+
timeout_sec=30,
910+
backoff_sec=1)

0 commit comments

Comments
 (0)