Skip to content

Commit fbf2a83

Browse files
authored
[AURON #1730] Fix data size calculation in Celeborn shuffle writer (#1731)
<!-- Thanks for sending a pull request! Please keep the following tips in mind: - Start the PR title with the related issue ID, e.g. '[AURON #XXXX] Short summary...'. - Make your PR title clear and descriptive, summarizing what this PR changes. - Provide a concise example to reproduce the issue, if possible. - Keep the PR description up to date with all changes. --> # Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> Closes #1730 # Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> # What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> # Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> # How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible. If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future. If tests were not added, please describe why they were not added and/or why it was difficult to add. -->
1 parent 59261a0 commit fbf2a83

1 file changed

Lines changed: 20 additions & 0 deletions

File tree

thirdparty/auron-celeborn-0.6/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/celeborn/AuronCelebornShuffleWriter.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,4 +76,24 @@ class AuronCelebornShuffleWriter[K, V](
7676
Shims.get
7777
.getMapStatus(blockManagerId, celebornPartitionWriter.getPartitionLengthMap, mapId))
7878
}
79+
80+
// Override stop to use partition length map directly instead of rssStop's mapStatus
81+
// because celeborn writer doesn't populate partition sizes correctly when using native writer
82+
override def stop(success: Boolean): Option[MapStatus] = {
83+
if (!success) {
84+
celebornShuffleWriter.stop(success)
85+
return None
86+
}
87+
88+
celebornShuffleWriter.write(Iterator.empty)
89+
celebornShuffleWriter.stop(success)
90+
91+
// Always use getPartitionLengthMap for Celeborn to get correct partition sizes
92+
val blockManagerId = SparkEnv.get.blockManager.shuffleServerId
93+
Some(
94+
Shims.get.getMapStatus(
95+
blockManagerId,
96+
celebornPartitionWriter.getPartitionLengthMap,
97+
taskContext.partitionId()))
98+
}
7999
}

0 commit comments

Comments
 (0)