Skip to content

Commit 874a6b7

Browse files
FMXturboFei
authored andcommitted
[CELEBORN-1838] Interrupt spark task should not report fetch failure
What changes were proposed in this pull request? Do not trigger fetch failure if a spark task attempt is interrupted(speculation enabled). Do not trigger fetch failure if the RPC of getReducerFileGroup is timeout. This PR is intended for celeborn-0.5 branch. Why are the changes needed? Avoid unnecessary fetch failures and stage re-runs. Does this PR introduce any user-facing change? NO. How was this patch tested? 1. GA. 2. Manually tested on cluster with spark speculation tasks. Here is the test case ```scala sc.parallelize(1 to 100, 100).flatMap(i => { (1 to 150000).iterator.map(num => num) }).groupBy(i => i, 100) .map(i => { if (i._1 < 5) { Thread.sleep(15000) } i }) .repartition(400).count ``` <img width="1384" alt="截屏2025-01-18 16 16 16" src="https://github.com/user-attachments/assets/adf64857-5773-4081-a7d0-fa3439e751eb" /> <img width="1393" alt="截屏2025-01-18 16 16 22" src="https://github.com/user-attachments/assets/ac9bf172-1ab4-4669-a930-872d009f2530" /> <img width="1258" alt="截屏2025-01-18 16 19 15" src="https://github.com/user-attachments/assets/6a8ff3e1-c1fb-4ef2-84d8-b1fc6eb56fa6" /> <img width="892" alt="截屏2025-01-18 16 17 27" src="https://github.com/user-attachments/assets/f9de3841-f7d4-4445-99a3-873235d4abd0" /> Closes apache#3070 from FMX/branch-0.5-b1838. Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com> Signed-off-by: Wang, Fei <fwang12@ebay.com>
1 parent 39a40dd commit 874a6b7

9 files changed

Lines changed: 292 additions & 27 deletions

File tree

client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import java.util.concurrent.TimeUnit;
2525
import java.util.concurrent.TimeoutException;
2626

27-
import scala.Tuple2;
27+
import scala.Tuple3;
2828
import scala.reflect.ClassTag$;
2929

