Skip to content

Comments

[Feature][seatunnel-api] Integrate Gravitino as metadata service for non-relational connectors#10402

Open
chl-wxp wants to merge 52 commits intoapache:devfrom
chl-wxp:dev-MetaLakeFactory
Open

[Feature][seatunnel-api] Integrate Gravitino as metadata service for non-relational connectors#10402
chl-wxp wants to merge 52 commits intoapache:devfrom
chl-wxp:dev-MetaLakeFactory

Conversation

@chl-wxp
Copy link
Contributor

@chl-wxp chl-wxp commented Jan 27, 2026

What this PR does

Integrates Apache Gravitino as an external metadata service for non-relational connectors (Elasticsearch, vector databases, data lakes, etc.). Users can now query table schemas via Gravitino REST API instead of manually
defining complex schema mappings.

Key Changes

  • Added MetaLakeFactory for creating metadata service clients
  • Added TableSchemaDiscoverer interface for schema discovery
  • Implemented GravitinoClient and GravitinoTableSchemaConvertor
  • Supports all Gravitino column types: boolean, integer, long, float, double, string, decimal, date, time, timestamp, binary, list, map, etc.
  • Schema validation is performed on the client side before job submission

Benefits

  • No manual schema configuration required
  • Single source of truth for table schemas
  • Better consistency between metadata and actual storage

Related Issue

Fixes #10339

@chl-wxp chl-wxp marked this pull request as draft January 27, 2026 02:01
@github-actions github-actions bot added core SeaTunnel core module Zeta api labels Jan 27, 2026
@DanielCarter-stack
Copy link

Issue 1: HttpClient Resource Leak

Location: seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/gravitino/GravitinoClient.java:54-150

private static volatile CloseableHttpClient httpClient;

private CloseableHttpClient getHttpClient() {
    if (httpClient == null) {
        synchronized (GravitinoClient.class) {
            if (httpClient == null) {
                httpClient = HttpClients.createDefault();
            }
        }
    }
    return httpClient;
}

Related Context:

  • Caller: GravitinoClient.executeGetRequest() (line 95)
  • HTTP request retry loop: lines 98-115
  • CloseableHttpResponse using try-with-resources: line 102

Issue Description:
CloseableHttpClient implements the AutoCloseable interface and manages connection pool and thread pool resources. The current code uses a singleton pattern but never calls the close() method, resulting in:

  1. Connections in the connection pool are never released
  2. Threads in the thread pool are never terminated
  3. In long-running processes, if the classloader reloads, multiple HttpClient instances will accumulate, exacerbating the resource leak

Potential Risks:

  • Risk 1: File descriptor leak. On Linux systems, each TCP connection occupies a file descriptor. Long-running processes may lead to "Too many open files" errors.
  • Risk 2: Thread leak. HttpClient uses a connection pool by default and maintains threads internally. Not closing it causes thread accumulation.
  • Risk 3: Memory leak. Resources such as connection pools and DNS resolution caches will not be garbage collected.

Impact Scope:

  • Direct Impact: All SeaTunnel jobs using schema_url
  • Indirect Impact: Long-running SeaTunnel Engine services
  • Affected Area: Core framework (seatunnel-api)

Severity: BLOCKER

Improvement Suggestions:

// Solution 1: Use shutdown hook (recommended)
private static volatile CloseableHttpClient httpClient;
private static volatile boolean closed = false;

private CloseableHttpClient getHttpClient() {
    if (closed) {
        throw new IllegalStateException("HttpClient has been closed");
    }
    if (httpClient == null) {
        synchronized (GravitinoClient.class) {
            if (httpClient == null && !closed) {
                httpClient = HttpClients.createDefault();
                // Register JVM shutdown hook
                Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                    try {
                        if (httpClient != null) {
                            httpClient.close();
                        }
                    } catch (IOException e) {
                        // Ignore shutdown exceptions
                    }
                }));
            }
        }
    }
    return httpClient;
}

// Solution 2: Do not use singleton, create new instance each time (simple but poor performance)
private JsonNode executeGetRequest(String url) throws IOException {
    IOException lastException = null;
    for (int attempt = 1; attempt <= MAX_RETRY_ATTEMPTS; attempt++) {
        HttpGet request = new HttpGet(url);
        request.addHeader(HEADER_ACCEPT, MEDIA_TYPE_GRAVITINO_V1);
        
        try (CloseableHttpClient client = HttpClients.createDefault();
             CloseableHttpResponse response = client.execute(request)) {
            HttpEntity entity = response.getEntity();
            if (entity == null) {
                throw new RuntimeException(ERROR_NO_RESPONSE_ENTITY);
            }
            try {
                return JsonUtils.readTree(entity.getContent());
            } finally {
                EntityUtils.consume(entity);
            }
        } catch (IOException e) {
            lastException = e;
        }
    }
    throw lastException;
}

// Solution 3: Implement HttpClient lifecycle management (most complete but complex)
public class GravitinoClient implements MetalakeClient, AutoCloseable {
    private CloseableHttpClient httpClient;
    
