Skip to content

Commit 8b3f859

Browse files
authored
Support cancel/forget/list forced compaction methods in RPC (#34676)
1 parent 09243bb commit 8b3f859

6 files changed

Lines changed: 91 additions & 2 deletions

File tree

ydb/core/grpc_services/rpc_cancel_operation.cpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <ydb/core/grpc_services/rpc_common/rpc_common.h>
99
#include <ydb/core/tx/schemeshard/schemeshard_build_index.h>
1010
#include <ydb/core/tx/schemeshard/schemeshard_export.h>
11+
#include <ydb/core/tx/schemeshard/schemeshard_forced_compaction.h>
1112
#include <ydb/core/tx/schemeshard/schemeshard_import.h>
1213
#include <ydb/library/actors/core/hfunc.h>
1314
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/library/operation_id/operation_id.h>
@@ -41,6 +42,8 @@ class TCancelOperationRPC: public TRpcOperationRequestActor<TCancelOperationRPC,
4142
return "[CancelIncrementalBackup]";
4243
case TOperationId::RESTORE:
4344
return "[CancelBackupCollectionRestore]";
45+
case TOperationId::COMPACTION:
46+
return "[CancelForcedCompaction]";
4447
default:
4548
return "[Untagged]";
4649
}
@@ -54,6 +57,8 @@ class TCancelOperationRPC: public TRpcOperationRequestActor<TCancelOperationRPC,
5457
return new TEvImport::TEvCancelImportRequest(TxId, GetDatabaseName(), RawOperationId);
5558
case TOperationId::BUILD_INDEX:
5659
return new TEvIndexBuilder::TEvCancelRequest(TxId, GetDatabaseName(), RawOperationId);
60+
case TOperationId::COMPACTION:
61+
return new TEvForcedCompaction::TEvCancelRequest(TxId, GetDatabaseName(), RawOperationId);
5762
default:
5863
Y_ABORT("unreachable");
5964
}
@@ -63,7 +68,8 @@ class TCancelOperationRPC: public TRpcOperationRequestActor<TCancelOperationRPC,
6368
const TOperationId::EKind kind = OperationId.GetKind();
6469
return kind == TOperationId::EXPORT
6570
|| kind == TOperationId::IMPORT
66-
|| kind == TOperationId::BUILD_INDEX;
71+
|| kind == TOperationId::BUILD_INDEX
72+
|| kind == TOperationId::COMPACTION;
6773
}
6874

6975
void Handle(TEvExport::TEvCancelExportResponse::TPtr& ev) {
@@ -93,6 +99,15 @@ class TCancelOperationRPC: public TRpcOperationRequestActor<TCancelOperationRPC,
9399
Reply(record.GetStatus(), record.GetIssues());
94100
}
95101

102+
void Handle(TEvForcedCompaction::TEvCancelResponse::TPtr& ev) {
103+
const auto& record = ev->Get()->Record;
104+
105+
LOG_D("Handle TEvForcedCompaction::TEvCancelResponse"
106+
<< ": record# " << record.ShortDebugString());
107+
108+
Reply(record.GetStatus(), record.GetIssues());
109+
}
110+
96111
public:
97112
using TRpcOperationRequestActor::TRpcOperationRequestActor;
98113

@@ -106,6 +121,7 @@ class TCancelOperationRPC: public TRpcOperationRequestActor<TCancelOperationRPC,
106121
case TOperationId::EXPORT:
107122
case TOperationId::IMPORT:
108123
case TOperationId::BUILD_INDEX:
124+
case TOperationId::COMPACTION:
109125
if (!TryGetId(OperationId, RawOperationId)) {
110126
return Reply(StatusIds::BAD_REQUEST, TIssuesIds::DEFAULT_ERROR, "Unable to extract operation id");
111127
}
@@ -140,6 +156,7 @@ class TCancelOperationRPC: public TRpcOperationRequestActor<TCancelOperationRPC,
140156
hFunc(TEvExport::TEvCancelExportResponse, Handle);
141157
hFunc(TEvImport::TEvCancelImportResponse, Handle);
142158
hFunc(TEvIndexBuilder::TEvCancelResponse, Handle);
159+
hFunc(TEvForcedCompaction::TEvCancelResponse, Handle);
143160
hFunc(NKqp::TEvCancelScriptExecutionOperationResponse, Handle);
144161
default:
145162
return StateBase(ev);

