Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,7 @@ common/src/main/resources/auron-build-info.properties


.flattened-pom.xml
dependency-reduced-pom.xml
dependency-reduced-pom.xml

#lsp
*.prefs
35 changes: 35 additions & 0 deletions auron-flink-extension/auron-flink-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,41 @@
<artifactId>proto</artifactId>
<version>${project.version}</version>
</dependency>

<!-- Arrow dependencies -->
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-c-data</artifactId>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-unsafe</artifactId>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.apache.auron</groupId>
<artifactId>auron-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.jupiter.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.auron.flink.arrow;

import java.util.ArrayList;
import java.util.List;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.types.DateUnit;
import org.apache.arrow.vector.types.FloatingPointPrecision;
import org.apache.arrow.vector.types.TimeUnit;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BinaryType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.CharType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.NullType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimeType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;

/**
* Utility class for converting Flink {@link LogicalType} instances to Arrow types, fields and schemas.
*/
public final class FlinkArrowUtils {

/**
* Root allocator for Arrow memory management.
*/
public static final RootAllocator ROOT_ALLOCATOR = new RootAllocator(Long.MAX_VALUE);

static {
Runtime.getRuntime().addShutdownHook(new Thread(ROOT_ALLOCATOR::close));
}

/**
* Creates a child allocator from the root allocator.
*
* @param name Name for the child allocator
* @return A new child allocator
*/
public static BufferAllocator createChildAllocator(String name) {
return ROOT_ALLOCATOR.newChildAllocator(name, 0, Long.MAX_VALUE);
}

/**
* Converts a Flink LogicalType to Arrow ArrowType.
*
* @param logicalType The Flink logical type
* @return The corresponding Arrow type
* @throws UnsupportedOperationException if the type is not supported
*/
public static ArrowType toArrowType(LogicalType logicalType) {
if (logicalType == null) {
throw new IllegalArgumentException("logicalType cannot be null");
}
if (logicalType instanceof NullType) {
return ArrowType.Null.INSTANCE;
} else if (logicalType instanceof BooleanType) {
return ArrowType.Bool.INSTANCE;
} else if (logicalType instanceof TinyIntType) {
return new ArrowType.Int(8, true);
} else if (logicalType instanceof SmallIntType) {
return new ArrowType.Int(16, true);
} else if (logicalType instanceof IntType) {
return new ArrowType.Int(32, true);
} else if (logicalType instanceof BigIntType) {
return new ArrowType.Int(64, true);
} else if (logicalType instanceof FloatType) {
return new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE);
} else if (logicalType instanceof DoubleType) {
return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE);
} else if (logicalType instanceof VarCharType || logicalType instanceof CharType) {
return ArrowType.Utf8.INSTANCE;
} else if (logicalType instanceof VarBinaryType || logicalType instanceof BinaryType) {
return ArrowType.Binary.INSTANCE;
} else if (logicalType instanceof DecimalType) {
DecimalType decimalType = (DecimalType) logicalType;
// Note: Arrow Java only has DecimalVector (128-bit) and Decimal256Vector (256-bit).
// There's no Decimal64Vector, so we always use 128-bit to match the actual storage.
// Setting bitWidth=64 would cause FFI export issues since the actual data is 128-bit.
return new ArrowType.Decimal(decimalType.getPrecision(), decimalType.getScale(), 128);
} else if (logicalType instanceof DateType) {
return new ArrowType.Date(DateUnit.DAY);
} else if (logicalType instanceof TimeType) {
// Flink TimeType stores time as milliseconds (int), convert to Arrow Time64 (microseconds)
Comment thread
x-tong marked this conversation as resolved.
Outdated
return new ArrowType.Time(TimeUnit.MICROSECOND, 64);
} else if (logicalType instanceof TimestampType) {
return new ArrowType.Timestamp(TimeUnit.MICROSECOND, null);
} else if (logicalType instanceof LocalZonedTimestampType) {
// LocalZonedTimestampType is similar to TimestampType but with UTC timezone
return new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC");
} else {
throw new UnsupportedOperationException("Unsupported Flink type: " + logicalType.asSummaryString());
Comment thread
x-tong marked this conversation as resolved.
}
}

/**
* Converts a Flink LogicalType to an Arrow Field.
*
* @param name The field name
* @param logicalType The Flink logical type
* @param nullable Whether the field is nullable
* @return The corresponding Arrow Field
*/
public static Field toArrowField(String name, LogicalType logicalType, boolean nullable) {
Comment thread
x-tong marked this conversation as resolved.
Outdated
if (logicalType instanceof ArrayType) {
ArrayType arrayType = (ArrayType) logicalType;
LogicalType elementType = arrayType.getElementType();
FieldType fieldType = new FieldType(nullable, ArrowType.List.INSTANCE, null);
Comment thread
x-tong marked this conversation as resolved.
Field elementField = toArrowField("element", elementType, elementType.isNullable());
List<Field> children = new ArrayList<>();
children.add(elementField);
return new Field(name, fieldType, children);
} else if (logicalType instanceof RowType) {
RowType rowType = (RowType) logicalType;
FieldType fieldType = new FieldType(nullable, ArrowType.Struct.INSTANCE, null);
List<Field> children = new ArrayList<>();
for (RowType.RowField field : rowType.getFields()) {
children.add(toArrowField(
field.getName(), field.getType(), field.getType().isNullable()));
}
return new Field(name, fieldType, children);
} else if (logicalType instanceof MapType) {
MapType mapType = (MapType) logicalType;
LogicalType keyType = mapType.getKeyType();
LogicalType valueType = mapType.getValueType();

// Create entries field (struct<key, value>)
FieldType entriesFieldType = new FieldType(false, ArrowType.Struct.INSTANCE, null);
List<Field> entriesChildren = new ArrayList<>();
entriesChildren.add(toArrowField(MapVector.KEY_NAME, keyType, false));
entriesChildren.add(toArrowField(MapVector.VALUE_NAME, valueType, valueType.isNullable()));
Field entriesField = new Field(MapVector.DATA_VECTOR_NAME, entriesFieldType, entriesChildren);

// Create map field
FieldType mapFieldType = new FieldType(nullable, new ArrowType.Map(false), null);
List<Field> mapChildren = new ArrayList<>();
mapChildren.add(entriesField);
return new Field(name, mapFieldType, mapChildren);
} else {
ArrowType arrowType = toArrowType(logicalType);
FieldType fieldType = new FieldType(nullable, arrowType, null);
return new Field(name, fieldType, new ArrayList<>());
}
}

/**
* Converts a Flink RowType to an Arrow Schema.
*
* @param rowType The Flink row type
* @return The corresponding Arrow Schema
*/
public static Schema toArrowSchema(RowType rowType) {
List<Field> fields = new ArrayList<>();
for (RowType.RowField field : rowType.getFields()) {
fields.add(toArrowField(
Comment thread
x-tong marked this conversation as resolved.
Outdated
field.getName(), field.getType(), field.getType().isNullable()));
}
return new Schema(fields);
}

private FlinkArrowUtils() {
// Utility class
}
}
Loading
Loading