3030
import com.google.common.annotations.VisibleForTesting;
@@ -265,9 +265,9 @@ public ReduceFileGroups updateFileGroup(
265265
int shuffleId, int partitionId, boolean isSegmentGranularityVisible)
266266
throws CelebornIOException {
267267
ReduceFileGroups reduceFileGroups =
268-
reduceFileGroupsMap.computeIfAbsent(
269-
shuffleId, (id) -> Tuple2.apply(new ReduceFileGroups(), null))
270-
._1;
268+
reduceFileGroupsMap
269+
.computeIfAbsent(shuffleId, (id) -> Tuple3.apply(new ReduceFileGroups(), null, null))
270+
._1();
271271
if (reduceFileGroups.partitionIds != null
272272
&& reduceFileGroups.partitionIds.contains(partitionId)) {
273273
logger.debug(
@@ -281,12 +281,12 @@ public ReduceFileGroups updateFileGroup(
281281
Utils.makeReducerKey(shuffleId, partitionId));
282282
} else {
283283
// refresh file groups
284-
Tuple2<ReduceFileGroups, String> fileGroups =
284+
Tuple3<ReduceFileGroups, String, Exception> fileGroups =
285285
loadFileGroupInternal(shuffleId, isSegmentGranularityVisible);
286286
ReduceFileGroups newGroups = fileGroups._1;
287287
if (newGroups == null) {
288288
throw new CelebornIOException(
289-
loadFileGroupException(shuffleId, partitionId, fileGroups._2));
289+
loadFileGroupException(shuffleId, partitionId, fileGroups._2()));
290290
} else if (!newGroups.partitionIds.contains(partitionId)) {
291291
throw new CelebornIOException(
292292
String.format(

client-spark/spark-3-4/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,5 +91,10 @@
9191
<artifactId>mockito-core</artifactId>
9292
<scope>test</scope>
9393
</dependency>
94+
<dependency>
95+
<groupId>org.mockito</groupId>
96+
<artifactId>mockito-inline</artifactId>
97+
<scope>test</scope>
98+
</dependency>
9499
</dependencies>
95100
</project>

client-spark/spark-3-4/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
package org.apache.spark.shuffle.celeborn
1919

2020
import java.io.IOException
21+
import java.nio.file.Files
2122
import java.util
22-
import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor, TimeUnit}
23+
import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor, TimeoutException, TimeUnit}
2324
import java.util.concurrent.atomic.AtomicReference
2425

2526
import scala.collection.JavaConverters._
2627

28+
import com.google.common.annotations.VisibleForTesting
2729
import org.apache.spark.{Aggregator, InterruptibleIterator, ShuffleDependency, TaskContext}
2830
import org.apache.spark.celeborn.ExceptionMakerHelper
2931
import org.apache.spark.internal.Logging
@@ -33,14 +35,14 @@ import org.apache.spark.shuffle.celeborn.CelebornShuffleReader.streamCreatorPool
3335
import org.apache.spark.util.CompletionIterator
3436
import org.apache.spark.util.collection.ExternalSorter
3537

36-
import org.apache.celeborn.client.ShuffleClient
38+
import org.apache.celeborn.client.{DummyShuffleClient, ShuffleClient}
3739
import org.apache.celeborn.client.ShuffleClientImpl.ReduceFileGroups
3840
import org.apache.celeborn.client.read.{CelebornInputStream, MetricsCallback}
3941
import org.apache.celeborn.common.CelebornConf
4042
import org.apache.celeborn.common.exception.{CelebornIOException, PartitionUnRetryAbleException}
4143
import org.apache.celeborn.common.network.client.TransportClient
4244
import org.apache.celeborn.common.network.protocol.TransportMessage
43-
import org.apache.celeborn.common.protocol.{MessageType, PartitionLocation, PbOpenStreamList, PbOpenStreamListResponse, PbStreamHandler}
45+
import org.apache.celeborn.common.protocol._
4446
import org.apache.celeborn.common.protocol.message.StatusCode
4547
import org.apache.celeborn.common.util.{JavaUtils, ThreadUtils, Utils}
4648

@@ -57,7 +59,9 @@ class CelebornShuffleReader[K, C](
5759
extends ShuffleReader[K, C] with Logging {
5860

5961
private val dep = handle.dependency
60-
private val shuffleClient = ShuffleClient.get(
62+
63+
@VisibleForTesting
64+
val shuffleClient = ShuffleClient.get(
6165
handle.appUniqueId,
6266
handle.lifecycleManagerHost,
6367
handle.lifecycleManagerPort,
@@ -111,7 +115,9 @@ class CelebornShuffleReader[K, C](
111115
fileGroups = shuffleClient.updateFileGroup(shuffleId, startPartition)
112116
} catch {
113117
case ce @ (_: CelebornIOException | _: PartitionUnRetryAbleException) =>
114-
handleFetchExceptions(handle.shuffleId, shuffleId, 0, ce)
118+
// if a task is interrupted, should not report fetch failure
119+
// if a task update file group timeout, should not report fetch failure
120+
checkAndReportFetchFailureForUpdateFileGroupFailure(shuffleId, ce)
115121
case e: Throwable => throw e
116122
}
117123

@@ -370,7 +376,22 @@ class CelebornShuffleReader[K, C](
370376
}
371377
}
372378

373-
private def handleFetchExceptions(
379+
@VisibleForTesting
380+
def checkAndReportFetchFailureForUpdateFileGroupFailure(
381+
celebornShuffleId: Int,
382+
ce: Throwable): Unit = {
383+
if (ce.getCause != null &&
384+
(ce.getCause.isInstanceOf[InterruptedException] || ce.getCause.isInstanceOf[
385+
TimeoutException])) {
386+
logWarning(s"fetch shuffle ${celebornShuffleId} timeout or interrupt", ce)
387+
throw ce
388+
} else {
389+
handleFetchExceptions(handle.shuffleId, celebornShuffleId, 0, ce)
390+
}
391+
}
392+
393+
@VisibleForTesting
394+
def handleFetchExceptions(
374395
appShuffleId: Int,
375396
shuffleId: Int,
376397
partitionId: Int,
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.celeborn
19+
20+
import java.nio.file.Files
21+
import java.util.concurrent.TimeoutException
22+
23+
import org.apache.spark.{Dependency, ShuffleDependency, TaskContext}
24+
import org.apache.spark.shuffle.ShuffleReadMetricsReporter
25+
import org.mockito.ArgumentMatchers.any
26+
import org.mockito.Mockito
27+
import org.mockito.Mockito._
28+
import org.scalatest.funsuite.AnyFunSuite
29+
30+
import org.apache.celeborn.client.{DummyShuffleClient, ShuffleClient}
31+
import org.apache.celeborn.common.CelebornConf
32+
import org.apache.celeborn.common.exception.CelebornIOException
33+
import org.apache.celeborn.common.identity.UserIdentifier
34+
35+
class CelebornShuffleReaderSuite extends AnyFunSuite {
36+
37+
/**
38+
* Due to spark limitations, spark local mode can not test speculation tasks ,
39+
* test the method `checkAndReportFetchFailureForUpdateFileGroupFailure`
40+
*/
41+
test("CELEBORN-1838 test check report fetch failure exceptions ") {
42+
val dependency = Mockito.mock(classOf[ShuffleDependency[Int, Int, Int]])
43+
val handler = new CelebornShuffleHandle[Int, Int, Int](
44+
"APP",
45+
"HOST1",
46+
1,
47+
UserIdentifier.apply("a", "b"),
48+
0,
49+
true,
50+
1,
51+
dependency)
52+
val context = Mockito.mock(classOf[TaskContext])
53+
val metricReporter = Mockito.mock(classOf[ShuffleReadMetricsReporter])
54+
val conf = new CelebornConf()
55+
56+
val tmpFile = Files.createTempFile("test", ".tmp").toFile
57+
mockStatic(classOf[ShuffleClient]).when(() =>
58+
ShuffleClient.get(any(), any(), any(), any(), any(), any())).thenReturn(
59+
new DummyShuffleClient(conf, tmpFile))
60+
61+
val shuffleReader =
62+
new CelebornShuffleReader[Int, Int](handler, 0, 0, 0, 0, context, conf, metricReporter, null)
63+
64+
val exception1: Throwable = new CelebornIOException("test1", new InterruptedException("test1"))
65+
val exception2: Throwable = new CelebornIOException("test2", new TimeoutException("test2"))
66+
val exception3: Throwable = new CelebornIOException("test3")
67+
val exception4: Throwable = new CelebornIOException("test4")
68+
69+
try {
70+
shuffleReader.checkAndReportFetchFailureForUpdateFileGroupFailure(0, exception1)
71+
} catch {
72+
case _: Throwable =>
73+
}
74+
try {
75+
shuffleReader.checkAndReportFetchFailureForUpdateFileGroupFailure(0, exception2)
76+
} catch {
77+
case _: Throwable =>
78+
}
79+
try {
80+
shuffleReader.checkAndReportFetchFailureForUpdateFileGroupFailure(0, exception3)
81+
} catch {
82+
case _: Throwable =>
83+
}
84+
assert(
85+
shuffleReader.shuffleClient.asInstanceOf[DummyShuffleClient].fetchFailureCount.get() === 1)
86+
try {
87+
shuffleReader.checkAndReportFetchFailureForUpdateFileGroupFailure(0, exception4)
88+
} catch {
89+
case _: Throwable =>
90+
}
91+
assert(
92+
shuffleReader.shuffleClient.asInstanceOf[DummyShuffleClient].fetchFailureCount.get() === 2)
93+
94+
}
95+
}

client/src/test/java/org/apache/celeborn/client/DummyShuffleClient.java renamed to client/src/main/java/org/apache/celeborn/client/DummyShuffleClient.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.List;
3030
import java.util.Map;
3131
import java.util.concurrent.ConcurrentHashMap;
32+
import java.util.concurrent.atomic.AtomicInteger;
3233

3334
import org.slf4j.Logger;
3435
import org.slf4j.LoggerFactory;
@@ -55,6 +56,8 @@ public class DummyShuffleClient extends ShuffleClient {
5556
private final Map<Integer, ConcurrentHashMap<Integer, PartitionLocation>> reducePartitionMap =
5657
new HashMap<>();
5758

59+
public AtomicInteger fetchFailureCount = new AtomicInteger();
60+
5861
public DummyShuffleClient(CelebornConf conf, File file) throws Exception {
5962
this.os = new BufferedOutputStream(new FileOutputStream(file));
6063
this.conf = conf;
@@ -181,6 +184,7 @@ public int getShuffleId(
181184

182185
@Override
183186
public boolean reportShuffleFetchFailure(int appShuffleId, int shuffleId, long taskId) {
187+
fetchFailureCount.incrementAndGet();
184188
return true;
185189
}
186190

client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.TimeUnit;
2727

2828
import scala.Tuple2;
29+
import scala.Tuple3;
2930
import scala.reflect.ClassTag$;
3031

3132
import com.google.common.annotations.VisibleForTesting;
@@ -170,7 +171,7 @@ public void update(ReduceFileGroups fileGroups) {
170171
}
171172

172173
// key: shuffleId
173-
protected final Map<Integer, Tuple2<ReduceFileGroups, String>> reduceFileGroupsMap =
174+
protected final Map<Integer, Tuple3<ReduceFileGroups, String, Exception>> reduceFileGroupsMap =
174175
JavaUtils.newConcurrentHashMap();
175176

176177
public ShuffleClientImpl(String appUniqueId, CelebornConf conf, UserIdentifier userIdentifier) {
@@ -1742,11 +1743,12 @@ public boolean cleanupShuffle(int shuffleId) {
17421743
return true;
17431744
}
17441745

1745-
protected Tuple2<ReduceFileGroups, String> loadFileGroupInternal(
1746+
protected Tuple3<ReduceFileGroups, String, Exception> loadFileGroupInternal(
17461747
int shuffleId, boolean isSegmentGranularityVisible) {
17471748
{
17481749
long getReducerFileGroupStartTime = System.nanoTime();
17491750
String exceptionMsg = null;
1751+
Exception exception = null;
17501752
try {
17511753
if (lifecycleManagerRef == null) {
17521754
exceptionMsg = "Driver endpoint is null!";
@@ -1768,9 +1770,10 @@ protected Tuple2<ReduceFileGroups, String> loadFileGroupInternal(
17681770
shuffleId,
17691771
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - getReducerFileGroupStartTime),
17701772
response.fileGroup().size());
1771-
return Tuple2.apply(
1773+
return Tuple3.apply(
17721774
new ReduceFileGroups(
17731775
response.fileGroup(), response.attempts(), response.partitionIds()),
1776+
null,
17741777
null);
17751778
case SHUFFLE_NOT_REGISTERED:
17761779
logger.warn(
@@ -1779,9 +1782,10 @@ protected Tuple2<ReduceFileGroups, String> loadFileGroupInternal(
17791782
response.status(),
17801783
shuffleId);
17811784
// return empty result
1782-
return Tuple2.apply(
1785+
return Tuple3.apply(
17831786
new ReduceFileGroups(
17841787
response.fileGroup(), response.attempts(), response.partitionIds()),
1788+
null,
17851789
null);
17861790
case STAGE_END_TIME_OUT:
17871791
case SHUFFLE_DATA_LOST:
@@ -1800,8 +1804,9 @@ protected Tuple2<ReduceFileGroups, String> loadFileGroupInternal(
18001804
}
18011805
logger.error("Exception raised while call GetReducerFileGroup for {}.", shuffleId, e);
18021806
exceptionMsg = e.getMessage();
1807+
exception = e;
18031808
}
1804-
return Tuple2.apply(null, exceptionMsg);
1809+
return Tuple3.apply(null, exceptionMsg, exception);
18051810
}
18061811
}
18071812

@@ -1814,21 +1819,22 @@ public ReduceFileGroups updateFileGroup(int shuffleId, int partitionId)
18141819
public ReduceFileGroups updateFileGroup(
18151820
int shuffleId, int partitionId, boolean isSegmentGranularityVisible)
18161821
throws CelebornIOException {
1817-
Tuple2<ReduceFileGroups, String> fileGroupTuple =
1822+
Tuple3<ReduceFileGroups, String, Exception> fileGroupTuple =
18181823
reduceFileGroupsMap.compute(
18191824
shuffleId,
18201825
(id, existsTuple) -> {
1821-
if (existsTuple == null || existsTuple._1 == null) {
1826+
if (existsTuple == null || existsTuple._1() == null) {
18221827
return loadFileGroupInternal(shuffleId, isSegmentGranularityVisible);
18231828
} else {
18241829
return existsTuple;
18251830
}
18261831
});
1827-
if (fileGroupTuple._1 == null) {
1832+
if (fileGroupTuple._1() == null) {
18281833
throw new CelebornIOException(
1829-
loadFileGroupException(shuffleId, partitionId, (fileGroupTuple._2)));
1834+
loadFileGroupException(shuffleId, partitionId, (fileGroupTuple._2())),
1835+
fileGroupTuple._3());
18301836
} else {
1831-
return fileGroupTuple._1;
1837+
return fileGroupTuple._1();
18321838
}
18331839
}
18341840

@@ -1899,7 +1905,7 @@ public CelebornInputStream readPartition(
18991905
}
19001906

19011907
@VisibleForTesting
1902-
public Map<Integer, Tuple2<ReduceFileGroups, String>> getReduceFileGroupsMap() {
1908+
public Map<Integer, Tuple3<ReduceFileGroups, String, Exception>> getReduceFileGroupsMap() {
19031909
return reduceFileGroupsMap;
19041910
}
19051911

0 commit comments

Comments
 (0)