    private synchronized CloseableHttpClient getHttpClient() {
        if (httpClient == null) {
            httpClient = HttpClients.createDefault();
        }
        return httpClient;
    }
    
    @Override
    public void close() throws IOException {
        if (httpClient != null) {
            httpClient.close();
            httpClient = null;
        }
    }
}

Rationale:

  • Option 1 balances performance (singleton reuse) and resource management (shutdown hook)
  • Option 2 is simple but has performance overhead (creating new client each time)
  • Option 3 is most complete but requires modifying caller code to ensure close() is called

Issue 2: factoryIdentifier Misused as Catalog Name

Location: seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java:48-52

default List<CatalogTable> discoverTableSchemas(TableSourceFactoryContext context) {
    final TableSchemaDiscoverer metaLakeSchemaDiscoverer =
            new TableSchemaDiscoverer(context, factoryIdentifier());
    return metaLakeSchemaDiscoverer.discoverTableSchemas();
}

Related Context:

  • Interface definition: TableSourceFactory.java:24 (String factoryIdentifier())
  • Caller: LocalFileSourceFactory.createSource() (connector-file-local)
  • catalogName usage: TableSchemaDiscoverer.java:122, 136, 147

Issue Description:
factoryIdentifier() returns a connector identifier (such as "LocalFile", "HdfsFile"), which is not the semantic meaning of catalog. In TableSchemaDiscoverer, catalogName is used to:

  1. Construct the catalog name for CatalogTable
  2. As part of TableIdentifier.of(catalogName, tablePath)

This results in:

  • When users configure schema { table = "mydb.mytable" }, the final catalog name will be replaced with "LocalFile"
  • Cannot correctly distinguish between different catalogs (such as production and test environments)

Potential Risks:

  • Risk 1: Catalog name semantic confusion, making it difficult for users to understand why the catalog name became the connector name
  • Risk 2: If users specify a catalog name in the configuration (such as table = "prod_db.table1"), it will be overwritten

Impact Scope:

  • Direct Impact: All scenarios using schema_url or schema.fields
  • Indirect Impact: Downstream functionality that relies on correct catalog names (such as multi-catalog writes)
  • Affected Area: All file connectors

Severity: CRITICAL

Improvement Suggestions:

// Solution 1: Parse catalog name from configuration (recommended)
default List<CatalogTable> discoverTableSchemas(TableSourceFactoryContext context) {
    // Get catalog name from configuration, use factoryIdentifier as default if not available
    String catalogName = context.getOptions()
        .getOptional(ConnectorCommonOptions.CATALOG_NAME)  // Need to add this option
        .orElse(factoryIdentifier());
    
    final TableSchemaDiscoverer metaLakeSchemaDiscoverer =
            new TableSchemaDiscoverer(context, catalogName);
    return metaLakeSchemaDiscoverer.discoverTableSchemas();
}

// Solution 2: Parse catalog name from table path
default List<CatalogTable> discoverTableSchemas(TableSourceFactoryContext context) {
    // Check the table property in configuration, extract catalog if it's a three-part name
    String catalogName = factoryIdentifier();
    
    Optional<String> configuredTable = context.getOptions()
        .getOptional(ConnectorCommonOptions.SCHEMA)
        .flatMap(schema -> schema.get("table"))
        .map(String::valueOf);
    
    if (configuredTable.isPresent()) {
        String tablePath = configuredTable.get();
        String[] parts = tablePath.split("\\.");
        if (parts.length == 3) {
            catalogName = parts[0];  // Use the configured catalog
        }
    }
    
    final TableSchemaDiscoverer metaLakeSchemaDiscoverer =
            new TableSchemaDiscoverer(context, catalogName);
    return metaLakeSchemaDiscoverer.discoverTableSchemas();
}

// Solution 3: Do not use factoryIdentifier, use fixed default value
default List<CatalogTable> discoverTableSchemas(TableSourceFactoryContext context) {
    final TableSchemaDiscoverer metaLakeSchemaDiscoverer =
            new TableSchemaDiscoverer(context, "default");  // Use "default" as the default catalog
    return metaLakeSchemaDiscoverer.discoverTableSchemas();
}

Rationale:

  • Option 1 is most flexible, allowing users to customize catalog names
  • Option 2 has best compatibility, inferring catalog from existing configuration
  • Option 3 is simplest and aligns with most users' expectations (using "default" catalog)

Issue 3: HTTP Retry Mechanism Lacks Backoff Strategy and Exception Classification

Location: seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/gravitino/GravitinoClient.java:95-118

private JsonNode executeGetRequest(String url) throws IOException {
    IOException lastException = null;

    for (int attempt = 1; attempt <= MAX_RETRY_ATTEMPTS; attempt++) {
        HttpGet request = new HttpGet(url);
        request.addHeader(HEADER_ACCEPT, MEDIA_TYPE_GRAVITINO_V1);

        try (CloseableHttpResponse response = getHttpClient().execute(request)) {
            HttpEntity entity = response.getEntity();
            if (entity == null) {
                throw new RuntimeException(ERROR_NO_RESPONSE_ENTITY);
            }
            try {
                return JsonUtils.readTree(entity.getContent());
            } finally {
                EntityUtils.consume(entity);
            }
        } catch (IOException e) {
            lastException = e;
        }
    }

    throw lastException;
}

