[Feature][seatunnel-api] Integrate Gravitino as metadata service for non-relational connectors#10402
[Feature][seatunnel-api] Integrate Gravitino as metadata service for non-relational connectors#10402chl-wxp wants to merge 52 commits intoapache:devfrom
Conversation
Issue 1: HttpClient Resource LeakLocation: private static volatile CloseableHttpClient httpClient;
private CloseableHttpClient getHttpClient() {
if (httpClient == null) {
synchronized (GravitinoClient.class) {
if (httpClient == null) {
httpClient = HttpClients.createDefault();
}
}
}
return httpClient;
}Related Context:
Issue Description:
Potential Risks:
Impact Scope:
Severity: BLOCKER Improvement Suggestions: // Solution 1: Use shutdown hook (recommended)
private static volatile CloseableHttpClient httpClient;
private static volatile boolean closed = false;
private CloseableHttpClient getHttpClient() {
if (closed) {
throw new IllegalStateException("HttpClient has been closed");
}
if (httpClient == null) {
synchronized (GravitinoClient.class) {
if (httpClient == null && !closed) {
httpClient = HttpClients.createDefault();
// Register JVM shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
if (httpClient != null) {
httpClient.close();
}
} catch (IOException e) {
// Ignore shutdown exceptions
}
}));
}
}
}
return httpClient;
}
// Solution 2: Do not use singleton, create new instance each time (simple but poor performance)
private JsonNode executeGetRequest(String url) throws IOException {
IOException lastException = null;
for (int attempt = 1; attempt <= MAX_RETRY_ATTEMPTS; attempt++) {
HttpGet request = new HttpGet(url);
request.addHeader(HEADER_ACCEPT, MEDIA_TYPE_GRAVITINO_V1);
try (CloseableHttpClient client = HttpClients.createDefault();
CloseableHttpResponse response = client.execute(request)) {
HttpEntity entity = response.getEntity();
if (entity == null) {
throw new RuntimeException(ERROR_NO_RESPONSE_ENTITY);
}
try {
return JsonUtils.readTree(entity.getContent());
} finally {
EntityUtils.consume(entity);
}
} catch (IOException e) {
lastException = e;
}
}
throw lastException;
}
// Solution 3: Implement HttpClient lifecycle management (most complete but complex)
public class GravitinoClient implements MetalakeClient, AutoCloseable {
private CloseableHttpClient httpClient;
private synchronized CloseableHttpClient getHttpClient() {
if (httpClient == null) {
httpClient = HttpClients.createDefault();
}
return httpClient;
}
@Override
public void close() throws IOException {
if (httpClient != null) {
httpClient.close();
httpClient = null;
}
}
}Rationale:
Issue 2: factoryIdentifier Misused as Catalog NameLocation: default List<CatalogTable> discoverTableSchemas(TableSourceFactoryContext context) {
final TableSchemaDiscoverer metaLakeSchemaDiscoverer =
new TableSchemaDiscoverer(context, factoryIdentifier());
return metaLakeSchemaDiscoverer.discoverTableSchemas();
}Related Context:
Issue Description:
This results in:
Potential Risks:
Impact Scope:
Severity: CRITICAL Improvement Suggestions: // Solution 1: Parse catalog name from configuration (recommended)
default List<CatalogTable> discoverTableSchemas(TableSourceFactoryContext context) {
// Get catalog name from configuration, use factoryIdentifier as default if not available
String catalogName = context.getOptions()
.getOptional(ConnectorCommonOptions.CATALOG_NAME) // Need to add this option
.orElse(factoryIdentifier());
final TableSchemaDiscoverer metaLakeSchemaDiscoverer =
new TableSchemaDiscoverer(context, catalogName);
return metaLakeSchemaDiscoverer.discoverTableSchemas();
}
// Solution 2: Parse catalog name from table path
default List<CatalogTable> discoverTableSchemas(TableSourceFactoryContext context) {
// Check the table property in configuration, extract catalog if it's a three-part name
String catalogName = factoryIdentifier();
Optional<String> configuredTable = context.getOptions()
.getOptional(ConnectorCommonOptions.SCHEMA)
.flatMap(schema -> schema.get("table"))
.map(String::valueOf);
if (configuredTable.isPresent()) {
String tablePath = configuredTable.get();
String[] parts = tablePath.split("\\.");
if (parts.length == 3) {
catalogName = parts[0]; // Use the configured catalog
}
}
final TableSchemaDiscoverer metaLakeSchemaDiscoverer =
new TableSchemaDiscoverer(context, catalogName);
return metaLakeSchemaDiscoverer.discoverTableSchemas();
}
// Solution 3: Do not use factoryIdentifier, use fixed default value
default List<CatalogTable> discoverTableSchemas(TableSourceFactoryContext context) {
final TableSchemaDiscoverer metaLakeSchemaDiscoverer =
new TableSchemaDiscoverer(context, "default"); // Use "default" as the default catalog
return metaLakeSchemaDiscoverer.discoverTableSchemas();
}Rationale:
Issue 3: HTTP Retry Mechanism Lacks Backoff Strategy and Exception ClassificationLocation: private JsonNode executeGetRequest(String url) throws IOException {
IOException lastException = null;
for (int attempt = 1; attempt <= MAX_RETRY_ATTEMPTS; attempt++) {
HttpGet request = new HttpGet(url);
request.addHeader(HEADER_ACCEPT, MEDIA_TYPE_GRAVITINO_V1);
try (CloseableHttpResponse response = getHttpClient().execute(request)) {
HttpEntity entity = response.getEntity();
if (entity == null) {
throw new RuntimeException(ERROR_NO_RESPONSE_ENTITY);
}
try {
return JsonUtils.readTree(entity.getContent());
} finally {
EntityUtils.consume(entity);
}
} catch (IOException e) {
lastException = e;
}
}
throw lastException;
}Related Context:
Issue Description:
Potential Risks:
Impact Scope:
Severity: MAJOR Improvement Suggestions: private JsonNode executeGetRequest(String url) throws IOException {
IOException lastException = null;
long baseDelayMs = 1000; // Base delay of 1 second
for (int attempt = 1; attempt <= MAX_RETRY_ATTEMPTS; attempt++) {
HttpGet request = new HttpGet(url);
request.addHeader(HEADER_ACCEPT, MEDIA_TYPE_GRAVITINO_V1);
try (CloseableHttpResponse response = getHttpClient().execute(request)) {
HttpEntity entity = response.getEntity();
if (entity == null) {
throw new RuntimeException(ERROR_NO_RESPONSE_ENTITY);
}
try {
return JsonUtils.readTree(entity.getContent());
} finally {
EntityUtils.consume(entity);
}
} catch (IOException e) {
lastException = e;
// Determine if it should retry
if (!shouldRetry(e) || attempt == MAX_RETRY_ATTEMPTS) {
break;
}
// Exponential backoff
try {
long delayMs = baseDelayMs * (1L << (attempt - 1));
Thread.sleep(delayMs);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException("Retry interrupted", ie);
}
}
}
// Construct detailed exception information
String errorMsg = String.format(
"Failed to fetch schema from Gravitino after %d attempts. URL: %s. Last error: %s",
MAX_RETRY_ATTEMPTS, url, lastException.getMessage()
);
throw new IOException(errorMsg, lastException);
}
/**
* Determine if the exception should be retried
*/
private boolean shouldRetry(IOException e) {
// Non-retryable exception types
if (e instanceof java.net.UnknownHostException) {
return false; // DNS resolution failed
}
if (e instanceof java.net.ConnectException) {
return false; // Connection refused
}
if (e instanceof org.apache.http.client.HttpResponseException) {
int statusCode = ((org.apache.http.client.HttpResponseException) e).getStatusCode();
if (statusCode == 404 || statusCode == 400 || statusCode == 401) {
return false; // Client errors should not be retried
}
}
// Retryable exceptions: timeout, server errors (5xx)
return true;
}Rationale:
Issue 4: Environment Variable Resolution Lacks Error HandlingLocation: if (StringUtils.isNotEmpty(
System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase()))) {
return MetaLakeType.valueOf(
System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase()));
}Related Context:
Issue Description: Potential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestions: // Solution 1: Use try-catch to provide friendly error messages
if (StringUtils.isNotEmpty(
System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase()))) {
String metalakeTypeStr = System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase());
try {
return MetaLakeType.valueOf(metalakeTypeStr);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
String.format(
"Invalid metalake type '%s'. Supported values are: %s. " +
"Please check your environment variable '%s' or configuration.",
metalakeTypeStr,
Arrays.stream(MetaLakeType.values())
.map(Enum::name)
.collect(Collectors.joining(", ")),
EnvCommonOptions.METALAKE_TYPE.key().toUpperCase()
),
e
);
}
}
// Solution 2: Ignore invalid values, use default values and log warnings (more lenient)
if (StringUtils.isNotEmpty(
System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase()))) {
String metalakeTypeStr = System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase());
try {
return MetaLakeType.valueOf(metalakeTypeStr);
} catch (IllegalArgumentException e) {
log.warn(
"Invalid metalake type '{}' from environment variable '{}'. " +
"Using default value '{}'. Supported values are: {}",
metalakeTypeStr,
EnvCommonOptions.METALAKE_TYPE.key().toUpperCase(),
MetaLakeType.GRAVITINO.getType(),
Arrays.stream(MetaLakeType.values())
.map(Enum::name)
.collect(Collectors.joining(", "))
);
return MetaLakeType.GRAVITINO;
}
}
// Solution 3: Use case-insensitive matching (most robust)
if (StringUtils.isNotEmpty(
System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase()))) {
String metalakeTypeStr = System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase());
for (MetaLakeType type : MetaLakeType.values()) {
if (type.name().equalsIgnoreCase(metalakeTypeStr)) {
return type;
}
}
// No matching type found
throw new IllegalArgumentException(
String.format(
"Invalid metalake type '%s'. Supported values are: %s",
metalakeTypeStr,
Arrays.stream(MetaLakeType.values())
.map(Enum::name)
.collect(Collectors.joining(", "))
)
);
}Rationale:
Issue 5: Exception Handling Lacks Context InformationLocation: private CatalogTable discoverTableSchemaFromMetaLake(String schemaUrl, String configTablePath) {
try {
JsonNode schemaNode = metalakeClient.getTableSchema(schemaUrl);
final TablePath tableSchemaPath;
if (StringUtils.isNotEmpty(configTablePath)) {
tableSchemaPath = TablePath.of(configTablePath);
} else {
tableSchemaPath = metalakeClient.getTableSchemaPath(schemaUrl);
}
final TableSchema tableSchema = metaLakeTableSchemaConvertor.convertor(schemaNode);
return metaLakeTableSchemaConvertor.buildCatalogTable(
catalogName, tableSchemaPath, tableSchema);
} catch (IOException e) {
throw new SeaTunnelRuntimeException(GET_META_LAKE_TABLE_SCHEMA_FAILED, e);
}
}Related Context:
Issue Description:
This makes debugging difficult, and users cannot quickly locate which table and which URL have the problem. Potential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestions: private CatalogTable discoverTableSchemaFromMetaLake(String schemaUrl, String configTablePath) {
try {
JsonNode schemaNode = metalakeClient.getTableSchema(schemaUrl);
final TablePath tableSchemaPath;
if (StringUtils.isNotEmpty(configTablePath)) {
tableSchemaPath = TablePath.of(configTablePath);
} else {
tableSchemaPath = metalakeClient.getTableSchemaPath(schemaUrl);
}
final TableSchema tableSchema = metaLakeTableSchemaConvertor.convertor(schemaNode);
return metaLakeTableSchemaConvertor.buildCatalogTable(
catalogName, tableSchemaPath, tableSchema);
} catch (IOException e) {
// Construct detailed error message
String errorMsg = String.format(
"Failed to get table schema from MetaLake. " +
"Schema URL: %s, " +
"Configured table path: %s, " +
"Catalog name: %s, " +
"Error: %s",
schemaUrl,
configTablePath != null ? configTablePath : "not configured",
catalogName,
e.getMessage()
);
throw new SeaTunnelRuntimeException(GET_META_LAKE_TABLE_SCHEMA_FAILED,
new IOException(errorMsg, e));
}
}Rationale:
Issue 6: BaseFileSourceConfig Constructor Signature Change Causes Breaking ChangeLocation: // Before
public BaseFileSourceConfig(ReadonlyConfig readonlyConfig)
// Now
public BaseFileSourceConfig(ReadonlyConfig readonlyConfig, CatalogTable catalogTableFromConfig)Related Context:
Issue Description:
Potential Risks:
Impact Scope:
Severity: MAJOR Improvement Suggestions: // Solution 1: Keep the old constructor, mark as deprecated (recommended)
@Deprecated
public BaseFileSourceConfig(ReadonlyConfig readonlyConfig) {
this(readonlyConfig, null); // Call the new constructor
log.warn(
"BaseFileSourceConfig(ReadonlyConfig) is deprecated and will be removed in future versions. " +
"Please use BaseFileSourceConfig(ReadonlyConfig, CatalogTable) instead."
);
}
public BaseFileSourceConfig(ReadonlyConfig readonlyConfig, CatalogTable catalogTableFromConfig) {
this.baseFileSourceConfig = readonlyConfig;
this.fileFormat = readonlyConfig.get(FileBaseSourceOptions.FILE_FORMAT_TYPE);
this.readStrategy = ReadStrategyFactory.of(readonlyConfig, getHadoopConfig());
this.filePaths = parseFilePaths(readonlyConfig);
this.catalogTableFromConfig = catalogTableFromConfig;
this.catalogTable = parseCatalogTable(readonlyConfig);
}
// Solution 2: Use Builder pattern (more thorough improvement)
public abstract class BaseFileSourceConfig implements Serializable {
// ... field definitions
protected BaseFileSourceConfig(Builder<?> builder) {
this.baseFileSourceConfig = builder.readonlyConfig;
this.fileFormat = builder.fileFormat;
this.readStrategy = builder.readStrategy;
this.filePaths = builder.filePaths;
this.catalogTableFromConfig = builder.catalogTableFromConfig;
this.catalogTable = parseCatalogTable(builder.readonlyConfig);
}
protected abstract static class Builder<T extends Builder<T>> {
private ReadonlyConfig readonlyConfig;
private FileFormat fileFormat;
private ReadStrategy readStrategy;
private List<String> filePaths;
private CatalogTable catalogTableFromConfig;
public T readonlyConfig(ReadonlyConfig readonlyConfig) {
this.readonlyConfig = readonlyConfig;
return self();
}
public T catalogTableFromConfig(CatalogTable catalogTableFromConfig) {
this.catalogTableFromConfig = catalogTableFromConfig;
return self();
}
protected abstract T self();
public abstract BaseFileSourceConfig build();
}
}Rationale:
Issue 7: GravitinoClient.getTableSchemaPath Regex Parsing Lacks Error HandlingLocation: @Override
public TablePath getTableSchemaPath(String schemaHttpUrl) {
Matcher matcher = TABLE_URL_PATTERN.matcher(schemaHttpUrl);
if (!matcher.find()) {
throw new SeaTunnelRuntimeException(
ERROR_INVALID_TABLE_URL,
"Invalid table URL format, expected: /catalogs/{catalog}/schemas/{schema}/tables/{table}");
}
String catalogName = matcher.group(1);
String schemaName = matcher.group(2);
String tableName = matcher.group(3);
return TablePath.of(catalogName, schemaName, tableName);
}Related Context:
Issue Description: Potential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestions: @Override
public TablePath getTableSchemaPath(String schemaHttpUrl) {
if (schemaHttpUrl == null || schemaHttpUrl.isEmpty()) {
throw new SeaTunnelRuntimeException(
ERROR_INVALID_TABLE_URL,
"Table URL cannot be null or empty");
}
Matcher matcher = TABLE_URL_PATTERN.matcher(schemaHttpUrl);
if (!matcher.matches()) { // Use matches() instead of find()
throw new SeaTunnelRuntimeException(
ERROR_INVALID_TABLE_URL,
String.format(
"Invalid table URL format: '%s'. " +
"Expected format: http://host/api/metalakes/{metalake}/catalogs/{catalog}/schemas/{schema}/tables/{table}",
schemaHttpUrl
));
}
String catalogName = matcher.group(1);
String schemaName = matcher.group(2);
String tableName = matcher.group(3);
return TablePath.of(catalogName, schemaName, tableName);
}Rationale:
Issue 8: HttpClient Connections May Become Invalid in Concurrent ScenariosLocation: private CloseableHttpClient getHttpClient() {
if (httpClient == null) {
synchronized (GravitinoClient.class) {
if (httpClient == null) {
httpClient = HttpClients.createDefault();
}
}
}
return httpClient;
}Issue Description:
Apache HttpClient's Potential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestions: private CloseableHttpClient getHttpClient() {
if (httpClient == null) {
synchronized (GravitinoClient.class) {
if (httpClient == null) {
// Configure connection manager to periodically check connection validity
PoolingHttpClientConnectionManager connectionManager =
PoolingHttpClientConnectionManagerBuilder.create()
.setMaxConnTotal(20) // Maximum number of connections
.setMaxConnPerRoute(10) // Maximum connections per route
.build();
// Create HttpClient with reasonable timeout and retry configuration
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(Timeout.ofMilliseconds(5000)) // Connection timeout of 5 seconds
.setConnectionRequestTimeout(Timeout.ofMilliseconds(5000)) // Timeout for getting connection from connection pool
.build();
httpClient = HttpClients.custom()
.setConnectionManager(connectionManager)
.setDefaultRequestConfig(requestConfig)
.evictIdleConnections(30, TimeUnit.SECONDS) // Clean up idle connections every 30 seconds
.build();
// Register JVM shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
if (httpClient != null) {
httpClient.close();
}
} catch (IOException e) {
// Ignore shutdown exceptions
}
}));
}
}
}
return httpClient;
}Rationale:
|
…taLakeFactory # Conflicts: # docs/en/connectors/source/FtpFile.md # docs/zh/connectors/source/FtpFile.md # seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java # seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java # seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
Issue 2 No changes, keep the previous usage. |
seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/gravitino/GravitinoClient.java
Show resolved
Hide resolved
seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/gravitino/GravitinoClient.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/seatunnel/api/metalake/gravitino/GravitinoTableSchemaConvertor.java
Outdated
Show resolved
Hide resolved
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
Show resolved
Hide resolved
|
good job |
|
In addition to file conflicts, please help and take a look again @liugddx @zhangshenghang @Carl-Zhou-CN @davidzollo @hawk9821 |
# Conflicts: # seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SchemaEvolutionErrorCode.java
...-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
Outdated
Show resolved
Hide resolved
...e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMetaLakeIT.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
This PR integrates Apache Gravitino as a MetaLake/metadata service so non-relational connectors (notably file connectors in this PR) can discover table schemas via schema_url (Gravitino REST API) instead of requiring full manual schema definitions, with supporting type conversion, client-side validation, docs, and tests.
Changes:
- Introduces MetaLake schema discovery + Gravitino REST client + Gravitino-to-SeaTunnel schema/type conversion (
TableSchemaDiscoverer,GravitinoClient,GravitinoTableSchemaConvertor,MetaLakeFactory). - Plumbs
envOptionsintoFactoryUtil/TableSourceFactoryContextso connectors can resolvemetalake_typefrom env/source options consistently. - Updates file source connectors/configs to accept discovered
CatalogTable(s), adds E2E coverage + documentation forschema_urland type mappings.
Reviewed changes
Copilot reviewed 79 out of 79 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java | Passes envOptions into source factory creation so schema discovery can read env-level settings. |
| seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java | Passes env options to FactoryUtil.createAndPrepareSource for schema discovery. |
| seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java | Passes env options to FactoryUtil.createAndPrepareSource for schema discovery. |
| seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactoryContext.java | Adds envOptions to the factory context so plugins can access env configuration. |
| seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java | Adds default discoverTableSchemas(...) hook to build CatalogTable list from config/MetaLake. |
| seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java | Extends source creation pipeline to carry envOptions into TableSourceFactoryContext. |
| seatunnel-api/src/main/java/org/apache/seatunnel/api/options/table/TableSchemaOptions.java | Adds metalake_type option for schema discovery. |
| seatunnel-api/src/main/java/org/apache/seatunnel/api/options/table/ColumnOptions.java | Adds schema_url option for schema discovery via REST API. |
| seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SchemaEvolutionErrorCode.java | Adds error codes related to MetaLake schema retrieval and URL parsing. |
| seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/MetalakeClient.java | Expands client contract to support schema fetching + path parsing + close lifecycle. |
| seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/MetaLakeFactory.java | New factory for creating MetaLake clients and schema convertors by type. |
| seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/MetaLakeTableSchemaConvertor.java | New SPI for converting MetaLake JSON into TableSchema/CatalogTable. |
| seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/TableSchemaDiscoverer.java | Implements discovery from fields/columns, schema_url, multi-table configs, and fallback behavior. |
| seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/gravitino/GravitinoClient.java | Implements Gravitino REST calls, retry logic, and schema URL parsing into TablePath. |
| seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/gravitino/GravitinoTableSchemaConvertor.java | Converts Gravitino column/index metadata (incl. complex types) to SeaTunnel schema types. |
| seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/MetalakeConfigUtils.java | Updates MetaLake config replacement flow to new client factory + new client method signature. |
| seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/MetalakeClientFactory.java | Removes old factory in favor of MetaLakeFactory. |
| seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/GravitinoClient.java | Removes old Gravitino client implementation (replaced by ...metalake.gravitino.GravitinoClient). |
| seatunnel-api/src/test/java/org/apache/seatunnel/api/metalake/TableSchemaDiscovererTest.java | Adds unit tests for schema discovery across fields/schema_url/multi-table scenarios. |
| seatunnel-api/src/test/java/org/apache/seatunnel/api/metalake/gravitino/GravitinoClientTest.java | Adds retry/status handling tests and URL-to-TablePath parsing tests. |
| seatunnel-api/src/test/java/org/apache/seatunnel/api/metalake/gravitino/GravitinoTableSchemaConvertorTest.java | Adds extensive type mapping and constraints/index conversion coverage. |
| seatunnel-api/src/test/resources/conf/table_schema_discoverer/single_schema_url.conf | Test config for schema_url single-table discovery. |
| seatunnel-api/src/test/resources/conf/table_schema_discoverer/single_schema_field.conf | Test config for fields single-table schema in config. |
| seatunnel-api/src/test/resources/conf/table_schema_discoverer/single_no_schema.conf | Test config for “no schema” fallback behavior. |
| seatunnel-api/src/test/resources/conf/table_schema_discoverer/multiple_tables_schema_url.conf | Test config for multi-table schema discovery using schema_url. |
| seatunnel-api/src/test/resources/conf/table_schema_discoverer/multiple_tables_no_schema_mixed_format.conf | Test config for multi-table fallback behavior across file formats. |
| seatunnel-api/src/test/resources/conf/table_schema_discoverer/multiple_tables_mixed.conf | Test config for mixing config schema + schema_url. |
| seatunnel-api/src/test/resources/conf/table_schema_discoverer/multiple_tables_fields.conf | Test config for multi-table schema via columns/fields in config. |
| seatunnel-api/src/test/resources/conf/json/metadata_json_from_meta_lake_pgsql.json | Sample metadata JSON used for converter tests. |
| seatunnel-api/src/test/resources/conf/json/metadata_json_from_meta_lake_hive.json | Sample metadata JSON used for converter tests. |
| seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseMultipleTableFileSourceConfig.java | Wires discovered CatalogTable list into per-table file source configs. |
| seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java | Uses injected CatalogTable rather than rebuilding it internally from config. |
| seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java | Enables schema discovery and adds metalake_type option. |
| seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java | Accepts discovered CatalogTable list for multi-table operation. |
| seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/MultipleTableLocalFileSourceConfig.java | Accepts discovered CatalogTable list and passes per-table table metadata down. |
| seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalFileSourceConfig.java | Accepts CatalogTable for file source schema behavior. |
| seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java | Enables schema discovery and adds metalake_type option. |
| seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java | Accepts discovered CatalogTable list. |
| seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/config/MultipleTableHdfsFileSourceConfig.java | Accepts discovered CatalogTable list and per-table metadata injection. |
| seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/config/HdfsFileSourceConfig.java | Accepts CatalogTable injection. |
| seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/HdfsFileSourceConfigTest.java | Updates tests for new HdfsFileSourceConfig constructor signature and schema injection. |
| seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java | Enables schema discovery and adds metalake_type option. |
| seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java | Accepts discovered CatalogTable list. |
| seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/config/MultipleTableS3FileSourceConfig.java | Accepts discovered CatalogTable list and per-table schema injection. |
| seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/config/S3FileSourceConfig.java | Accepts CatalogTable injection. |
| seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java | Enables schema discovery and adds metalake_type option. |
| seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java | Accepts discovered CatalogTable list. |
| seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/config/MultipleTableOssFileSourceConfig.java | Accepts discovered CatalogTable list and per-table schema injection. |
| seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/config/OssFileSourceConfig.java | Accepts CatalogTable injection. |
| seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java | Enables schema discovery and adds metalake_type option. |
| seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java | Accepts discovered CatalogTable list. |
| seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/MultipleTableFTPFileSourceConfig.java | Accepts discovered CatalogTable list and per-table schema injection. |
| seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FTPFileSourceConfig.java | Accepts CatalogTable injection. |
| seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java | Enables schema discovery and adds metalake_type option. |
| seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java | Accepts discovered CatalogTable list. |
| seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/config/MultipleTableSFTPFileSourceConfig.java | Accepts discovered CatalogTable list and per-table schema injection. |
| seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/config/SFTPFileSourceConfig.java | Accepts CatalogTable injection. |
| seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMetaLakeIT.java | Adds E2E integration test standing up MySQL + Gravitino + SeaTunnel to validate schema_url behavior. |
| seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/local_file_csv_to_local_file_csv_with_metalake.conf | Adds E2E job config demonstrating mixed fields and schema_url. |
| seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_to_local_file_with_metalake.conf | Adds E2E job config variant for text/csv schema setups with MetaLake. |
| seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/data/table1.csv | Adds test input data for E2E. |
| seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/data/table2.csv | Adds test input data for E2E. |
| seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/pom.xml | Adds MySQL testcontainers + MySQL JDBC driver for E2E MetaLake test. |
| docs/zh/introduction/concepts/schema-feature.md | Documents schema_url usage and examples (ZH). |
| docs/en/introduction/concepts/schema-feature.md | Documents schema_url usage and examples (EN). |
| docs/zh/introduction/concepts/gravitino-type-mapping.md | Adds Gravitino type mapping doc (ZH). |
| docs/en/introduction/concepts/gravitino-type-mapping.md | Adds Gravitino type mapping doc (EN). |
| docs/zh/connectors/source/LocalFile.md | Documents schema_url/metalake_type for LocalFile (ZH). |
| docs/en/connectors/source/LocalFile.md | Documents schema_url/metalake_type for LocalFile (EN). |
| docs/zh/connectors/source/HdfsFile.md | Documents schema_url/metalake_type for HdfsFile (ZH). |
| docs/en/connectors/source/HdfsFile.md | Documents schema_url/metalake_type for HdfsFile (EN). |
| docs/zh/connectors/source/S3File.md | Documents schema_url/metalake_type for S3File (ZH). |
| docs/en/connectors/source/S3File.md | Documents schema_url/metalake_type for S3File (EN). |
| docs/zh/connectors/source/OssFile.md | Documents schema_url/metalake_type for OssFile (ZH). |
| docs/en/connectors/source/OssFile.md | Documents schema_url/metalake_type for OssFile (EN). |
| docs/zh/connectors/source/FtpFile.md | Documents schema_url/metalake_type for FtpFile (ZH). |
| docs/en/connectors/source/FtpFile.md | Documents schema_url/metalake_type for FtpFile (EN). |
| docs/zh/connectors/source/SftpFile.md | Documents schema_url/metalake_type for SftpFile (ZH). |
| docs/en/connectors/source/SftpFile.md | Documents schema_url/metalake_type for SftpFile (EN). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
What this PR does
Integrates Apache Gravitino as an external metadata service for non-relational connectors (Elasticsearch, vector databases, data lakes, etc.). Users can now query table schemas via Gravitino REST API instead of manually
defining complex schema mappings.
Key Changes
MetaLakeFactoryfor creating metadata service clientsTableSchemaDiscovererinterface for schema discoveryGravitinoClientandGravitinoTableSchemaConvertorBenefits
Related Issue
Fixes #10339