Skip to content

Commit 5729aa8

Browse files
authored
Handle NOT Ready GetOperationResponse for Scripts (#1479)
1 parent b4dec16 commit 5729aa8

3 files changed

Lines changed: 48 additions & 18 deletions

File tree

ydb/core/fq/libs/compute/ydb/events/events.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,10 @@ struct TEvYdbCompute {
117117
};
118118

119119
struct TEvGetOperationResponse : public NActors::TEventLocal<TEvGetOperationResponse, EvGetOperationResponse> {
120-
TEvGetOperationResponse(NYql::TIssues issues, NYdb::EStatus status)
120+
TEvGetOperationResponse(NYql::TIssues issues, NYdb::EStatus status, bool ready)
121121
: Issues(std::move(issues))
122122
, Status(status)
123+
, Ready(ready)
123124
{}
124125

125126
TEvGetOperationResponse(NYdb::NQuery::EExecStatus execStatus, Ydb::StatusIds::StatusCode statusCode, const TVector<Ydb::Query::ResultSetMeta>& resultSetsMeta, const Ydb::TableStats::QueryStats& queryStats, NYql::TIssues issues)
@@ -129,6 +130,7 @@ struct TEvYdbCompute {
129130
, QueryStats(queryStats)
130131
, Issues(std::move(issues))
131132
, Status(NYdb::EStatus::SUCCESS)
133+
, Ready(true)
132134
{}
133135

134136
NYdb::NQuery::EExecStatus ExecStatus = NYdb::NQuery::EExecStatus::Unspecified;
@@ -137,6 +139,7 @@ struct TEvYdbCompute {
137139
Ydb::TableStats::QueryStats QueryStats;
138140
NYql::TIssues Issues;
139141
NYdb::EStatus Status;
142+
bool Ready;
140143
};
141144

142145
struct TEvFetchScriptResultRequest : public NActors::TEventLocal<TEvFetchScriptResultRequest, EvFetchScriptResultRequest> {

ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,12 @@ class TStatusTrackerActor : public TBaseComputeActor<TStatusTrackerActor> {
119119
void Handle(const TEvYdbCompute::TEvGetOperationResponse::TPtr& ev) {
120120
const auto& response = *ev.Get()->Get();
121121

122+
if (response.Status == NYdb::EStatus::SUCCESS && !response.Ready) {
123+
LOG_D("GetOperation IS NOT READY, repeating");
124+
SendGetOperation(TDuration::MilliSeconds(BackoffTimer.NextBackoffMs()));
125+
return;
126+
}
127+
122128
if (response.Status == NYdb::EStatus::NOT_FOUND) { // FAILING / ABORTING_BY_USER / ABORTING_BY_SYSTEM
123129
LOG_I("Operation has been already removed");
124130
Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(response.Issues, response.Status, ExecStatus, ComputeStatus));

ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -68,16 +68,18 @@ class TYdbConnectorActor : public NActors::TActorBootstrapped<TYdbConnectorActor
6868
actorSystem->Send(
6969
recipient,
7070
MakeResponse<TEvYdbCompute::TEvExecuteScriptResponse>(
71+
database,
7172
response.Status().GetIssues(),
72-
response.Status().GetStatus(), database),
73+
response.Status().GetStatus()),
7374
0, cookie);
7475
}
7576
} catch (...) {
7677
actorSystem->Send(
7778
recipient,
7879
MakeResponse<TEvYdbCompute::TEvExecuteScriptResponse>(
80+
database,
7981
CurrentExceptionMessage(),
80-
NYdb::EStatus::GENERIC_ERROR, database),
82+
NYdb::EStatus::GENERIC_ERROR),
8183
0, cookie);
8284
}
8385
});
@@ -89,22 +91,35 @@ class TYdbConnectorActor : public NActors::TActorBootstrapped<TYdbConnectorActor
8991
.Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie, database = ComputeConnection.database()](auto future) {
9092
try {
9193
auto response = future.ExtractValueSync();
92-
if (response.Id().GetKind() != Ydb::TOperationId::UNUSED) {
94+
if (!response.Ready()) {
95+
actorSystem->Send(
96+
recipient,
97+
MakeResponse<TEvYdbCompute::TEvGetOperationResponse>(
98+
database,
99+
response.Status().GetIssues(),
100+
response.Status().GetStatus(),
101+
false),
102+
0, cookie);
103+
} else if (response.Id().GetKind() != Ydb::TOperationId::UNUSED) {
93104
actorSystem->Send(recipient, new TEvYdbCompute::TEvGetOperationResponse(response.Metadata().ExecStatus, static_cast<Ydb::StatusIds::StatusCode>(response.Status().GetStatus()), response.Metadata().ResultSetsMeta, response.Metadata().ExecStats, RemoveDatabaseFromIssues(response.Status().GetIssues(), database)), 0, cookie);
94105
} else {
95106
actorSystem->Send(
96107
recipient,
97108
MakeResponse<TEvYdbCompute::TEvGetOperationResponse>(
109+
database,
98110
response.Status().GetIssues(),
99-
response.Status().GetStatus(), database),
111+
response.Status().GetStatus(),
112+
true),
100113
0, cookie);
101114
}
102115
} catch (...) {
103116
actorSystem->Send(
104117
recipient,
105118
MakeResponse<TEvYdbCompute::TEvGetOperationResponse>(
119+
database,
106120
CurrentExceptionMessage(),
107-
NYdb::EStatus::GENERIC_ERROR, database),
121+
NYdb::EStatus::GENERIC_ERROR,
122+
true),
108123
0, cookie);
109124
}
110125
});
@@ -124,16 +139,18 @@ class TYdbConnectorActor : public NActors::TActorBootstrapped<TYdbConnectorActor
124139
actorSystem->Send(
125140
recipient,
126141
MakeResponse<TEvYdbCompute::TEvFetchScriptResultResponse>(
142+
database,
127143
response.GetIssues(),
128-
response.GetStatus(), database),
144+
response.GetStatus()),
129145
0, cookie);
130146
}
131147
} catch (...) {
132148
actorSystem->Send(
133149
recipient,
134150
MakeResponse<TEvYdbCompute::TEvFetchScriptResultResponse>(
151+
database,
135152
CurrentExceptionMessage(),
136-
NYdb::EStatus::GENERIC_ERROR, database),
153+
NYdb::EStatus::GENERIC_ERROR),
137154
0, cookie);
138155
}
139156
});
@@ -148,15 +165,17 @@ class TYdbConnectorActor : public NActors::TActorBootstrapped<TYdbConnectorActor
148165
actorSystem->Send(
149166
recipient,
150167
MakeResponse<TEvYdbCompute::TEvCancelOperationResponse>(
168+
database,
151169
response.GetIssues(),
152-
response.GetStatus(), database),
170+
response.GetStatus()),
153171
0, cookie);
154172
} catch (...) {
155173
actorSystem->Send(
156174
recipient,
157175
MakeResponse<TEvYdbCompute::TEvCancelOperationResponse>(
176+
database,
158177
CurrentExceptionMessage(),
159-
NYdb::EStatus::GENERIC_ERROR, database),
178+
NYdb::EStatus::GENERIC_ERROR),
160179
0, cookie);
161180
}
162181
});
@@ -171,28 +190,30 @@ class TYdbConnectorActor : public NActors::TActorBootstrapped<TYdbConnectorActor
171190
actorSystem->Send(
172191
recipient,
173192
MakeResponse<TEvYdbCompute::TEvForgetOperationResponse>(
193+
database,
174194
response.GetIssues(),
175-
response.GetStatus(), database),
195+
response.GetStatus()),
176196
0, cookie);
177197
} catch (...) {
178198
actorSystem->Send(
179199
recipient,
180200
MakeResponse<TEvYdbCompute::TEvForgetOperationResponse>(
201+
database,
181202
CurrentExceptionMessage(),
182-
NYdb::EStatus::GENERIC_ERROR, database),
203+
NYdb::EStatus::GENERIC_ERROR),
183204
0, cookie);
184205
}
185206
});
186207
}
187208

188-
template<typename TResponse>
189-
static TResponse* MakeResponse(TString msg, NYdb::EStatus status, TString databasePath) {
190-
return new TResponse(NYql::TIssues{NYql::TIssue{RemoveDatabaseFromStr(msg, databasePath)}}, status);
209+
template<typename TResponse, typename... TArgs>
210+
static TResponse* MakeResponse(TString databasePath, TString msg, TArgs&&... args) {
211+
return new TResponse(NYql::TIssues{NYql::TIssue{RemoveDatabaseFromStr(msg, databasePath)}}, std::forward<TArgs>(args)...);
191212
}
192213

193-
template<typename TResponse>
194-
static TResponse* MakeResponse(const NYql::TIssues& issues, NYdb::EStatus status, TString databasePath) {
195-
return new TResponse(RemoveDatabaseFromIssues(issues, databasePath), status);
214+
template<typename TResponse, typename... TArgs>
215+
static TResponse* MakeResponse(TString databasePath, const NYql::TIssues& issues, TArgs&&... args) {
216+
return new TResponse(RemoveDatabaseFromIssues(issues, databasePath), std::forward<TArgs>(args)...);
196217
}
197218

198219
private:

0 commit comments

Comments
 (0)