Related Context:

  • Constant definitions: MAX_RETRY_ATTEMPTS = 2 (line 50)
  • Caller: getTableSchema() (line 69)

Issue Description:
The current retry mechanism has the following issues:

  1. No delay: Retries are executed immediately, which may cause a thundering herd effect
  2. No exception classification: All IOExceptions are retried, but certain exceptions should not be retried (such as DNS resolution failures)
  3. Missing exponential backoff: Fixed number of retries, no delay adjustment based on failure count
  4. Exception context loss: The final thrown exception does not include URL and retry count information

Potential Risks:

  • Risk 1: When the Gravitino service is temporarily unavailable, rapid retries may exacerbate service pressure
  • Risk 2: Exceptions such as DNS resolution failures and connection refusals should not be retried, but the current code will retry them
  • Risk 3: Difficult debugging, exception messages lack key information (such as URL, retry count)

Impact Scope:

  • Direct Impact: All scenarios where metadata is obtained through schema_url
  • Indirect Impact: Stability of Gravitino service
  • Affected Area: Core framework

Severity: MAJOR

Improvement Suggestions:

private JsonNode executeGetRequest(String url) throws IOException {
    IOException lastException = null;
    long baseDelayMs = 1000;  // Base delay of 1 second

    for (int attempt = 1; attempt <= MAX_RETRY_ATTEMPTS; attempt++) {
        HttpGet request = new HttpGet(url);
        request.addHeader(HEADER_ACCEPT, MEDIA_TYPE_GRAVITINO_V1);

        try (CloseableHttpResponse response = getHttpClient().execute(request)) {
            HttpEntity entity = response.getEntity();
            if (entity == null) {
                throw new RuntimeException(ERROR_NO_RESPONSE_ENTITY);
            }
            try {
                return JsonUtils.readTree(entity.getContent());
            } finally {
                EntityUtils.consume(entity);
            }
        } catch (IOException e) {
            lastException = e;
            
            // Determine if it should retry
            if (!shouldRetry(e) || attempt == MAX_RETRY_ATTEMPTS) {
                break;
            }
            
            // Exponential backoff
            try {
                long delayMs = baseDelayMs * (1L << (attempt - 1));
                Thread.sleep(delayMs);
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                throw new IOException("Retry interrupted", ie);
            }
        }
    }

    // Construct detailed exception information
    String errorMsg = String.format(
        "Failed to fetch schema from Gravitino after %d attempts. URL: %s. Last error: %s",
        MAX_RETRY_ATTEMPTS, url, lastException.getMessage()
    );
    throw new IOException(errorMsg, lastException);
}

/**
 * Determine if the exception should be retried
 */
private boolean shouldRetry(IOException e) {
    // Non-retryable exception types
    if (e instanceof java.net.UnknownHostException) {
        return false;  // DNS resolution failed
    }
    if (e instanceof java.net.ConnectException) {
        return false;  // Connection refused
    }
    if (e instanceof org.apache.http.client.HttpResponseException) {
        int statusCode = ((org.apache.http.client.HttpResponseException) e).getStatusCode();
        if (statusCode == 404 || statusCode == 400 || statusCode == 401) {
            return false;  // Client errors should not be retried
        }
    }
    
    // Retryable exceptions: timeout, server errors (5xx)
    return true;
}

Rationale:

  • Exponential backoff prevents thundering herd effect
  • Exception classification avoids meaningless retries
  • Detailed exception information facilitates debugging
  • Aligns with best practices for network request retries

Issue 4: Environment Variable Resolution Lacks Error Handling

Location: seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/TableSchemaDiscoverer.java:165-169

if (StringUtils.isNotEmpty(
        System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase()))) {
    return MetaLakeType.valueOf(
            System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase()));
}

Related Context:

  • Enum definition: MetaLakeType.java (only one value: GRAVITINO)
  • Configuration option definition: EnvCommonOptions.METALAKE_TYPE

Issue Description:
When the environment variable METALAKE_TYPE is set to an invalid value (such as invalid), MetaLakeType.valueOf() throws IllegalArgumentException, causing job submission to fail. The error message is unfriendly, and users do not know what value to set.

Potential Risks:

  • Risk 1: Users receive obscure error messages (No enum constant MetaLakeType.invalid) when configuration is incorrect
  • Risk 2: Lack of prompt information, users do not know the valid value is gravitino

Impact Scope:

  • Direct Impact: Users using environment variable to configure METALAKE_TYPE
  • Indirect Impact: Difficult for operations personnel to troubleshoot
  • Affected Area: All scenarios using metadata service

Severity: MINOR

Improvement Suggestions:

