Skip to content

Commit e8d75a4

Browse files
gridnevvvitMrLolthe1st
authored andcommitted
Refactor memory tracking (ydb-platform#4510)
1 parent 4bb02ec commit e8d75a4

5 files changed

Lines changed: 69 additions & 104 deletions

File tree

ydb/core/kqp/node_service/kqp_node_service.cpp

Lines changed: 8 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -285,12 +285,11 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
285285
auto requester = ev->Sender;
286286

287287
ui64 txId = msg.GetTxId();
288-
bool isScan = msg.HasSnapshot();
289288
const ui64 outputChunkMaxSize = msg.GetOutputChunkMaxSize();
290289

291290
YQL_ENSURE(msg.GetStartAllOrFail()); // todo: support partial start
292291

293-
LOG_D("TxId: " << txId << ", new " << (isScan ? "scan " : "") << "compute tasks request from " << requester
292+
LOG_D("TxId: " << txId << ", new compute tasks request from " << requester
294293
<< " with " << msg.GetTasks().size() << " tasks: " << TasksIdsStr(msg.GetTasks()));
295294

296295
NKqpNode::TTasksRequest request;
@@ -339,22 +338,10 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
339338
LOG_D("TxId: " << txId << ", task: " << taskCtx.TaskId << ", requested memory: " << taskCtx.Memory);
340339

341340
requestChannels += estimation.ChannelBuffersCount;
342-
request.TotalMemory += taskCtx.Memory;
343341
}
344342

345343
LOG_D("TxId: " << txId << ", channels: " << requestChannels
346-
<< ", computeActors: " << msg.GetTasks().size() << ", memory: " << request.TotalMemory);
347-
348-
auto txMemory = bucket.GetTxMemory(txId, memoryPool) + request.TotalMemory;
349-
if (txMemory > Config.GetQueryMemoryLimit()) {
350-
LOG_N("TxId: " << txId << ", requested too many memory: " << request.TotalMemory
351-
<< "(" << txMemory << " for this Tx), limit: " << Config.GetQueryMemoryLimit());
352-
353-
Counters->RmNotEnoughMemory->Inc();
354-
355-
return ReplyError(txId, request.Executer, msg, NKikimrKqp::TEvStartKqpTasksResponse::QUERY_MEMORY_LIMIT_EXCEEDED,
356-
TStringBuilder() << "Required: " << txMemory << ", limit: " << Config.GetQueryMemoryLimit());
357-
}
344+
<< ", computeActors: " << msg.GetTasks().size() << ", memory: " << request.CalculateTotalMemory());
358345

359346
TVector<ui64> allocatedTasks;
360347
allocatedTasks.reserve(msg.GetTasks().size());
@@ -368,29 +355,13 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
368355

369356
NRm::TKqpNotEnoughResources resourcesResponse;
370357
if (!ResourceManager()->AllocateResources(txId, task.first, resourcesRequest, &resourcesResponse)) {
371-
NKikimrKqp::TEvStartKqpTasksResponse::ENotStartedTaskReason failReason = NKikimrKqp::TEvStartKqpTasksResponse::INTERNAL_ERROR;
372-
TStringBuilder error;
373-
if (resourcesResponse.ScanQueryMemory()) {
374-
error << "TxId: " << txId << ", NodeId: " << SelfId().NodeId() << ", not enough memory, requested " << task.second.Memory;
375-
LOG_N(error);
376-
377-
failReason = NKikimrKqp::TEvStartKqpTasksResponse::NOT_ENOUGH_MEMORY;
378-
}
379-
380-
if (resourcesResponse.QueryMemoryLimit()) {
381-
error << "TxId: " << txId << ", NodeId: " << SelfId().NodeId() << ", memory limit exceeded, requested " << task.second.Memory;
382-
LOG_N(error);
383-
384-
failReason = NKikimrKqp::TEvStartKqpTasksResponse::QUERY_MEMORY_LIMIT_EXCEEDED;
385-
}
386-
387358
for (ui64 taskId : allocatedTasks) {
388359
ResourceManager()->FreeResources(txId, taskId);
389360
}
390361

391362
ResourceManager()->FreeExecutionUnits(executionUnits);
392363

393-
ReplyError(txId, request.Executer, msg, failReason, error);
364+
ReplyError(txId, request.Executer, msg, resourcesResponse.GetStatus(), resourcesResponse.GetFailReason());
394365
return;
395366
}
396367