ydb/core/grpc_services/rpc_forget_operation.cpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <ydb/core/tx/schemeshard/schemeshard_backup.h>
1111
#include <ydb/core/tx/schemeshard/schemeshard_build_index.h>
1212
#include <ydb/core/tx/schemeshard/schemeshard_export.h>
13+
#include <ydb/core/tx/schemeshard/schemeshard_forced_compaction.h>
1314
#include <ydb/core/tx/schemeshard/schemeshard_import.h>
1415
#include <ydb/library/actors/core/hfunc.h>
1516
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/library/operation_id/operation_id.h>
@@ -41,6 +42,8 @@ class TForgetOperationRPC: public TRpcOperationRequestActor<TForgetOperationRPC,
4142
return "[ForgetIncrementalBackup]";
4243
case TOperationId::RESTORE:
4344
return "[ForgetBackupCollectionRestore]";
45+
case TOperationId::COMPACTION:
46+
return "[ForgetForcedCompaction]";
4447
default:
4548
return "[Untagged]";
4649
}
@@ -58,6 +61,8 @@ class TForgetOperationRPC: public TRpcOperationRequestActor<TForgetOperationRPC,
5861
return new TEvBackup::TEvForgetIncrementalBackupRequest(TxId, GetDatabaseName(), RawOperationId);
5962
case TOperationId::RESTORE:
6063
return new TEvBackup::TEvForgetBackupCollectionRestoreRequest(TxId, GetDatabaseName(), RawOperationId);
64+
case TOperationId::COMPACTION:
65+
return new TEvForcedCompaction::TEvForgetRequest(TxId, GetDatabaseName(), RawOperationId);
6166
default:
6267
Y_ABORT("unreachable");
6368
}
@@ -69,7 +74,8 @@ class TForgetOperationRPC: public TRpcOperationRequestActor<TForgetOperationRPC,
6974
|| kind == TOperationId::IMPORT
7075
|| kind == TOperationId::BUILD_INDEX
7176
|| kind == TOperationId::INCREMENTAL_BACKUP
72-
|| kind == TOperationId::RESTORE;
77+
|| kind == TOperationId::RESTORE
78+
|| kind == TOperationId::COMPACTION;
7379
}
7480

7581
void Handle(TEvExport::TEvForgetExportResponse::TPtr& ev) {
@@ -99,6 +105,15 @@ class TForgetOperationRPC: public TRpcOperationRequestActor<TForgetOperationRPC,
99105
Reply(record.GetStatus(), record.GetIssues());
100106
}
101107