// Solution 1: Use try-catch to provide friendly error messages
if (StringUtils.isNotEmpty(
        System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase()))) {
    String metalakeTypeStr = System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase());
    try {
        return MetaLakeType.valueOf(metalakeTypeStr);
    } catch (IllegalArgumentException e) {
        throw new IllegalArgumentException(
            String.format(
                "Invalid metalake type '%s'. Supported values are: %s. " +
                "Please check your environment variable '%s' or configuration.",
                metalakeTypeStr,
                Arrays.stream(MetaLakeType.values())
                    .map(Enum::name)
                    .collect(Collectors.joining(", ")),
                EnvCommonOptions.METALAKE_TYPE.key().toUpperCase()
            ),
            e
        );
    }
}

// Solution 2: Ignore invalid values, use default values and log warnings (more lenient)
if (StringUtils.isNotEmpty(
        System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase()))) {
    String metalakeTypeStr = System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase());
    try {
        return MetaLakeType.valueOf(metalakeTypeStr);
    } catch (IllegalArgumentException e) {
        log.warn(
            "Invalid metalake type '{}' from environment variable '{}'. " +
            "Using default value '{}'. Supported values are: {}",
            metalakeTypeStr,
            EnvCommonOptions.METALAKE_TYPE.key().toUpperCase(),
            MetaLakeType.GRAVITINO.getType(),
            Arrays.stream(MetaLakeType.values())
                .map(Enum::name)
                .collect(Collectors.joining(", "))
        );
        return MetaLakeType.GRAVITINO;
    }
}

// Solution 3: Use case-insensitive matching (most robust)
if (StringUtils.isNotEmpty(
        System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase()))) {
    String metalakeTypeStr = System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase());
    for (MetaLakeType type : MetaLakeType.values()) {
        if (type.name().equalsIgnoreCase(metalakeTypeStr)) {
            return type;
        }
    }
    // No matching type found
    throw new IllegalArgumentException(
        String.format(
            "Invalid metalake type '%s'. Supported values are: %s",
            metalakeTypeStr,
            Arrays.stream(MetaLakeType.values())
                .map(Enum::name)
                .collect(Collectors.joining(", "))
        )
    );
}

Rationale:

  • Option 1 provides most detailed error information, helping users quickly locate issues
  • Option 2 is most lenient but may hide configuration errors
  • Option 3 is most robust, supporting case-insensitive input

Issue 5: Exception Handling Lacks Context Information

Location: seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/TableSchemaDiscoverer.java:125-140

private CatalogTable discoverTableSchemaFromMetaLake(String schemaUrl, String configTablePath) {
    try {
        JsonNode schemaNode = metalakeClient.getTableSchema(schemaUrl);
        final TablePath tableSchemaPath;
        if (StringUtils.isNotEmpty(configTablePath)) {
            tableSchemaPath = TablePath.of(configTablePath);
        } else {
            tableSchemaPath = metalakeClient.getTableSchemaPath(schemaUrl);
        }
        final TableSchema tableSchema = metaLakeTableSchemaConvertor.convertor(schemaNode);
        return metaLakeTableSchemaConvertor.buildCatalogTable(
                catalogName, tableSchemaPath, tableSchema);
    } catch (IOException e) {
        throw new SeaTunnelRuntimeException(GET_META_LAKE_TABLE_SCHEMA_FAILED, e);
    }
}

Related Context:

  • Error code definition: SchemaEvolutionErrorCode.GET_META_LAKE_TABLE_SCHEMA_FAILED
  • Exception message: "Get meta lake table schema failed"

Issue Description:
When fetching schema from Gravitino fails, the thrown SeaTunnelRuntimeException only contains the original IOException, lacking key context information:

  1. schemaUrl - User-configured URL
  2. tablePath - Parsed table path
  3. catalogName - Catalog name

This makes debugging difficult, and users cannot quickly locate which table and which URL have the problem.

Potential Risks:

  • Risk 1: In production environments, if multiple tables use schema_url, it is impossible to quickly locate which table failed from logs
  • Risk 2: Operations personnel need to manually check configuration files to determine the failed URL

Impact Scope:

  • Direct Impact: All scenarios using schema_url
  • Indirect Impact: Troubleshooting efficiency
  • Affected Area: All file connectors

Severity: MINOR

Improvement Suggestions:

private CatalogTable discoverTableSchemaFromMetaLake(String schemaUrl, String configTablePath) {
    try {
        JsonNode schemaNode = metalakeClient.getTableSchema(schemaUrl);
        final TablePath tableSchemaPath;
        if (StringUtils.isNotEmpty(configTablePath)) {
            tableSchemaPath = TablePath.of(configTablePath);
        } else {
            tableSchemaPath = metalakeClient.getTableSchemaPath(schemaUrl);
        }
        final TableSchema tableSchema = metaLakeTableSchemaConvertor.convertor(schemaNode);
        return metaLakeTableSchemaConvertor.buildCatalogTable(
                catalogName, tableSchemaPath, tableSchema);
    } catch (IOException e) {
        // Construct detailed error message
        String errorMsg = String.format(
            "Failed to get table schema from MetaLake. " +
            "Schema URL: %s, " +
            "Configured table path: %s, " +
            "Catalog name: %s, " +
            "Error: %s",
            schemaUrl,
            configTablePath != null ? configTablePath : "not configured",
            catalogName,
            e.getMessage()
        );
        throw new SeaTunnelRuntimeException(GET_META_LAKE_TABLE_SCHEMA_FAILED, 
            new IOException(errorMsg, e));
    }
}

