Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1227,7 +1227,7 @@
return this.isExpectHeaderEnabled;
}

public boolean accountThrottlingEnabled() {

Check failure on line 1230 in hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java#L1230

javadoc: warning: no comment
return accountThrottlingEnabled;
}

Expand All @@ -1240,9 +1240,9 @@
}

public boolean getCreateRemoteFileSystemDuringInitialization() {
// we do not support creating the filesystem when AuthType is SAS
// we do not support creating the filesystem when AuthType is SAS or UserboundSASWithOAuth
return this.createRemoteFileSystemDuringInitialization
&& this.getAuthType(this.accountName) != AuthType.SAS;
&& !(validateForSASType(this.getAuthType(this.accountName)));
}

public boolean getSkipUserGroupMetadataDuringInitialization() {
Expand Down Expand Up @@ -1413,9 +1413,14 @@
return this.trackLatency;
}

public boolean validateForSASType(AuthType authType){
return authType == AuthType.SAS
|| authType == AuthType.UserboundSASWithOAuth;
}

public AccessTokenProvider getTokenProvider() throws TokenAccessProviderException {
AuthType authType = getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey);
if (authType == AuthType.OAuth) {
if (authType == AuthType.OAuth || authType == AuthType.UserboundSASWithOAuth) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is with oauth check

try {
Class<? extends AccessTokenProvider> tokenProviderClass =
getTokenProviderClass(authType,
Expand Down Expand Up @@ -1563,7 +1568,7 @@
* the AbfsConfiguration with which a filesystem is initialized, and eliminate
* chances of dynamic modifications and spurious situations.<br>
* @return sasTokenProvider object based on configurations provided
* @throws AzureBlobFileSystemException
* @throws AzureBlobFileSystemException if SAS token provider initialization fails
*/
public SASTokenProvider getSASTokenProvider() throws AzureBlobFileSystemException {
AuthType authType = getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey);
Expand Down Expand Up @@ -1610,6 +1615,70 @@
}
}

/**
* Returns the SASTokenProvider implementation to be used to generate user-bound SAS token.
* Custom implementation of {@link SASTokenProvider} under th config
* "fs.azure.sas.token.provider.type" needs to be provided.
* @param authType authentication type
* @return sasTokenProvider object based on configurations provided
* @throws AzureBlobFileSystemException is user-bound SAS token provider initialization fails
*/
public SASTokenProvider getUserBoundSASTokenProvider(AuthType authType)
throws AzureBlobFileSystemException {

try {
Class<? extends SASTokenProvider> customSasTokenProviderImplementation =
getTokenProviderClass(authType, FS_AZURE_SAS_TOKEN_PROVIDER_TYPE,
null, SASTokenProvider.class);

if (customSasTokenProviderImplementation == null) {
Comment thread
anujmodi2021 marked this conversation as resolved.
throw new SASTokenProviderException(String.format(
"\"%s\" must be set for user-bound SAS auth type.",
FS_AZURE_SAS_TOKEN_PROVIDER_TYPE));
}

SASTokenProvider sasTokenProvider = ReflectionUtils.newInstance(
customSasTokenProviderImplementation, rawConfig);
if (sasTokenProvider == null) {
throw new SASTokenProviderException(String.format(
"Failed to initialize %s", customSasTokenProviderImplementation));
}
LOG.trace("Initializing {}", customSasTokenProviderImplementation.getName());
sasTokenProvider.initialize(rawConfig, accountName);
LOG.trace("{} init complete", customSasTokenProviderImplementation.getName());
return sasTokenProvider;
} catch (SASTokenProviderException e) {
throw e;
} catch (Exception e) {
throw new SASTokenProviderException(
"Unable to load user-bound SAS token provider class: " + e, e);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why e 2 times ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e is included in the message string and also passed as the cause in the exception constructor

}
}

/**
* Returns both the AccessTokenProvider and the SASTokenProvider
* when auth type is UserboundSASWithOAuth.
*
* @return Object[] where:
* [0] = AccessTokenProvider
* [1] = SASTokenProvider
* @throws AzureBlobFileSystemException if provider initialization fails
*/
public Object[] getUserBoundSASBothTokenProviders()
throws AzureBlobFileSystemException {
AuthType authType = getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME,
AuthType.SharedKey);
if (authType != AuthType.UserboundSASWithOAuth) {
throw new SASTokenProviderException(String.format(
"Invalid auth type: %s is being used, expecting user-bound SAS.",
authType));
}

AccessTokenProvider tokenProvider = getTokenProvider();
SASTokenProvider sasTokenProvider = getUserBoundSASTokenProvider(authType);
return new Object[]{tokenProvider, sasTokenProvider};
}

