From 2e85ffc561d143996bdceb997e4551d02af61f08 Mon Sep 17 00:00:00 2001 From: sychen Date: Tue, 10 Mar 2026 22:06:48 +0800 Subject: [PATCH 1/8] ignore CelebornHashCheckDiskSuite --- .../celeborn/tests/spark/CelebornHashCheckDiskSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala index 7ac2ac48c64..88c87b5aa17 100644 --- a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala +++ b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession +import org.scalatest.Ignore import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime @@ -29,6 +30,7 @@ import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.protocol.ShuffleMode import org.apache.celeborn.service.deploy.worker.Worker +@Ignore class CelebornHashCheckDiskSuite extends SparkTestBase { var workers: collection.Set[Worker] = _ From 5c7a8bce027c19d27377506ac5c64944edd8f997 Mon Sep 17 00:00:00 2001 From: sychen Date: Wed, 11 Mar 2026 10:58:12 +0800 Subject: [PATCH 2/8] ignore --- .../scala/org/apache/celeborn/tests/spark/RetryReviveTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RetryReviveTest.scala b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RetryReviveTest.scala index ffb8e7721d4..0e52d00f3be 100644 --- a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RetryReviveTest.scala +++ b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RetryReviveTest.scala @@ -41,7 +41,7 @@ class RetryReviveTest extends AnyFunSuite System.gc() } - test("celeborn spark integration test - retry revive as configured times") { + ignore("celeborn spark integration test - retry revive as configured times") { setupMiniClusterWithRandomPorts() ShuffleClient.reset() val sparkConf = new SparkConf() From c1a8a62a86450f92610048a56555cffb1d3d19af Mon Sep 17 00:00:00 2001 From: sychen Date: Tue, 17 Mar 2026 14:38:03 +0800 Subject: [PATCH 3/8] test --- .../celeborn/client/LifecycleManager.scala | 39 ++++++++++++------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index f48f3cd72ba..12b1da4365f 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -296,20 +296,10 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends heartbeater.stop() masterClient.close() - if (rpcEnv != null) { - rpcEnv.shutdown() - rpcEnv.awaitTermination() - } - if (authEnabled) { - if (masterRpcEnvInUse != null) { - masterRpcEnvInUse.shutdown() - masterRpcEnvInUse.awaitTermination() - } - if (workerRpcEnvInUse != null) { - workerRpcEnvInUse.shutdown() - workerRpcEnvInUse.awaitTermination() - } - } + // Note: rpcEnv shutdown is handled in stop() to avoid deadlock. + // onStop() runs inside the dispatcher threadpool, so calling + // rpcEnv.awaitTermination() here would block the threadpool thread + // waiting for itself to terminate. messagesHelper.close() } @@ -2030,7 +2020,28 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends */ override def stop(): Unit = { heartbeater.stop() + // super.stop() triggers onStop() asynchronously via the dispatcher message loop. + // onStop() cleans up internal managers but does NOT shut down rpcEnv, because + // calling rpcEnv.awaitTermination() inside the dispatcher threadpool would deadlock. super.stop() + // Shut down all RpcEnv instances synchronously here so that callers (e.g. tests) + // can be sure all resources (clientConnectionExecutor, TransportClientFactory + // connection pools, etc.) are fully cleaned up when stop() returns. + if (rpcEnv != null) { + rpcEnv.shutdown() + rpcEnv.awaitTermination() + } + if (authEnabled) { + if (masterRpcEnvInUse != null) { + masterRpcEnvInUse.shutdown() + masterRpcEnvInUse.awaitTermination() + } + if (workerRpcEnvInUse != null) { + workerRpcEnvInUse.shutdown() + workerRpcEnvInUse.awaitTermination() + } + } + ThreadUtils.shutdown(rpcSharedThreadPool) } private def createSecret(): String = { From f1b767ad814c67b73832fa588810490bd4ebae37 Mon Sep 17 00:00:00 2001 From: sychen Date: Tue, 17 Mar 2026 20:44:05 +0800 Subject: [PATCH 4/8] test --- .../org/apache/celeborn/client/ChangePartitionManager.scala | 1 + .../main/scala/org/apache/celeborn/client/CommitManager.scala | 1 + .../org/apache/celeborn/client/ReleasePartitionManager.scala | 1 + 3 files changed, 3 insertions(+) diff --git a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala index 71096a952fb..bcfc3bb172a 100644 --- a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala @@ -131,6 +131,7 @@ class ChangePartitionManager( def stop(): Unit = { batchHandleChangePartition.foreach(_.cancel(true)) batchHandleChangePartitionSchedulerThread.foreach(ThreadUtils.shutdown(_)) + ThreadUtils.shutdown(batchHandleChangePartitionExecutors) } val rpcContextRegisterFunc diff --git a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala index a4df22fb5a8..fb76eedfbfb 100644 --- a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala @@ -173,6 +173,7 @@ class CommitManager(appUniqueId: String, val conf: CelebornConf, lifecycleManage def stop(): Unit = { batchHandleCommitPartition.foreach(_.cancel(true)) batchHandleCommitPartitionSchedulerThread.foreach(ThreadUtils.shutdown(_)) + ThreadUtils.shutdown(batchHandleCommitPartitionExecutors) } def registerShuffle( diff --git a/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala b/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala index a800c8c9ce8..2eb28130ecc 100644 --- a/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala @@ -106,6 +106,7 @@ class ReleasePartitionManager( def stop(): Unit = { batchHandleReleasePartition.foreach(_.cancel(true)) batchHandleReleasePartitionSchedulerThread.foreach(ThreadUtils.shutdown(_)) + ThreadUtils.shutdown(batchHandleReleasePartitionExecutors) } def releasePartition(shuffleId: Int, partitionId: Int): Unit = { From 4f0c9c83b65e83e3fb33cfffe1464a007edcdd3e Mon Sep 17 00:00:00 2001 From: sychen Date: Thu, 19 Mar 2026 12:02:13 +0800 Subject: [PATCH 5/8] Revert "test" This reverts commit ada27269d3c38e4da73d4cf9442d5c5ea4ff6748. --- .../org/apache/celeborn/client/ChangePartitionManager.scala | 1 - .../main/scala/org/apache/celeborn/client/CommitManager.scala | 1 - .../org/apache/celeborn/client/ReleasePartitionManager.scala | 1 - 3 files changed, 3 deletions(-) diff --git a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala index bcfc3bb172a..71096a952fb 100644 --- a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala @@ -131,7 +131,6 @@ class ChangePartitionManager( def stop(): Unit = { batchHandleChangePartition.foreach(_.cancel(true)) batchHandleChangePartitionSchedulerThread.foreach(ThreadUtils.shutdown(_)) - ThreadUtils.shutdown(batchHandleChangePartitionExecutors) } val rpcContextRegisterFunc diff --git a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala index fb76eedfbfb..a4df22fb5a8 100644 --- a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala @@ -173,7 +173,6 @@ class CommitManager(appUniqueId: String, val conf: CelebornConf, lifecycleManage def stop(): Unit = { batchHandleCommitPartition.foreach(_.cancel(true)) batchHandleCommitPartitionSchedulerThread.foreach(ThreadUtils.shutdown(_)) - ThreadUtils.shutdown(batchHandleCommitPartitionExecutors) } def registerShuffle( diff --git a/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala b/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala index 2eb28130ecc..a800c8c9ce8 100644 --- a/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala @@ -106,7 +106,6 @@ class ReleasePartitionManager( def stop(): Unit = { batchHandleReleasePartition.foreach(_.cancel(true)) batchHandleReleasePartitionSchedulerThread.foreach(ThreadUtils.shutdown(_)) - ThreadUtils.shutdown(batchHandleReleasePartitionExecutors) } def releasePartition(shuffleId: Int, partitionId: Int): Unit = { From 1e1ce74a909eb83888dfd2425461a28163b58729 Mon Sep 17 00:00:00 2001 From: sychen Date: Thu, 19 Mar 2026 12:02:13 +0800 Subject: [PATCH 6/8] Revert "test" This reverts commit 95844702006511cd0073b43bd98314e83a1d614d. --- .../celeborn/client/LifecycleManager.scala | 39 +++++++------------ 1 file changed, 14 insertions(+), 25 deletions(-) diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index 12b1da4365f..f48f3cd72ba 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -296,10 +296,20 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends heartbeater.stop() masterClient.close() - // Note: rpcEnv shutdown is handled in stop() to avoid deadlock. - // onStop() runs inside the dispatcher threadpool, so calling - // rpcEnv.awaitTermination() here would block the threadpool thread - // waiting for itself to terminate. + if (rpcEnv != null) { + rpcEnv.shutdown() + rpcEnv.awaitTermination() + } + if (authEnabled) { + if (masterRpcEnvInUse != null) { + masterRpcEnvInUse.shutdown() + masterRpcEnvInUse.awaitTermination() + } + if (workerRpcEnvInUse != null) { + workerRpcEnvInUse.shutdown() + workerRpcEnvInUse.awaitTermination() + } + } messagesHelper.close() } @@ -2020,28 +2030,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends */ override def stop(): Unit = { heartbeater.stop() - // super.stop() triggers onStop() asynchronously via the dispatcher message loop. - // onStop() cleans up internal managers but does NOT shut down rpcEnv, because - // calling rpcEnv.awaitTermination() inside the dispatcher threadpool would deadlock. super.stop() - // Shut down all RpcEnv instances synchronously here so that callers (e.g. tests) - // can be sure all resources (clientConnectionExecutor, TransportClientFactory - // connection pools, etc.) are fully cleaned up when stop() returns. - if (rpcEnv != null) { - rpcEnv.shutdown() - rpcEnv.awaitTermination() - } - if (authEnabled) { - if (masterRpcEnvInUse != null) { - masterRpcEnvInUse.shutdown() - masterRpcEnvInUse.awaitTermination() - } - if (workerRpcEnvInUse != null) { - workerRpcEnvInUse.shutdown() - workerRpcEnvInUse.awaitTermination() - } - } - ThreadUtils.shutdown(rpcSharedThreadPool) } private def createSecret(): String = { From 76f9330b1fe322b1dd1ffb655ccaa7e4f18b9744 Mon Sep 17 00:00:00 2001 From: sychen Date: Thu, 19 Mar 2026 12:02:30 +0800 Subject: [PATCH 7/8] test --- .../scala/org/apache/celeborn/tests/spark/RetryReviveTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RetryReviveTest.scala b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RetryReviveTest.scala index 0e52d00f3be..51c3697d1a6 100644 --- a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RetryReviveTest.scala +++ b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RetryReviveTest.scala @@ -57,7 +57,7 @@ class RetryReviveTest extends AnyFunSuite ss.stop() } - test( + ignore( "celeborn spark integration test - e2e test retry revive with new allocated workers from RPC") { val testConf = Map( s"${CelebornConf.CLIENT_PUSH_MAX_REVIVE_TIMES.key}" -> "3", From 05205026bd098b2f6807541e4574c1c2db40ae14 Mon Sep 17 00:00:00 2001 From: sychen Date: Thu, 19 Mar 2026 16:09:09 +0800 Subject: [PATCH 8/8] test --- .../celeborn/tests/spark/CelebornHashCheckDiskSuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala index 88c87b5aa17..f762d620cfd 100644 --- a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala +++ b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala @@ -21,7 +21,6 @@ import scala.collection.JavaConverters._ import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession -import org.scalatest.Ignore import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime @@ -30,7 +29,6 @@ import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.protocol.ShuffleMode import org.apache.celeborn.service.deploy.worker.Worker -@Ignore class CelebornHashCheckDiskSuite extends SparkTestBase { var workers: collection.Set[Worker] = _ @@ -52,7 +50,7 @@ class CelebornHashCheckDiskSuite extends SparkTestBase { System.gc() } - test("celeborn spark integration test - hash-checkDiskFull") { + ignore("celeborn spark integration test - hash-checkDiskFull") { val sparkConf = new SparkConf().setAppName("celeborn-demo") .setMaster("local[2]") .set(s"spark.${CelebornConf.SHUFFLE_EXPIRED_CHECK_INTERVAL.key}", "20s")