108+
void Handle(TEvForcedCompaction::TEvForgetResponse::TPtr& ev) {
109+
const auto& record = ev->Get()->Record;
110+
111+
LOG_D("Handle TEvForcedCompaction::TEvForgetResponse"
112+
<< ": record# " << record.ShortDebugString());
113+
114+
Reply(record.GetStatus(), record.GetIssues());
115+
}
116+
102117
void Handle(NKqp::TEvForgetScriptExecutionOperationResponse::TPtr& ev) {
103118
google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> issuesProto;
104119
NYql::IssuesToMessage(ev->Get()->Issues, &issuesProto);
@@ -144,6 +159,7 @@ class TForgetOperationRPC: public TRpcOperationRequestActor<TForgetOperationRPC,
144159
case TOperationId::BUILD_INDEX:
145160
case TOperationId::INCREMENTAL_BACKUP:
146161
case TOperationId::RESTORE:
162+
case TOperationId::COMPACTION:
147163
if (!TryGetId(OperationId, RawOperationId)) {
148164
return Reply(StatusIds::BAD_REQUEST, TIssuesIds::DEFAULT_ERROR, "Unable to extract operation id");
149165
}
@@ -170,6 +186,7 @@ class TForgetOperationRPC: public TRpcOperationRequestActor<TForgetOperationRPC,
170186
hFunc(TEvExport::TEvForgetExportResponse, Handle);
171187
hFunc(TEvImport::TEvForgetImportResponse, Handle);
172188
hFunc(TEvIndexBuilder::TEvForgetResponse, Handle);
189+
hFunc(TEvForcedCompaction::TEvForgetResponse, Handle);
173190
hFunc(NKqp::TEvForgetScriptExecutionOperationResponse, Handle);
174191
hFunc(TEvBackup::TEvForgetIncrementalBackupResponse, Handle);
175192
hFunc(TEvBackup::TEvForgetBackupCollectionRestoreResponse, Handle);

ydb/core/grpc_services/rpc_get_operation.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <ydb/core/tx/schemeshard/schemeshard_backup.h>
1919
#include <ydb/core/tx/schemeshard/schemeshard_build_index.h>
2020
#include <ydb/core/tx/schemeshard/schemeshard_export.h>
21+
#include <ydb/core/tx/schemeshard/schemeshard_forced_compaction.h>
2122
#include <ydb/core/tx/schemeshard/schemeshard_import.h>
2223
#include <ydb/core/tx/tx_proxy/proxy.h>
2324
#include <ydb/library/actors/core/hfunc.h>
@@ -62,6 +63,8 @@ class TGetOperationRPC
6263
return "[GetIncrementalBackup]";
6364
case TOperationId::RESTORE:
6465
return "[GetBackupCollectionRestore]";
66+
case TOperationId::COMPACTION:
67+
return "[GetForcedCompaction]";
6568
default:
6669
return "[Untagged]";
6770
}
@@ -79,6 +82,8 @@ class TGetOperationRPC
7982
return new NSchemeShard::TEvBackup::TEvGetIncrementalBackupRequest(GetDatabaseName(), RawOperationId_);
8083
case TOperationId::RESTORE:
8184
return new NSchemeShard::TEvBackup::TEvGetBackupCollectionRestoreRequest(GetDatabaseName(), RawOperationId_);
85+
case TOperationId::COMPACTION:
86+
return new NSchemeShard::TEvForcedCompaction::TEvGetRequest(GetDatabaseName(), RawOperationId_);
8287
default:
8388
Y_ABORT("unreachable");
8489
}
@@ -111,6 +116,7 @@ class TGetOperationRPC
111116
case TOperationId::BUILD_INDEX:
112117
case TOperationId::INCREMENTAL_BACKUP:
113118
case TOperationId::RESTORE:
119+
case TOperationId::COMPACTION:
114120
if (!TryGetId(OperationId_, RawOperationId_)) {
115121
return ReplyWithStatus(StatusIds::BAD_REQUEST);
116122
}
@@ -138,6 +144,7 @@ class TGetOperationRPC
138144
HFunc(NSchemeShard::TEvExport::TEvGetExportResponse, Handle);
139145
HFunc(NSchemeShard::TEvImport::TEvGetImportResponse, Handle);
140146
HFunc(NSchemeShard::TEvIndexBuilder::TEvGetResponse, Handle);
147+
HFunc(NSchemeShard::TEvForcedCompaction::TEvGetResponse, Handle);
141148
HFunc(NKqp::TEvGetScriptExecutionOperationResponse, Handle);
142149
HFunc(NSchemeShard::TEvBackup::TEvGetIncrementalBackupResponse, Handle);
143150
HFunc(NSchemeShard::TEvBackup::TEvGetBackupCollectionRestoreResponse, Handle);
@@ -262,6 +269,22 @@ class TGetOperationRPC
262269
}
263270
}
264271