@@ -553,19 +524,13 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
553524

554525
void TerminateTx(ui64 txId, const TString& reason) {
555526
auto& bucket = GetStateBucketByTx(Buckets, txId);
556-
auto tasksToAbort = bucket.RemoveTx(txId);
527+
auto tasksToAbort = bucket.GetTasksByTxId(txId);
557528

558529
if (!tasksToAbort.empty()) {
559-
LOG_D("TxId: " << txId << ", cancel granted resources");
560-
ResourceManager()->FreeResources(txId);
561-
562-
for (const auto& tasksRequest: tasksToAbort) {
563-
ResourceManager()->FreeExecutionUnits(tasksRequest.InFlyTasks.size());
564-
for (const auto& [taskId, task] : tasksRequest.InFlyTasks) {
565-
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::UNSPECIFIED,
566-
reason);
567-
Send(task.ComputeActorId, abortEv.Release());
568-
}
530+
for (const auto& [taskId, computeActorId]: tasksToAbort) {
531+
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::UNSPECIFIED,
532+
reason);
533+
Send(computeActorId, abortEv.Release());
569534
}
570535
}
571536
}

ydb/core/kqp/node_service/kqp_node_state.h

Lines changed: 16 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -30,25 +30,25 @@ struct TTasksRequest {
3030
// when task is finished it will be removed from this map
3131
THashMap<ui64, TTaskContext> InFlyTasks;
3232
TInstant Deadline;
33-
ui64 TotalMemory = 0;
3433
TActorId Executer;
3534
TActorId TimeoutTimer;
3635
bool ExecutionCancelled = false;
36+
37+
ui64 CalculateTotalMemory() const {
38+
ui64 result = 0;
39+
for(auto [id, task]: InFlyTasks) {
40+
result += task.Memory;
41+
}
42+
return result;
43+
}
3744
};
3845

3946
struct TTxMeta {
40-
ui64 TotalMemory = 0;
4147
NRm::EKqpMemoryPool MemoryPool = NRm::EKqpMemoryPool::Unspecified;
4248
ui32 TotalComputeActors = 0;
4349
TInstant StartTime;
4450
};
4551

