Conversation
fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
Outdated
Show resolved
Hide resolved
fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
Outdated
Show resolved
Hide resolved
|
|
||
| public Column(String columnName, DataType dataType) { | ||
| this(columnName, dataType, null); | ||
| this(columnName, dataType, null, UNKNOWN_COLUMN_ID); |
There was a problem hiding this comment.
We should be able to get the column id for the previous schema (use the column order). Using -1 as default column is error-prone.
There was a problem hiding this comment.
Yes, I also want to do it. But as a public api, I cannot remove this constructor Column(String columnName, DataType dataType). If not set -1, I have no idea how to handle it.
| /** Register schema to ZK metadata and return the schema id. */ | ||
| public int registerSchema(TablePath tablePath, Schema schema) throws Exception { | ||
| int currentSchemaId = getCurrentSchemaId(tablePath); | ||
| return registerSchema(tablePath, schema, getCurrentSchemaId(tablePath) + 1); |
There was a problem hiding this comment.
This is creating a new table, we can directly use schema id = 1 here?
279a97d to
2ade0ce
Compare
...client/src/main/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScanner.java
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java
Outdated
Show resolved
Hide resolved
fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java
Outdated
Show resolved
Hide resolved
d490cb1 to
a475aee
Compare
5829700 to
9742f89
Compare
…ration 1. Make lookup requests async. 2. Introduce ServerProjectionCache to share ProjectionInfo in TabletServer level. 3. Revert FlinkAsFlussRow changes, and introduce PaddingRow for the padding null columns for schema changes. 4. Make PutKv zero-copy to persist kv records. 5. Rename SchemaMetadataManager to ServerSchemaCache and use TableId instead of TablePath to track schemas which is safer. 6. Revert source related changes to still include the projectedFieldIndexes parameter.
f4fb30e to
1046abd
Compare
wuchong
left a comment
There was a problem hiding this comment.
I squashed and pushed a commit to improve the implementations. The change details are listed in the commit message.
Waiting for the CI pass.
| Map<Integer, Integer> columnIdPositions = new HashMap<>(); | ||
| List<Integer> columnIds = schema.getColumnIds(); | ||
| for (int i = 0; i < columnIds.size(); i++) { | ||
| columnIdPositions.put(columnIds.get(i), i); | ||
| } | ||
|
|
||
| int prev = -1; | ||
| int[] selectedFieldPositions = new int[projectedFields.length]; | ||
| for (int i = 0; i < projectedFields.length; i++) { | ||
| int fieldId = projectedFields[i]; | ||
| Integer position = columnIdPositions.get(fieldId); | ||
| if (position == null) { | ||
| throw new InvalidColumnProjectionException( | ||
| String.format( | ||
| "Projected field id %s is not contains in %s", fieldId, columnIds)); | ||
| } | ||
|
|
||
| selectedFieldPositions[i] = position; | ||
| if (position < prev) { | ||
| throw new InvalidColumnProjectionException( | ||
| "The projection indexes should be in field order, but is " | ||
| + Arrays.toString(projectedFields)); | ||
| } | ||
|
|
||
| prev = position; | ||
| } | ||
| return selectedFieldPositions; |
There was a problem hiding this comment.
We can remove this for now, as it may affect projection performance, and we don't need the remapping, as we only support add column add end. We can rename the method into toSelectedFieldPositions().
|
|
||
| message PbModifyColumn{ | ||
| required string column_name = 1; | ||
| required bytes data_type_json = 2; |
There was a problem hiding this comment.
should be optional. The following alter column is also valid.
ALTER TABLE prod.db.sample ALTER COLUMN measurement COMMENT 'unit is kilobytes per second';
| int currentSchemaId = zooKeeperClient.getCurrentSchemaId(tablePath); | ||
| SchemaInfo schemaInfo; | ||
| if (schemaId != currentSchemaId) { | ||
| LOG.warn( | ||
| "Schema id {} is not equal to current schema id {}. Skipping schema change processing.", | ||
| schemaId, | ||
| currentSchemaId); | ||
| return; | ||
| } |
There was a problem hiding this comment.
Why check current schema id? This is a heavy operation. Even if this is an old schema, I think it is still fine to process the schema.

Purpose
Linked issue: close #2056
Brief change log
Tests
API and Format
Documentation