Skip to content

Commit 1b98ee8

Browse files
authored
Merge 947bc67 into 5fe6486
2 parents 5fe6486 + 947bc67 commit 1b98ee8

5 files changed

Lines changed: 108 additions & 42 deletions

File tree

ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -160,20 +160,20 @@ struct TEvS3FileQueue {
160160
161161
EvEnd
162162
};
163-
static_assert(EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::ES_S3_FILE_QUEUE),
163+
static_assert(EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::ES_S3_FILE_QUEUE),
164164
"expect EvEnd < EventSpaceEnd(TEvents::ES_S3_FILE_QUEUE)");
165-
165+
166166
struct TEvUpdateConsumersCount :
167167
public TEventPB<TEvUpdateConsumersCount, NS3::FileQueue::TEvUpdateConsumersCount, EvUpdateConsumersCount> {
168-
168+
169169
explicit TEvUpdateConsumersCount(ui64 consumersCountDelta = 0) {
170170
Record.SetConsumersCountDelta(consumersCountDelta);
171171
}
172172
};
173173

174174
struct TEvAck :
175175
public TEventPB<TEvAck, NS3::FileQueue::TEvAck, EvAck> {
176-
176+
177177
TEvAck() = default;
178178

179179
explicit TEvAck(const TMessageTransportMeta& transportMeta) {
@@ -388,6 +388,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
388388
TPathList paths,
389389
size_t prefetchSize,
390390
ui64 fileSizeLimit,
391+
ui64 readLimit,
391392
bool useRuntimeListing,
392393
ui64 consumersCount,
393394
ui64 batchSizeLimit,
@@ -401,6 +402,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
401402
: TxId(std::move(txId))
402403
, PrefetchSize(prefetchSize)
403404
, FileSizeLimit(fileSizeLimit)
405+
, ReadLimit(readLimit)
404406
, MaybeIssues(Nothing())
405407
, UseRuntimeListing(useRuntimeListing)
406408
, ConsumersCount(consumersCount)
@@ -510,7 +512,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
510512
// skip 'directories'
511513
continue;
512514
}
513-
if (object.Size > FileSizeLimit) {
515+
if (object.Size > std::min(FileSizeLimit, ReadLimit)) {
514516
auto errorMessage = TStringBuilder()
515517
<< "Size of object " << object.Path << " = "
516518
<< object.Size
@@ -522,10 +524,10 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
522524
LOG_T("TS3FileQueueActor", "SaveRetrievedResults adding path: " << object.Path);
523525
TObjectPath objectPath;
524526
objectPath.SetPath(object.Path);
525-
objectPath.SetSize(object.Size);
527+
objectPath.SetSize(std::min(object.Size, ReadLimit));
526528
objectPath.SetPathIndex(CurrentDirectoryPathIndex);
527529
Objects.emplace_back(std::move(objectPath));
528-
ObjectsTotalSize += object.Size;
530+
ObjectsTotalSize += std::min(object.Size, ReadLimit);
529531
}
530532
return true;
531533
}
@@ -595,7 +597,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
595597
Send(ev->Sender, new TEvS3FileQueue::TEvObjectPathReadError(*MaybeIssues, ev->Get()->Record.GetTransportMeta()));
596598
TryFinish(ev->Sender, ev->Get()->Record.GetTransportMeta().GetSeqNo());
597599
}
598-
600+
599601
void HandleUpdateConsumersCount(TEvS3FileQueue::TEvUpdateConsumersCount::TPtr& ev) {
600602
if (!UpdatedConsumers.contains(ev->Sender)) {
601603
LOG_D(
@@ -649,7 +651,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
649651

650652
LOG_T("TS3FileQueueActor", "SendObjects Sending " << result.size() << " objects to consumer with id " << consumer);
651653
Send(consumer, new TEvS3FileQueue::TEvObjectPathBatch(std::move(result), HasNoMoreItems(), transportMeta));
652-
654+
653655
if (HasNoMoreItems()) {
654656
TryFinish(consumer, transportMeta.GetSeqNo());
655657
}
@@ -671,7 +673,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
671673
}
672674

673675
bool CanSendToConsumer(const TActorId& consumer) {
674-
return !UseRuntimeListing || RoundRobinStageFinished ||
676+
return !UseRuntimeListing || RoundRobinStageFinished ||
675677
(StartedConsumers.size() < ConsumersCount && !StartedConsumers.contains(consumer));
676678
}
677679

@@ -749,7 +751,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
749751
}
750752
});
751753
}
752-
754+
753755
void ScheduleRequest(const TActorId& consumer, const TMessageTransportMeta& transportMeta) {
754756
PendingRequests[consumer].push_back(transportMeta);
755757
HasPendingRequests = true;
@@ -786,7 +788,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
786788
}
787789
}
788790
}
789-
791+
790792
void TryFinish(const TActorId& consumer, ui64 seqNo) {
791793
LOG_T("TS3FileQueueActor", "TryFinish from consumer " << consumer << ", " << FinishedConsumers.size() << " consumers already finished, seqNo=" << seqNo);
792794
if (FinishingConsumerToLastSeqNo.contains(consumer)) {
@@ -810,6 +812,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
810812

811813
size_t PrefetchSize;
812814
ui64 FileSizeLimit;
815+
ui64 ReadLimit;
813816
TMaybe<NS3Lister::IS3Lister::TPtr> MaybeLister = Nothing();
814817
TMaybe<NThreading::TFuture<NS3Lister::TListResult>> ListingFuture;
815818
size_t CurrentDirectoryPathIndex = 0;
@@ -834,7 +837,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
834837
const TString Pattern;
835838
const ES3PatternVariant PatternVariant;
836839
const ES3PatternType PatternType;
837-
840+
838841
static constexpr TDuration PoisonTimeout = TDuration::Hours(3);
839842
static constexpr TDuration RoundRobinStageTimeout = TDuration::Seconds(3);
840843
};
@@ -914,6 +917,7 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA
914917
std::move(Paths),
915918
ReadActorFactoryCfg.MaxInflight * 2,
916919
FileSizeLimit,
920+
SizeLimit,
917921
false,
918922
1,
919923
FileQueueBatchSizeLimit,
@@ -1093,7 +1097,7 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA
10931097
void HandleAck(TEvS3FileQueue::TEvAck::TPtr& ev) {
10941098
FileQueueEvents.OnEventReceived(ev);
10951099
}
1096-
1100+
10971101
static void OnDownloadFinished(TActorSystem* actorSystem, TActorId selfId, const TString& requestId, IHTTPGateway::TResult&& result, size_t pathInd, const TString path) {
10981102
if (!result.Issues) {
10991103
actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadResult(std::move(result.Content), requestId, pathInd, path)));
@@ -1205,7 +1209,7 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA
12051209
auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "Error while reading file " << path << " with request id [" << requestId << "]", TIssues{result->Get()->Error});
12061210
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
12071211
}
1208-
1212+
12091213
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr&) {
12101214
FileQueueEvents.Retry();
12111215
}
@@ -2084,7 +2088,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
20842088
if (isCancelled) {
20852089
LOG_CORO_D("RunCoroBlockArrowParserOverHttp - STOPPED ON SATURATION, downloaded " <<
20862090
QueueBufferCounter->DownloadedBytes << " bytes");
2087-
break;
2091+
break;
20882092
}
20892093
}
20902094
}
@@ -2534,6 +2538,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
25342538
::NMonitoring::TDynamicCounterPtr counters,
25352539
::NMonitoring::TDynamicCounterPtr taskCounters,
25362540
ui64 fileSizeLimit,
2541+
ui64 readLimit,
25372542
std::optional<ui64> rowsLimitHint,
25382543
IMemoryQuotaManager::TPtr memoryQuotaManager,
25392544
bool useRuntimeListing,
@@ -2560,6 +2565,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
25602565
, TaskCounters(std::move(taskCounters))
25612566
, FileQueueActor(fileQueueActor)
25622567
, FileSizeLimit(fileSizeLimit)
2568+
, ReadLimit(readLimit)
25632569
, MemoryQuotaManager(memoryQuotaManager)
25642570
, UseRuntimeListing(useRuntimeListing)
25652571
, FileQueueBatchSizeLimit(fileQueueBatchSizeLimit)
@@ -2618,6 +2624,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
26182624
std::move(Paths),
26192625
ReadActorFactoryCfg.MaxInflight * 2,
26202626
FileSizeLimit,
2627+
ReadLimit,
26212628
false,
26222629
1,
26232630
FileQueueBatchSizeLimit,
@@ -2780,7 +2787,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
27802787
void CommitState(const NDqProto::TCheckpoint&) final {}
27812788