272+
void Handle(NSchemeShard::TEvForcedCompaction::TEvGetResponse::TPtr& ev, const TActorContext& ctx) {
273+
const auto& record = ev->Get()->Record;
274+
275+
LOG_D("Handle TEvForcedCompaction::TEvGetResponse"
276+
<< ": record# " << record.ShortDebugString());
277+
278+
if (record.GetStatus() != Ydb::StatusIds::SUCCESS) {
279+
ReplyGetOperationResponse(true, ctx, record.GetStatus());
280+
} else {
281+
TEvGetOperationRequest::TResponse resp;
282+
283+
::NKikimr::NGRpcService::ToOperation(record.GetForcedCompaction(), resp.mutable_operation());
284+
Reply(resp, ctx);
285+
}
286+
}
287+
265288
void Handle(NKqp::TEvGetScriptExecutionOperationResponse::TPtr& ev, const TActorContext& ctx) {
266289
TEvGetOperationRequest::TResponse resp;
267290
auto deferred = resp.mutable_operation();

ydb/core/grpc_services/rpc_list_operations.cpp

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include <ydb/core/tx/schemeshard/schemeshard_backup.h>
1515
#include <ydb/core/tx/schemeshard/schemeshard_build_index.h>
1616
#include <ydb/core/tx/schemeshard/schemeshard_export.h>
17+
#include <ydb/core/tx/schemeshard/schemeshard_forced_compaction.h>
1718
#include <ydb/core/tx/schemeshard/schemeshard_import.h>
1819
#include <ydb/core/tx/schemeshard/olap/bg_tasks/events/global.h>
1920
#include <ydb/library/actors/core/hfunc.h>
@@ -58,6 +59,8 @@ class TListOperationsRPC
5859
return "[ListIncrementalBackups]";
5960
case TOperationId::RESTORE:
6061
return "[ListBackupCollectionRestores]";
62+
case TOperationId::COMPACTION:
63+
return "[ListForcedCompactions]";
6164
default:
6265
return "[Untagged]";
6366
}
@@ -79,6 +82,8 @@ class TListOperationsRPC
7982
return new TEvBackup::TEvListIncrementalBackupsRequest(GetDatabaseName(), request.page_size(), request.page_token());
8083
case TOperationId::RESTORE:
8184
return new TEvBackup::TEvListBackupCollectionRestoresRequest(GetDatabaseName(), request.page_size(), request.page_token());
85+
case TOperationId::COMPACTION:
86+
return new TEvForcedCompaction::TEvListRequest(GetDatabaseName(), request.page_size(), request.page_token());
8287
default:
8388
Y_ABORT("unreachable");
8489
}
@@ -140,6 +145,26 @@ class TListOperationsRPC
140145
Reply(response);
141146
}
142147

148+
void Handle(TEvForcedCompaction::TEvListResponse::TPtr& ev) {
149+
const auto& record = ev->Get()->Record;
150+
151+
LOG_D("Handle TEvForcedCompaction::TEvListResponse"
152+
<< ": record# " << record.ShortDebugString());
153+
154+
TResponse response;
155+
156+
response.set_status(record.GetStatus());
157+
if (record.GetIssues().size()) {
158+
response.mutable_issues()->CopyFrom(record.GetIssues());
159+
}
160+
for (const auto& entry : record.GetEntries()) {
161+
auto operation = response.add_operations();
162+
::NKikimr::NGRpcService::ToOperation(entry, operation);
163+
}
164+
response.set_next_page_token(record.GetNextPageToken());
165+
Reply(response);
166+
}
167+
143168
void Handle(NSchemeShard::NBackground::TEvListResponse::TPtr& ev) {
144169
const auto& record = ev->Get()->Record;
145170

@@ -224,6 +249,7 @@ class TListOperationsRPC
224249
case TOperationId::SS_BG_TASKS:
225250
case TOperationId::INCREMENTAL_BACKUP:
226251
case TOperationId::RESTORE:
252+
case TOperationId::COMPACTION:
227253
break;
228254
case TOperationId::SCRIPT_EXECUTION:
229255
SendListScriptExecutions();
@@ -242,6 +268,7 @@ class TListOperationsRPC
242268
hFunc(TEvImport::TEvListImportsResponse, Handle);
243269
hFunc(NSchemeShard::NBackground::TEvListResponse, Handle);
244270
hFunc(TEvIndexBuilder::TEvListResponse, Handle);
271+
hFunc(TEvForcedCompaction::TEvListResponse, Handle);
245272
hFunc(NKqp::TEvListScriptExecutionOperationsResponse, Handle);
246273
hFunc(TEvBackup::TEvListIncrementalBackupsResponse, Handle);
247274
hFunc(TEvBackup::TEvListBackupCollectionRestoresResponse, Handle);

ydb/public/sdk/cpp/include/ydb-cpp-sdk/library/operation_id/operation_id.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class TOperationId {
2828
SS_BG_TASKS = 10,
2929
INCREMENTAL_BACKUP = 11,
3030
RESTORE = 12,
31+
COMPACTION = 13,
3132
};
3233

3334
struct TData {

ydb/public/sdk/cpp/src/library/operation_id/operation_id.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,10 @@ TOperationId::EKind ParseKind(const std::string_view value) {
326326
return TOperationId::RESTORE;
327327
}
328328

329+
if (value.starts_with("compaction")) {
330+
return TOperationId::COMPACTION;
331+
}
332+
329333
return TOperationId::UNUSED;
330334
}
331335

0 commit comments

Comments
 (0)