Skip to content

Commit 0ab4ce7

Browse files
authored
feat(shuffle): Add codec in bolt to support checksum during shuffle (#180)
1 parent 1dc14e2 commit 0ab4ce7

44 files changed

Lines changed: 4532 additions & 257 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

bolt/shuffle/sparksql/BoltShuffleReader.cpp

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include "bolt/shuffle/sparksql/BoltArrowMemoryPool.h"
4242
#include "bolt/shuffle/sparksql/Payload.h"
4343
#include "bolt/shuffle/sparksql/Utils.h"
44+
#include "bolt/shuffle/sparksql/compression/Codec.h"
4445
#include "bolt/shuffle/sparksql/compression/Compression.h"
4546
#include "bolt/vector/ComplexVector.h"
4647
#include "bolt/vector/FlatVector.h"
@@ -373,7 +374,7 @@ RowVectorPtr makeColumnarBatch(
373374
BoltColumnarBatchDeserializer::BoltColumnarBatchDeserializer(
374375
std::shared_ptr<arrow::io::InputStream> in,
375376
const std::shared_ptr<arrow::Schema>& schema,
376-
const std::shared_ptr<arrow::util::Codec>& codec,
377+
const std::shared_ptr<Codec>& codec,
377378
const bytedance::bolt::RowTypePtr& rowType,
378379
int32_t batchSize,
379380
int32_t shuffleBatchByteSize,
@@ -384,7 +385,7 @@ BoltColumnarBatchDeserializer::BoltColumnarBatchDeserializer(
384385
uint64_t& deserializeTime,
385386
uint64_t& decompressTime,
386387
bool isRowFormat,
387-
ZstdStreamCodec* zstdCodec,
388+
AdaptiveParallelZstdCodec* zstdCodec,
388389
RowBufferPool* rowBufferPool,
389390
ShuffleRowToColumnarConverter* row2ColConverter)
390391
: schema_(schema),
@@ -701,19 +702,21 @@ bool BoltColumnarBatchDeserializer::isCompositeRowVectorLayout(int64_t& bytes) {
701702

702703
BoltColumnarBatchDeserializerFactory::BoltColumnarBatchDeserializerFactory(
703704
const std::shared_ptr<arrow::Schema>& schema,
704-
const std::shared_ptr<arrow::util::Codec>& codec,
705+
const std::shared_ptr<Codec>& codec,
705706
const RowTypePtr& rowType,
706707
int32_t batchSize,
707708
int32_t shuffleBatchByteSize,
708709
arrow::MemoryPool* memoryPool,
709-
bytedance::bolt::memory::MemoryPool* boltPool)
710+
bytedance::bolt::memory::MemoryPool* boltPool,
711+
bool checksumEnabled)
710712
: schema_(schema),
711713
codec_(codec),
712714
rowType_(rowType),
713715
batchSize_(batchSize),
714716
shuffleBatchByteSize_(shuffleBatchByteSize),
715717
memoryPool_(memoryPool),
716-
boltPool_(boltPool) {
718+
boltPool_(boltPool),
719+
checksumEnabled_(checksumEnabled) {
717720
initFromSchema();
718721
}
719722

@@ -729,8 +732,8 @@ BoltColumnarBatchDeserializerFactory::createDeserializer(
729732
schema_->num_fields() >= rowBaseColumnNumThreshold) ||
730733
(shuffleWriterType_ == ShuffleWriterType::RowBased));
731734
if (!zstdCodec_) {
732-
zstdCodec_ =
733-
std::make_shared<ZstdStreamCodec>(1 /*not used*/, false, memoryPool_);
735+
zstdCodec_ = std::make_shared<AdaptiveParallelZstdCodec>(
736+
1 /*not used*/, false, memoryPool_, checksumEnabled_);
734737
rowBufferPool_ = std::make_shared<RowBufferPool>(memoryPool_);
735738
row2ColConverter_ =
736739
std::make_shared<ShuffleRowToColumnarConverter>(rowType_, boltPool_);
@@ -821,14 +824,18 @@ BoltShuffleReader::BoltShuffleReader(
821824
bytedance::bolt::memory::MemoryPool* boltPool)
822825
: factory_(std::make_unique<BoltColumnarBatchDeserializerFactory>(
823826
schema,
824-
createArrowIpcCodec(
827+
createCodec(
825828
options.compressionType,
826-
getCodecBackend(options.codecBackend)),
829+
CodecOptions{
830+
getCodecBackend(options.codecBackend),
831+
kDefaultCompressionLevel,
832+
options.checksumEnabled}),
827833
bytedance::bolt::asRowType(fromBoltTypeToArrowSchema(schema)),
828834
options.batchSize,
829835
options.shuffleBatchByteSize,
830836
pool,
831-
boltPool)) {
837+
boltPool,
838+
options.checksumEnabled)) {
832839
factory_->setNumPartitions(options.numPartitions);
833840
factory_->setShuffleWriterType(options.forceShuffleWriterType);
834841
factory_->setpartitioningShortName(options.partitionShortName);

bolt/shuffle/sparksql/BoltShuffleReader.h

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ class BoltColumnarBatchDeserializer {
100100
BoltColumnarBatchDeserializer(
101101
std::shared_ptr<arrow::io::InputStream> in,
102102
const std::shared_ptr<arrow::Schema>& schema,
103-
const std::shared_ptr<arrow::util::Codec>& codec,
103+
const std::shared_ptr<Codec>& codec,
104104
const bytedance::bolt::RowTypePtr& rowType,
105105
int32_t batchSize,
106106
int32_t shuffleBatchByteSize,
@@ -111,7 +111,7 @@ class BoltColumnarBatchDeserializer {
111111
uint64_t& deserializeTime,
112112
uint64_t& decompressTime,
113113
bool isRowFormat = false,
114-
ZstdStreamCodec* zstdCodec = nullptr,
114+
AdaptiveParallelZstdCodec* zstdCodec = nullptr,
115115
RowBufferPool* rowBufferPool = nullptr,
116116
ShuffleRowToColumnarConverter* row2ColConverter = nullptr);
117117

@@ -123,7 +123,7 @@ class BoltColumnarBatchDeserializer {
123123

124124
std::shared_ptr<arrow::io::BufferedInputStream> in_;
125125
std::shared_ptr<arrow::Schema> schema_;
126-
std::shared_ptr<arrow::util::Codec> codec_;
126+
std::shared_ptr<Codec> codec_;
127127
bytedance::bolt::RowTypePtr rowType_;
128128
int32_t batchSize_;
129129
int32_t shuffleBatchByteSize_;
@@ -140,7 +140,7 @@ class BoltColumnarBatchDeserializer {
140140

141141
// for row format shuffle read
142142
const bool isRowFormat_{false};
143-
ZstdStreamCodec* zstdCodec_{nullptr};
143+
AdaptiveParallelZstdCodec* zstdCodec_{nullptr};
144144
RowBufferPool* rowBufferPool_{nullptr};
145145
uint8_t* partialRow_{nullptr};
146146
int32_t partialRowSize_{0};
@@ -157,12 +157,13 @@ class BoltColumnarBatchDeserializerFactory {
157157
public:
158158
BoltColumnarBatchDeserializerFactory(
159159
const std::shared_ptr<arrow::Schema>& schema,
160-
const std::shared_ptr<arrow::util::Codec>& codec,
160+
const std::shared_ptr<Codec>& codec,
161161
const bytedance::bolt::RowTypePtr& rowType,
162162
int32_t batchSize,
163163
int32_t shuffleBatchByteSize,
164164
arrow::MemoryPool* memoryPool,
165-
bytedance::bolt::memory::MemoryPool* boltPool);
165+
bytedance::bolt::memory::MemoryPool* boltPool,
166+
bool checksumEnabled = true);
166167

167168
std::unique_ptr<BoltColumnarBatchDeserializer> createDeserializer(
168169
std::shared_ptr<arrow::io::InputStream> in);
@@ -205,7 +206,7 @@ class BoltColumnarBatchDeserializerFactory {
205206

206207
private:
207208
std::shared_ptr<arrow::Schema> schema_;
208-
std::shared_ptr<arrow::util::Codec> codec_;
209+
std::shared_ptr<Codec> codec_;
209210
bytedance::bolt::RowTypePtr rowType_;
210211
int32_t batchSize_;
211212
int32_t shuffleBatchByteSize_;
@@ -223,9 +224,10 @@ class BoltColumnarBatchDeserializerFactory {
223224

224225
void initFromSchema();
225226
// for rowbased shuffle
226-
std::shared_ptr<ZstdStreamCodec> zstdCodec_{nullptr};
227+
std::shared_ptr<AdaptiveParallelZstdCodec> zstdCodec_{nullptr};
227228
std::shared_ptr<RowBufferPool> rowBufferPool_{nullptr};
228229
std::shared_ptr<ShuffleRowToColumnarConverter> row2ColConverter_{nullptr};
230+
bool checksumEnabled_{true};
229231
};
230232

231233
class BoltShuffleReader {

bolt/shuffle/sparksql/CMakeLists.txt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,15 @@ bolt_add_library(
3434
BoltShuffleWriterV2.cpp
3535
CelebornReaderStreamIterator.cpp
3636
compression/Compression.cpp
37+
compression/Codec.cpp
38+
compression/GzipCodec.cpp
39+
compression/Lz4Codec.cpp
40+
compression/SnappyCodec.cpp
41+
compression/ZstdCodec.cpp
42+
compression/StreamCodec.cpp
43+
compression/ZstdStreamCodec.cpp
44+
compression/GzipStreamCodec.cpp
45+
compression/Lz4FrameStreamCodec.cpp
3746
partition_writer/LocalPartitionWriter.cpp
3847
partition_writer/PartitionWriter.cpp
3948
partition_writer/rss/CelebornPartitionWriter.cpp
@@ -84,3 +93,7 @@ endif()
8493
if(${BOLT_BUILD_TESTING})
8594
add_subdirectory(tests)
8695
endif()
96+
97+
if(${BOLT_ENABLE_BENCHMARKS})
98+
add_subdirectory(benchmarks)
99+
endif()

0 commit comments

Comments
 (0)