2929 */
3030
3131#include " bolt/connectors/hive/storage_adapters/s3fs/S3WriteFile.h"
32+ #include < folly/executors/CPUThreadPoolExecutor.h>
33+ #include < folly/executors/thread_factory/NamedThreadFactory.h>
34+ #include < folly/synchronization/ThrottledLifoSem.h>
3235#include " bolt/common/base/StatsReporter.h"
3336#include " bolt/connectors/hive/storage_adapters/s3fs/S3Counters.h"
3437#include " bolt/connectors/hive/storage_adapters/s3fs/S3Util.h"
@@ -52,14 +55,29 @@ class S3WriteFile::Impl {
5255 explicit Impl (
5356 std::string_view path,
5457 Aws::S3::S3Client* client,
55- memory::MemoryPool* pool)
58+ memory::MemoryPool* pool,
59+ S3Config* s3Config)
5660 : client_(client), pool_(pool) {
5761 BOLT_CHECK_NOT_NULL (client);
5862 BOLT_CHECK_NOT_NULL (pool);
63+ BOLT_CHECK_NOT_NULL (s3Config);
64+ partUploadSize_ = s3Config->partUploadSize ();
65+ if (s3Config->partUploadAsync ()) {
66+ maxConcurrentUploadNum_ = std::make_unique<folly::ThrottledLifoSem>(
67+ s3Config->maxConcurrentUploadNum ());
68+ if (!uploadThreadPool_) {
69+ uploadThreadPool_ = std::make_shared<folly::CPUThreadPoolExecutor>(
70+ s3Config->uploadThreads (),
71+ std::make_shared<folly::NamedThreadFactory>(" upload-thread" ));
72+ }
73+ } else {
74+ uploadThreadPool_ = nullptr ;
75+ }
76+
5977 getBucketAndKeyFromPath (path, bucket_, key_);
6078 currentPart_ = std::make_unique<dwio::common::DataBuffer<char >>(*pool_);
61- currentPart_->reserve (kPartUploadSize );
6279 // Check that the object doesn't exist, if it does throw an error.
80+ currentPart_->reserve (partUploadSize_);
6381 {
6482 Aws::S3::Model::HeadObjectRequest request;
6583 request.SetBucket (awsString (bucket_));
@@ -103,6 +121,7 @@ class S3WriteFile::Impl {
103121 // / (https://github.com/apache/arrow/issues/11934). So we instead default
104122 // / to application/octet-stream which is less misleading.
105123 request.SetContentType (kApplicationOctetStream );
124+ request.SetChecksumAlgorithm (Aws::S3::Model::ChecksumAlgorithm::CRC32);
106125 auto outcome = client_->CreateMultipartUpload (request);
107126 BOLT_CHECK_AWS_OUTCOME (
108127 outcome, " Failed initiating multiple part upload" , bucket_, key_);
@@ -115,7 +134,7 @@ class S3WriteFile::Impl {
115134 // Appends data to the end of the file.
116135 void append (std::string_view data) {
117136 BOLT_CHECK (!closed (), " File is closed" );
118- if (data.size () + currentPart_->size () >= kPartUploadSize ) {
137+ if (data.size () + currentPart_->size () >= partUploadSize_ ) {
119138 upload (data);
120139 } else {
121140 // Append to current part.
@@ -129,7 +148,7 @@ class S3WriteFile::Impl {
129148 BOLT_CHECK (!closed (), " File is closed" );
130149 // / currentPartSize must be less than kPartUploadSize since
131150 // / append() would have already flushed after reaching kUploadPartSize.
132- BOLT_CHECK_LT (currentPart_->size (), kPartUploadSize );
151+ BOLT_CHECK_LT (currentPart_->size (), partUploadSize_ );
133152 }
134153
135154 // Complete the multipart upload and close the file.
@@ -139,6 +158,20 @@ class S3WriteFile::Impl {
139158 }
140159 RECORD_METRIC_VALUE (kMetricS3StartedUploads );
141160 uploadPart ({currentPart_->data (), currentPart_->size ()}, true );
161+ if (uploadThreadPool_) {
162+ if (!futures_.empty ()) {
163+ folly::collectAll (std::move (futures_)).get ();
164+ }
165+ // The list of parts should be in ascending order.
166+ std::sort (
167+ uploadState_.completedParts .begin (),
168+ uploadState_.completedParts .end (),
169+ [](const Aws::S3::Model::CompletedPart& a,
170+ const Aws::S3::Model::CompletedPart& b) {
171+ return a.GetPartNumber () < b.GetPartNumber ();
172+ });
173+ }
174+
142175 BOLT_CHECK_EQ (uploadState_.partNumber , uploadState_.completedParts .size ());
143176 // Complete the multipart upload.
144177 {
@@ -172,7 +205,6 @@ class S3WriteFile::Impl {
172205 }
173206
174207 private:
175- static constexpr int64_t kPartUploadSize = 10 * 1024 * 1024 ;
176208 static constexpr const char * kApplicationOctetStream =
177209 " application/octet-stream" ;
178210
@@ -186,10 +218,9 @@ class S3WriteFile::Impl {
186218 int64_t partNumber = 0 ;
187219 Aws::String id;
188220 };
189- UploadState uploadState_;
190221
191- // Data can be smaller or larger than the kPartUploadSize .
192- // Complete the currentPart_ and upload kPartUploadSize chunks of data.
222+ // Data can be smaller or larger than the partUploadSize_ .
223+ // Complete the currentPart_ and upload partUploadSize_ chunks of data.
193224 // Save the remaining into currentPart_.
194225 void upload (const std::string_view data) {
195226 auto dataPtr = data.data ();
@@ -200,44 +231,90 @@ class S3WriteFile::Impl {
200231 uploadPart ({currentPart_->data (), currentPart_->size ()});
201232 dataPtr += remainingBufferSize;
202233 dataSize -= remainingBufferSize;
203- while (dataSize > kPartUploadSize ) {
204- uploadPart ({dataPtr, kPartUploadSize });
205- dataPtr += kPartUploadSize ;
206- dataSize -= kPartUploadSize ;
234+ while (dataSize > partUploadSize_ ) {
235+ uploadPart ({dataPtr, partUploadSize_ });
236+ dataPtr += partUploadSize_ ;
237+ dataSize -= partUploadSize_ ;
207238 }
208239 // Stash the remaining at the beginning of currentPart.
209240 currentPart_->unsafeAppend (0 , dataPtr, dataSize);
210241 }
211242
212243 void uploadPart (const std::string_view part, bool isLast = false ) {
213- // Only the last part can be less than kPartUploadSize.
214- BOLT_CHECK (isLast || (!isLast && (part.size () == kPartUploadSize )));
215- // Upload the part.
216- {
217- Aws::S3::Model::UploadPartRequest request;
218- request.SetBucket (bucket_);
219- request.SetKey (key_);
220- request.SetUploadId (uploadState_.id );
221- request.SetPartNumber (++uploadState_.partNumber );
222- request.SetContentLength (part.size ());
223- request.SetBody (
224- std::make_shared<StringViewStream>(part.data (), part.size ()));
225- auto outcome = client_->UploadPart (request);
226- BOLT_CHECK_AWS_OUTCOME (outcome, " Failed to upload" , bucket_, key_);
227- // Append ETag and part number for this uploaded part.
228- // This will be needed for upload completion in Close().
229- auto result = outcome.GetResult ();
230- Aws::S3::Model::CompletedPart part;
244+ // Only the last part can be less than partUploadSize_.
245+ BOLT_CHECK (isLast || part.size () == partUploadSize_);
246+ auto uploadPartSync = [&](const std::string_view partData) {
247+ Aws::S3::Model::CompletedPart completedPart =
248+ uploadPartSeq (uploadState_.id , ++uploadState_.partNumber , partData);
249+ uploadState_.completedParts .push_back (std::move (completedPart));
250+ };
251+ // If this is the last part and no parts have been uploaded yet,
252+ // use the synchronous upload method.
253+ bool useSyncUpload =
254+ !uploadThreadPool_ || (isLast && uploadState_.partNumber == 0 );
255+ if (useSyncUpload) {
256+ uploadPartSync (part);
257+ } else {
258+ uploadPartAsync (part);
259+ }
260+ }
231261
232- part.SetPartNumber (uploadState_.partNumber );
233- part.SetETag (result.GetETag ());
234- // Don't add the checksum to the part if the checksum is empty.
235- // Some filesystems such as IBM COS require this to be not set.
236- if (!result.GetChecksumCRC32 ().empty ()) {
237- part.SetChecksumCRC32 (result.GetChecksumCRC32 ());
238- }
239- uploadState_.completedParts .push_back (std::move (part));
262+ // Common logic for uploading a part.
263+ Aws::S3::Model::CompletedPart uploadPartSeq (
264+ const Aws::String& uploadId,
265+ const int64_t partNumber,
266+ const std::string_view part) {
267+ Aws::S3::Model::UploadPartRequest request;
268+ request.SetBucket (bucket_);
269+ request.SetKey (key_);
270+ request.SetUploadId (uploadId);
271+ request.SetPartNumber (partNumber);
272+ request.SetContentLength (part.size ());
273+ request.SetBody (
274+ std::make_shared<StringViewStream>(part.data (), part.size ()));
275+ // The default algorithm used is MD5. However, MD5 is not supported with
276+ // fips and can cause a SIGSEGV. Set CRC32 instead which is a standard for
277+ // checksum computation and is not restricted by fips.
278+ request.SetChecksumAlgorithm (Aws::S3::Model::ChecksumAlgorithm::CRC32);
279+ auto outcome = client_->UploadPart (request);
280+ BOLT_CHECK_AWS_OUTCOME (outcome, " Failed to upload" , bucket_, key_);
281+ // Append ETag and part number for this uploaded part.
282+ // This will be needed for upload completion in Close().
283+ auto result = outcome.GetResult ();
284+ Aws::S3::Model::CompletedPart completedPart;
285+ completedPart.SetPartNumber (partNumber);
286+ completedPart.SetETag (result.GetETag ());
287+ // Don't add the checksum to the part if the checksum is empty.
288+ // Some filesystems such as IBM COS require this to be not set.
289+ if (!result.GetChecksumCRC32 ().empty ()) {
290+ completedPart.SetChecksumCRC32 (result.GetChecksumCRC32 ());
240291 }
292+ return completedPart;
293+ }
294+
295+ // Upload the part asynchronously.
296+ void uploadPartAsync (const std::string_view part) {
297+ // NOLINT(readability-convert-member-functions-to-static)
298+ maxConcurrentUploadNum_->wait ();
299+ const int64_t partNumber = ++uploadState_.partNumber ;
300+ std::shared_ptr<std::string> partStr =
301+ std::make_shared<std::string>(part.data (), part.size ());
302+ futures_.emplace_back (
303+ folly::via (uploadThreadPool_.get (), [this , partNumber, partStr]() {
304+ SCOPE_EXIT {
305+ maxConcurrentUploadNum_->post ();
306+ };
307+ try {
308+ Aws::S3::Model::CompletedPart completedPart =
309+ uploadPartSeq (uploadState_.id , partNumber, *partStr);
310+ std::lock_guard<std::mutex> lock (uploadStateMutex_);
311+ uploadState_.completedParts .push_back (std::move (completedPart));
312+ } catch (const std::exception& e) {
313+ LOG (ERROR) << " Exception during async upload: " << e.what ();
314+ } catch (...) {
315+ LOG (ERROR) << " Unknown exception during async upload." ;
316+ }
317+ }));
241318 }
242319
243320 Aws::S3::S3Client* client_;
@@ -246,13 +323,22 @@ class S3WriteFile::Impl {
246323 std::string bucket_;
247324 std::string key_;
248325 size_t fileSize_ = -1 ;
326+ UploadState uploadState_;
327+ std::mutex uploadStateMutex_;
328+ std::vector<folly::Future<folly::Unit>> futures_;
329+ size_t partUploadSize_;
330+ // maxConcurrentUploadNum_ controls the concurrency of asynchronous uploads to
331+ // S3 for each S3WriteFile, preventing excessive memory usage.
332+ std::unique_ptr<folly::ThrottledLifoSem> maxConcurrentUploadNum_;
333+ inline static std::shared_ptr<folly::CPUThreadPoolExecutor> uploadThreadPool_;
249334};
250335
251336S3WriteFile::S3WriteFile (
252337 std::string_view path,
253338 Aws::S3::S3Client* client,
254- memory::MemoryPool* pool) {
255- impl_ = std::make_shared<Impl>(path, client, pool);
339+ memory::MemoryPool* pool,
340+ S3Config* s3Config) {
341+ impl_ = std::make_shared<Impl>(path, client, pool, s3Config);
256342}
257343
258344void S3WriteFile::append (std::string_view data) {
0 commit comments