-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HADOOP-19736: ABFS. Support for new auth type: User-bound SAS #8051
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
05b52e4
fc9251f
4c181df
0411d9b
a33f682
c9a994e
0a168e9
5c9cfd5
91afee5
58fc8b3
09139dc
ec2c76c
0bbbd36
d3f7108
040b36d
6d523ac
2b1ecc7
8a235f5
7d43417
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1227,7 +1227,7 @@ | |
| return this.isExpectHeaderEnabled; | ||
| } | ||
|
|
||
| public boolean accountThrottlingEnabled() { | ||
|
Check failure on line 1230 in hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
|
||
| return accountThrottlingEnabled; | ||
| } | ||
|
|
||
|
|
@@ -1240,9 +1240,9 @@ | |
| } | ||
|
|
||
| public boolean getCreateRemoteFileSystemDuringInitialization() { | ||
| // we do not support creating the filesystem when AuthType is SAS | ||
| // we do not support creating the filesystem when AuthType is SAS or UserboundSASWithOAuth | ||
| return this.createRemoteFileSystemDuringInitialization | ||
| && this.getAuthType(this.accountName) != AuthType.SAS; | ||
| && !(validateForSASType(this.getAuthType(this.accountName))); | ||
| } | ||
|
|
||
| public boolean getSkipUserGroupMetadataDuringInitialization() { | ||
|
|
@@ -1413,9 +1413,14 @@ | |
| return this.trackLatency; | ||
| } | ||
|
|
||
| public boolean validateForSASType(AuthType authType){ | ||
| return authType == AuthType.SAS | ||
| || authType == AuthType.UserboundSASWithOAuth; | ||
| } | ||
|
|
||
| public AccessTokenProvider getTokenProvider() throws TokenAccessProviderException { | ||
| AuthType authType = getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey); | ||
| if (authType == AuthType.OAuth) { | ||
| if (authType == AuthType.OAuth || authType == AuthType.UserboundSASWithOAuth) { | ||
| try { | ||
| Class<? extends AccessTokenProvider> tokenProviderClass = | ||
| getTokenProviderClass(authType, | ||
|
|
@@ -1563,7 +1568,7 @@ | |
| * the AbfsConfiguration with which a filesystem is initialized, and eliminate | ||
| * chances of dynamic modifications and spurious situations.<br> | ||
| * @return sasTokenProvider object based on configurations provided | ||
| * @throws AzureBlobFileSystemException | ||
| * @throws AzureBlobFileSystemException if SAS token provider initialization fails | ||
| */ | ||
| public SASTokenProvider getSASTokenProvider() throws AzureBlobFileSystemException { | ||
| AuthType authType = getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey); | ||
|
|
@@ -1610,6 +1615,70 @@ | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Returns the SASTokenProvider implementation to be used to generate user-bound SAS token. | ||
| * Custom implementation of {@link SASTokenProvider} under th config | ||
| * "fs.azure.sas.token.provider.type" needs to be provided. | ||
| * @param authType authentication type | ||
| * @return sasTokenProvider object based on configurations provided | ||
| * @throws AzureBlobFileSystemException is user-bound SAS token provider initialization fails | ||
| */ | ||
| public SASTokenProvider getUserBoundSASTokenProvider(AuthType authType) | ||
| throws AzureBlobFileSystemException { | ||
|
|
||
| try { | ||
| Class<? extends SASTokenProvider> customSasTokenProviderImplementation = | ||
| getTokenProviderClass(authType, FS_AZURE_SAS_TOKEN_PROVIDER_TYPE, | ||
| null, SASTokenProvider.class); | ||
|
|
||
| if (customSasTokenProviderImplementation == null) { | ||
|
anujmodi2021 marked this conversation as resolved.
|
||
| throw new SASTokenProviderException(String.format( | ||
| "\"%s\" must be set for user-bound SAS auth type.", | ||
| FS_AZURE_SAS_TOKEN_PROVIDER_TYPE)); | ||
| } | ||
|
|
||
| SASTokenProvider sasTokenProvider = ReflectionUtils.newInstance( | ||
| customSasTokenProviderImplementation, rawConfig); | ||
| if (sasTokenProvider == null) { | ||
| throw new SASTokenProviderException(String.format( | ||
| "Failed to initialize %s", customSasTokenProviderImplementation)); | ||
| } | ||
| LOG.trace("Initializing {}", customSasTokenProviderImplementation.getName()); | ||
| sasTokenProvider.initialize(rawConfig, accountName); | ||
| LOG.trace("{} init complete", customSasTokenProviderImplementation.getName()); | ||
| return sasTokenProvider; | ||
| } catch (SASTokenProviderException e) { | ||
| throw e; | ||
| } catch (Exception e) { | ||
| throw new SASTokenProviderException( | ||
| "Unable to load user-bound SAS token provider class: " + e, e); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why e 2 times ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. e is included in the message string and also passed as the cause in the exception constructor |
||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Returns both the AccessTokenProvider and the SASTokenProvider | ||
| * when auth type is UserboundSASWithOAuth. | ||
| * | ||
| * @return Object[] where: | ||
| * [0] = AccessTokenProvider | ||
| * [1] = SASTokenProvider | ||
| * @throws AzureBlobFileSystemException if provider initialization fails | ||
| */ | ||
| public Object[] getUserBoundSASBothTokenProviders() | ||
| throws AzureBlobFileSystemException { | ||
| AuthType authType = getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, | ||
| AuthType.SharedKey); | ||
| if (authType != AuthType.UserboundSASWithOAuth) { | ||
| throw new SASTokenProviderException(String.format( | ||
| "Invalid auth type: %s is being used, expecting user-bound SAS.", | ||
| authType)); | ||
| } | ||
|
|
||
| AccessTokenProvider tokenProvider = getTokenProvider(); | ||
| SASTokenProvider sasTokenProvider = getUserBoundSASTokenProvider(authType); | ||
| return new Object[]{tokenProvider, sasTokenProvider}; | ||
| } | ||
|
|
||
| public EncryptionContextProvider createEncryptionContextProvider() { | ||
| try { | ||
| String configKey = FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -344,22 +344,24 @@ private AbfsClient(final URL baseUrl, | |
| LOG.trace("primaryUserGroup is {}", this.primaryUserGroup); | ||
| } | ||
|
|
||
| public AbfsClient(final URL baseUrl, | ||
| final SharedKeyCredentials sharedKeyCredentials, | ||
| final AbfsConfiguration abfsConfiguration, | ||
| final AccessTokenProvider tokenProvider, | ||
| final EncryptionContextProvider encryptionContextProvider, | ||
| final AbfsClientContext abfsClientContext, | ||
| final AbfsServiceType abfsServiceType) | ||
| throws IOException { | ||
| this(baseUrl, sharedKeyCredentials, abfsConfiguration, | ||
| encryptionContextProvider, abfsClientContext, abfsServiceType); | ||
| this.tokenProvider = tokenProvider; | ||
| } | ||
|
|
||
| /** | ||
| * Constructs an AbfsClient instance with all authentication and configuration options. | ||
| * | ||
| * @param baseUrl The base URL for the ABFS endpoint. | ||
| * @param sharedKeyCredentials Shared key credentials for authentication. | ||
| * @param abfsConfiguration The ABFS configuration. | ||
| * @param tokenProvider The access token provider for OAuth authentication. | ||
| * @param sasTokenProvider The SAS token provider for SAS authentication. | ||
| * @param encryptionContextProvider The encryption context provider. | ||
| * @param abfsClientContext The client context | ||
| * @param abfsServiceType The ABFS service type (e.g., Blob, DFS). | ||
| * @throws IOException if initialization fails. | ||
| */ | ||
| public AbfsClient(final URL baseUrl, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Java doc missing
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added
anujmodi2021 marked this conversation as resolved.
|
||
| final SharedKeyCredentials sharedKeyCredentials, | ||
| final AbfsConfiguration abfsConfiguration, | ||
| final AccessTokenProvider tokenProvider, | ||
| final SASTokenProvider sasTokenProvider, | ||
| final EncryptionContextProvider encryptionContextProvider, | ||
| final AbfsClientContext abfsClientContext, | ||
|
|
@@ -368,6 +370,7 @@ public AbfsClient(final URL baseUrl, | |
| this(baseUrl, sharedKeyCredentials, abfsConfiguration, | ||
| encryptionContextProvider, abfsClientContext, abfsServiceType); | ||
| this.sasTokenProvider = sasTokenProvider; | ||
| this.tokenProvider = tokenProvider; | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -1179,7 +1182,7 @@ protected String appendSASTokenToQuery(String path, | |
| String cachedSasToken) | ||
| throws SASTokenProviderException { | ||
| String sasToken = null; | ||
| if (this.authType == AuthType.SAS) { | ||
| if (getAbfsConfiguration().validateForSASType(this.authType)) { | ||
| try { | ||
| LOG.trace("Fetch SAS token for {} on {}", operation, path); | ||
| if (cachedSasToken == null) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -47,24 +47,26 @@ public class AbfsClientHandler implements Closeable { | |
| private final AbfsDfsClient dfsAbfsClient; | ||
| private final AbfsBlobClient blobAbfsClient; | ||
|
|
||
| public AbfsClientHandler(final URL baseUrl, | ||
| final SharedKeyCredentials sharedKeyCredentials, | ||
| final AbfsConfiguration abfsConfiguration, | ||
| final AccessTokenProvider tokenProvider, | ||
| final EncryptionContextProvider encryptionContextProvider, | ||
| final AbfsClientContext abfsClientContext) throws IOException { | ||
| this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials, | ||
| abfsConfiguration, tokenProvider, null, encryptionContextProvider, | ||
| abfsClientContext); | ||
| this.blobAbfsClient = createBlobClient(baseUrl, sharedKeyCredentials, | ||
| abfsConfiguration, tokenProvider, null, encryptionContextProvider, | ||
| abfsClientContext); | ||
| initServiceType(abfsConfiguration); | ||
| } | ||
|
|
||
| /** | ||
| * Constructs an AbfsClientHandler instance. | ||
| * | ||
| * Initializes the default and ingress service types from the provided configuration, | ||
| * then creates both DFS and Blob clients using the given params | ||
| * | ||
| * @param baseUrl the base URL for the file system. | ||
| * @param sharedKeyCredentials credentials for shared key authentication. | ||
| * @param abfsConfiguration the ABFS configuration. | ||
| * @param tokenProvider the access token provider, may be null. | ||
| * @param sasTokenProvider the SAS token provider, may be null. | ||
| * @param encryptionContextProvider the encryption context provider | ||
| * @param abfsClientContext the ABFS client context. | ||
| * @throws IOException if client creation or URL conversion fails. | ||
| */ | ||
| public AbfsClientHandler(final URL baseUrl, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Java doc missing for the constructor
anujmodi2021 marked this conversation as resolved.
|
||
| final SharedKeyCredentials sharedKeyCredentials, | ||
| final AbfsConfiguration abfsConfiguration, | ||
| final AccessTokenProvider tokenProvider, | ||
| final SASTokenProvider sasTokenProvider, | ||
| final EncryptionContextProvider encryptionContextProvider, | ||
| final AbfsClientContext abfsClientContext) throws IOException { | ||
|
|
@@ -73,10 +75,10 @@ public AbfsClientHandler(final URL baseUrl, | |
| // only for default client. | ||
| initServiceType(abfsConfiguration); | ||
| this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials, | ||
| abfsConfiguration, null, sasTokenProvider, encryptionContextProvider, | ||
| abfsConfiguration, tokenProvider, sasTokenProvider, encryptionContextProvider, | ||
| abfsClientContext); | ||
| this.blobAbfsClient = createBlobClient(baseUrl, sharedKeyCredentials, | ||
| abfsConfiguration, null, sasTokenProvider, encryptionContextProvider, | ||
| abfsConfiguration, tokenProvider, sasTokenProvider, encryptionContextProvider, | ||
| abfsClientContext); | ||
| } | ||
|
|
||
|
|
@@ -154,17 +156,13 @@ private AbfsDfsClient createDfsClient(final URL baseUrl, | |
| final EncryptionContextProvider encryptionContextProvider, | ||
| final AbfsClientContext abfsClientContext) throws IOException { | ||
| URL dfsUrl = changeUrlFromBlobToDfs(baseUrl); | ||
| if (tokenProvider != null) { | ||
| LOG.debug("Creating AbfsDfsClient with access token provider using the URL: {}", dfsUrl); | ||
| return new AbfsDfsClient(dfsUrl, creds, abfsConfiguration, | ||
| tokenProvider, encryptionContextProvider, | ||
| abfsClientContext); | ||
| } else { | ||
| LOG.debug("Creating AbfsDfsClient with SAS token provider using the URL: {}", dfsUrl); | ||
| return new AbfsDfsClient(dfsUrl, creds, abfsConfiguration, | ||
| sasTokenProvider, encryptionContextProvider, | ||
| abfsClientContext); | ||
| } | ||
| LOG.debug( | ||
| "Creating AbfsDfsClient with access token provider: %s and " | ||
| + "SAS token provider: %s using the URL: %s", | ||
| tokenProvider, sasTokenProvider, dfsUrl); | ||
| return new AbfsDfsClient(dfsUrl, creds, abfsConfiguration, | ||
| tokenProvider, sasTokenProvider, encryptionContextProvider, | ||
| abfsClientContext); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -188,17 +186,13 @@ private AbfsBlobClient createBlobClient(final URL baseUrl, | |
| final EncryptionContextProvider encryptionContextProvider, | ||
| final AbfsClientContext abfsClientContext) throws IOException { | ||
| URL blobUrl = changeUrlFromDfsToBlob(baseUrl); | ||
| if (tokenProvider != null) { | ||
| LOG.debug("Creating AbfsBlobClient with access token provider using the URL: {}", blobUrl); | ||
| return new AbfsBlobClient(blobUrl, creds, abfsConfiguration, | ||
| tokenProvider, encryptionContextProvider, | ||
| abfsClientContext); | ||
| } else { | ||
| LOG.debug("Creating AbfsBlobClient with SAS token provider using the URL: {}", blobUrl); | ||
| return new AbfsBlobClient(blobUrl, creds, abfsConfiguration, | ||
| sasTokenProvider, encryptionContextProvider, | ||
| abfsClientContext); | ||
| } | ||
| LOG.debug( | ||
| "Creating AbfsBlobClient with access token provider: %s and " | ||
| + "SAS token provider: %s using the URL: %s", | ||
| tokenProvider, sasTokenProvider, blobUrl); | ||
| return new AbfsBlobClient(blobUrl, creds, abfsConfiguration, | ||
| tokenProvider, sasTokenProvider, encryptionContextProvider, | ||
| abfsClientContext); | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is with oauth check