Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions bolt/connectors/hive/storage_adapters/s3fs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,7 @@ if(BOLT_ENABLE_S3)
if(${BOLT_BUILD_TESTING})
add_subdirectory(tests)
endif()
if(${BOLT_BUILD_BENCHMARKS})
add_subdirectory(benchmark)
endif()
endif()
38 changes: 38 additions & 0 deletions bolt/connectors/hive/storage_adapters/s3fs/S3Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ class S3Config {
kRetryMode,
kUseProxyFromEnv,
kCredentialsProvider,
kPartUploadAsync,
kPartUploadSize,
kMaxConcurrentUploadNum,
kUploadThreads,
kEnd
};

Expand Down Expand Up @@ -129,6 +133,13 @@ class S3Config {
std::make_pair("use-proxy-from-env", "false")},
{Keys::kCredentialsProvider,
std::make_pair("aws-credentials-provider", std::nullopt)},
{Keys::kPartUploadAsync,
std::make_pair("part-upload-async", "false")},
{Keys::kPartUploadSize,
std::make_pair("part-upload-size", "10485760")},
{Keys::kMaxConcurrentUploadNum,
std::make_pair("max-concurrent-upload-num", "4")},
{Keys::kUploadThreads, std::make_pair("upload-threads", "16")},
};
return config;
}
Expand Down Expand Up @@ -258,6 +269,33 @@ class S3Config {
return config_.find(Keys::kCredentialsProvider)->second;
}

/// If true, enables asynchronous upload of parts for S3 multipart uploads,
/// false otherwise.
bool partUploadAsync() const {
auto value = config_.find(Keys::kPartUploadAsync)->second.value();
return folly::to<bool>(value);
}

/// Return the size (in bytes) of each part for S3 multipart uploads.
int32_t partUploadSize() const {
auto value = config_.find(Keys::kPartUploadSize)->second.value();
return folly::to<uint32_t>(value);
}

/// Return the maximum number of concurrent uploads for S3 multipart uploads,
/// applicable only when asynchronous uploads are enabled.
int32_t maxConcurrentUploadNum() const {
auto value = config_.find(Keys::kMaxConcurrentUploadNum)->second.value();
return folly::to<uint32_t>(value);
}

/// Return the number of threads to use for S3 multipart uploads,
/// applicable only when asynchronous uploads are enabled.
int32_t uploadThreads() const {
auto value = config_.find(Keys::kUploadThreads)->second.value();
return folly::to<uint32_t>(value);
}

private:
std::unordered_map<Keys, std::optional<std::string>> config_;
std::string payloadSigningPolicy_;
Expand Down
10 changes: 8 additions & 2 deletions bolt/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ class S3FileSystem::Impl {

client_ = std::make_shared<Aws::S3::S3Client>(
credentialsProvider, nullptr /* endpointProvider */, clientConfig);
s3Config_ = std::make_shared<S3Config>(s3Config);
++fileSystemCount;
}

Expand Down Expand Up @@ -453,6 +454,10 @@ class S3FileSystem::Impl {
return client_.get();
}

S3Config* s3Config() const {
return s3Config_.get();
}

std::string getLogLevelName() const {
return getAwsInstance()->getLogLevelName();
}
Expand All @@ -463,6 +468,7 @@ class S3FileSystem::Impl {

private:
std::shared_ptr<Aws::S3::S3Client> client_;
std::shared_ptr<S3Config> s3Config_;
};

