Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.RowType;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -64,6 +65,7 @@ public class PaimonSnapshotAndLogSplitScanner implements BatchScanner {

private final LogScanner logScanner;
private final long stoppingOffset;
private final RowType paimonRowType;

private boolean logScanFinished;
private SortMergeReader currentSortMergeReader;
Expand All @@ -76,6 +78,7 @@ public PaimonSnapshotAndLogSplitScanner(
PaimonSnapshotAndFlussLogSplit snapshotAndFlussLogSplit,
@Nullable int[] projectedFields) {
this.pkIndexes = flussTable.getTableInfo().getSchema().getPrimaryKeyIndexes();
this.paimonRowType = fileStoreTable.rowType();
int[] newProjectedFields = getNeedProjectFields(flussTable, projectedFields);
this.tableRead =
fileStoreTable.newReadBuilder().withProjection(newProjectedFields).newRead();
Expand Down Expand Up @@ -190,7 +193,7 @@ private SortMergeReader createSortMergeReader() throws IOException {
private void pollLogRecords(Duration timeout) {
ScanRecords scanRecords = logScanner.poll(timeout);
for (ScanRecord scanRecord : scanRecords) {
InternalRow paimonRow = new ScanRecordWrapper(scanRecord);
InternalRow paimonRow = new ScanRecordWrapper(scanRecord, paimonRowType);
boolean isDelete =
scanRecord.getChangeType() == ChangeType.DELETE
|| scanRecord.getChangeType() == ChangeType.UPDATE_BEFORE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,30 @@

import com.alibaba.fluss.client.table.scanner.ScanRecord;
import com.alibaba.fluss.record.ChangeType;
import com.alibaba.fluss.row.TimestampLtz;
import com.alibaba.fluss.row.TimestampNtz;

import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;

/** A wrapper of {@link ScanRecord} which bridges {@link ScanRecord} to Paimon' internal row. */
public class ScanRecordWrapper implements InternalRow {

private final ChangeType changeType;
private final com.alibaba.fluss.row.InternalRow flussRow;
private final RowType rowType;

public ScanRecordWrapper(ScanRecord scanRecord) {
public ScanRecordWrapper(ScanRecord scanRecord, RowType rowType) {
this.changeType = scanRecord.getChangeType();
this.flussRow = scanRecord.getRow();
this.rowType = rowType;
}

@Override
Expand Down Expand Up @@ -117,7 +123,32 @@ public Decimal getDecimal(int pos, int precision, int scale) {

@Override
public Timestamp getTimestamp(int pos, int precision) {
return Timestamp.fromInstant(flussRow.getTimestampLtz(pos, precision).toInstant());
DataType paimonTimestampType = rowType.getTypeAt(pos);
switch (paimonTimestampType.getTypeRoot()) {
case TIMESTAMP_WITHOUT_TIME_ZONE:
if (TimestampNtz.isCompact(precision)) {
return Timestamp.fromEpochMillis(
flussRow.getTimestampNtz(pos, precision).getMillisecond());
} else {
TimestampNtz timestampNtz = flussRow.getTimestampNtz(pos, precision);
return Timestamp.fromEpochMillis(
timestampNtz.getMillisecond(), timestampNtz.getNanoOfMillisecond());
}

case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
if (TimestampLtz.isCompact(precision)) {
return Timestamp.fromEpochMillis(
flussRow.getTimestampLtz(pos, precision).getEpochMillisecond());
} else {
TimestampLtz timestampLtz = flussRow.getTimestampLtz(pos, precision);
return Timestamp.fromEpochMillis(
timestampLtz.getEpochMillisecond(),
timestampLtz.getNanoOfMillisecond());
}
default:
throw new UnsupportedOperationException(
"Unsupported data type to get timestamp: " + paimonTimestampType);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.alibaba.fluss.metadata.TableDescriptor;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.row.InternalRow;
import com.alibaba.fluss.row.TimestampLtz;
import com.alibaba.fluss.row.TimestampNtz;
import com.alibaba.fluss.server.replica.Replica;
import com.alibaba.fluss.types.DataTypes;

Expand All @@ -31,6 +33,7 @@
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

Expand All @@ -39,6 +42,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -202,6 +206,58 @@ void testPrimaryKeyTable(boolean isPartitioned) throws Exception {
assertThat(rows.toString()).isEqualTo(sortedRows(expectedRows).toString());
}

@Test
void testUnionReadWithTimeStamp() throws Exception {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created #1127 used to improve our test coverage in next version

Copy link
Copy Markdown
Contributor Author

@luoyuxia luoyuxia Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has Verified all supported types in my local, let's test all types in #1127

// first of all, start tiering
JobClient jobClient = buildTieringJob(execEnv);

String tableName = "pk_table_with_timestamp";
TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
long tableId =
createPrimaryKeyTable(
t1,
1,
Arrays.asList(
new Schema.Column("c1", DataTypes.INT()),
new Schema.Column("c2", DataTypes.TIMESTAMP_LTZ()),
new Schema.Column("c3", DataTypes.TIMESTAMP())));
// write some rows;
List<InternalRow> rows =
Arrays.asList(
row(
1,
TimestampLtz.fromEpochMillis(1698235273182L, 5000),
TimestampNtz.fromMillis(1698235273183L, 6000)),
row(
2,
TimestampLtz.fromEpochMillis(1698235273200L, 5000),
TimestampNtz.fromMillis(1698235273201L, 6000)));

writeRows(t1, rows, false);

// wait unit records has has been synced
waitUtilBucketSynced(t1, tableId, 1, false);

// stop lake tiering service
jobClient.cancel().get();

// write a row again
rows =
Collections.singletonList(
row(
2,
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
TimestampNtz.fromMillis(1698235273501L, 8000)));
writeRows(t1, rows, false);

// now, query the result, it must union lake snapshot and log
List<String> result =
toSortedRows(batchTEnv.executeSql("select * from " + tableName), false);
assertThat(result.toString())
.isEqualTo(
"[+I[1, 2023-10-25T12:01:13.182005Z, 2023-10-25T12:01:13.183006], +I[2, 2023-10-25T12:01:13.400007Z, 2023-10-25T12:01:13.501008]]");
}

private List<Row> paddingPartition(TablePath tablePath, List<Row> rows) {
List<Row> paddingPartitionRows = new ArrayList<>();
for (String partition : waitUntilPartitions(tablePath).values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,20 @@ protected long createLogTable(TablePath tablePath, int bucketNum, boolean isPart
return createTable(tablePath, tableBuilder.build());
}

protected long createPrimaryKeyTable(
TablePath tablePath, int bucketNum, List<Schema.Column> columns) throws Exception {
Schema.Builder schemaBuilder =
Schema.newBuilder().fromColumns(columns).primaryKey(columns.get(0).getName());

TableDescriptor.Builder tableBuilder =
TableDescriptor.builder()
.distributedBy(bucketNum)
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")
.property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500));
tableBuilder.schema(schemaBuilder.build());
return createTable(tablePath, tableBuilder.build());
}

protected long createPkTable(TablePath tablePath) throws Exception {
return createPkTable(tablePath, 1);
}
Expand Down