27822789
ui64 GetInputIndex() const final {
2783-
return InputIndex;
2790+
return InputIndex;
27842791
}
27852792

27862793
const TDqAsyncStats& GetIngressStats() const final {
@@ -3034,7 +3041,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
30343041
}
30353042
}
30363043
}
3037-
3044+
30383045
void Handle(TEvS3FileQueue::TEvAck::TPtr& ev) {
30393046
FileQueueEvents.OnEventReceived(ev);
30403047
}
@@ -3132,6 +3139,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
31323139
std::set<NActors::TActorId> CoroActors;
31333140
NActors::TActorId FileQueueActor;
31343141
const ui64 FileSizeLimit;
3142+
const ui64 ReadLimit;
31353143
bool Bootstrapped = false;
31363144
IMemoryQuotaManager::TPtr MemoryQuotaManager;
31373145
bool UseRuntimeListing;
@@ -3291,6 +3299,7 @@ IActor* CreateS3FileQueueActor(
32913299
TPathList paths,
32923300
size_t prefetchSize,
32933301
ui64 fileSizeLimit,
3302+
ui64 readLimit,
32943303
bool useRuntimeListing,
32953304
ui64 consumersCount,
32963305
ui64 batchSizeLimit,
@@ -3306,6 +3315,7 @@ IActor* CreateS3FileQueueActor(
33063315
paths,
33073316
prefetchSize,
33083317
fileSizeLimit,
3318+
readLimit,
33093319
useRuntimeListing,
33103320
consumersCount,
33113321
batchSizeLimit,
@@ -3390,15 +3400,15 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
33903400
if (params.GetRowsLimitHint() != 0) {
33913401
rowsLimitHint = params.GetRowsLimitHint();
33923402
}
3393-
3403+
33943404
TActorId fileQueueActor;
33953405
if (auto it = settings.find("fileQueueActor"); it != settings.cend()) {
33963406
NActorsProto::TActorId protoId;
33973407
TMemoryInput inputStream(it->second);
33983408
ParseFromTextFormat(inputStream, protoId);
33993409
fileQueueActor = ActorIdFromProto(protoId);
34003410
}
3401-
3411+
34023412
ui64 fileQueueBatchSizeLimit = 0;
34033413
if (auto it = settings.find("fileQueueBatchSizeLimit"); it != settings.cend()) {
34043414
fileQueueBatchSizeLimit = FromString<ui64>(it->second);
@@ -3408,7 +3418,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
34083418
if (auto it = settings.find("fileQueueBatchObjectCountLimit"); it != settings.cend()) {
34093419
fileQueueBatchObjectCountLimit = FromString<ui64>(it->second);
34103420
}
3411-
3421+
34123422
ui64 fileQueueConsumersCountDelta = 0;
34133423
if (readRanges.size() > 1) {
34143424
fileQueueConsumersCountDelta = readRanges.size() - 1;
@@ -3516,9 +3526,13 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
35163526

35173527
#undef SET_FLAG
35183528
#undef SUPPORTED_FLAGS
3529+
ui64 sizeLimit = std::numeric_limits<ui64>::max();
3530+
if (const auto it = settings.find("sizeLimit"); settings.cend() != it)
3531+
sizeLimit = FromString<ui64>(it->second);
3532+
35193533
const auto actor = new TS3StreamReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), authInfo, pathPattern, pathPatternVariant,
35203534
std::move(paths), addPathIndex, readSpec, computeActorId, retryPolicy,
3521-
cfg, counters, taskCounters, fileSizeLimit, rowsLimitHint, memoryQuotaManager,
3535+
cfg, counters, taskCounters, fileSizeLimit, sizeLimit, rowsLimitHint, memoryQuotaManager,
35223536
params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta);
35233537

35243538
return {actor, actor};

ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ NActors::IActor* CreateS3FileQueueActor(
1818
NS3Details::TPathList paths,
1919
size_t prefetchSize,
2020
ui64 fileSizeLimit,
21+
ui64 readLimit,
2122
bool useRuntimeListing,
2223
ui64 consumersCount,
2324
ui64 batchSizeLimit,

ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ class TS3DqIntegration: public TDqIntegrationBase {
410410

411411
auto fileQueueBatchObjectCountLimit = State_->Configuration->FileQueueBatchObjectCountLimit.Get().GetOrElse(1000);
412412
srcDesc.MutableSettings()->insert({"fileQueueBatchObjectCountLimit", ToString(fileQueueBatchObjectCountLimit)});
413-
413+
414414
YQL_CLOG(DEBUG, ProviderS3) << " useRuntimeListing=" << useRuntimeListing;
415415

416416
if (useRuntimeListing) {
@@ -422,8 +422,8 @@ class TS3DqIntegration: public TDqIntegrationBase {
422422
packed.Data().Literal().Value(),
423423
FromString<bool>(packed.IsText().Literal().Value()),
424424
paths);
425-
paths.insert(paths.end(),
426-
std::make_move_iterator(pathsChunk.begin()),
425+
paths.insert(paths.end(),
426+
std::make_move_iterator(pathsChunk.begin()),
427427
std::make_move_iterator(pathsChunk.end()));
428428
}
429429

@@ -434,11 +434,11 @@ class TS3DqIntegration: public TDqIntegrationBase {
434434
builder.AddPath(f.Path, f.Size, f.IsDirectory);
435435
});
436436
builder.Save(&range);
437-
437+
438438
TVector<TString> serialized(1);
439439
TStringOutput out(serialized.front());
440440
range.Save(&out);
441-
441+
442442
paths.clear();
443443
ReadPathsList(srcDesc, {}, serialized, paths);
444444

@@ -485,12 +485,18 @@ class TS3DqIntegration: public TDqIntegrationBase {
485485

486486
YQL_CLOG(DEBUG, ProviderS3) << " hasDirectories=" << hasDirectories << ", consumersCount=" << consumersCount;
487487

488+
ui64 readLimit = std::numeric_limits<ui64>::max();
489+
if (srcDesc.MutableSettings()->find("sizeLimit") != srcDesc.MutableSettings()->cend()) {
490+
readLimit = FromString<ui64>(srcDesc.MutableSettings()->at("sizeLimit"));
491+
}
492+
488493
auto fileQueueActor = NActors::TActivationContext::ActorSystem()->Register(
489494
NDq::CreateS3FileQueueActor(
490495
0ul,
491496
std::move(paths),
492497
fileQueuePrefetchSize,
493498
fileSizeLimit,
499+
readLimit,
494500
useRuntimeListing,
495501
consumersCount,
496502
fileQueueBatchSizeLimit,

ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -233,8 +233,15 @@ class TS3LogicalOptProposalTransformer : public TOptimizeTransformerBase {
233233
fileSizeLimit = it->second;
234234
}
235235
}
236+
237+
ui64 userSizeLimit = std::numeric_limits<ui64>::max();
236238
if (formatName == "parquet") {
237239
fileSizeLimit = State_->Configuration->BlockFileSizeLimit;
240+
} else if (formatName == "raw") {
241+
const auto sizeLimitParam = dqSource.Input().Cast<TS3SourceSettings>().SizeLimit().Maybe<TCoAtom>();
242+
if (sizeLimitParam.IsValid()) {
243+
userSizeLimit = FromString<ui64>(sizeLimitParam.Cast().StringValue());
244+
}
238245
}
239246

240247
for (const TS3Path& batch : maybeS3SourceSettings.Cast().Paths()) {
@@ -245,13 +252,13 @@ class TS3LogicalOptProposalTransformer : public TOptimizeTransformerBase {
245252
UnpackPathsList(packed, isTextEncoded, paths);
246253

247254
for (auto& entry : paths) {
248-
if (entry.Size > fileSizeLimit) {
255+
if (std::min(entry.Size, userSizeLimit) > fileSizeLimit) {
249256
ctx.AddError(TIssue(ctx.GetPosition(batch.Pos()),
250257
TStringBuilder() << "Size of object " << entry.Path << " = " << entry.Size << " and exceeds limit = " << fileSizeLimit << " specified for format " << formatName));
251258
hasErr = true;
252259
return false;
253260
}
254-
totalSize += entry.Size;
261+
totalSize += std::min(entry.Size, userSizeLimit);
255262
++count;
256263
}
257264
}
@@ -673,7 +680,7 @@ class TS3LogicalOptProposalTransformer : public TOptimizeTransformerBase {
673680
.RowsLimitHint(count.Literal())
674681
.Build()
675682
.Build()
676-
.Done();
683+
.Done();
677684
}
678685

679686
TMaybeNode<TExprBase> PushDownLimit(TExprBase node, TExprContext& ctx) const {

0 commit comments

Comments
 (0)