Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4377,6 +4377,18 @@ configuration::configuration()
{.needs_restart = needs_restart::no, .visibility = visibility::user},
std::nullopt,
&validate_iceberg_topic_name_dot_replacement)
, iceberg_dlq_table_suffix(
*this,
"iceberg_dlq_table_suffix",
"Suffix appended to the Iceberg table name for the dead-letter queue "
"(DLQ) table that stores invalid records when the invalid record action "
"is set to `dlq_table`. Should be chosen in a way that avoids name "
"collisions with other Iceberg tables. Important for catalogs which do "
"not support ~ (tilde) in table names. Should not be changed after any "
"DLQ tables have been created.",
{.needs_restart = needs_restart::no, .visibility = visibility::user},
"~dlq",
&validate_non_empty_string_opt)
, enable_host_metrics(
*this,
"enable_host_metrics",
Expand Down
1 change: 1 addition & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,7 @@ struct configuration final : public config_store {
property<bool> iceberg_disable_snapshot_tagging;
property<bool> iceberg_disable_automatic_snapshot_expiry;
property<std::optional<ss::sstring>> iceberg_topic_name_dot_replacement;
property<ss::sstring> iceberg_dlq_table_suffix;

property<bool> enable_host_metrics;

Expand Down
5 changes: 4 additions & 1 deletion src/v/datalake/table_id_provider.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ table_id_provider::dlq_table_id(const model::topic& t) {
return {
// TODO: namespace as a topic property? Keep it in the table metadata?
.ns = {"redpanda"},
.table = fmt::format("{}~dlq", sanitize_topic_name(t)),
.table = fmt::format(
"{}{}",
sanitize_topic_name(t),
config::shard_local_cfg().iceberg_dlq_table_suffix()),
};
}

Expand Down
93 changes: 82 additions & 11 deletions tests/rptest/tests/datalake/biglake_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

import json

from confluent_kafka import Producer
from ducktape.mark import matrix
from ducktape.mark._mark import Mark

from rptest.clients.rpk import RpkTool
from rptest.context.gcp import GCPContext
from rptest.services.catalog_service import CatalogType
from rptest.services.cluster import cluster
Expand Down Expand Up @@ -62,6 +66,8 @@ def gcp_only_test(func, /):


class BiglakeTest(RedpandaTest):
dlq_table_suffix = "__panda_dlq"

def __init__(self, test_context, *args, **kwargs):
super().__init__(
test_context,
Expand All @@ -70,6 +76,7 @@ def __init__(self, test_context, *args, **kwargs):
extra_rp_conf={
"iceberg_enabled": "true",
"iceberg_catalog_commit_interval_ms": 5000,
"iceberg_dlq_table_suffix": self.dlq_table_suffix,
},
schema_registry_config=SchemaRegistryConfig(),
pandaproxy_config=PandaproxyConfig(),
Expand All @@ -83,6 +90,32 @@ def setUp(self):
# redpanda will be started by DatalakeServices
pass

def count_rows(self, dl: DatalakeServices, table_name: str) -> int:
t = dl.catalog_client().load_table(f"redpanda.{table_name}")
df = t.scan().to_duckdb("data")
r = df.sql("SELECT count(*) FROM data").fetchone()
self.logger.info(f"Row count for {table_name}: {r[0]}")
return r[0]

def wait_rows_count(
self,
dl: DatalakeServices,
table_name: str,
expected_count: int,
timeout_sec: int = 60,
):
wait_until(
lambda: dl.catalog_client().table_exists(f"redpanda.{table_name}"),
timeout_sec=timeout_sec,
backoff_sec=1,
)

wait_until(
lambda: self.count_rows(dl, table_name) == expected_count,
timeout_sec=timeout_sec,
backoff_sec=1,
)

@gcp_only_test
@cluster(num_nodes=2)
@matrix(cloud_storage_type=supported_storage_types())
Expand All @@ -97,17 +130,55 @@ def test_e2e_basic(self, cloud_storage_type):
dl.create_iceberg_enabled_topic(self.topic_name, partitions=10)
dl.produce_to_topic(self.topic_name, 1024, count)

wait_until(
lambda: dl.catalog_client().table_exists(f"redpanda.{self.topic_name}"),
timeout_sec=30,
backoff_sec=1,
self.wait_rows_count(dl, self.topic_name, count, timeout_sec=60)

@gcp_only_test
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q: this will be cdt-only, right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Will trigger a run as soon as everything else passes.

@cluster(num_nodes=2)
@matrix(cloud_storage_type=supported_storage_types())
def test_dlq(self, cloud_storage_type):
count = 10
with DatalakeServices(
self.test_context,
redpanda=self.redpanda,
include_query_engines=[],
catalog_type=CatalogType.BIGLAKE,
) as dl:
dl.create_iceberg_enabled_topic(
self.topic_name, partitions=1, iceberg_mode="value_schema_latest"
)

def count_rows():
t = dl.catalog_client().load_table(f"redpanda.{self.topic_name}")
df = t.scan().to_duckdb("data")
r = df.sql("SELECT count(*) FROM data").fetchone()
self.logger.info(f"Row count for {self.topic_name}: {r[0]}")
return r[0]
self.logger.info(f"Creating schema for topic {self.topic_name}")
rpk = RpkTool(self.redpanda)
rpk.create_schema_from_str(
subject=f"{self.topic_name}-value",
schema="""
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": { "type": "integer" }
}
}
""",
schema_suffix="json",
)

self.logger.info(f"Producing {count} invalid messages to {self.topic_name}")
dl.produce_to_topic(self.topic_name, 1024, count)

self.logger.info(f"Producing {count} valid messages to {self.topic_name}")
producer = Producer({"bootstrap.servers": self.redpanda.brokers()})
for i in range(count):
producer.produce(
self.topic_name,
value=json.dumps({"id": i}),
)
producer.flush()

self.logger.info("Waiting for DLQ table to have expected rows")
self.wait_rows_count(
dl, f"{self.topic_name}{self.dlq_table_suffix}", count, timeout_sec=60
)

wait_until(lambda: count_rows() == count, timeout_sec=60, backoff_sec=1)
self.logger.info("Waiting for main table to have expected rows")
self.wait_rows_count(dl, self.topic_name, count, timeout_sec=60)
41 changes: 41 additions & 0 deletions tests/rptest/tests/datalake/datalake_dlq_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,47 @@ def test_dlq_table_for_invalid_records(
verifier.start()
verifier.wait()

@cluster(num_nodes=3)
@matrix(
cloud_storage_type=supported_storage_types(),
# Lightweight matrix as we only care about custom suffix behavior here.
query_engine=[QueryEngineType.DUCKDB_PY],
catalog_type=[filesystem_catalog_type()],
)
def test_dlq_table_for_invalid_records_custom_suffix(
self, cloud_storage_type, query_engine, catalog_type
):
"""
Test DLQ table with custom suffix configured via
`iceberg_dlq_table_suffix` Redpanda config.
"""
dlq_table_suffix = "__panda_dlq"
num_records = 10

with DatalakeServices(
self.test_ctx,
redpanda=self.redpanda,
catalog_type=catalog_type,
include_query_engines=[query_engine],
) as dl:
self.redpanda.set_cluster_config(
{"iceberg_dlq_table_suffix": dlq_table_suffix}
)

dl.create_iceberg_enabled_topic(
self.topic_name, iceberg_mode="value_schema_id_prefix"
)

dl.produce_to_topic(self.topic_name, 1, num_records)
dl.wait_for_translation(
self.topic_name,
num_records,
table_override=f"{self.topic_name}{dlq_table_suffix}",
)

# No other tables created.
assert dl.num_tables() == 1, "Expected only 1 table in catalog"

@cluster(num_nodes=4)
@matrix(
cloud_storage_type=supported_storage_types(),
Expand Down