Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions auron-flink-extension/auron-flink-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Kafka client for partition metadata discovery -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
startupMode);

if (watermarkStrategy != null) {
sourceFunction.assignTimestampsAndWatermarks(watermarkStrategy);
sourceFunction.setWatermarkStrategy(watermarkStrategy);
}

return new DataStreamScanProvider() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
package org.apache.auron.flink.connector.kafka;

import static org.apache.auron.flink.connector.kafka.KafkaConstants.*;
import static org.apache.flink.util.Preconditions.checkNotNull;

import java.io.File;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.util.*;
import org.apache.auron.flink.arrow.FlinkArrowReader;
import org.apache.auron.flink.arrow.FlinkArrowUtils;
Expand All @@ -38,19 +38,13 @@
import org.apache.auron.protobuf.PhysicalPlanNode;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
Expand All @@ -60,16 +54,17 @@
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
import org.apache.flink.streaming.connectors.kafka.internals.SourceContextWatermarkOutputAdapter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier;
import org.apache.flink.table.runtime.generated.WatermarkGenerator;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.SerializableObject;
import org.apache.flink.util.SerializedValue;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
Expand All @@ -82,9 +77,9 @@
* If checkpoints are enabled, Kafka offsets are committed via Auron after a successful checkpoint.
* If checkpoints are disabled, Kafka offsets are committed periodically via Auron.
*
* <p>Watermark support is implemented via {@link WatermarkOutputMultiplexer} with per-partition
* watermark generation. Partition expansion is detected periodically using a lightweight
* {@link KafkaConsumer} (metadata queries only, no data consumption).
* <p>Watermark support uses the table-runtime {@link WatermarkGenerator} directly
* (from {@code WatermarkPushDownSpec}) with per-partition watermark tracking.
* The combined watermark emitted downstream is the minimum across all assigned partitions.
*/
public class AuronKafkaSourceFunction extends RichParallelSourceFunction<RowData>
implements FlinkAuronFunction, CheckpointListener, CheckpointedFunction {
Expand All @@ -110,7 +105,6 @@ public class AuronKafkaSourceFunction extends RichParallelSourceFunction<RowData
private transient Map<Integer, Long> restoredOffsets;
private transient Map<Integer, Long> currentOffsets;
private final SerializableObject lock = new SerializableObject();
private SerializedValue<WatermarkStrategy<RowData>> watermarkStrategy;
private volatile boolean isRunning;
private transient String auronOperatorIdWithSubtaskIndex;
private transient MetricNode nativeMetric;
Expand All @@ -120,14 +114,11 @@ public class AuronKafkaSourceFunction extends RichParallelSourceFunction<RowData
private transient KafkaConsumer<byte[], byte[]> kafkaConsumer;
private transient List<Integer> assignedPartitions;

// Watermark related
private transient WatermarkOutputMultiplexer watermarkOutputMultiplexer;
private transient Map<Integer, String> partitionIdToOutputIdMap;
private transient WatermarkGenerator<RowData> watermarkGenerator;
private transient TimestampAssigner<RowData> timestampAssigner;
// Periodic watermark control: autoWatermarkInterval > 0 means enabled
private transient long autoWatermarkInterval;
private transient long lastPeriodicWatermarkTime;
// Watermark related: uses table-runtime WatermarkGenerator directly
private WatermarkStrategy<RowData> watermarkStrategy;
private transient WatermarkGenerator tableWatermarkGenerator;
private transient Map<Integer, Long> partitionWatermarks;
private transient long currentCombinedWatermark;

public AuronKafkaSourceFunction(
LogicalType outputType,
Expand Down Expand Up @@ -231,22 +222,24 @@ public void open(Configuration config) throws Exception {
subtaskIndex,
assignedPartitions);

// 3. Initialize Watermark components if watermarkStrategy is set
// 3. Initialize table-runtime WatermarkGenerator if watermarkStrategy is set
if (watermarkStrategy != null) {
ClassLoader userCodeClassLoader = runtimeContext.getUserCodeClassLoader();
WatermarkStrategy<RowData> deserializedWatermarkStrategy =
watermarkStrategy.deserializeValue(userCodeClassLoader);

MetricGroup metricGroup = runtimeContext.getMetricGroup();

this.timestampAssigner = deserializedWatermarkStrategy.createTimestampAssigner(() -> metricGroup);

this.watermarkGenerator = deserializedWatermarkStrategy.createWatermarkGenerator(() -> metricGroup);

// 4. Determine periodic watermark interval
// autoWatermarkInterval > 0 means periodic watermark is enabled
this.autoWatermarkInterval = runtimeContext.getExecutionConfig().getAutoWatermarkInterval();
this.lastPeriodicWatermarkTime = 0L; // Initialize to 0 so first emit triggers immediately
// Create DataStream API WatermarkGenerator via the strategy
org.apache.flink.api.common.eventtime.WatermarkGenerator<RowData> dsGenerator =
watermarkStrategy.createWatermarkGenerator(() -> metricGroup);
// Extract inner table-runtime WatermarkGenerator from DefaultWatermarkGenerator
if (dsGenerator instanceof GeneratedWatermarkGeneratorSupplier.DefaultWatermarkGenerator) {
Field field = GeneratedWatermarkGeneratorSupplier.DefaultWatermarkGenerator.class.getDeclaredField(
"innerWatermarkGenerator");
field.setAccessible(true);
this.tableWatermarkGenerator = (WatermarkGenerator) field.get(dsGenerator);
} else {
throw new IllegalStateException("Expected DefaultWatermarkGenerator from WatermarkPushDownSpec, got: "
+ dsGenerator.getClass().getName());
}
this.partitionWatermarks = new HashMap<>();
this.currentCombinedWatermark = Long.MIN_VALUE;
}
this.isRunning = true;
}
Expand All @@ -267,97 +260,76 @@ public void add(String name, long value) {
fieldList.addAll(((RowType) outputType).getFields());
RowType auronOutputRowType = new RowType(fieldList);

// Initialize WatermarkOutputMultiplexer here because sourceContext is available
if (watermarkGenerator != null) {
this.watermarkOutputMultiplexer =
new WatermarkOutputMultiplexer(new SourceContextWatermarkOutputAdapter<>(sourceContext));
this.partitionIdToOutputIdMap = new HashMap<>();
for (Integer partition : assignedPartitions) {
String outputId = createOutputId(partition);
partitionIdToOutputIdMap.put(partition, outputId);
watermarkOutputMultiplexer.registerNewOutput(outputId, watermark -> {});
}
}

// Pre-check watermark flag to avoid per-record null checks in the hot path
final boolean enableWatermark = watermarkGenerator != null;

while (this.isRunning) {
AuronCallNativeWrapper wrapper = new AuronCallNativeWrapper(
FlinkArrowUtils.getRootAllocator(),
physicalPlanNode,
nativeMetric,
0,
0,
0,
AuronAdaptor.getInstance()
.getAuronConfiguration()
.getLong(FlinkAuronConfiguration.NATIVE_MEMORY_SIZE));

if (enableWatermark) {
// Watermark-enabled path
while (wrapper.loadNextBatch(batch -> {
Map<Integer, Long> tmpOffsets = new HashMap<>(currentOffsets);
FlinkArrowReader arrowReader = FlinkArrowReader.create(batch, auronOutputRowType, 3);

for (int i = 0; i < batch.getRowCount(); i++) {
AuronColumnarRowData tmpRowData = (AuronColumnarRowData) arrowReader.read(i);
// Extract kafka meta fields
int partitionId = tmpRowData.getInt(-3);
long offset = tmpRowData.getLong(-2);
long kafkaTimestamp = tmpRowData.getLong(-1);
tmpOffsets.put(partitionId, offset);

// Extract event timestamp via user-defined TimestampAssigner
long timestamp = timestampAssigner.extractTimestamp(tmpRowData, kafkaTimestamp);

// Route to the per-partition WatermarkOutput and trigger onEvent
// outputId must not null, else is a bug
String outputId = partitionIdToOutputIdMap.get(partitionId);
WatermarkOutput partitionOutput = watermarkOutputMultiplexer.getImmediateOutput(outputId);
watermarkGenerator.onEvent(tmpRowData, timestamp, partitionOutput);
// Emit record with event timestamp
sourceContext.collectWithTimestamp(arrowReader.read(i), timestamp);
}

// Periodic watermark: only emit if enough time has elapsed since last emit
// Controlled by ExecutionConfig.getAutoWatermarkInterval()
long currentTime = System.currentTimeMillis();
if (autoWatermarkInterval > 0
&& (currentTime - lastPeriodicWatermarkTime) >= autoWatermarkInterval) {
for (Map.Entry<Integer, String> entry : partitionIdToOutputIdMap.entrySet()) {
// Use getDeferredOutput for periodic emit: all partitions update first,
// then multiplexer merges and emits once via onPeriodicEmit()
WatermarkOutput output = watermarkOutputMultiplexer.getDeferredOutput(entry.getValue());
watermarkGenerator.onPeriodicEmit(output);
final boolean enableWatermark = tableWatermarkGenerator != null;

AuronCallNativeWrapper wrapper = new AuronCallNativeWrapper(
FlinkArrowUtils.getRootAllocator(),
physicalPlanNode,
nativeMetric,
0,
0,
0,
AuronAdaptor.getInstance().getAuronConfiguration().getLong(FlinkAuronConfiguration.NATIVE_MEMORY_SIZE));

if (enableWatermark) {
// Watermark-enabled path: use table-runtime WatermarkGenerator directly
while (wrapper.loadNextBatch(batch -> {
Map<Integer, Long> tmpOffsets = new HashMap<>(currentOffsets);
FlinkArrowReader arrowReader = FlinkArrowReader.create(batch, auronOutputRowType, 3);
for (int i = 0; i < batch.getRowCount(); i++) {
AuronColumnarRowData tmpRowData = (AuronColumnarRowData) arrowReader.read(i);
// Extract kafka meta fields
int partitionId = tmpRowData.getInt(-3);
long offset = tmpRowData.getLong(-2);
long kafkaTimestamp = tmpRowData.getLong(-1);
tmpOffsets.put(partitionId, offset);

try {
// Compute watermark using table-runtime WatermarkGenerator (stateless pure function)
// with local Timezone
Long watermark = tableWatermarkGenerator.currentWatermark(tmpRowData);
// Update per-partition watermark tracking
if (watermark != null) {
partitionWatermarks.merge(partitionId, watermark, Math::max);
}
// Merge all deferred updates and emit the combined watermark downstream
watermarkOutputMultiplexer.onPeriodicEmit();
lastPeriodicWatermarkTime = currentTime;
} catch (Exception e) {
throw new RuntimeException("Generated WatermarkGenerator fails to generate:", e);
}
// Emit record with kafka timestamp
sourceContext.collectWithTimestamp(tmpRowData, kafkaTimestamp);
}

synchronized (lock) {
currentOffsets = tmpOffsets;
}
})) {}
} else {
// No-watermark path: still use collectWithTimestamp with kafka timestamp
while (wrapper.loadNextBatch(batch -> {
Map<Integer, Long> tmpOffsets = new HashMap<>(currentOffsets);
FlinkArrowReader arrowReader = FlinkArrowReader.create(batch, auronOutputRowType, 3);
for (int i = 0; i < batch.getRowCount(); i++) {
AuronColumnarRowData tmpRowData = (AuronColumnarRowData) arrowReader.read(i);
int partitionId = tmpRowData.getInt(-3);
long offset = tmpRowData.getLong(-2);
long kafkaTimestamp = tmpRowData.getLong(-1);
tmpOffsets.put(partitionId, offset);
sourceContext.collectWithTimestamp(arrowReader.read(i), kafkaTimestamp);
}
synchronized (lock) {
currentOffsets = tmpOffsets;
// After each batch, compute combined watermark (min across all partitions) and emit
if (!partitionWatermarks.isEmpty()) {
long minWatermark = Collections.min(partitionWatermarks.values());
if (minWatermark > currentCombinedWatermark) {
currentCombinedWatermark = minWatermark;
sourceContext.emitWatermark(new Watermark(minWatermark));
}
})) {}
}
}

synchronized (lock) {
currentOffsets = tmpOffsets;
}
})) {}
} else {
// No-watermark path: still use collectWithTimestamp with kafka timestamp
while (wrapper.loadNextBatch(batch -> {
Map<Integer, Long> tmpOffsets = new HashMap<>(currentOffsets);
FlinkArrowReader arrowReader = FlinkArrowReader.create(batch, auronOutputRowType, 3);
for (int i = 0; i < batch.getRowCount(); i++) {
AuronColumnarRowData tmpRowData = (AuronColumnarRowData) arrowReader.read(i);
int partitionId = tmpRowData.getInt(-3);
long offset = tmpRowData.getLong(-2);
long kafkaTimestamp = tmpRowData.getLong(-1);
tmpOffsets.put(partitionId, offset);
sourceContext.collectWithTimestamp(tmpRowData, kafkaTimestamp);
}
synchronized (lock) {
currentOffsets = tmpOffsets;
}
})) {}
}
LOG.info("Auron kafka source run end");
}
Expand All @@ -376,6 +348,11 @@ public void close() throws Exception {
kafkaConsumer.close();
}

// Close table-runtime WatermarkGenerator
if (tableWatermarkGenerator != null) {
tableWatermarkGenerator.close();
}

super.close();
}

Expand Down Expand Up @@ -478,22 +455,7 @@ public void initializeState(FunctionInitializationContext context) throws Except
}
}

public AuronKafkaSourceFunction assignTimestampsAndWatermarks(WatermarkStrategy<RowData> watermarkStrategy) {
checkNotNull(watermarkStrategy);
try {
ClosureCleaner.clean(watermarkStrategy, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
this.watermarkStrategy = new SerializedValue<>(watermarkStrategy);
} catch (Exception e) {
throw new IllegalArgumentException("The given WatermarkStrategy is not serializable", e);
}
return this;
}

// -------------------------------------------------------------------------
// Internal helpers
// -------------------------------------------------------------------------

private String createOutputId(int partitionId) {
return topic + "-" + partitionId;
public void setWatermarkStrategy(WatermarkStrategy<RowData> watermarkStrategy) {
this.watermarkStrategy = watermarkStrategy;
}
}
Loading
Loading