Skip to content

Commit 9260ee0

Browse files
authored
Fix. Return metadata in case of empty result for query service. (#2897)
1 parent 690b5b7 commit 9260ee0

5 files changed

Lines changed: 52 additions & 25 deletions

File tree

ydb/core/grpc_services/query/rpc_execute_query.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
289289
false, // keepSession
290290
false, // useCancelAfter
291291
syntax,
292-
true);
292+
true); // trailing support
293293

294294
if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release())) {
295295
NYql::TIssues issues;

ydb/core/kqp/query_data/kqp_query_data.cpp

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,12 @@ Ydb::ResultSet* TKqpExecuterTxResult::GetYdb(google::protobuf::Arena* arena, TMa
7777
return ydbResult;
7878
}
7979

80-
Ydb::ResultSet* TKqpExecuterTxResult::GetTrailingYdb(google::protobuf::Arena* arena) {
80+
Ydb::ResultSet* TKqpExecuterTxResult::ExtractTrailingYdb(google::protobuf::Arena* arena) {
8181
if (!HasTrailingResult)
8282
return nullptr;
8383

8484
Ydb::ResultSet* ydbResult = google::protobuf::Arena::CreateMessage<Ydb::ResultSet>(arena);
85-
if (TrailingResult.rows().size() > 0) {
86-
ydbResult->Swap(&TrailingResult);
87-
}
85+
ydbResult->Swap(&TrailingResult);
8886

8987
return ydbResult;
9088
}
@@ -237,10 +235,10 @@ NKikimrMiniKQL::TResult* TQueryData::GetMkqlTxResult(const NKqpProto::TKqpPhyRes
237235
return TxResults[txIndex][resultIndex].GetMkql(arena);
238236
}
239237

240-
Ydb::ResultSet* TQueryData::GetTrailingTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena) {
238+
Ydb::ResultSet* TQueryData::ExtractTrailingTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena) {
241239
auto txIndex = rb.GetTxResultBinding().GetTxIndex();
242240
auto resultIndex = rb.GetTxResultBinding().GetResultIndex();
243-
return TxResults[txIndex][resultIndex].GetTrailingYdb(arena);
241+
return TxResults[txIndex][resultIndex].ExtractTrailingYdb(arena);
244242
}
245243

246244

ydb/core/kqp/query_data/kqp_query_data.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ struct TKqpExecuterTxResult {
100100
NKikimrMiniKQL::TResult* GetMkql(google::protobuf::Arena* arena);
101101
NKikimrMiniKQL::TResult GetMkql();
102102
Ydb::ResultSet* GetYdb(google::protobuf::Arena* arena, TMaybe<ui64> rowsLimitPerWrite);
103-
Ydb::ResultSet* GetTrailingYdb(google::protobuf::Arena* arena);
103+
Ydb::ResultSet* ExtractTrailingYdb(google::protobuf::Arena* arena);
104104

105105
void FillMkql(NKikimrMiniKQL::TResult* mkqlResult);
106106
void FillYdb(Ydb::ResultSet* ydbResult, TMaybe<ui64> rowsLimitPerWrite);
@@ -253,7 +253,7 @@ class TQueryData : NMiniKQL::ITerminator {
253253
TTypedUnboxedValue GetTxResult(ui32 txIndex, ui32 resultIndex);
254254
NKikimrMiniKQL::TResult* GetMkqlTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena);
255255
Ydb::ResultSet* GetYdbTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena, TMaybe<ui64> rowsLimitPerWrite);
256-
Ydb::ResultSet* GetTrailingTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena);
256+
Ydb::ResultSet* ExtractTrailingTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena);
257257

258258
std::pair<NKikimr::NMiniKQL::TType*, NUdf::TUnboxedValue> GetInternalBindingValue(const NKqpProto::TKqpPhyParamBinding& paramBinding);
259259
TTypedUnboxedValue& GetParameterUnboxedValue(const TString& name);

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1656,7 +1656,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
16561656
size_t trailingResultsCount = 0;
16571657
for (size_t i = 0; i < phyQuery.ResultBindingsSize(); ++i) {
16581658
if (QueryState->IsStreamResult()) {
1659-
auto ydbResult = QueryState->QueryData->GetTrailingTxResult(
1659+
auto ydbResult = QueryState->QueryData->ExtractTrailingTxResult(
16601660
phyQuery.GetResultBindings(i), response->GetArena());
16611661

16621662
if (ydbResult) {

ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp

Lines changed: 44 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -140,26 +140,55 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
140140
auto kikimr = DefaultKikimrRunner();
141141
auto db = kikimr.GetQueryClient();
142142

143-
auto it = db.StreamExecuteQuery(R"(
144-
SELECT Key, Value2 FROM TwoShard WHERE Value2 > 0;
145-
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
146-
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
143+
{
144+
auto it = db.StreamExecuteQuery(R"(
145+
SELECT Key, Value2 FROM TwoShard WHERE Value2 > 0;
146+
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
147+
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
148+
149+
ui64 count = 0;
150+
for (;;) {
151+
auto streamPart = it.ReadNext().GetValueSync();
152+
if (!streamPart.IsSuccess()) {
153+
UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString());
154+
break;
155+
}
147156

148-
ui64 count = 0;
149-
for (;;) {
150-
auto streamPart = it.ReadNext().GetValueSync();
151-
if (!streamPart.IsSuccess()) {
152-
UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString());
153-
break;
157+
if (streamPart.HasResultSet()) {
158+
auto resultSet = streamPart.ExtractResultSet();
159+
count += resultSet.RowsCount();
160+
}
154161
}
155162

156-
if (streamPart.HasResultSet()) {
157-
auto resultSet = streamPart.ExtractResultSet();
158-
count += resultSet.RowsCount();
159-
}
163+
UNIT_ASSERT_VALUES_EQUAL(count, 2);
160164
}
161165

162-
UNIT_ASSERT_VALUES_EQUAL(count, 2);
166+
{
167+
auto it = db.StreamExecuteQuery(R"(
168+
SELECT Key, Value2 FROM TwoShard WHERE false ORDER BY Key > 0;
169+
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
170+
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
171+
172+
ui32 rsCount = 0;
173+
ui32 columns = 0;
174+
for (;;) {
175+
auto streamPart = it.ReadNext().GetValueSync();
176+
if (!streamPart.IsSuccess()) {
177+
UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString());
178+
break;
179+
}
180+
181+
if (streamPart.HasResultSet()) {
182+
auto resultSet = streamPart.ExtractResultSet();
183+
columns = resultSet.ColumnsCount();
184+
CompareYson(R"([])", FormatResultSetYson(resultSet));
185+
rsCount++;
186+
}
187+
}
188+
189+
UNIT_ASSERT_VALUES_EQUAL(rsCount, 1);
190+
UNIT_ASSERT_VALUES_EQUAL(columns, 2);
191+
}
163192
}
164193

165194
void CheckQueryResult(TExecuteQueryResult result) {

0 commit comments

Comments
 (0)