Skip to content

Commit 205e890

Browse files
authored
Merge db9dd74 into 45a24af
2 parents 45a24af + db9dd74 commit 205e890

20 files changed

Lines changed: 266 additions & 76 deletions

ydb/core/grpc_services/query/rpc_execute_query.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
289289
false, // keepSession
290290
false, // useCancelAfter
291291
syntax,
292-
true);
292+
true); // trailing support
293293

294294
if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release())) {
295295
NYql::TIssues issues;

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,8 +278,16 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
278278

279279
auto resultSize = ResponseEv->GetByteSize();
280280
if (resultSize > (int)ReplySizeLimit) {
281-
TString message = TStringBuilder() << "Query result size limit exceeded. ("
282-
<< resultSize << " > " << ReplySizeLimit << ")";
281+
TString message;
282+
if (ResponseEv->TxResults.size() == 1 && !ResponseEv->TxResults[0].QueryResultIndex.Defined()) {
283+
message = TStringBuilder() << "Intermediate data materialization exceeded size limit"
284+
<< " (" << resultSize << " > " << ReplySizeLimit << ")."
285+
<< " This usually happens when trying to write large amounts of data or to perform lookup"
286+
<< " by big collection of keys in single query. Consider using smaller batches of data.";
287+
} else {
288+
message = TStringBuilder() << "Query result size limit exceeded. ("
289+
<< resultSize << " > " << ReplySizeLimit << ")";
290+
}
283291

284292
auto issue = YqlIssue({}, TIssuesIds::KIKIMR_RESULT_UNAVAILABLE, message);
285293
ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED, issue);

ydb/core/kqp/executer_actor/kqp_executer.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,16 @@ struct TEvKqpExecuter {
2828
ui64 ResultRowsCount = 0;
2929
ui64 ResultRowsBytes = 0;
3030

31-
explicit TEvTxResponse(TTxAllocatorState::TPtr allocState)
31+
enum class EExecutionType {
32+
Data,
33+
Scan,
34+
Scheme,
35+
Literal,
36+
} ExecutionType;
37+
38+
TEvTxResponse(TTxAllocatorState::TPtr allocState, EExecutionType type)
3239
: AllocState(std::move(allocState))
40+
, ExecutionType(type)
3341
{}
3442

3543
~TEvTxResponse();

ydb/core/kqp/executer_actor/kqp_executer_impl.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,9 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
9696
for (auto& tx : request.Transactions) {
9797
if (txsType) {
9898
YQL_ENSURE(*txsType == tx.Body->GetType(), "Mixed physical tx types in executer.");
99-
YQL_ENSURE(*txsType == NKqpProto::TKqpPhyTx::TYPE_DATA, "Cannot execute multiple non-data physical txs.");
99+
YQL_ENSURE((*txsType == NKqpProto::TKqpPhyTx::TYPE_DATA)
100+
|| (*txsType == NKqpProto::TKqpPhyTx::TYPE_GENERIC),
101+
"Cannot execute multiple non-data physical txs.");
100102
} else {
101103
txsType = tx.Body->GetType();
102104
}

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,10 @@ namespace NKqp {
6464
#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "Ctx: " << *GetUserRequestContext() << ". " << stream)
6565
#define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "Ctx: " << *GetUserRequestContext() << ". " << stream)
6666

67-
enum class EExecType {
68-
Data,
69-
Scan
70-
};
67+
using EExecType = TEvKqpExecuter::TEvTxResponse::EExecutionType;
7168

7269
const ui64 MaxTaskSize = 48_MB;
70+
constexpr ui64 PotentialUnsigned64OverflowLimit = (std::numeric_limits<ui64>::max() >> 1);
7371

7472
std::pair<TString, TString> SerializeKqpTasksParametersForOlap(const TStageInfo& stageInfo, const TTask& task);
7573

@@ -114,6 +112,7 @@ struct TEvPrivate {
114112

115113
template <class TDerived, EExecType ExecType>
116114
class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
115+
static_assert(ExecType == EExecType::Data || ExecType == EExecType::Scan);
117116
public:
118117
TKqpExecuterBase(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
119118
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
@@ -140,7 +139,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
140139
TasksGraph.GetMeta().Database = Database;
141140
TasksGraph.GetMeta().ChannelTransportVersion = chanTransportVersion;
142141
TasksGraph.GetMeta().UserRequestContext = userRequestContext;
143-
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(Request.TxAlloc);
142+
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(Request.TxAlloc, ExecType);
144143
ResponseEv->Orbit = std::move(Request.Orbit);
145144
Stats = std::make_unique<TQueryExecutionStats>(Request.StatsMode, &TasksGraph,
146145
ResponseEv->Record.MutableResponse()->MutableResult()->MutableStats());
@@ -360,7 +359,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
360359
ui64 seqNo = ev->Get()->Record.GetSeqNo();
361360
i64 freeSpace = ev->Get()->Record.GetFreeSpace();
362361

363-
LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId
362+
LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId
364363
<< ", send ack to channelId: " << channelId
365364
<< ", seqNo: " << seqNo
366365
<< ", enough: " << ev->Get()->Record.GetEnough()
@@ -798,6 +797,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
798797

799798
auto ru = NRuCalc::CalcRequestUnit(consumption);
800799

800+
YQL_ENSURE(consumption.ReadIOStat.Rows < PotentialUnsigned64OverflowLimit);
801+
YQL_ENSURE(ru < PotentialUnsigned64OverflowLimit);
802+
801803
// Some heuristic to reduce overprice due to round part stats
802804
if (ru <= 100 && !force)
803805
return;
@@ -1661,6 +1663,10 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
16611663
}
16621664

16631665
void AbortExecutionAndDie(TActorId abortSender, NYql::NDqProto::StatusIds::StatusCode status, const TString& message) {
1666+
if (AlreadyReplied) {
1667+
return;
1668+
}
1669+
16641670
LOG_E("Abort execution: " << NYql::NDqProto::StatusIds_StatusCode_Name(status) << "," << message);
16651671
if (ExecuterSpan) {
16661672
ExecuterSpan.EndError(TStringBuilder() << NYql::NDqProto::StatusIds_StatusCode_Name(status));
@@ -1674,6 +1680,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
16741680
this->Send(Target, abortEv.Release());
16751681
}
16761682

1683+
AlreadyReplied = true;
16771684
LOG_E("Sending timeout response to: " << Target);
16781685
this->Send(Target, ResponseEv.release());
16791686

@@ -1685,6 +1692,10 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
16851692
virtual void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status,
16861693
google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>* issues)
16871694
{
1695+
if (AlreadyReplied) {
1696+
return;
1697+
}
1698+
16881699
if (Planner) {
16891700
for (auto computeActor : Planner->GetPendingComputeActors()) {
16901701
LOG_D("terminate compute actor " << computeActor.first);
@@ -1694,6 +1705,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
16941705
}
16951706
}
16961707

1708+
AlreadyReplied = true;
16971709
auto& response = *ResponseEv->Record.MutableResponse();
16981710

16991711
response.SetStatus(status);
@@ -1907,6 +1919,8 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
19071919
THashMap<ui64, TActorId> ResultChannelToComputeActor;
19081920
THashMap<NYql::NDq::TStageId, THashMap<ui64, TShardInfo>> SourceScanStageIdToParititions;
19091921

1922+
bool AlreadyReplied = false;
1923+
19101924
private:
19111925
static constexpr TDuration ResourceUsageUpdateInterval = TDuration::MilliSeconds(100);
19121926
};

ydb/core/kqp/executer_actor/kqp_executer_stats.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -746,7 +746,7 @@ void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& st
746746
for (auto& p2 : p.second.Egress) {
747747
ExportAggAsyncBufferStats(p2.second, (*stageStats.MutableEgress())[p2.first]);
748748
}
749-
}
749+
}
750750
}
751751

752752
void TQueryExecutionStats::Finish() {
@@ -761,7 +761,7 @@ void TQueryExecutionStats::Finish() {
761761
Result->SetDurationUs(FinishTs.MicroSeconds() - StartTs.MicroSeconds());
762762

763763
// Result->Result* feilds are (temporary?) commented out in proto due to lack of use
764-
//
764+
//
765765
// Result->SetResultBytes(ResultBytes);
766766
// Result->SetResultRows(ResultRows);
767767

@@ -838,6 +838,11 @@ void TProgressStatEntry::Out(IOutputStream& o) const {
838838
}
839839

840840
void TProgressStat::Set(const NDqProto::TDqComputeActorStats& stats) {
841+
if (Cur.Defined) {
842+
Cur = TEntry();
843+
}
844+
845+
Cur.Defined = true;
841846
Cur.ComputeTime += TDuration::MicroSeconds(stats.GetCpuTimeUs());
842847
for (auto& task : stats.GetTasks()) {
843848
for (auto& table: task.GetTables()) {
@@ -848,7 +853,7 @@ void TProgressStat::Set(const NDqProto::TDqComputeActorStats& stats) {
848853
}
849854

850855
TProgressStat::TEntry TProgressStat::GetLastUsage() const {
851-
return Cur - Total;
856+
return Cur.Defined ? Cur - Total : Cur;
852857
}
853858

854859
void TProgressStat::Update() {

ydb/core/kqp/executer_actor/kqp_executer_stats.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ struct TTableStat {
191191
struct TProgressStatEntry {
192192
TDuration ComputeTime;
193193
TTableStat ReadIOStat;
194+
bool Defined = false;
194195

195196
TProgressStatEntry& operator+=(const TProgressStatEntry& rhs);
196197

ydb/core/kqp/executer_actor/kqp_literal_executer.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,9 @@ class TKqpLiteralExecuter {
7979
, LiteralExecuterSpan(TWilsonKqp::LiteralExecuter, std::move(Request.TraceId), "LiteralExecuter")
8080
, UserRequestContext(userRequestContext)
8181
{
82-
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(Request.TxAlloc);
82+
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(
83+
Request.TxAlloc, TEvKqpExecuter::TEvTxResponse::EExecutionType::Literal);
84+
8385
ResponseEv->Orbit = std::move(Request.Orbit);
8486
Stats = std::make_unique<TQueryExecutionStats>(Request.StatsMode, &TasksGraph,
8587
ResponseEv->Record.MutableResponse()->MutableResult()->MutableStats());

ydb/core/kqp/executer_actor/kqp_result_channel.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ class TResultCommonChannelProxy : public NActors::TActor<TResultCommonChannelPro
5656
}
5757
} catch (const yexception& ex) {
5858
InternalError(ex.what());
59+
} catch (const NKikimr::TMemoryLimitExceededException& ex) {
60+
InternalError("Memory limit exceeded exception", NYql::NDqProto::StatusIds::PRECONDITION_FAILED);
5961
}
6062
}
6163

@@ -97,10 +99,10 @@ class TResultCommonChannelProxy : public NActors::TActor<TResultCommonChannelPro
9799
Send(ComputeActor, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ ChannelId);
98100
}
99101

100-
void InternalError(const TString& msg) {
102+
void InternalError(const TString& msg, const NYql::NDqProto::StatusIds_StatusCode& code = NYql::NDqProto::StatusIds::INTERNAL_ERROR) {
101103
LOG_CRIT_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER, msg);
102104

103-
auto evAbort = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, msg);
105+
auto evAbort = MakeHolder<TEvKqp::TEvAbortExecution>(code, msg);
104106
Send(Executer, evAbort.Release());
105107

106108
Become(&TResultCommonChannelProxy::DeadState);
@@ -112,7 +114,7 @@ class TResultCommonChannelProxy : public NActors::TActor<TResultCommonChannelPro
112114
switch (ev->GetTypeRewrite()) {
113115
hFunc(TEvents::TEvPoison, HandlePoison);
114116
}
115-
117+
116118
} catch(const yexception& ex) {
117119
InternalError(ex.what());
118120
}

ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
6969
YQL_ENSURE(PhyTx);
7070
YQL_ENSURE(PhyTx->GetType() == NKqpProto::TKqpPhyTx::TYPE_SCHEME);
7171

72-
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(nullptr);
72+
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(
73+
nullptr,
74+
TEvKqpExecuter::TEvTxResponse::EExecutionType::Scheme);
7375
}
7476

7577
void StartBuildOperation() {

0 commit comments

Comments
 (0)