Skip to content

Support Add Column in Fluss.#2010

Merged
wuchong merged 2 commits intoapache:mainfrom
loserwang1024:poc-schema-change
Dec 2, 2025
Merged

Support Add Column in Fluss.#2010
wuchong merged 2 commits intoapache:mainfrom
loserwang1024:poc-schema-change

Conversation

@loserwang1024
Copy link
Copy Markdown
Contributor

@loserwang1024 loserwang1024 commented Nov 24, 2025

Purpose

Linked issue: close #2056

Brief change log

Tests

API and Format

Documentation


public Column(String columnName, DataType dataType) {
this(columnName, dataType, null);
this(columnName, dataType, null, UNKNOWN_COLUMN_ID);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to get the column id for the previous schema (use the column order). Using -1 as default column is error-prone.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I also want to do it. But as a public api, I cannot remove this constructor Column(String columnName, DataType dataType). If not set -1, I have no idea how to handle it.

/** Register schema to ZK metadata and return the schema id. */
public int registerSchema(TablePath tablePath, Schema schema) throws Exception {
int currentSchemaId = getCurrentSchemaId(tablePath);
return registerSchema(tablePath, schema, getCurrentSchemaId(tablePath) + 1);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is creating a new table, we can directly use schema id = 1 here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

sometime schema will be recreated when create table.

@loserwang1024 loserwang1024 force-pushed the poc-schema-change branch 3 times, most recently from 279a97d to 2ade0ce Compare November 27, 2025 02:19
@loserwang1024 loserwang1024 force-pushed the poc-schema-change branch 5 times, most recently from d490cb1 to a475aee Compare November 30, 2025 07:39
@loserwang1024 loserwang1024 changed the title [DRAFT] Fluss Support Schema evolution. Support Add Column in Fluss. Nov 30, 2025
loserwang1024 and others added 2 commits December 1, 2025 21:16
…ration

1. Make lookup requests async.
2. Introduce ServerProjectionCache to share ProjectionInfo in TabletServer level.
3. Revert FlinkAsFlussRow changes, and introduce PaddingRow for the padding null columns for schema changes.
4. Make PutKv zero-copy to persist kv records.
5. Rename SchemaMetadataManager to ServerSchemaCache and use TableId instead of TablePath to track schemas which is safer.
6. Revert source related changes to still include the projectedFieldIndexes parameter.
Copy link
Copy Markdown
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I squashed and pushed a commit to improve the implementations. The change details are listed in the commit message.

Waiting for the CI pass.

Comment on lines +463 to +489
Map<Integer, Integer> columnIdPositions = new HashMap<>();
List<Integer> columnIds = schema.getColumnIds();
for (int i = 0; i < columnIds.size(); i++) {
columnIdPositions.put(columnIds.get(i), i);
}

int prev = -1;
int[] selectedFieldPositions = new int[projectedFields.length];
for (int i = 0; i < projectedFields.length; i++) {
int fieldId = projectedFields[i];
Integer position = columnIdPositions.get(fieldId);
if (position == null) {
throw new InvalidColumnProjectionException(
String.format(
"Projected field id %s is not contains in %s", fieldId, columnIds));
}

selectedFieldPositions[i] = position;
if (position < prev) {
throw new InvalidColumnProjectionException(
"The projection indexes should be in field order, but is "
+ Arrays.toString(projectedFields));
}

prev = position;
}
return selectedFieldPositions;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this for now, as it may affect projection performance, and we don't need the remapping, as we only support add column add end. We can rename the method into toSelectedFieldPositions().


message PbModifyColumn{
required string column_name = 1;
required bytes data_type_json = 2;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be optional. The following alter column is also valid.

ALTER TABLE prod.db.sample ALTER COLUMN measurement COMMENT 'unit is kilobytes per second';

Comment on lines +249 to +257
int currentSchemaId = zooKeeperClient.getCurrentSchemaId(tablePath);
SchemaInfo schemaInfo;
if (schemaId != currentSchemaId) {
LOG.warn(
"Schema id {} is not equal to current schema id {}. Skipping schema change processing.",
schemaId,
currentSchemaId);
return;
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why check current schema id? This is a heavy operation. Even if this is an old schema, I think it is still fine to process the schema.

@wuchong wuchong merged commit 07721fe into apache:main Dec 2, 2025
8 of 9 checks passed
wuchong pushed a commit that referenced this pull request Dec 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Fluss Support Add column

2 participants