Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions auron-flink-extension/auron-flink-planner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,19 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
<version>${hadoopVersion}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
<version>${hadoopVersion}</version>
<scope>test</scope>
</dependency>

<dependency>
<!-- For using the filesystem connector in tests -->
<groupId>org.apache.flink</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.auron.flink.table.kafka;

import static org.assertj.core.api.Assertions.assertThat;

import java.time.LocalDateTime;
import java.util.List;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.junit.jupiter.api.Test;

/**
* IT case for Auron Flink Kafka Source.
*/
public class AuronKafkaSourceITCase extends AuronKafkaSourceTestBase {

@Test
public void testEventTimeTumbleTvfWindow() {
environment.setParallelism(1);
List<Row> rows = CollectionUtil.iteratorToList(tableEnvironment
.executeSql(
"SELECT `name`, count(1), window_start FROM TABLE("
+ "TUMBLE(TABLE T2, DESCRIPTOR(`ts`), INTERVAL '1' MINUTE)) GROUP BY `name`, window_start, window_end")
.collect());
assertThat(rows.size()).isEqualTo(3);
assertRowsContains(
rows,
new Object[] {"zm1", 1L, LocalDateTime.parse("2026-03-16T12:03:00")},
new Object[] {"zm2", 1L, LocalDateTime.parse("2026-03-16T12:03:00")},
new Object[] {"zm1", 1L, LocalDateTime.parse("2026-03-16T12:05:00")});
}

@Test
public void testEventTimeTumbleGroupWindow() {
environment.setParallelism(1);
List<Row> rows = CollectionUtil.iteratorToList(tableEnvironment
.executeSql("SELECT `name`, count(1), TUMBLE_START(`ts`, INTERVAL '1' MINUTE) "
+ "FROM T2 group by TUMBLE(`ts`, INTERVAL '1' MINUTE), `name`")
.collect());
assertThat(rows.size()).isEqualTo(3);
assertRowsContains(
rows,
new Object[] {"zm1", 1L, LocalDateTime.parse("2026-03-16T12:03:00")},
new Object[] {"zm2", 1L, LocalDateTime.parse("2026-03-16T12:03:00")},
new Object[] {"zm1", 1L, LocalDateTime.parse("2026-03-16T12:05:00")});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.auron.flink.table.kafka;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestInstance;

/**
* Base class for Auron Flink Kafka Table Tests.
*/
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class AuronKafkaSourceTestBase {
protected StreamExecutionEnvironment environment;
protected StreamTableEnvironment tableEnvironment;

@BeforeAll
public void before() {
Configuration configuration = new Configuration();
// TODO Resolving the issue where the Flink classloader is closed and CompileUtils.doCompile fails
configuration.setString("classloader.check-leaked-classloader", "false");
// set time zone to UTC
configuration.setString("table.local-time-zone", "UTC");
configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
environment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
environment.setRestartStrategy(RestartStrategies.noRestart());
environment.getConfig().setAutoWatermarkInterval(1);
tableEnvironment =
StreamTableEnvironment.create(environment, EnvironmentSettings.fromConfiguration(configuration));
String jsonArray = "["
+ "{\"serialized_kafka_records_partition\": 1, \"serialized_kafka_records_offset\": 100000, "
+ "\"serialized_kafka_records_timestamp\": 1773662603760, \"event_time\": 1773662603760, \"age\": 20, \"name\":\"zm1\"},"
+ "{\"serialized_kafka_records_partition\": 1, \"serialized_kafka_records_offset\": 100001, "
+ "\"serialized_kafka_records_timestamp\": 1773662603761, \"event_time\": 1773662633760, \"age\": 21, \"name\":\"zm2\"},"
+ "{\"serialized_kafka_records_partition\": 1, \"serialized_kafka_records_offset\": 100002, "
+ "\"serialized_kafka_records_timestamp\": 1773662603762, \"event_time\": 1773662703761, \"age\": 22, \"name\":\"zm1\"}"
+ "]";
tableEnvironment.executeSql(" CREATE TABLE T2 ( "
+ "\n `event_time` BIGINT, "
+ "\n `age` INT, "
+ "\n `name` STRING,"
+ "\n `ts` AS TO_TIMESTAMP(FROM_UNIXTIME(event_time / 1000)),"
+ "\n WATERMARK FOR `ts` AS `ts` "
+ "\n ) WITH ( "
+ "\n 'connector' = 'auron-kafka',"
+ "\n 'kafka.mock.data' = '" + jsonArray + "',"
+ "\n 'topic' = 'mock_topic',"
+ "\n 'properties.bootstrap.servers' = '127.0.0.1:9092',"
+ "\n 'properties.group.id' = 'flink-test-mock',"
+ "\n 'format' = 'JSON' "
+ "\n )");
}

protected void assertRowsContains(List<Row> actualRows, Object[]... expectedRows) {
for (Object[] expected : expectedRows) {
boolean found = actualRows.stream().anyMatch(row -> {
for (int i = 0; i < expected.length; i++) {
Object actual = row.getField(i);
if (!java.util.Objects.equals(expected[i], actual)) {
return false;
}
}
return true;
});
assertThat(found)
.as("Expected row %s not found in actual rows: %s", Arrays.toString(expected), actualRows)
.isTrue();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ public class AuronKafkaDynamicTableFactory implements DynamicTableSourceFactory
.withDescription(
"offset mode for kafka source, support GROUP_OFFSET, LATEST, EARLIEST, TIMESTAMP will be supported.");

public static final ConfigOption<String> KAFKA_MOCK_DATA = ConfigOptions.key("kafka.mock.data")
.stringType()
.noDefaultValue()
.withDescription(
"When mock data generated, remember that the first three columns of each row are serialized_kafka_records_partition, serialized_kafka_records_offset, and serialized_kafka_records_timestamp.");

@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
Expand All @@ -107,7 +113,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
format,
formatConfig,
tableOptions.get(BUFFER_SIZE),
tableOptions.get(START_UP_MODE));
tableOptions.get(START_UP_MODE),
tableOptions.get(KAFKA_MOCK_DATA));
} catch (Exception e) {
throw new FlinkRuntimeException("Could not create Auron Kafka dynamic table source", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class AuronKafkaDynamicTableSource implements ScanTableSource, SupportsWa
private final Map<String, String> formatConfig;
private final int bufferSize;
private final String startupMode;
private final String mockData;
/** Watermark strategy that is used to generate per-partition watermark. */
protected @Nullable WatermarkStrategy<RowData> watermarkStrategy;

Expand All @@ -57,7 +58,8 @@ public AuronKafkaDynamicTableSource(
String format,
Map<String, String> formatConfig,
int bufferSize,
String startupMode) {
String startupMode,
String mockData) {
final LogicalType physicalType = physicalDataType.getLogicalType();
Preconditions.checkArgument(physicalType.is(LogicalTypeRoot.ROW), "Row data type expected.");
this.physicalDataType = physicalDataType;
Expand All @@ -67,6 +69,7 @@ public AuronKafkaDynamicTableSource(
this.formatConfig = formatConfig;
this.bufferSize = bufferSize;
this.startupMode = startupMode;
this.mockData = mockData;
}

@Override
Expand All @@ -91,6 +94,10 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
sourceFunction.setWatermarkStrategy(watermarkStrategy);
}

if (mockData != null) {
sourceFunction.setMockData(mockData);
}

return new DataStreamScanProvider() {

@Override
Expand All @@ -109,7 +116,7 @@ public boolean isBounded() {
@Override
public DynamicTableSource copy() {
return new AuronKafkaDynamicTableSource(
physicalDataType, kafkaTopic, kafkaProperties, format, formatConfig, bufferSize, startupMode);
physicalDataType, kafkaTopic, kafkaProperties, format, formatConfig, bufferSize, startupMode, mockData);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.curator5.com.google.common.base.Preconditions;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
Expand Down Expand Up @@ -92,6 +94,7 @@ public class AuronKafkaSourceFunction extends RichParallelSourceFunction<RowData
private final Map<String, String> formatConfig;
private final int bufferSize;
private final String startupMode;
private String mockData;
private transient PhysicalPlanNode physicalPlanNode;

// Flink Checkpoint-related, compatible with Flink Kafka Legacy source
Expand Down Expand Up @@ -174,53 +177,63 @@ public void open(Configuration config) throws Exception {
this.auronOperatorId + "-" + getRuntimeContext().getIndexOfThisSubtask();
scanExecNode.setAuronOperatorId(auronOperatorIdWithSubtaskIndex);
scanExecNode.setStartupMode(KafkaStartupMode.valueOf(startupMode));
sourcePlan.setKafkaScan(scanExecNode.build());
this.physicalPlanNode = sourcePlan.build();

// 1. Initialize Kafka Consumer for partition metadata discovery only (not for data consumption)
Properties kafkaProps = new Properties();
kafkaProps.putAll(kafkaProperties);
// Override to ensure this consumer does not interfere with actual data consumption
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-auron-fetch-meta");
kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
kafkaProps.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
kafkaProps.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
this.kafkaConsumer = new KafkaConsumer<>(kafkaProps);

StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext) getRuntimeContext();
// 2. Discover and assign partitions for this subtask
List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor(topic);
int subtaskIndex = runtimeContext.getIndexOfThisSubtask();
int numSubtasks = runtimeContext.getNumberOfParallelSubtasks();

this.assignedPartitions = new ArrayList<>();
for (PartitionInfo partitionInfo : partitionInfos) {
int partitionId = partitionInfo.partition();
if (KafkaTopicPartitionAssigner.assign(topic, partitionId, numSubtasks) == subtaskIndex) {
assignedPartitions.add(partitionId);
}
}
boolean enableCheckpoint = runtimeContext.isCheckpointingEnabled();
Map<String, Object> auronRuntimeInfo = new HashMap<>();
auronRuntimeInfo.put("subtask_index", subtaskIndex);
auronRuntimeInfo.put("num_readers", numSubtasks);
auronRuntimeInfo.put("enable_checkpoint", enableCheckpoint);
auronRuntimeInfo.put("restored_offsets", restoredOffsets);
auronRuntimeInfo.put("assigned_partitions", assignedPartitions);
JniBridge.putResource(auronOperatorIdWithSubtaskIndex, mapper.writeValueAsString(auronRuntimeInfo));
currentOffsets = new HashMap<>();
pendingOffsetsToCommit = new LinkedMap();
LOG.info(
"Auron kafka source init successful, Auron operator id: {}, enableCheckpoint is {}, "
+ "subtask {} assigned partitions: {}",
auronOperatorIdWithSubtaskIndex,
enableCheckpoint,
subtaskIndex,
assignedPartitions);
if (mockData != null) {
scanExecNode.setMockDataJsonArray(mockData);
JsonNode mockDataJson = mapper.readTree(mockData);
for (JsonNode data : mockDataJson) {
int partition = data.get("serialized_kafka_records_partition").asInt();
if (!assignedPartitions.contains(partition)) {
assignedPartitions.add(partition);
}
}
LOG.info("Use mock data for auron kafka source, partition size = {}", assignedPartitions);
} else {
// 1. Initialize Kafka Consumer for partition metadata discovery only (not for data consumption)
Properties kafkaProps = new Properties();
kafkaProps.putAll(kafkaProperties);
// Override to ensure this consumer does not interfere with actual data consumption
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-auron-fetch-meta");
kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
kafkaProps.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
kafkaProps.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
this.kafkaConsumer = new KafkaConsumer<>(kafkaProps);

// 2. Discover and assign partitions for this subtask
List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor(topic);
int subtaskIndex = runtimeContext.getIndexOfThisSubtask();
int numSubtasks = runtimeContext.getNumberOfParallelSubtasks();
for (PartitionInfo partitionInfo : partitionInfos) {
int partitionId = partitionInfo.partition();
if (KafkaTopicPartitionAssigner.assign(topic, partitionId, numSubtasks) == subtaskIndex) {
assignedPartitions.add(partitionId);
}
}
boolean enableCheckpoint = runtimeContext.isCheckpointingEnabled();
Map<String, Object> auronRuntimeInfo = new HashMap<>();
auronRuntimeInfo.put("subtask_index", subtaskIndex);
auronRuntimeInfo.put("num_readers", numSubtasks);
auronRuntimeInfo.put("enable_checkpoint", enableCheckpoint);
auronRuntimeInfo.put("restored_offsets", restoredOffsets);
auronRuntimeInfo.put("assigned_partitions", assignedPartitions);
JniBridge.putResource(auronOperatorIdWithSubtaskIndex, mapper.writeValueAsString(auronRuntimeInfo));
LOG.info(
"Auron kafka source init successful, Auron operator id: {}, enableCheckpoint is {}, "
+ "subtask {} assigned partitions: {}",
auronOperatorIdWithSubtaskIndex,
enableCheckpoint,
subtaskIndex,
assignedPartitions);
}
sourcePlan.setKafkaScan(scanExecNode.build());
this.physicalPlanNode = sourcePlan.build();

// 3. Initialize table-runtime WatermarkGenerator if watermarkStrategy is set
if (watermarkStrategy != null) {
Expand Down Expand Up @@ -458,4 +471,9 @@ public void initializeState(FunctionInitializationContext context) throws Except
public void setWatermarkStrategy(WatermarkStrategy<RowData> watermarkStrategy) {
this.watermarkStrategy = watermarkStrategy;
}

public void setMockData(String mockData) {
Preconditions.checkArgument(mockData != null, "Auron kafka source mock data must not null");
this.mockData = mockData;
}
}
1 change: 1 addition & 0 deletions native-engine/auron-planner/proto/auron.proto
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,7 @@ message KafkaScanExecNode {
string auron_operator_id = 6;
KafkaFormat data_format = 7;
string format_config_json = 8;
string mock_data_json_array = 9;
}

enum KafkaFormat {
Expand Down
Loading
Loading