Rationale:

  • Include all key context information in exception messages
  • Preserve original exception as cause for viewing complete stack trace
  • Aligns with exception handling best practices (fail-fast with context)

Issue 6: BaseFileSourceConfig Constructor Signature Change Causes Breaking Change

Location: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java:57-60

// Before
public BaseFileSourceConfig(ReadonlyConfig readonlyConfig)

// Now
public BaseFileSourceConfig(ReadonlyConfig readonlyConfig, CatalogTable catalogTableFromConfig)

Related Context:

  • Parent class: BaseFileSourceConfig (abstract class)
  • Subclasses: BaseFileSourceConfig implementations of 6 file connectors
  • Caller: BaseMultipleTableFileSourceConfig

Issue Description:
This is a breaking change that will cause all external code inheriting BaseFileSourceConfig to fail compilation. Although all internal subclasses in the current PR have been updated, there is no:

  1. Deprecation warning (@Deprecated)
  2. Migration guide
  3. Compatible transition solution

Potential Risks:

  • Risk 1: If users have customized file connectors inheriting BaseFileSourceConfig, code will fail to compile
  • Risk 2: After upgrading to a new version, users need to modify code to compile successfully

Impact Scope:

  • Direct Impact: All external code inheriting BaseFileSourceConfig
  • Indirect Impact: Upgrade compatibility
  • Affected Area: All file connectors

Severity: MAJOR

Improvement Suggestions:

// Solution 1: Keep the old constructor, mark as deprecated (recommended)
@Deprecated
public BaseFileSourceConfig(ReadonlyConfig readonlyConfig) {
    this(readonlyConfig, null);  // Call the new constructor
    log.warn(
        "BaseFileSourceConfig(ReadonlyConfig) is deprecated and will be removed in future versions. " +
        "Please use BaseFileSourceConfig(ReadonlyConfig, CatalogTable) instead."
    );
}

public BaseFileSourceConfig(ReadonlyConfig readonlyConfig, CatalogTable catalogTableFromConfig) {
    this.baseFileSourceConfig = readonlyConfig;
    this.fileFormat = readonlyConfig.get(FileBaseSourceOptions.FILE_FORMAT_TYPE);
    this.readStrategy = ReadStrategyFactory.of(readonlyConfig, getHadoopConfig());
    this.filePaths = parseFilePaths(readonlyConfig);
    this.catalogTableFromConfig = catalogTableFromConfig;
    this.catalogTable = parseCatalogTable(readonlyConfig);
}

// Solution 2: Use Builder pattern (more thorough improvement)
public abstract class BaseFileSourceConfig implements Serializable {
    // ... field definitions
    
    protected BaseFileSourceConfig(Builder<?> builder) {
        this.baseFileSourceConfig = builder.readonlyConfig;
        this.fileFormat = builder.fileFormat;
        this.readStrategy = builder.readStrategy;
        this.filePaths = builder.filePaths;
        this.catalogTableFromConfig = builder.catalogTableFromConfig;
        this.catalogTable = parseCatalogTable(builder.readonlyConfig);
    }
    
    protected abstract static class Builder<T extends Builder<T>> {
        private ReadonlyConfig readonlyConfig;
        private FileFormat fileFormat;
        private ReadStrategy readStrategy;
        private List<String> filePaths;
        private CatalogTable catalogTableFromConfig;
        
        public T readonlyConfig(ReadonlyConfig readonlyConfig) {
            this.readonlyConfig = readonlyConfig;
            return self();
        }
        
        public T catalogTableFromConfig(CatalogTable catalogTableFromConfig) {
            this.catalogTableFromConfig = catalogTableFromConfig;
            return self();
        }
        
        protected abstract T self();
        
        public abstract BaseFileSourceConfig build();
    }
}

Rationale:

  • Option 1 provides backward compatibility, giving users time to migrate
  • Option 2 is more thorough, using Builder pattern to avoid the problem of too many constructor parameters

Issue 7: GravitinoClient.getTableSchemaPath Regex Parsing Lacks Error Handling

Location: seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/gravitino/GravitinoClient.java:74-86

@Override
public TablePath getTableSchemaPath(String schemaHttpUrl) {
    Matcher matcher = TABLE_URL_PATTERN.matcher(schemaHttpUrl);
    if (!matcher.find()) {
        throw new SeaTunnelRuntimeException(
                ERROR_INVALID_TABLE_URL,
                "Invalid table URL format, expected: /catalogs/{catalog}/schemas/{schema}/tables/{table}");
    }
    String catalogName = matcher.group(1);
    String schemaName = matcher.group(2);
    String tableName = matcher.group(3);
    return TablePath.of(catalogName, schemaName, tableName);
}