S3FileSystem::S3FileSystem(
Expand Down Expand Up @@ -494,8 +500,8 @@ std::unique_ptr<WriteFile> S3FileSystem::openFileForWrite(
std::string_view s3Path,
const FileOptions& options) {
const auto path = getPath(s3Path);
auto s3file =
std::make_unique<S3WriteFile>(path, impl_->s3Client(), options.pool);
auto s3file = std::make_unique<S3WriteFile>(
path, impl_->s3Client(), options.pool, impl_->s3Config());
return s3file;
}

Expand Down
166 changes: 126 additions & 40 deletions bolt/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
*/

#include "bolt/connectors/hive/storage_adapters/s3fs/S3WriteFile.h"
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/executors/thread_factory/NamedThreadFactory.h>
#include <folly/synchronization/ThrottledLifoSem.h>
#include "bolt/common/base/StatsReporter.h"
#include "bolt/connectors/hive/storage_adapters/s3fs/S3Counters.h"
#include "bolt/connectors/hive/storage_adapters/s3fs/S3Util.h"
Expand All @@ -52,14 +55,29 @@ class S3WriteFile::Impl {
explicit Impl(
std::string_view path,
Aws::S3::S3Client* client,
memory::MemoryPool* pool)
memory::MemoryPool* pool,
S3Config* s3Config)
: client_(client), pool_(pool) {
BOLT_CHECK_NOT_NULL(client);
BOLT_CHECK_NOT_NULL(pool);
BOLT_CHECK_NOT_NULL(s3Config);
partUploadSize_ = s3Config->partUploadSize();
if (s3Config->partUploadAsync()) {
maxConcurrentUploadNum_ = std::make_unique<folly::ThrottledLifoSem>(
s3Config->maxConcurrentUploadNum());
if (!uploadThreadPool_) {
uploadThreadPool_ = std::make_shared<folly::CPUThreadPoolExecutor>(
s3Config->uploadThreads(),
std::make_shared<folly::NamedThreadFactory>("upload-thread"));
}
} else {
uploadThreadPool_ = nullptr;
}

getBucketAndKeyFromPath(path, bucket_, key_);
currentPart_ = std::make_unique<dwio::common::DataBuffer<char>>(*pool_);
currentPart_->reserve(kPartUploadSize);
// Check that the object doesn't exist, if it does throw an error.
currentPart_->reserve(partUploadSize_);
{
Aws::S3::Model::HeadObjectRequest request;
request.SetBucket(awsString(bucket_));
Expand Down Expand Up @@ -103,6 +121,7 @@ class S3WriteFile::Impl {
/// (https://github.com/apache/arrow/issues/11934). So we instead default
/// to application/octet-stream which is less misleading.
request.SetContentType(kApplicationOctetStream);
request.SetChecksumAlgorithm(Aws::S3::Model::ChecksumAlgorithm::CRC32);
auto outcome = client_->CreateMultipartUpload(request);
BOLT_CHECK_AWS_OUTCOME(
outcome, "Failed initiating multiple part upload", bucket_, key_);
Expand All @@ -115,7 +134,7 @@ class S3WriteFile::Impl {
// Appends data to the end of the file.
void append(std::string_view data) {
BOLT_CHECK(!closed(), "File is closed");
if (data.size() + currentPart_->size() >= kPartUploadSize) {
if (data.size() + currentPart_->size() >= partUploadSize_) {
upload(data);
} else {
// Append to current part.
Expand All @@ -129,7 +148,7 @@ class S3WriteFile::Impl {
BOLT_CHECK(!closed(), "File is closed");
/// currentPartSize must be less than kPartUploadSize since
/// append() would have already flushed after reaching kUploadPartSize.
BOLT_CHECK_LT(currentPart_->size(), kPartUploadSize);
BOLT_CHECK_LT(currentPart_->size(), partUploadSize_);
}

// Complete the multipart upload and close the file.
Expand All @@ -139,6 +158,20 @@ class S3WriteFile::Impl {
}
RECORD_METRIC_VALUE(kMetricS3StartedUploads);
uploadPart({currentPart_->data(), currentPart_->size()}, true);
if (uploadThreadPool_) {
if (!futures_.empty()) {
folly::collectAll(std::move(futures_)).get();
}
// The list of parts should be in ascending order.
std::sort(
uploadState_.completedParts.begin(),
uploadState_.completedParts.end(),
[](const Aws::S3::Model::CompletedPart& a,
const Aws::S3::Model::CompletedPart& b) {
return a.GetPartNumber() < b.GetPartNumber();
});
}

BOLT_CHECK_EQ(uploadState_.partNumber, uploadState_.completedParts.size());
// Complete the multipart upload.
{
Expand Down Expand Up @@ -172,7 +205,6 @@ class S3WriteFile::Impl {
}

private:
static constexpr int64_t kPartUploadSize = 10 * 1024 * 1024;
static constexpr const char* kApplicationOctetStream =
"application/octet-stream";

Expand All @@ -186,10 +218,9 @@ class S3WriteFile::Impl {
int64_t partNumber = 0;
Aws::String id;
};
UploadState uploadState_;

// Data can be smaller or larger than the kPartUploadSize.
// Complete the currentPart_ and upload kPartUploadSize chunks of data.
// Data can be smaller or larger than the partUploadSize_.
// Complete the currentPart_ and upload partUploadSize_ chunks of data.
// Save the remaining into currentPart_.
void upload(const std::string_view data) {
auto dataPtr = data.data();
Expand All @@ -200,44 +231,90 @@ class S3WriteFile::Impl {
uploadPart({currentPart_->data(), currentPart_->size()});
dataPtr += remainingBufferSize;
dataSize -= remainingBufferSize;
while (dataSize > kPartUploadSize) {
uploadPart({dataPtr, kPartUploadSize});
dataPtr += kPartUploadSize;
dataSize -= kPartUploadSize;
while (dataSize > partUploadSize_) {
uploadPart({dataPtr, partUploadSize_});
dataPtr += partUploadSize_;
dataSize -= partUploadSize_;
}
// Stash the remaining at the beginning of currentPart.
currentPart_->unsafeAppend(0, dataPtr, dataSize);
}

void uploadPart(const std::string_view part, bool isLast = false) {
// Only the last part can be less than kPartUploadSize.
BOLT_CHECK(isLast || (!isLast && (part.size() == kPartUploadSize)));
// Upload the part.
{
Aws::S3::Model::UploadPartRequest request;
request.SetBucket(bucket_);
request.SetKey(key_);
request.SetUploadId(uploadState_.id);
request.SetPartNumber(++uploadState_.partNumber);
request.SetContentLength(part.size());
request.SetBody(
std::make_shared<StringViewStream>(part.data(), part.size()));
auto outcome = client_->UploadPart(request);
BOLT_CHECK_AWS_OUTCOME(outcome, "Failed to upload", bucket_, key_);
// Append ETag and part number for this uploaded part.
// This will be needed for upload completion in Close().
auto result = outcome.GetResult();
Aws::S3::Model::CompletedPart part;
// Only the last part can be less than partUploadSize_.
BOLT_CHECK(isLast || part.size() == partUploadSize_);
auto uploadPartSync = [&](const std::string_view partData) {
Aws::S3::Model::CompletedPart completedPart =
uploadPartSeq(uploadState_.id, ++uploadState_.partNumber, partData);
uploadState_.completedParts.push_back(std::move(completedPart));
};
// If this is the last part and no parts have been uploaded yet,
// use the synchronous upload method.
bool useSyncUpload =
!uploadThreadPool_ || (isLast && uploadState_.partNumber == 0);
if (useSyncUpload) {
uploadPartSync(part);
} else {
uploadPartAsync(part);
}
}

part.SetPartNumber(uploadState_.partNumber);
part.SetETag(result.GetETag());
// Don't add the checksum to the part if the checksum is empty.
// Some filesystems such as IBM COS require this to be not set.
if (!result.GetChecksumCRC32().empty()) {
part.SetChecksumCRC32(result.GetChecksumCRC32());
}
uploadState_.completedParts.push_back(std::move(part));
// Common logic for uploading a part.
Aws::S3::Model::CompletedPart uploadPartSeq(
const Aws::String& uploadId,
const int64_t partNumber,
const std::string_view part) {
Aws::S3::Model::UploadPartRequest request;
request.SetBucket(bucket_);
request.SetKey(key_);
request.SetUploadId(uploadId);
request.SetPartNumber(partNumber);
request.SetContentLength(part.size());
request.SetBody(
std::make_shared<StringViewStream>(part.data(), part.size()));
// The default algorithm used is MD5. However, MD5 is not supported with
// fips and can cause a SIGSEGV. Set CRC32 instead which is a standard for
// checksum computation and is not restricted by fips.
request.SetChecksumAlgorithm(Aws::S3::Model::ChecksumAlgorithm::CRC32);
auto outcome = client_->UploadPart(request);
BOLT_CHECK_AWS_OUTCOME(outcome, "Failed to upload", bucket_, key_);
// Append ETag and part number for this uploaded part.
// This will be needed for upload completion in Close().
auto result = outcome.GetResult();
Aws::S3::Model::CompletedPart completedPart;
completedPart.SetPartNumber(partNumber);
completedPart.SetETag(result.GetETag());
// Don't add the checksum to the part if the checksum is empty.
// Some filesystems such as IBM COS require this to be not set.
if (!result.GetChecksumCRC32().empty()) {
completedPart.SetChecksumCRC32(result.GetChecksumCRC32());
}
return completedPart;
}

// Upload the part asynchronously.
void uploadPartAsync(const std::string_view part) {
// NOLINT(readability-convert-member-functions-to-static)
maxConcurrentUploadNum_->wait();
const int64_t partNumber = ++uploadState_.partNumber;
std::shared_ptr<std::string> partStr =
std::make_shared<std::string>(part.data(), part.size());
futures_.emplace_back(
folly::via(uploadThreadPool_.get(), [this, partNumber, partStr]() {
SCOPE_EXIT {
maxConcurrentUploadNum_->post();
};
try {
Aws::S3::Model::CompletedPart completedPart =
uploadPartSeq(uploadState_.id, partNumber, *partStr);
std::lock_guard<std::mutex> lock(uploadStateMutex_);
uploadState_.completedParts.push_back(std::move(completedPart));
} catch (const std::exception& e) {
LOG(ERROR) << "Exception during async upload: " << e.what();
} catch (...) {
LOG(ERROR) << "Unknown exception during async upload.";
}
}));
}

Aws::S3::S3Client* client_;
Expand All @@ -246,13 +323,22 @@ class S3WriteFile::Impl {
std::string bucket_;
std::string key_;
size_t fileSize_ = -1;
UploadState uploadState_;
std::mutex uploadStateMutex_;
std::vector<folly::Future<folly::Unit>> futures_;
size_t partUploadSize_;
// maxConcurrentUploadNum_ controls the concurrency of asynchronous uploads to
// S3 for each S3WriteFile, preventing excessive memory usage.
std::unique_ptr<folly::ThrottledLifoSem> maxConcurrentUploadNum_;
inline static std::shared_ptr<folly::CPUThreadPoolExecutor> uploadThreadPool_;
};

S3WriteFile::S3WriteFile(
std::string_view path,
Aws::S3::S3Client* client,
memory::MemoryPool* pool) {
impl_ = std::make_shared<Impl>(path, client, pool);
memory::MemoryPool* pool,
S3Config* s3Config) {
impl_ = std::make_shared<Impl>(path, client, pool, s3Config);
}

void S3WriteFile::append(std::string_view data) {
Expand Down
4 changes: 3 additions & 1 deletion bolt/connectors/hive/storage_adapters/s3fs/S3WriteFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

#include "bolt/common/file/File.h"
#include "bolt/common/memory/MemoryPool.h"
#include "bolt/connectors/hive/storage_adapters/s3fs/S3Config.h"

namespace Aws::S3 {
class S3Client;
Expand Down Expand Up @@ -64,7 +65,8 @@ class S3WriteFile : public WriteFile {
S3WriteFile(
std::string_view path,
Aws::S3::S3Client* client,
memory::MemoryPool* pool);
memory::MemoryPool* pool,
S3Config* s3Config);

/// Appends data to the end of the file.
/// Uploads a part on reaching part size limit.
Expand Down
Loading