Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1948,4 +1948,91 @@ private Constants() {
* AWS S3 PutObject API Documentation</a>
*/
public static final String IF_NONE_MATCH_STAR = "*";

// ==================== AWS Client Shared Thread Pool Configuration ===========
// These settings control whether AWS SDK clients share a thread pool
// instead of each client creating its own. This prevents thread leaks
// when many S3A filesystem instances are created.

/**
* Default thread pool size for AWS client shared thread pools: {@value}.
*/
public static final int AWS_CLIENT_SHARED_THREADPOOL_SIZE_DEFAULT = 5;

/**
* Default keepalive in seconds for AWS client shared thread pool: {@value}.
*/
public static final int AWS_CLIENT_SHARED_THREADPOOL_KEEPALIVE_DEFAULT = 60;

/**
* Enable shared thread pool for AWS S3 sync client: {@value}.
*/
public static final String AWS_S3_CLIENT_SHARED_THREADPOOL_ENABLED =
"fs.s3a.aws.s3.client.shared.threadpool.enabled";

/**
* Thread pool size for AWS S3 sync client shared thread pool: {@value}.
*/
public static final String AWS_S3_CLIENT_SHARED_THREADPOOL_SIZE =
"fs.s3a.aws.s3.client.shared.threadpool.size";

/**
* Keepalive in seconds for AWS S3 sync client shared thread pool: {@value}.
*/
public static final String AWS_S3_CLIENT_SHARED_THREADPOOL_KEEPALIVE =
"fs.s3a.aws.s3.client.shared.threadpool.keepalive.seconds";

/**
* Enable shared thread pool for AWS S3 async client: {@value}.
*/
public static final String AWS_S3_ASYNC_CLIENT_SHARED_THREADPOOL_ENABLED =
"fs.s3a.aws.s3.async.client.shared.threadpool.enabled";

/**
* Thread pool size for AWS S3 async client shared thread pool: {@value}.
*/
public static final String AWS_S3_ASYNC_CLIENT_SHARED_THREADPOOL_SIZE =
"fs.s3a.aws.s3.async.client.shared.threadpool.size";

/**
* Keepalive in seconds for AWS S3 async client shared thread pool: {@value}.
*/
public static final String AWS_S3_ASYNC_CLIENT_SHARED_THREADPOOL_KEEPALIVE =
"fs.s3a.aws.s3.async.client.shared.threadpool.keepalive.seconds";

/**
* Enable shared thread pool for AWS STS client: {@value}.
*/
public static final String AWS_STS_CLIENT_SHARED_THREADPOOL_ENABLED =
"fs.s3a.aws.sts.client.shared.threadpool.enabled";

/**
* Thread pool size for AWS STS client shared thread pool: {@value}.
*/
public static final String AWS_STS_CLIENT_SHARED_THREADPOOL_SIZE =
"fs.s3a.aws.sts.client.shared.threadpool.size";

/**
* Keepalive in seconds for AWS STS client shared thread pool: {@value}.
*/
public static final String AWS_STS_CLIENT_SHARED_THREADPOOL_KEEPALIVE =
"fs.s3a.aws.sts.client.shared.threadpool.keepalive.seconds";

/**
* Enable shared thread pool for AWS KMS client: {@value}.
*/
public static final String AWS_KMS_CLIENT_SHARED_THREADPOOL_ENABLED =
"fs.s3a.aws.kms.client.shared.threadpool.enabled";

/**
* Thread pool size for AWS KMS client shared thread pool: {@value}.
*/
public static final String AWS_KMS_CLIENT_SHARED_THREADPOOL_SIZE =
"fs.s3a.aws.kms.client.shared.threadpool.size";

/**
* Keepalive in seconds for AWS KMS client shared thread pool: {@value}.
*/
public static final String AWS_KMS_CLIENT_SHARED_THREADPOOL_KEEPALIVE =
"fs.s3a.aws.kms.client.shared.threadpool.keepalive.seconds";
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.s3a.impl.AWSClientConfig;
import org.apache.hadoop.fs.s3a.impl.LazySharedThreadPoolHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -63,17 +65,23 @@
import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_ACCESS_GRANTS_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_ACCESS_GRANTS_FALLBACK_TO_IAM_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_ASYNC_CLIENT_SHARED_THREADPOOL_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_ASYNC_CLIENT_SHARED_THREADPOOL_KEEPALIVE;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_ASYNC_CLIENT_SHARED_THREADPOOL_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CLIENT_SHARED_THREADPOOL_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CLIENT_SHARED_THREADPOOL_KEEPALIVE;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CLIENT_SHARED_THREADPOOL_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CROSS_REGION_ACCESS_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CROSS_REGION_ACCESS_ENABLED_DEFAULT;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_DEFAULT_REGION;
import static org.apache.hadoop.fs.s3a.Constants.AWS_SERVICE_IDENTIFIER_S3;
import static org.apache.hadoop.fs.s3a.Constants.CENTRAL_ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_SECURE_CONNECTIONS;
import static org.apache.hadoop.fs.s3a.Constants.FIPS_ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_CLASS_NAME;
import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_ENABLED_DEFAULT;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_SECURE_CONNECTIONS;
import static org.apache.hadoop.fs.s3a.Constants.SECURE_CONNECTIONS;
import static org.apache.hadoop.fs.s3a.Constants.AWS_SERVICE_IDENTIFIER_S3;
import static org.apache.hadoop.fs.s3a.auth.SignerFactory.createHttpSigner;
import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.REQUESTER_PAYS_HEADER;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AUTH_SCHEME_AWS_SIGV_4;
Expand All @@ -97,6 +105,26 @@ public class DefaultS3ClientFactory extends Configured
private static final Pattern VPC_ENDPOINT_PATTERN =
Pattern.compile("^(?:.+\\.)?([a-z0-9-]+)\\.vpce\\.amazonaws\\.(?:com|com\\.cn)$");

/**
* Shared executor for S3 sync clients.
*/
private static final LazySharedThreadPoolHolder S3_SYNC_EXECUTOR =
new LazySharedThreadPoolHolder(
AWS_S3_CLIENT_SHARED_THREADPOOL_ENABLED,
AWS_S3_CLIENT_SHARED_THREADPOOL_SIZE,
AWS_S3_CLIENT_SHARED_THREADPOOL_KEEPALIVE,
"s3a-s3-sync-scheduler");

/**
* Shared executor for S3 async clients.
*/
private static final LazySharedThreadPoolHolder S3_ASYNC_EXECUTOR =
new LazySharedThreadPoolHolder(
AWS_S3_ASYNC_CLIENT_SHARED_THREADPOOL_ENABLED,
AWS_S3_ASYNC_CLIENT_SHARED_THREADPOOL_SIZE,
AWS_S3_ASYNC_CLIENT_SHARED_THREADPOOL_KEEPALIVE,
"s3a-s3-async-scheduler");

/**
* Subclasses refer to this.
*/
Expand Down Expand Up @@ -235,8 +263,10 @@ private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> Build
.pathStyleAccessEnabled(parameters.isPathStyleAccess())
.build();

final ClientOverrideConfiguration.Builder override =
createClientOverrideConfiguration(parameters, conf);
final ClientOverrideConfiguration.Builder override = createClientOverrideConfiguration(
parameters,
conf,
builder instanceof S3AsyncClientBuilder);

S3BaseClientBuilder<BuilderT, ClientT> s3BaseClientBuilder = builder
.overrideConfiguration(override.build())
Expand Down Expand Up @@ -265,13 +295,14 @@ private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> Build
* Create an override configuration for an S3 client.
* @param parameters parameter object
* @param conf configuration object
* @throws IOException any IOE raised, or translated exception
* @throws RuntimeException some failures creating an http signer
* @param isAsync true for async client, false for sync client
* @return the override configuration
* @throws IOException any IOE raised, or translated exception
* @throws RuntimeException some failures creating an http signer
*/
protected ClientOverrideConfiguration.Builder createClientOverrideConfiguration(
S3ClientCreationParameters parameters, Configuration conf) throws IOException {
S3ClientCreationParameters parameters, Configuration conf, boolean isAsync)
throws IOException {
final ClientOverrideConfiguration.Builder clientOverrideConfigBuilder =
AWSClientConfig.createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3);

Expand Down Expand Up @@ -302,6 +333,13 @@ protected ClientOverrideConfiguration.Builder createClientOverrideConfiguration(
final RetryPolicy.Builder retryPolicyBuilder = AWSClientConfig.createRetryPolicyBuilder(conf);
clientOverrideConfigBuilder.retryPolicy(retryPolicyBuilder.build());

ScheduledExecutorService executor = isAsync
? S3_ASYNC_EXECUTOR.get(conf)
: S3_SYNC_EXECUTOR.get(conf);
if (executor != null) {
clientOverrideConfigBuilder.scheduledExecutorService(executor);
}

return clientOverrideConfigBuilder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
Expand All @@ -35,6 +36,7 @@
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
import software.amazon.awssdk.services.sts.model.Credentials;
import software.amazon.awssdk.services.sts.model.GetSessionTokenRequest;
import org.apache.hadoop.fs.s3a.impl.LazySharedThreadPoolHolder;
import org.apache.hadoop.fs.s3a.impl.AWSClientConfig;
import org.apache.hadoop.util.Preconditions;

Expand All @@ -51,6 +53,9 @@
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.fs.s3a.Constants.AWS_SERVICE_IDENTIFIER_STS;
import static org.apache.hadoop.fs.s3a.Constants.AWS_STS_CLIENT_SHARED_THREADPOOL_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.AWS_STS_CLIENT_SHARED_THREADPOOL_KEEPALIVE;
import static org.apache.hadoop.fs.s3a.Constants.AWS_STS_CLIENT_SHARED_THREADPOOL_SIZE;
import static org.apache.hadoop.fs.s3a.auth.delegation.DelegationConstants.*;

/**
Expand All @@ -63,6 +68,16 @@ public class STSClientFactory {
private static final Logger LOG =
LoggerFactory.getLogger(STSClientFactory.class);

/**
* Shared executor for STS clients.
*/
private static final LazySharedThreadPoolHolder STS_EXECUTOR =
new LazySharedThreadPoolHolder(
AWS_STS_CLIENT_SHARED_THREADPOOL_ENABLED,
AWS_STS_CLIENT_SHARED_THREADPOOL_SIZE,
AWS_STS_CLIENT_SHARED_THREADPOOL_KEEPALIVE,
"s3a-sts-scheduler");

/**
* Create the builder ready for any final configuration options.
* Picks up connection settings from the Hadoop configuration, including
Expand Down Expand Up @@ -139,6 +154,10 @@ public static StsClientBuilder builder(final AwsCredentialsProvider credentials,
final ProxyConfiguration proxyConfig = AWSClientConfig.createProxyConfiguration(conf, bucket);

clientOverrideConfigBuilder.retryPolicy(retryPolicyBuilder.build());
ScheduledExecutorService executor = STS_EXECUTOR.get(conf);
if (executor != null) {
clientOverrideConfigBuilder.scheduledExecutorService(executor);
}
httpClientBuilder.proxyConfiguration(proxyConfig);

stsClientBuilder.httpClientBuilder(httpClientBuilder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import java.io.IOException;
import java.net.URI;
import java.util.concurrent.ScheduledExecutorService;

import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kms.KmsClient;
import software.amazon.awssdk.services.kms.KmsClientBuilder;
Expand All @@ -39,13 +41,26 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.functional.LazyAtomicReference;

import static org.apache.hadoop.fs.s3a.Constants.AWS_KMS_CLIENT_SHARED_THREADPOOL_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.AWS_KMS_CLIENT_SHARED_THREADPOOL_KEEPALIVE;
import static org.apache.hadoop.fs.s3a.Constants.AWS_KMS_CLIENT_SHARED_THREADPOOL_SIZE;
import static org.apache.hadoop.fs.s3a.impl.InstantiationIOException.unavailable;

/**
* Factory class to create encrypted s3 client and encrypted async s3 client.
*/
public class EncryptionS3ClientFactory extends DefaultS3ClientFactory {

/**
* Shared executor for KMS clients.
*/
private static final LazySharedThreadPoolHolder KMS_EXECUTOR =
new LazySharedThreadPoolHolder(
AWS_KMS_CLIENT_SHARED_THREADPOOL_ENABLED,
AWS_KMS_CLIENT_SHARED_THREADPOOL_SIZE,
AWS_KMS_CLIENT_SHARED_THREADPOOL_KEEPALIVE,
"s3a-kms-scheduler");

/**
* Encryption client class name.
* value: {@value}
Expand Down Expand Up @@ -201,6 +216,13 @@ private S3Client createS3EncryptionClient(S3ClientCreationParameters parameters)
private Keyring createKmsKeyring(S3ClientCreationParameters parameters,
CSEMaterials cseMaterials) {
KmsClientBuilder kmsClientBuilder = KmsClient.builder();
ScheduledExecutorService executor = KMS_EXECUTOR.get(cseMaterials.getConf());
if (executor != null) {
kmsClientBuilder.overrideConfiguration(
ClientOverrideConfiguration.builder()
.scheduledExecutorService(executor)
.build());
}
if (parameters.getCredentialSet() != null) {
kmsClientBuilder.credentialsProvider(parameters.getCredentialSet());
}
Expand Down
Loading
Loading