Skip to content

Commit 1cdba2c

Browse files
wuwenchimorningman
authored andcommitted
[fix](iceberg)Fix the thread pool issue used for commit. (#51508)
### What problem does this PR solve? Problem Summary: When Iceberg generates a new snapshot, it performs a merge operation based on the previous snapshot. This operation reads manifest files, and the file reading process uses a global thread pool. However, users may have their own authentication information, which requires the use of doAs to ensure context. Therefore, the thread pool provided by Iceberg cannot be used. ### Release note None
1 parent 1a4e716 commit 1cdba2c

3 files changed

Lines changed: 11 additions & 2 deletions

File tree

fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1170,4 +1170,8 @@ public void setAutoAnalyzePolicy(String dbName, String tableName, String policy)
11701170
tableAutoAnalyzePolicy.put(key, policy);
11711171
}
11721172
}
1173+
1174+
public ThreadPoolExecutor getThreadPoolExecutor() {
1175+
return threadPoolWithPreAuth;
1176+
}
11731177
}

fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.List;
5050
import java.util.Map;
5151
import java.util.Optional;
52+
import java.util.concurrent.ThreadPoolExecutor;
5253
import java.util.stream.Collectors;
5354

5455
public class IcebergMetadataOps implements ExternalMetadataOps {
@@ -291,4 +292,8 @@ private Namespace getNamespace(String dbName) {
291292
private Namespace getNamespace() {
292293
return externalCatalogName.map(Namespace::of).orElseGet(() -> Namespace.empty());
293294
}
295+
296+
public ThreadPoolExecutor getThreadPoolWithPreAuth() {
297+
return dorisCatalog.getThreadPoolExecutor();
298+
}
294299
}

fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ private synchronized Table getNativeTable(SimpleTableInfo tableInfo) {
155155

156156
private void commitAppendTxn(Table table, List<WriteResult> pendingResults) {
157157
// commit append files.
158-
AppendFiles appendFiles = table.newAppend();
158+
AppendFiles appendFiles = table.newAppend().scanManifestsWith(ops.getThreadPoolWithPreAuth());
159159
for (WriteResult result : pendingResults) {
160160
Preconditions.checkState(result.referencedDataFiles().length == 0,
161161
"Should have no referenced data files for append.");
@@ -171,7 +171,7 @@ private void commitReplaceTxn(Table table, List<WriteResult> pendingResults) {
171171
// 1. if dst_tb is a partitioned table, it will return directly.
172172
// 2. if dst_tb is an unpartitioned table, the `dst_tb` table will be emptied.
173173
if (!table.spec().isPartitioned()) {
174-
OverwriteFiles overwriteFiles = table.newOverwrite();
174+
OverwriteFiles overwriteFiles = table.newOverwrite().scanManifestsWith(ops.getThreadPoolWithPreAuth());
175175
try (CloseableIterable<FileScanTask> fileScanTasks = table.newScan().planFiles()) {
176176
fileScanTasks.forEach(f -> overwriteFiles.deleteFile(f.file()));
177177
} catch (IOException e) {

0 commit comments

Comments
 (0)