Skip to content

Commit 259c469

Browse files
committed
rptest: add timestamp_policy_test
A variety of test cases that produce records with timestamps in the past, future, and present, to assert that `message.timestamp.{before/after}.max.ms` are being correctly enforced.
1 parent d1c1d62 commit 259c469

1 file changed

Lines changed: 134 additions & 0 deletions

File tree

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
# Copyright 2025 Redpanda Data, Inc.
2+
#
3+
# Use of this software is governed by the Business Source License
4+
# included in the file licenses/BSL.md
5+
#
6+
# As of the Change Date specified in that file, in accordance with
7+
# the Business Source License, use of this software will be governed
8+
# by the Apache License, Version 2.0
9+
10+
import time
11+
import datetime
12+
from dataclasses import dataclass
13+
from typing import Callable
14+
15+
from rptest.services.kgo_verifier_services import KgoVerifierProducer
16+
from rptest.clients.types import TopicSpec
17+
from rptest.services.cluster import cluster
18+
from rptest.tests.redpanda_test import RedpandaTest
19+
from rptest.util import expect_exception
20+
21+
from confluent_kafka import Producer
22+
23+
24+
def expect_failure_callback(err, msg):
25+
assert err is not None, "Expected failure"
26+
27+
28+
def expect_success_callback(err, msg):
29+
assert err is None, "Expected success"
30+
31+
32+
@dataclass
33+
class TestCase:
34+
key: str
35+
value: str
36+
timestamp: int
37+
callback: Callable
38+
39+
40+
class TimestampPolicyTest(RedpandaTest):
41+
topics = (
42+
TopicSpec(
43+
partition_count=1,
44+
replication_factor=1,
45+
cleanup_policy=TopicSpec.CLEANUP_DELETE,
46+
),
47+
)
48+
49+
@cluster(num_nodes=1)
50+
def test_produce_timestamps(self):
51+
"""
52+
Test that bounds set by `message.timestamp.{before/after}.max.ms` are respected
53+
when producing.
54+
"""
55+
56+
# Bound valid timestamps by one hour. Expect any records with timestamps
57+
# outside this bound to be rejected at time of production.
58+
hour = 3600 * 1000
59+
self.client().alter_topic_config(
60+
self.topic, "message.timestamp.before.max.ms", hour
61+
)
62+
63+
self.client().alter_topic_config(
64+
self.topic, "message.timestamp.after.max.ms", hour
65+
)
66+
67+
producer = Producer({"bootstrap.servers": self.redpanda.brokers()})
68+
69+
test_cases = [
70+
TestCase(
71+
"Roads?",
72+
"Where we're going, we don't need roads.",
73+
int(datetime.datetime(1955, 11, 5, 0, 0).timestamp() * 1000),
74+
expect_failure_callback,
75+
),
76+
TestCase(
77+
"Do you ever have deja vu?",
78+
"I don't think so, but I could check with the kitchen.",
79+
int(datetime.datetime(1993, 2, 2, 0, 0).timestamp() * 1000),
80+
expect_failure_callback,
81+
),
82+
TestCase(
83+
"Drat.",
84+
"Just barely missed it!",
85+
int((time.time() - 61 * 60) * 1000),
86+
expect_failure_callback,
87+
),
88+
TestCase(
89+
"I'll be back.",
90+
"...in 59 minutes.",
91+
int((time.time() - 59 * 60) * 1000),
92+
expect_success_callback,
93+
),
94+
TestCase(
95+
"There's no time like the present.",
96+
"Unless the present has traffic.",
97+
int(time.time() * 1000),
98+
expect_success_callback,
99+
),
100+
TestCase(
101+
"If I could turn back time...",
102+
"...I’d go back 60 minutes and grab another donut.",
103+
int((time.time() + 59 * 60) * 1000),
104+
expect_success_callback,
105+
),
106+
TestCase(
107+
"Oops.",
108+
"Missed it by a hair!",
109+
int((time.time() + 61 * 60) * 1000),
110+
expect_failure_callback,
111+
),
112+
TestCase(
113+
"Be excellent to each other.",
114+
"Party on, dudes!",
115+
int(datetime.datetime(2688, 1, 1, 0, 0).timestamp() * 1000),
116+
expect_failure_callback,
117+
),
118+
TestCase(
119+
"Year 3000!",
120+
"Here's to another lousy millennium.",
121+
int(datetime.datetime(2999, 12, 31, 0, 0).timestamp() * 1000),
122+
expect_failure_callback,
123+
),
124+
]
125+
126+
for test_case in test_cases:
127+
producer.produce(
128+
topic=self.topic,
129+
key=test_case.key.encode("utf-8"),
130+
value=test_case.value.encode("utf-8"),
131+
timestamp=test_case.timestamp,
132+
callback=test_case.callback,
133+
)
134+
producer.flush()

0 commit comments

Comments
 (0)