Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.types.BlobType;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
Expand Down Expand Up @@ -578,7 +579,7 @@ public void testNonSpillable() throws Exception {
writer.close();
}

/* @Test // TODO this can be enabled after avro supports vector
@Test
public void testVectorStoreSameFormatUsesRowDataWriter() throws Exception {
RowType vectorStoreSchema =
RowType.builder()
Expand All @@ -599,7 +600,7 @@ public void testVectorStoreSameFormatUsesRowDataWriter() throws Exception {
assertThat(increment.newFilesIncrement().newFiles()).hasSize(1);
DataFileMeta meta = increment.newFilesIncrement().newFiles().get(0);
assertThat(meta.fileName()).doesNotContain(".vector");
} */
}

private SimpleColStats initStats(Integer min, Integer max, long nullCount) {
return new SimpleColStats(min, max, nullCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimeType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.types.VectorType;

import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
Expand Down Expand Up @@ -260,8 +261,11 @@ public static Schema convertToSchema(
}
return nullable ? nullableSchema(map) : map;
case ARRAY:
ArrayType arrayType = (ArrayType) dataType;
DataType elementType = arrayType.getElementType();
case VECTOR:
DataType elementType =
dataType.getTypeRoot() == DataTypeRoot.ARRAY
? ((ArrayType) dataType).getElementType()
: ((VectorType) dataType).getElementType();

ArrayBuilder<Schema> arrayBuilder = SchemaBuilder.builder().array();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VectorType;

import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
Expand Down Expand Up @@ -53,6 +54,8 @@ default T visit(Schema schema, DataType type) {
if (type instanceof MapType) {
MapType mapType = (MapType) type;
return visitArrayMap(schema, mapType.getKeyType(), mapType.getValueType());
} else if (type instanceof VectorType) {
return visitArrayVector(schema, ((VectorType) type).getElementType());
} else {
return visitArray(
schema, type == null ? null : ((ArrayType) type).getElementType());
Expand Down Expand Up @@ -155,6 +158,8 @@ default T primitive(Schema primitive, DataType type) {

T visitArray(Schema schema, DataType elementType);

T visitArrayVector(Schema schema, DataType elementType);

T visitArrayMap(Schema schema, DataType keyType, DataType valueType);

T visitMap(Schema schema, DataType valueType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.format.avro;

import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.BinaryVector;
import org.apache.paimon.data.Blob;
import org.apache.paimon.data.BlobDescriptor;
import org.apache.paimon.data.Decimal;
Expand Down Expand Up @@ -167,6 +168,12 @@ public FieldReader visitArray(Schema schema, @Nullable DataType elementType) {
return new ArrayReader(elementReader);
}

@Override
public FieldReader visitArrayVector(Schema schema, @Nullable DataType elementType) {
FieldReader elementReader = visit(schema.getElementType(), elementType);
return new ArrayVectorReader(elementReader, elementType);
}

@Override
public FieldReader visitArrayMap(Schema schema, DataType keyType, DataType valueType) {
RowReader entryReader =
Expand Down Expand Up @@ -461,6 +468,22 @@ public void skip(Decoder decoder) throws IOException {
}
}

private static class ArrayVectorReader extends ArrayReader {

private final DataType elementType;

private ArrayVectorReader(FieldReader elementReader, DataType elementType) {
super(elementReader);
this.elementType = elementType;
}

@Override
public Object read(Decoder decoder, Object reuse) throws IOException {
GenericArray array = (GenericArray) super.read(decoder, reuse);
return BinaryVector.fromInternalArray(array, elementType);
}
}

private static class ArrayMapReader implements FieldReader {

private final RowReader entryReader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.InternalVector;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeRoot;
Expand Down Expand Up @@ -197,6 +198,22 @@ public FieldWriter visitArray(Schema schema, DataType elementType) {
};
}

@Override
public FieldWriter visitArrayVector(Schema schema, DataType elementType) {
FieldWriter elementWriter = visit(schema.getElementType(), elementType);
return (container, index, encoder) -> {
InternalVector vector = container.getVector(index);
encoder.writeArrayStart();
int numElements = vector.size();
encoder.setItemCount(numElements);
for (int i = 0; i < numElements; i += 1) {
encoder.startItem();
elementWriter.write(vector, i, encoder);
}
encoder.writeArrayEnd();
};
}

@Override
public FieldWriter visitArrayMap(Schema schema, DataType keyType, DataType valueType) {
RowWriter entryWriter =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,18 @@

package org.apache.paimon.format.avro;

import org.apache.paimon.data.BinaryVector;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FileFormatFactory;
import org.apache.paimon.format.FormatReadWriteTest;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;

import java.util.ArrayList;
import java.util.List;

/** An avro {@link FormatReadWriteTest}. */
public class AvroFormatReadWriteTest extends FormatReadWriteTest {
Expand All @@ -34,4 +42,25 @@ protected AvroFormatReadWriteTest() {
protected FileFormat fileFormat() {
return new AvroFileFormat(new FileFormatFactory.FormatContext(new Options(), 1024, 1024));
}

@Override
protected RowType rowTypeForFullTypesTest() {
RowType rowWithoutVector = super.rowTypeForFullTypesTest();
List<DataField> fields = new ArrayList<>(rowWithoutVector.getFields());
int vectorFieldId = fields.stream().map(DataField::id).max(Integer::compare).get() + 1;
fields.add(new DataField(vectorFieldId, "embed", DataTypes.VECTOR(3, DataTypes.FLOAT())));
return new RowType(rowWithoutVector.isNullable(), fields);
}

@Override
protected GenericRow expectedRowForFullTypesTest() {
float[] vector = new float[] {1.0f, 2.0f, 3.0f};
GenericRow rowWithoutVector = super.expectedRowForFullTypesTest();
GenericRow row = new GenericRow(rowWithoutVector.getFieldCount() + 1);
for (int i = 0; i < rowWithoutVector.getFieldCount(); ++i) {
row.setField(i, rowWithoutVector.getField(i));
}
row.setField(rowWithoutVector.getFieldCount(), BinaryVector.fromPrimitiveArray(vector));
return row;
}
}