46-
struct TRemoveTaskContext {
47-
ui64 TotalMemory = 0;
48-
ui64 ComputeActorsNumber = 0;
49-
bool FinixTx = false;
50-
};
51-
5252
class TState {
5353
public:
5454
struct TRequestId {
@@ -57,7 +57,6 @@ class TState {
5757
};
5858

5959
struct TRemoveTaskContext {
60-
ui64 TotalMemory = 0;
6160
ui64 ComputeActorsNumber = 0;
6261
bool FinixTx = false;
6362
TActorId Requester;
@@ -73,18 +72,9 @@ class TState {
7372
return Requests.contains(std::make_pair(txId, requester));
7473
}
7574

76-
ui64 GetTxMemory(ui64 txId, NRm::EKqpMemoryPool memoryPool) const {
77-
TReadGuard guard(RWLock);
78-
if (auto* meta = Meta.FindPtr(txId)) {
79-
return meta->MemoryPool == memoryPool ? meta->TotalMemory : 0;
80-
}
81-
return 0;
82-
}
83-
8475
void NewRequest(ui64 txId, const TActorId& requester, TTasksRequest&& request, NRm::EKqpMemoryPool memoryPool) {
8576
TWriteGuard guard(RWLock);
8677
auto& meta = Meta[txId];
87-
meta.TotalMemory += request.TotalMemory;
8878
meta.TotalComputeActors += request.InFlyTasks.size();
8979
if (!meta.StartTime) {
9080
meta.StartTime = TAppData::TimeProvider->Now();
@@ -109,21 +99,15 @@ class TState {
10999

110100
auto taskIt = requestIt->second.InFlyTasks.find(taskId);
111101
if (taskIt != requestIt->second.InFlyTasks.end()) {
112-
auto task = std::move(taskIt->second);
113102
requestIt->second.InFlyTasks.erase(taskIt);
114-
115-
Y_DEBUG_ABORT_UNLESS(requestIt->second.TotalMemory >= task.Memory);
116-
requestIt->second.TotalMemory -= task.Memory;
117103
requestIt->second.ExecutionCancelled |= !success;
118104

119105
auto& meta = Meta[txId];
120-
Y_DEBUG_ABORT_UNLESS(meta.TotalMemory >= task.Memory);
121106
Y_DEBUG_ABORT_UNLESS(meta.TotalComputeActors >= 1);
122-
meta.TotalMemory -= task.Memory;
123107
meta.TotalComputeActors--;
124108

125109
auto ret = TRemoveTaskContext{
126-
requestIt->second.TotalMemory, requestIt->second.InFlyTasks.size(), meta.TotalComputeActors == 0, senderIt->second
110+
requestIt->second.InFlyTasks.size(), meta.TotalComputeActors == 0, senderIt->second
127111
};
128112

129113
if (requestIt->second.InFlyTasks.empty()) {
@@ -157,24 +141,21 @@ class TState {
157141
return RemoveRequestImpl(txId, requester);
158142
}
159143

160-
std::vector<TTasksRequest> RemoveTx(ui64 txId) {
144+
// return the vector of pairs where the first element is a taskId
145+
// and the second one is the compute actor id associated with this task.
146+
std::vector<std::pair<ui64, TActorId>> GetTasksByTxId(ui64 txId) {
161147
TWriteGuard guard(RWLock);
162-
Meta.erase(txId);
163-
164148
YQL_ENSURE(Requests.size() == SenderIdsByTxId.size());
165149
const auto senders = SenderIdsByTxId.equal_range(txId);
166-
std::vector<TTasksRequest> ret;
150+
std::vector<std::pair<ui64, TActorId>> ret;
167151
for (auto senderIt = senders.first; senderIt != senders.second; ++senderIt) {
168152
auto requestIt = Requests.find(*senderIt);
169153
YQL_ENSURE(requestIt != Requests.end());
170-
171-
ret.push_back(std::move(requestIt->second));
172-
Requests.erase(requestIt);
154+
for(const auto& [taskId, task] : requestIt->second.InFlyTasks) {
155+
ret.push_back({taskId, task.ComputeActorId});
156+
}
173157
}
174158

175-
SenderIdsByTxId.erase(txId);
176-
YQL_ENSURE(Requests.size() == SenderIdsByTxId.size());
177-
178159
return ret;
179160
}
180161

@@ -208,15 +189,13 @@ class TState {
208189
for (auto& [txId, requests] : byTx) {
209190
auto& meta = Meta[txId];
210191
str << " TxId: " << txId << Endl;
211-
str << " Memory: " << meta.TotalMemory << Endl;
212192
str << " MemoryPool: " << (ui32) meta.MemoryPool << Endl;
213193
str << " Compute actors: " << meta.TotalComputeActors << Endl;
214194
str << " Start time: " << meta.StartTime << Endl;
215195
str << " Requests:" << Endl;
216196
for (auto& [requester, request] : requests) {
217197
str << " Requester: " << requester << Endl;
218198
str << " Deadline: " << request->Deadline << Endl;
219-
str << " Memory: " << request->TotalMemory << Endl;
220199
str << " In-fly tasks:" << Endl;
221200
for (auto& [taskId, task] : request->InFlyTasks) {
222201
str << " Task: " << taskId << Endl;
@@ -250,9 +229,7 @@ class TState {
250229
YQL_ENSURE(Requests.size() == SenderIdsByTxId.size());
251230

252231
auto& meta = Meta[txId];
253-
Y_DEBUG_ABORT_UNLESS(meta.TotalMemory >= ret->TotalMemory);
254232
Y_DEBUG_ABORT_UNLESS(meta.TotalComputeActors >= 1);
255-
meta.TotalMemory -= ret->TotalMemory;
256233
meta.TotalComputeActors -= ret->InFlyTasks.size();
257234

258235
if (meta.TotalComputeActors == 0) {

ydb/core/kqp/node_service/kqp_node_ut.cpp

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -482,8 +482,8 @@ void KqpNode::NotEnoughMemory() {
482482
UNIT_ASSERT_VALUES_EQUAL(1, record.GetTxId());
483483
UNIT_ASSERT_VALUES_EQUAL(1, record.GetNotStartedTasks().size());
484484
auto& task = record.GetNotStartedTasks()[0];
485-
UNIT_ASSERT_EQUAL(NKikimrKqp::TEvStartKqpTasksResponse::QUERY_MEMORY_LIMIT_EXCEEDED, task.GetReason());
486-
UNIT_ASSERT_STRINGS_EQUAL("Required: 201000, limit: 30000", task.GetMessage());
485+
UNIT_ASSERT_EQUAL(NKikimrKqp::TEvStartKqpTasksResponse::NOT_ENOUGH_MEMORY, task.GetReason());
486+
UNIT_ASSERT_STRING_CONTAINS(task.GetMessage(), "Not enough memory for query, requested: 201000");
487487
}
488488

489489
AssertResourceBrokerSensors(0, 0, 0, 0, 0);
@@ -670,6 +670,13 @@ void KqpNode::ExecuterLost() {
670670
UNIT_ASSERT_VALUES_EQUAL("executer lost", abortEvent->Get()->Record.GetLegacyMessage());
671671
}
672672

673+
{
674+
NKikimr::TActorSystemStub stub;
675+
TMap<ui64, TMockComputeActor> Task2ActorEmpty;
676+
CompFactory->Task2Actor.swap(Task2ActorEmpty);
677+
Task2ActorEmpty.clear();
678+
}
679+
673680
size_t iterations = 30;
674681
while (KqpCounters->RmComputeActors->Val() != 0 && iterations > 0) {
675682
Sleep(TDuration::MilliSeconds(300));
@@ -731,6 +738,13 @@ void KqpNode::TerminateTx() {
731738
auto abortEvent = Runtime->GrabEdgeEvent<TEvKqp::TEvAbortExecution>(computeActor.ActorId);
732739
UNIT_ASSERT_VALUES_EQUAL("terminate", abortEvent->Get()->Record.GetLegacyMessage());
733740
}
741+
742+
{
743+
NKikimr::TActorSystemStub stub;
744+
TMap<ui64, TMockComputeActor> Task2ActorEmpty;
745+
CompFactory->Task2Actor.swap(Task2ActorEmpty);
746+
Task2ActorEmpty.clear();
747+
}
734748
}
735749

736750
UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmComputeActors->Val(), 0);

ydb/core/kqp/rm_service/kqp_rm_service.cpp

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -212,10 +212,12 @@ class TKqpResourceManager : public IKqpResourceManager {
212212

213213
with_lock (Lock) {
214214
if (Y_UNLIKELY(!ResourceBroker)) {
215-
LOG_AS_E("AllocateResources: not ready yet. TxId: " << txId << ", taskId: " << taskId);
215+
TStringBuilder reason;
216+
reason << "AllocateResources: not ready yet. TxId: " << txId << ", taskId: " << taskId;
216217
if (details) {
217-
details->SetNotReady();
218+
details->FailReason = reason;
218219
}
220+
219221
return false;
220222
}
221223

@@ -228,9 +230,11 @@ class TKqpResourceManager : public IKqpResourceManager {
228230

229231
if (!hasScanQueryMemory) {
230232
Counters->RmNotEnoughMemory->Inc();
231-
LOG_AS_N("TxId: " << txId << ", taskId: " << taskId << ". Not enough ScanQueryMemory, requested: " << resources.Memory);
233+
TStringBuilder reason;
234+
reason << "TxId: " << txId << ", taskId: " << taskId << ". Not enough memory for query, requested: " << resources.Memory;
232235
if (details) {
233-
details->SetScanQueryMemory();
236+
details->FailReason = reason;
237+
details->Status = NKikimrKqp::TEvStartKqpTasksResponse::NOT_ENOUGH_MEMORY;
234238
}
235239
return false;
236240
}
@@ -242,18 +246,21 @@ class TKqpResourceManager : public IKqpResourceManager {
242246
auto& txBucket = TxBucket(txId);
243247
with_lock (txBucket.Lock) {
244248
if (auto it = txBucket.Txs.find(txId); it != txBucket.Txs.end()) {
245-
if (it->second.TxScanQueryMemory + resources.Memory > queryMemoryLimit) {
249+
ui64 txTotalRequestedMemory = it->second.TxScanQueryMemory + resources.Memory;
250+
if (txTotalRequestedMemory > queryMemoryLimit) {
246251
auto unguard = ::Unguard(txBucket.Lock);
247252

248253
with_lock (Lock) {
249254
ScanQueryMemoryResource.Release(resources.Memory);
250255
} // with_lock (Lock)
251256

252257
Counters->RmNotEnoughMemory->Inc();
253-
LOG_AS_N("TxId: " << txId << ", taskId: " << taskId << ". Query memory limit exceeded: "
254-
<< "requested " << (it->second.TxScanQueryMemory + resources.Memory));
258+
TStringBuilder reason;
259+
reason << "TxId: " << txId << ", taskId: " << taskId << ". Query memory limit exceeded: "
260+
<< "requested " << txTotalRequestedMemory;
255261
if (details) {
256-
details->SetQueryMemoryLimit();
262+
details->FailReason = reason;
263+
details->Status = NKikimrKqp::TEvStartKqpTasksResponse::QUERY_MEMORY_LIMIT_EXCEEDED;
257264
}
258265
return false;
259266
}
@@ -271,10 +278,13 @@ class TKqpResourceManager : public IKqpResourceManager {
271278
} // with_lock (Lock)
272279

273280
Counters->RmNotEnoughMemory->Inc();
274-
LOG_AS_N("TxId: " << txId << ", taskId: " << taskId << ". Not enough ScanQueryMemory: "
275-
<< "requested " << resources.Memory);
281+
TStringBuilder reason;
282+
reason << "TxId: " << txId << ", taskId: " << taskId << ". Not enough ScanQueryMemory: "
283+
<< "requested " << resources.Memory;
284+
LOG_AS_N(reason);
276285
if (details) {
277-
details->SetScanQueryMemory();
286+
details->Status = NKikimrKqp::TEvStartKqpTasksResponse::NOT_ENOUGH_MEMORY;
287+
details->FailReason = reason;
278288
}
279289
return false;
280290
}

ydb/core/kqp/rm_service/kqp_rm_service.h

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,17 +45,16 @@ struct TKqpResourcesRequest {
4545

4646
/// detailed information on allocation failure
4747
struct TKqpNotEnoughResources {
48-
std::bitset<32> State;
48+
NKikimrKqp::TEvStartKqpTasksResponse::ENotStartedTaskReason Status = NKikimrKqp::TEvStartKqpTasksResponse::INTERNAL_ERROR;
49+
TString FailReason;
4950

50-
bool NotReady() const { return State.test(0); }
51-
bool QueryMemoryLimit() const { return State.test(1); }
52-
bool ScanQueryMemory() const { return State.test(2); }
53-
bool DataQueryMemory() const { return State.test(3); }
51+
NKikimrKqp::TEvStartKqpTasksResponse::ENotStartedTaskReason GetStatus() const {
52+
return Status;
53+
}
5454

55-
void SetNotReady() { State.set(0); }
56-
void SetQueryMemoryLimit() { State.set(1); }
57-
void SetScanQueryMemory() { State.set(2); }
58-
void SetDataQueryMemory() { State.set(3); }
55+
TString GetFailReason() const {
56+
return FailReason;
57+
}
5958
};
6059

6160
/// local resources snapshot

0 commit comments

Comments
 (0)