Related Context:

  • Regex definition: TABLE_URL_PATTERN = Pattern.compile("/catalogs/([^/]+)/schemas/([^/]+)/tables/([^/]+)")
  • Error code: ERROR_INVALID_TABLE_URL

Issue Description:
When the URL format is incorrect, the thrown exception message lacks the actual URL, making it impossible for users to know which URL has the wrong format. Additionally, matcher.find() only finds the first match. If the URL contains multiple substrings that meet the condition, incorrect results may be parsed.

Potential Risks:

  • Risk 1: Exception message does not contain the actual URL, making debugging difficult
  • Risk 2: matcher.find() may match the wrong location (although probability is very low)

Impact Scope:

  • Direct Impact: All scenarios using schema_url without configuring table property
  • Indirect Impact: Troubleshooting efficiency
  • Affected Area: All file connectors

Severity: MINOR

Improvement Suggestions:

@Override
public TablePath getTableSchemaPath(String schemaHttpUrl) {
    if (schemaHttpUrl == null || schemaHttpUrl.isEmpty()) {
        throw new SeaTunnelRuntimeException(
                ERROR_INVALID_TABLE_URL,
                "Table URL cannot be null or empty");
    }
    
    Matcher matcher = TABLE_URL_PATTERN.matcher(schemaHttpUrl);
    if (!matcher.matches()) {  // Use matches() instead of find()
        throw new SeaTunnelRuntimeException(
                ERROR_INVALID_TABLE_URL,
                String.format(
                    "Invalid table URL format: '%s'. " +
                    "Expected format: http://host/api/metalakes/{metalake}/catalogs/{catalog}/schemas/{schema}/tables/{table}",
                    schemaHttpUrl
                ));
    }
    String catalogName = matcher.group(1);
    String schemaName = matcher.group(2);
    String tableName = matcher.group(3);
    return TablePath.of(catalogName, schemaName, tableName);
}

Rationale:

  • Use matches() to ensure the entire URL matches, not partial matching
  • Include the actual URL in exception messages
  • Add null checks to provide more friendly error messages

Issue 8: HttpClient Connections May Become Invalid in Concurrent Scenarios

Location: seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/gravitino/GravitinoClient.java:141-150

private CloseableHttpClient getHttpClient() {
    if (httpClient == null) {
        synchronized (GravitinoClient.class) {
            if (httpClient == null) {
                httpClient = HttpClients.createDefault();
            }
        }
    }
    return httpClient;
}

Issue Description:
Although CloseableHttpClient is thread-safe, the following scenarios may cause issues:

  1. After Gravitino service restarts, old connections in HttpClient may become invalid
  2. After DNS resolution changes, HttpClient may still use the old IP address
  3. In long-running processes, HttpClient's connection pool may encounter various issues (such as idle connections being closed by the server)

Apache HttpClient's createDefault() method creates a client using the default connection manager, which automatically handles connection invalidation, but requires proper configuration.

Potential Risks:

  • Risk 1: After Gravitino service restarts, requests in a short period may fail (until the connection manager detects connection invalidation)
  • Risk 2: Long-running processes may accumulate many invalid connections

Impact Scope:

  • Direct Impact: Long-running SeaTunnel jobs
  • Indirect Impact: Gravitino service operations
  • Affected Area: All scenarios using schema_url

Severity: MINOR

Improvement Suggestions:

private CloseableHttpClient getHttpClient() {
    if (httpClient == null) {
        synchronized (GravitinoClient.class) {
            if (httpClient == null) {
                // Configure connection manager to periodically check connection validity
                PoolingHttpClientConnectionManager connectionManager =
                    PoolingHttpClientConnectionManagerBuilder.create()
                        .setMaxConnTotal(20)  // Maximum number of connections
                        .setMaxConnPerRoute(10)  // Maximum connections per route
                        .build();
                
                // Create HttpClient with reasonable timeout and retry configuration
                RequestConfig requestConfig = RequestConfig.custom()
                    .setConnectTimeout(Timeout.ofMilliseconds(5000))  // Connection timeout of 5 seconds
                    .setConnectionRequestTimeout(Timeout.ofMilliseconds(5000))  // Timeout for getting connection from connection pool
                    .build();
                
                httpClient = HttpClients.custom()
                    .setConnectionManager(connectionManager)
                    .setDefaultRequestConfig(requestConfig)
                    .evictIdleConnections(30, TimeUnit.SECONDS)  // Clean up idle connections every 30 seconds
                    .build();
                    
                // Register JVM shutdown hook
                Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                    try {
                        if (httpClient != null) {
                            httpClient.close();
                        }
                    } catch (IOException e) {
                        // Ignore shutdown exceptions
                    }
                }));
            }
        }
    }
    return httpClient;
}

Rationale:

  • Configure connection pool parameters to avoid resource exhaustion
  • Periodically clean up idle connections to avoid using invalid connections
  • Set reasonable timeout values
  • Add shutdown hook to resolve resource leak issues

…taLakeFactory

# Conflicts:
#	docs/en/connectors/source/FtpFile.md
#	docs/zh/connectors/source/FtpFile.md
#	seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
#	seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
#	seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
@chl-wxp
Copy link
Contributor Author

