Skip to content

Commit 873d355

Browse files
CalvinKirsghkang98lik40
authored
[branch-3.0: [Fix](Catalog)Ensure preExecutionAuthenticator is properly initialized (#51280)
#50839 #50623 --------- Co-authored-by: kang <[email protected]> Co-authored-by: lik40 <[email protected]>
1 parent d7caf81 commit 873d355

7 files changed

Lines changed: 67 additions & 16 deletions

File tree

fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.hadoop.security.UserGroupInformation;
2121

2222
import java.io.IOException;
23+
import java.lang.reflect.UndeclaredThrowableException;
2324
import java.security.PrivilegedExceptionAction;
2425

2526
public interface HadoopAuthenticator {
@@ -31,6 +32,12 @@ default <T> T doAs(PrivilegedExceptionAction<T> action) throws IOException {
3132
return getUGI().doAs(action);
3233
} catch (InterruptedException e) {
3334
throw new IOException(e);
35+
} catch (UndeclaredThrowableException e) {
36+
if (e.getCause() instanceof RuntimeException) {
37+
throw (RuntimeException) e.getCause();
38+
} else {
39+
throw new RuntimeException(e.getCause());
40+
}
3441
}
3542
}
3643

fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.doris.common.security.authentication;
1919

20+
import org.apache.hadoop.conf.Configuration;
21+
2022
import java.security.PrivilegedExceptionAction;
2123
import java.util.concurrent.Callable;
2224

@@ -40,6 +42,19 @@ public class PreExecutionAuthenticator {
4042
public PreExecutionAuthenticator() {
4143
}
4244

45+
/**
46+
* Constructor to initialize the PreExecutionAuthenticator object.
47+
* This constructor is responsible for initializing the Hadoop authenticator required for Kerberos authentication
48+
* based on the provided configuration information.
49+
*
50+
* @param configuration Configuration information used to obtain Kerberos authentication settings
51+
*/
52+
public PreExecutionAuthenticator(Configuration configuration) {
53+
AuthenticationConfig config = AuthenticationConfig.getKerberosConfig(configuration);
54+
this.hadoopAuthenticator = HadoopAuthenticator.getHadoopAuthenticator(config);
55+
}
56+
57+
4358
/**
4459
* Executes the specified task with necessary authentication.
4560
* <p>If a HadoopAuthenticator is set, the task will be executed within a

fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,17 @@ public ExternalCatalog(long catalogId, String name, InitCatalogLog.Type logType,
176176
this.comment = Strings.nullToEmpty(comment);
177177
}
178178

179+
/**
180+
* Initializes the PreExecutionAuthenticator instance.
181+
* This method ensures that the authenticator is created only once in a thread-safe manner.
182+
* If additional authentication logic is required, it should be extended and implemented in subclasses.
183+
*/
184+
protected synchronized void initPreExecutionAuthenticator() {
185+
if (preExecutionAuthenticator == null) {
186+
preExecutionAuthenticator = new PreExecutionAuthenticator();
187+
}
188+
}
189+
179190
public Configuration getConfiguration() {
180191
// build configuration is costly, so we cache it.
181192
if (cachedConf != null) {
@@ -213,6 +224,11 @@ protected List<String> listDatabaseNames() {
213224
}
214225
}
215226

227+
public ExternalMetadataOps getMetadataOps() {
228+
makeSureInitialized();
229+
return metadataOps;
230+
}
231+
216232
// Will be called when creating catalog(so when as replaying)
217233
// to add some default properties if missing.
218234
public void setDefaultPropsIfMissing(boolean isReplay) {
@@ -1137,6 +1153,9 @@ public void setAutoAnalyzePolicy(String dbName, String tableName, String policy)
11371153
}
11381154

11391155
public PreExecutionAuthenticator getPreExecutionAuthenticator() {
1156+
if (null == preExecutionAuthenticator) {
1157+
throw new RuntimeException("PreExecutionAuthenticator is null, please confirm it is initialized.");
1158+
}
11401159
return preExecutionAuthenticator;
11411160
}
11421161

fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
import org.apache.doris.common.DdlException;
2626
import org.apache.doris.common.Pair;
2727
import org.apache.doris.common.ThreadPoolManager;
28-
import org.apache.doris.common.security.authentication.AuthenticationConfig;
29-
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
3028
import org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
3129
import org.apache.doris.common.util.Util;
3230
import org.apache.doris.datasource.CatalogProperty;
@@ -53,7 +51,6 @@
5351
import com.google.common.base.Strings;
5452
import com.google.common.collect.Lists;
5553
import com.google.common.collect.Maps;
56-
import lombok.Getter;
5754
import org.apache.commons.lang3.math.NumberUtils;
5855
import org.apache.hadoop.hive.conf.HiveConf;
5956
import org.apache.iceberg.hive.HiveCatalog;
@@ -91,8 +88,6 @@ public class HMSExternalCatalog extends ExternalCatalog {
9188

9289
private static final int FILE_SYSTEM_EXECUTOR_THREAD_NUM = 16;
9390
private ThreadPoolExecutor fileSystemExecutor;
94-
@Getter
95-
private HadoopAuthenticator authenticator;
9691

9792
private int hmsEventsBatchSizePerRpc = -1;
9893
private boolean enableHmsEventsIncrementalSync = false;
@@ -170,14 +165,15 @@ public void checkProperties() throws DdlException {
170165
}
171166

172167
@Override
173-
protected void initLocalObjectsImpl() {
174-
this.preExecutionAuthenticator = new PreExecutionAuthenticator();
175-
if (this.authenticator == null) {
176-
AuthenticationConfig config = AuthenticationConfig.getKerberosConfig(getConfiguration());
177-
this.authenticator = HadoopAuthenticator.getHadoopAuthenticator(config);
178-
this.preExecutionAuthenticator.setHadoopAuthenticator(authenticator);
168+
protected synchronized void initPreExecutionAuthenticator() {
169+
if (preExecutionAuthenticator == null) {
170+
preExecutionAuthenticator = new PreExecutionAuthenticator(getConfiguration());
179171
}
172+
}
180173

174+
@Override
175+
protected void initLocalObjectsImpl() {
176+
initPreExecutionAuthenticator();
181177
HiveConf hiveConf = null;
182178
JdbcClientConfig jdbcClientConfig = null;
183179
String hiveMetastoreType = catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_TYPE, "");

fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public HiveMetadataOps(HiveConf hiveConf, JdbcClientConfig jdbcClientConfig, HMS
7272
this(catalog, createCachedClient(hiveConf,
7373
Math.max(MIN_CLIENT_POOL_SIZE, Config.max_external_cache_loader_thread_pool_size),
7474
jdbcClientConfig));
75-
hadoopAuthenticator = catalog.getAuthenticator();
75+
hadoopAuthenticator = catalog.getPreExecutionAuthenticator().getHadoopAuthenticator();
7676
client.setHadoopAuthenticator(hadoopAuthenticator);
7777
}
7878

fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,16 @@ public IcebergExternalCatalog(long catalogId, String name, String comment) {
5252
// Create catalog based on catalog type
5353
protected abstract void initCatalog();
5454

55+
@Override
56+
protected synchronized void initPreExecutionAuthenticator() {
57+
if (preExecutionAuthenticator == null) {
58+
preExecutionAuthenticator = new PreExecutionAuthenticator(getConfiguration());
59+
}
60+
}
61+
5562
@Override
5663
protected void initLocalObjectsImpl() {
57-
preExecutionAuthenticator = new PreExecutionAuthenticator();
64+
initPreExecutionAuthenticator();
5865
initCatalog();
5966
IcebergMetadataOps ops = ExternalMetadataOperations.newIcebergMetadataOps(this, catalog);
6067
transactionManager = TransactionManagerFactory.createIcebergTransactionManager(ops);

fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,15 +52,22 @@ public IcebergHadoopExternalCatalog(long catalogId, String name, String resource
5252
@Override
5353
protected void initCatalog() {
5454
icebergCatalogType = ICEBERG_HADOOP;
55-
HadoopCatalog hadoopCatalog = new HadoopCatalog();
55+
5656
Configuration conf = getConfiguration();
5757
initS3Param(conf);
5858
// initialize hadoop catalog
5959
Map<String, String> catalogProperties = catalogProperty.getProperties();
6060
String warehouse = catalogProperty.getHadoopProperties().get(CatalogProperties.WAREHOUSE_LOCATION);
61+
HadoopCatalog hadoopCatalog = new HadoopCatalog();
6162
hadoopCatalog.setConf(conf);
6263
catalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse);
63-
hadoopCatalog.initialize(getName(), catalogProperties);
64-
catalog = hadoopCatalog;
64+
try {
65+
this.catalog = preExecutionAuthenticator.execute(() -> {
66+
hadoopCatalog.initialize(getName(), catalogProperties);
67+
return hadoopCatalog;
68+
});
69+
} catch (Exception e) {
70+
throw new RuntimeException("Hadoop catalog init error!", e);
71+
}
6572
}
6673
}

0 commit comments

Comments
 (0)