Skip to content

Commit be50758

Browse files
committed
Handle exceptions in listing callback
1 parent 012caf5 commit be50758

1 file changed

Lines changed: 28 additions & 8 deletions

File tree

ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -354,21 +354,28 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
354354

355355
EvNextListingChunkReceived = EvBegin,
356356
EvRoundRobinStageTimeout,
357+
EvTransitToErrorState,
357358

358359
EvEnd
359360
};
360361
static_assert(
361362
EvEnd <= EventSpaceEnd(TEvents::ES_PRIVATE),
362363
"expected EvEnd <= EventSpaceEnd(TEvents::ES_PRIVATE)");
363364

364-
struct TEvNextListingChunkReceived :
365-
public TEventLocal<TEvNextListingChunkReceived, EvNextListingChunkReceived> {
365+
struct TEvNextListingChunkReceived : public TEventLocal<TEvNextListingChunkReceived, EvNextListingChunkReceived> {
366366
NS3Lister::TListResult ListingResult;
367367
TEvNextListingChunkReceived(NS3Lister::TListResult listingResult)
368368
: ListingResult(std::move(listingResult)){};
369369
};
370-
struct TEvRoundRobinStageTimeout :
371-
public TEventLocal<TEvRoundRobinStageTimeout, EvRoundRobinStageTimeout> {
370+
371+
struct TEvRoundRobinStageTimeout : public TEventLocal<TEvRoundRobinStageTimeout, EvRoundRobinStageTimeout> {
372+
};
373+
374+
struct TEvTransitToErrorState : public TEventLocal<TEvTransitToErrorState, EvTransitToErrorState> {
375+
explicit TEvTransitToErrorState(TIssues&& issues)
376+
: Issues(issues) {
377+
}
378+
TIssues Issues;
372379
};
373380
};
374381
using TBase = TActorBootstrapped<TS3FileQueueActor>;
@@ -437,6 +444,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
437444
hFunc(TEvS3FileQueue::TEvGetNextBatch, HandleGetNextBatch);
438445
hFunc(TEvPrivatePrivate::TEvNextListingChunkReceived, HandleNextListingChunkReceived);
439446
cFunc(TEvPrivatePrivate::EvRoundRobinStageTimeout, HandleRoundRobinStageTimeout);
447+
hFunc(TEvPrivatePrivate::TEvTransitToErrorState, HandleTransitToErrorState);
440448
cFunc(TEvents::TSystem::Poison, HandlePoison);
441449
default:
442450
MaybeIssues = TIssues{TIssue{TStringBuilder() << "An event with unknown type has been received: '" << etype << "'"}};
@@ -479,6 +487,11 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
479487
}
480488
}
481489

490+
void HandleTransitToErrorState(TEvPrivatePrivate::TEvTransitToErrorState::TPtr& ev) {
491+
MaybeIssues = ev->Get().Issues;
492+
TransitToErrorState();
493+
}
494+
482495
bool SaveRetrievedResults(const NS3Lister::TListResult& listingResult) {
483496
LOG_T("TS3FileQueueActor", "SaveRetrievedResults");
484497
if (std::holds_alternative<NS3Lister::TListError>(listingResult)) {
@@ -717,10 +730,17 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
717730
->Next()
718731
.Subscribe([actorSystem, selfId = SelfId()](
719732
const NThreading::TFuture<NS3Lister::TListResult>& future) {
720-
actorSystem->Send(
721-
selfId,
722-
new TEvPrivatePrivate::TEvNextListingChunkReceived(
723-
future.GetValue()));
733+
try {
734+
actorSystem->Send(
735+
selfId,
736+
new TEvPrivatePrivate::TEvNextListingChunkReceived(
737+
future.GetValue()));
738+
} catch (const std::exception& e) {
739+
actorSystem->Send(
740+
selfId,
741+
new TEvPrivatePrivate::TEvTransitToErrorState(
742+
TIssues{TIssue{TStringBuilder() << "An unknown exception has occurred: '" << e.what() << "'"}}));
743+
}
724744
});
725745
}
726746

0 commit comments

Comments
 (0)