chl-wxp commented Feb 6, 2026

Issue 6

Issue 2 No changes, keep the previous usage.
Issue 6 will not be changed. Relevant changes have been completed and no transition is required.
Everything else has been changed.

davidzollo
davidzollo previously approved these changes Feb 6, 2026
Copy link
Contributor

@davidzollo davidzollo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 if CI passed

@Carl-Zhou-CN
Copy link
Member

good job

@chl-wxp
Copy link
Contributor Author

chl-wxp commented Feb 10, 2026

In addition to file conflicts, please help and take a look again @liugddx @zhangshenghang @Carl-Zhou-CN @davidzollo @hawk9821

# Conflicts:
#	seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SchemaEvolutionErrorCode.java
Copy link
Contributor

@hawk9821 hawk9821 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

`

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR integrates Apache Gravitino as a MetaLake/metadata service so non-relational connectors (notably file connectors in this PR) can discover table schemas via schema_url (Gravitino REST API) instead of requiring full manual schema definitions, with supporting type conversion, client-side validation, docs, and tests.

Changes:

  • Introduces MetaLake schema discovery + Gravitino REST client + Gravitino-to-SeaTunnel schema/type conversion (TableSchemaDiscoverer, GravitinoClient, GravitinoTableSchemaConvertor, MetaLakeFactory).
  • Plumbs envOptions into FactoryUtil/TableSourceFactoryContext so connectors can resolve metalake_type from env/source options consistently.
  • Updates file source connectors/configs to accept discovered CatalogTable(s), adds E2E coverage + documentation for schema_url and type mappings.

Reviewed changes

Copilot reviewed 79 out of 79 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java Passes envOptions into source factory creation so schema discovery can read env-level settings.
seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java Passes env options to FactoryUtil.createAndPrepareSource for schema discovery.
seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java Passes env options to FactoryUtil.createAndPrepareSource for schema discovery.
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactoryContext.java Adds envOptions to the factory context so plugins can access env configuration.
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java Adds default discoverTableSchemas(...) hook to build CatalogTable list from config/MetaLake.
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java Extends source creation pipeline to carry envOptions into TableSourceFactoryContext.
seatunnel-api/src/main/java/org/apache/seatunnel/api/options/table/TableSchemaOptions.java Adds metalake_type option for schema discovery.
seatunnel-api/src/main/java/org/apache/seatunnel/api/options/table/ColumnOptions.java Adds schema_url option for schema discovery via REST API.
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SchemaEvolutionErrorCode.java Adds error codes related to MetaLake schema retrieval and URL parsing.
seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/MetalakeClient.java Expands client contract to support schema fetching + path parsing + close lifecycle.
seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/MetaLakeFactory.java New factory for creating MetaLake clients and schema convertors by type.
seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/MetaLakeTableSchemaConvertor.java New SPI for converting MetaLake JSON into TableSchema/CatalogTable.
seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/TableSchemaDiscoverer.java Implements discovery from fields/columns, schema_url, multi-table configs, and fallback behavior.
seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/gravitino/GravitinoClient.java Implements Gravitino REST calls, retry logic, and schema URL parsing into TablePath.
seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/gravitino/GravitinoTableSchemaConvertor.java Converts Gravitino column/index metadata (incl. complex types) to SeaTunnel schema types.
seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/MetalakeConfigUtils.java Updates MetaLake config replacement flow to new client factory + new client method signature.
seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/MetalakeClientFactory.java Removes old factory in favor of MetaLakeFactory.
seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/GravitinoClient.java Removes old Gravitino client implementation (replaced by ...metalake.gravitino.GravitinoClient).
seatunnel-api/src/test/java/org/apache/seatunnel/api/metalake/TableSchemaDiscovererTest.java Adds unit tests for schema discovery across fields/schema_url/multi-table scenarios.
seatunnel-api/src/test/java/org/apache/seatunnel/api/metalake/gravitino/GravitinoClientTest.java Adds retry/status handling tests and URL-to-TablePath parsing tests.
seatunnel-api/src/test/java/org/apache/seatunnel/api/metalake/gravitino/GravitinoTableSchemaConvertorTest.java Adds extensive type mapping and constraints/index conversion coverage.
seatunnel-api/src/test/resources/conf/table_schema_discoverer/single_schema_url.conf Test config for schema_url single-table discovery.
seatunnel-api/src/test/resources/conf/table_schema_discoverer/single_schema_field.conf Test config for fields single-table schema in config.
seatunnel-api/src/test/resources/conf/table_schema_discoverer/single_no_schema.conf Test config for “no schema” fallback behavior.
seatunnel-api/src/test/resources/conf/table_schema_discoverer/multiple_tables_schema_url.conf Test config for multi-table schema discovery using schema_url.
seatunnel-api/src/test/resources/conf/table_schema_discoverer/multiple_tables_no_schema_mixed_format.conf Test config for multi-table fallback behavior across file formats.
seatunnel-api/src/test/resources/conf/table_schema_discoverer/multiple_tables_mixed.conf Test config for mixing config schema + schema_url.
seatunnel-api/src/test/resources/conf/table_schema_discoverer/multiple_tables_fields.conf Test config for multi-table schema via columns/fields in config.
seatunnel-api/src/test/resources/conf/json/metadata_json_from_meta_lake_pgsql.json Sample metadata JSON used for converter tests.
seatunnel-api/src/test/resources/conf/json/metadata_json_from_meta_lake_hive.json Sample metadata JSON used for converter tests.
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseMultipleTableFileSourceConfig.java Wires discovered CatalogTable list into per-table file source configs.
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java Uses injected CatalogTable rather than rebuilding it internally from config.
seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java Enables schema discovery and adds metalake_type option.
seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java Accepts discovered CatalogTable list for multi-table operation.
seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/MultipleTableLocalFileSourceConfig.java Accepts discovered CatalogTable list and passes per-table table metadata down.
seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalFileSourceConfig.java Accepts CatalogTable for file source schema behavior.
seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java Enables schema discovery and adds metalake_type option.
seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java Accepts discovered CatalogTable list.
seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/config/MultipleTableHdfsFileSourceConfig.java Accepts discovered CatalogTable list and per-table metadata injection.
seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/config/HdfsFileSourceConfig.java Accepts CatalogTable injection.
seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/HdfsFileSourceConfigTest.java Updates tests for new HdfsFileSourceConfig constructor signature and schema injection.
seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java Enables schema discovery and adds metalake_type option.
seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java Accepts discovered CatalogTable list.
seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/config/MultipleTableS3FileSourceConfig.java Accepts discovered CatalogTable list and per-table schema injection.
seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/config/S3FileSourceConfig.java Accepts CatalogTable injection.
seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java Enables schema discovery and adds metalake_type option.
seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java Accepts discovered CatalogTable list.
seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/config/MultipleTableOssFileSourceConfig.java Accepts discovered CatalogTable list and per-table schema injection.
seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/config/OssFileSourceConfig.java Accepts CatalogTable injection.
seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java Enables schema discovery and adds metalake_type option.
seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java Accepts discovered CatalogTable list.
seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/MultipleTableFTPFileSourceConfig.java Accepts discovered CatalogTable list and per-table schema injection.
seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FTPFileSourceConfig.java Accepts CatalogTable injection.
seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java Enables schema discovery and adds metalake_type option.
seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java Accepts discovered CatalogTable list.
seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/config/MultipleTableSFTPFileSourceConfig.java Accepts discovered CatalogTable list and per-table schema injection.
seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/config/SFTPFileSourceConfig.java Accepts CatalogTable injection.
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMetaLakeIT.java Adds E2E integration test standing up MySQL + Gravitino + SeaTunnel to validate schema_url behavior.
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/local_file_csv_to_local_file_csv_with_metalake.conf Adds E2E job config demonstrating mixed fields and schema_url.
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_to_local_file_with_metalake.conf Adds E2E job config variant for text/csv schema setups with MetaLake.
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/data/table1.csv Adds test input data for E2E.
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/data/table2.csv Adds test input data for E2E.
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/pom.xml Adds MySQL testcontainers + MySQL JDBC driver for E2E MetaLake test.
docs/zh/introduction/concepts/schema-feature.md Documents schema_url usage and examples (ZH).
docs/en/introduction/concepts/schema-feature.md Documents schema_url usage and examples (EN).
docs/zh/introduction/concepts/gravitino-type-mapping.md Adds Gravitino type mapping doc (ZH).
docs/en/introduction/concepts/gravitino-type-mapping.md Adds Gravitino type mapping doc (EN).
docs/zh/connectors/source/LocalFile.md Documents schema_url/metalake_type for LocalFile (ZH).
docs/en/connectors/source/LocalFile.md Documents schema_url/metalake_type for LocalFile (EN).
docs/zh/connectors/source/HdfsFile.md Documents schema_url/metalake_type for HdfsFile (ZH).
docs/en/connectors/source/HdfsFile.md Documents schema_url/metalake_type for HdfsFile (EN).
docs/zh/connectors/source/S3File.md Documents schema_url/metalake_type for S3File (ZH).
docs/en/connectors/source/S3File.md Documents schema_url/metalake_type for S3File (EN).
docs/zh/connectors/source/OssFile.md Documents schema_url/metalake_type for OssFile (ZH).
docs/en/connectors/source/OssFile.md Documents schema_url/metalake_type for OssFile (EN).
docs/zh/connectors/source/FtpFile.md Documents schema_url/metalake_type for FtpFile (ZH).
docs/en/connectors/source/FtpFile.md Documents schema_url/metalake_type for FtpFile (EN).
docs/zh/connectors/source/SftpFile.md Documents schema_url/metalake_type for SftpFile (ZH).
docs/en/connectors/source/SftpFile.md Documents schema_url/metalake_type for SftpFile (EN).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature][seatunnel-api] Non-relational databases use Gravitino's metadata restApi instead of schema configuration

6 participants