public EncryptionContextProvider createEncryptionContextProvider() {
try {
String configKey = FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_CREATE_ON_ROOT;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_INVALID_ABFS_STATE;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.UNAUTHORIZED_SAS;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.UNAUTHORIZED_USER_BOUND_SAS;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator;
Expand Down Expand Up @@ -271,6 +272,29 @@ public void initialize(URI uri, Configuration configuration)
throw new InvalidConfigurationValueException(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, ex);
}

/**
* Validates that User-bound SAS with OAuth is not used for FNS (non-hierarchical namespace) accounts.
* Throws an InvalidConfigurationValueException if this configuration is detected.
*
* @throws InvalidConfigurationValueException if User-bound SAS with OAuth is configured for FNS accounts
*/
try {
if (abfsConfiguration.getAuthType(abfsConfiguration.getAccountName())
== AuthType.UserboundSASWithOAuth && // Auth type is User-bound SAS
!tryGetIsNamespaceEnabled(
new TracingContext(initFSTracingContext))) { // Account is FNS
throw new InvalidConfigurationValueException(UNAUTHORIZED_USER_BOUND_SAS);
}
} catch (InvalidConfigurationValueException ex) {
LOG.error("User-bound SAS not supported for FNS Accounts", ex);
throw ex;
} catch (AzureBlobFileSystemException ex) {
LOG.error("Failed to determine account type for auth type validation",
ex);
throw new InvalidConfigurationValueException(
FS_AZURE_ACCOUNT_IS_HNS_ENABLED, ex);
}

/*
* Non-hierarchical-namespace account can not have a customer-provided-key(CPK).
* Fail initialization of filesystem if the configs are provided. CPK is of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1754,11 +1754,20 @@ private void initializeClient(URI uri, String fileSystemName,
} else if (authType == AuthType.SAS) {
LOG.trace("Fetching SAS Token Provider");
sasTokenProvider = abfsConfiguration.getSASTokenProvider();
} else if (authType == AuthType.UserboundSASWithOAuth) {
LOG.trace("Fetching SAS and OAuth Token Provider for user bound SAS");
AzureADAuthenticator.init(abfsConfiguration);
Object[] providers
= abfsConfiguration.getUserBoundSASBothTokenProviders();
tokenProvider = (AccessTokenProvider) providers[0];
sasTokenProvider = (SASTokenProvider) providers[1];
ExtensionHelper.bind(tokenProvider, uri,
abfsConfiguration.getRawConfiguration());
} else {
LOG.trace("Fetching token provider");
tokenProvider = abfsConfiguration.getTokenProvider();
ExtensionHelper.bind(tokenProvider, uri,
abfsConfiguration.getRawConfiguration());
abfsConfiguration.getRawConfiguration());
}

// Encryption setup
Expand All @@ -1782,16 +1791,11 @@ private void initializeClient(URI uri, String fileSystemName,
}
}

LOG.trace("Initializing AbfsClient for {}", baseUrl);
if (tokenProvider != null) {
this.clientHandler = new AbfsClientHandler(baseUrl, creds, abfsConfiguration,
tokenProvider, encryptionContextProvider,
populateAbfsClientContext());
} else {
this.clientHandler = new AbfsClientHandler(baseUrl, creds, abfsConfiguration,
sasTokenProvider, encryptionContextProvider,
populateAbfsClientContext());
}
LOG.trace("Initializing AbfsClientHandler for {}", baseUrl);
this.clientHandler = new AbfsClientHandler(baseUrl, creds,
abfsConfiguration,
tokenProvider, sasTokenProvider, encryptionContextProvider,
populateAbfsClientContext());

this.setClient(getClientHandler().getClient());
LOG.trace("AbfsClient init complete");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public final class AbfsHttpConstants {
public static final String APPLICATION_JSON = "application/json";
public static final String APPLICATION_OCTET_STREAM = "application/octet-stream";
public static final String APPLICATION_XML = "application/xml";
public static final String APPLICATION_X_WWW_FORM_URLENCODED = "application/x-www-form-urlencoded";
public static final String XMS_PROPERTIES_ENCODING_ASCII = "ISO-8859-1";
public static final String XMS_PROPERTIES_ENCODING_UNICODE = "UTF-8";

Expand Down Expand Up @@ -189,7 +190,8 @@ public enum ApiVersion {
DEC_12_2019("2019-12-12"),
APR_10_2021("2021-04-10"),
AUG_03_2023("2023-08-03"),
NOV_04_2024("2024-11-04");
NOV_04_2024("2024-11-04"),
JUL_05_2025("2025-07-05");

private final String xMsApiVersion;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,22 +186,10 @@ public AbfsBlobClient(final URL baseUrl,
final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final AccessTokenProvider tokenProvider,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
super(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider,
encryptionContextProvider, abfsClientContext, AbfsServiceType.BLOB);
this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
abfsConfiguration.getAzureAtomicRenameDirs()
.split(AbfsHttpConstants.COMMA)));
}

public AbfsBlobClient(final URL baseUrl,
final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final SASTokenProvider sasTokenProvider,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
super(baseUrl, sharedKeyCredentials, abfsConfiguration, sasTokenProvider,
super(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider, sasTokenProvider,
encryptionContextProvider, abfsClientContext, AbfsServiceType.BLOB);
this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
abfsConfiguration.getAzureAtomicRenameDirs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,22 +344,24 @@ private AbfsClient(final URL baseUrl,
LOG.trace("primaryUserGroup is {}", this.primaryUserGroup);
}

public AbfsClient(final URL baseUrl,
final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final AccessTokenProvider tokenProvider,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext,
final AbfsServiceType abfsServiceType)
throws IOException {
this(baseUrl, sharedKeyCredentials, abfsConfiguration,
encryptionContextProvider, abfsClientContext, abfsServiceType);
this.tokenProvider = tokenProvider;
}

/**
* Constructs an AbfsClient instance with all authentication and configuration options.
*
* @param baseUrl The base URL for the ABFS endpoint.
* @param sharedKeyCredentials Shared key credentials for authentication.
* @param abfsConfiguration The ABFS configuration.
* @param tokenProvider The access token provider for OAuth authentication.
* @param sasTokenProvider The SAS token provider for SAS authentication.
* @param encryptionContextProvider The encryption context provider.
* @param abfsClientContext The client context
* @param abfsServiceType The ABFS service type (e.g., Blob, DFS).
* @throws IOException if initialization fails.
*/
public AbfsClient(final URL baseUrl,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java doc missing

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

Comment thread
anujmodi2021 marked this conversation as resolved.
final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final AccessTokenProvider tokenProvider,
final SASTokenProvider sasTokenProvider,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext,
Expand All @@ -368,6 +370,7 @@ public AbfsClient(final URL baseUrl,
this(baseUrl, sharedKeyCredentials, abfsConfiguration,
encryptionContextProvider, abfsClientContext, abfsServiceType);
this.sasTokenProvider = sasTokenProvider;
this.tokenProvider = tokenProvider;
}

@Override
Expand Down Expand Up @@ -1179,7 +1182,7 @@ protected String appendSASTokenToQuery(String path,
String cachedSasToken)
throws SASTokenProviderException {
String sasToken = null;
if (this.authType == AuthType.SAS) {
if (getAbfsConfiguration().validateForSASType(this.authType)) {
try {
LOG.trace("Fetch SAS token for {} on {}", operation, path);
if (cachedSasToken == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,26 @@ public class AbfsClientHandler implements Closeable {
private final AbfsDfsClient dfsAbfsClient;
private final AbfsBlobClient blobAbfsClient;

public AbfsClientHandler(final URL baseUrl,
final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final AccessTokenProvider tokenProvider,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials,
abfsConfiguration, tokenProvider, null, encryptionContextProvider,
abfsClientContext);
this.blobAbfsClient = createBlobClient(baseUrl, sharedKeyCredentials,
abfsConfiguration, tokenProvider, null, encryptionContextProvider,
abfsClientContext);
initServiceType(abfsConfiguration);
}

/**
* Constructs an AbfsClientHandler instance.
*
* Initializes the default and ingress service types from the provided configuration,
* then creates both DFS and Blob clients using the given params
*
* @param baseUrl the base URL for the file system.
* @param sharedKeyCredentials credentials for shared key authentication.
* @param abfsConfiguration the ABFS configuration.
* @param tokenProvider the access token provider, may be null.
* @param sasTokenProvider the SAS token provider, may be null.
* @param encryptionContextProvider the encryption context provider
* @param abfsClientContext the ABFS client context.
* @throws IOException if client creation or URL conversion fails.
*/
public AbfsClientHandler(final URL baseUrl,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java doc missing for the constructor

Comment thread
anujmodi2021 marked this conversation as resolved.
final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final AccessTokenProvider tokenProvider,
final SASTokenProvider sasTokenProvider,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
Expand All @@ -73,10 +75,10 @@ public AbfsClientHandler(final URL baseUrl,
// only for default client.
initServiceType(abfsConfiguration);
this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials,
abfsConfiguration, null, sasTokenProvider, encryptionContextProvider,
abfsConfiguration, tokenProvider, sasTokenProvider, encryptionContextProvider,
abfsClientContext);
this.blobAbfsClient = createBlobClient(baseUrl, sharedKeyCredentials,
abfsConfiguration, null, sasTokenProvider, encryptionContextProvider,
abfsConfiguration, tokenProvider, sasTokenProvider, encryptionContextProvider,
abfsClientContext);
}

Expand Down Expand Up @@ -154,17 +156,13 @@ private AbfsDfsClient createDfsClient(final URL baseUrl,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
URL dfsUrl = changeUrlFromBlobToDfs(baseUrl);
if (tokenProvider != null) {
LOG.debug("Creating AbfsDfsClient with access token provider using the URL: {}", dfsUrl);
return new AbfsDfsClient(dfsUrl, creds, abfsConfiguration,
tokenProvider, encryptionContextProvider,
abfsClientContext);
} else {
LOG.debug("Creating AbfsDfsClient with SAS token provider using the URL: {}", dfsUrl);
return new AbfsDfsClient(dfsUrl, creds, abfsConfiguration,
sasTokenProvider, encryptionContextProvider,
abfsClientContext);
}
LOG.debug(
"Creating AbfsDfsClient with access token provider: %s and "
+ "SAS token provider: %s using the URL: %s",
tokenProvider, sasTokenProvider, dfsUrl);
return new AbfsDfsClient(dfsUrl, creds, abfsConfiguration,
tokenProvider, sasTokenProvider, encryptionContextProvider,
abfsClientContext);
}

/**
Expand All @@ -188,17 +186,13 @@ private AbfsBlobClient createBlobClient(final URL baseUrl,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
URL blobUrl = changeUrlFromDfsToBlob(baseUrl);
if (tokenProvider != null) {
LOG.debug("Creating AbfsBlobClient with access token provider using the URL: {}", blobUrl);
return new AbfsBlobClient(blobUrl, creds, abfsConfiguration,
tokenProvider, encryptionContextProvider,
abfsClientContext);
} else {
LOG.debug("Creating AbfsBlobClient with SAS token provider using the URL: {}", blobUrl);
return new AbfsBlobClient(blobUrl, creds, abfsConfiguration,
sasTokenProvider, encryptionContextProvider,
abfsClientContext);
}
LOG.debug(
"Creating AbfsBlobClient with access token provider: %s and "
+ "SAS token provider: %s using the URL: %s",
tokenProvider, sasTokenProvider, blobUrl);
return new AbfsBlobClient(blobUrl, creds, abfsConfiguration,
tokenProvider, sasTokenProvider, encryptionContextProvider,
abfsClientContext);
}

@Override
Expand Down
Loading
Loading