diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h index 998da2bd8274..a32f9ec57151 100644 --- a/ydb/core/base/blobstorage.h +++ b/ydb/core/base/blobstorage.h @@ -737,6 +737,7 @@ struct TEvBlobStorage { EvHugePreCompact, EvHugePreCompactResult, EvPDiskMetadataLoaded, + EvBalancingSendPartsOnMain, EvYardInitResult = EvPut + 9 * 512, /// 268 636 672 EvLogResult, diff --git a/ydb/core/blobstorage/vdisk/balance/balancing_actor.cpp b/ydb/core/blobstorage/vdisk/balance/balancing_actor.cpp index c4990b306643..80152edc22f3 100644 --- a/ydb/core/blobstorage/vdisk/balance/balancing_actor.cpp +++ b/ydb/core/blobstorage/vdisk/balance/balancing_actor.cpp @@ -51,7 +51,7 @@ namespace NBalancing { } else if (ev->Sender == DeleterId) { IsDeleteCompleted = true; } else { - STLOG(PRI_WARN, BS_VDISK_BALANCING, BSVB05, "Unexpected id", (Id, ev->Sender)); + STLOG(PRI_WARN, BS_VDISK_BALANCING, BSVB05, "Unexpected actor id", (Id, ev->Sender)); } } @@ -76,9 +76,12 @@ namespace NBalancing { TBatchedQueue SendOnMainParts; TBatchedQueue TryDeleteParts; + std::unordered_map> TryDeletePartsFullData; // if part on main by ingress, but actualy it is not, we could not delete it, so we need to send it on main TBatchManager BatchManager; + TInstant StartTime; + /////////////////////////////////////////////////////////////////////////////////////////// // Main logic /////////////////////////////////////////////////////////////////////////////////////////// @@ -98,8 +101,12 @@ namespace NBalancing { } void ScheduleJobQuant() { + Ctx->MonGroup.ReplTokenAquired()++; + Ctx->MonGroup.PlannedToSendOnMain() = SendOnMainParts.Data.size(); + Ctx->MonGroup.CandidatesToDelete() = TryDeleteParts.Data.size(); + // once repl token received, start balancing - waking up sender and deleter - STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB02, VDISKP(Ctx->VCtx, "Schedule job quant"), + STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB02, VDISKP(Ctx->VCtx, "Schedule job quant"), (SendPartsLeft, SendOnMainParts.Size()), (DeletePartsLeft, TryDeleteParts.Size()), (ConnectedVDisks, ConnectedVDisks.size()), (TotalVDisks, GInfo->GetTotalVDisksNum())); @@ -125,7 +132,7 @@ namespace NBalancing { for (ui32 cnt = 0; It.Valid(); It.Next(), ++cnt) { if (cnt % 100 == 99 && TDuration::Seconds(timer.Passed()) > JOB_GRANULARITY) { // actor should not block the thread for a long time, so we should yield - STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB04, VDISKP(Ctx->VCtx, "Collect keys"), (collected, cnt), (passed, timer.Passed())); + // STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB04, VDISKP(Ctx->VCtx, "Collect keys"), (collected, cnt), (passed, timer.Passed())); Send(SelfId(), new NActors::TEvents::TEvWakeup()); return; } @@ -138,7 +145,7 @@ namespace NBalancing { auto [moveMask, delMask] = merger.Ingress.HandoffParts(&top, Ctx->VCtx->ShortSelfVDisk, key); - if (auto partsToSend = merger.Ingress.LocalParts(top.GType) & moveMask; !partsToSend.Empty()) { + if (auto partsToSend = merger.Ingress.LocalParts(top.GType) & moveMask; !partsToSend.Empty() && SendOnMainParts.Size() < MAX_TO_SEND_PER_EPOCH) { // collect parts to send on main for (const auto& [parts, data]: merger.Parts) { if (!(partsToSend & parts).Empty()) { @@ -151,21 +158,33 @@ namespace NBalancing { } } - if (auto partsToDelete = merger.Ingress.LocalParts(top.GType) & delMask; !partsToDelete.Empty()) { + if (auto partsToDelete = merger.Ingress.LocalParts(top.GType) & delMask; !partsToDelete.Empty() && TryDeleteParts.Size() < MAX_TO_DELETE_PER_EPOCH) { // collect parts to delete + auto key = It.GetCurKey().LogoBlobID(); for (ui8 partIdx = partsToDelete.FirstPosition(); partIdx < partsToDelete.GetSize(); partIdx = partsToDelete.NextPosition(partIdx)) { - TryDeleteParts.Data.emplace_back(TLogoBlobID(It.GetCurKey().LogoBlobID(), partIdx + 1)); + TryDeleteParts.Data.emplace_back(TLogoBlobID(key, partIdx + 1)); STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB10, VDISKP(Ctx->VCtx, "Delete"), (LogoBlobId, TryDeleteParts.Data.back().ToString())); } + + for (const auto& [parts, data]: merger.Parts) { + if (!(partsToDelete & parts).Empty()) { + TryDeletePartsFullData[key].emplace_back(TPartInfo{ + .Key=key, .PartsMask=parts, .PartData=data + }); + } + } } merger.Clear(); + + if (SendOnMainParts.Size() >= MAX_TO_SEND_PER_EPOCH && TryDeleteParts.Size() >= MAX_TO_DELETE_PER_EPOCH) { + // reached the limit of parts to send and delete + break; + } } STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB08, VDISKP(Ctx->VCtx, "Keys collected"), (SendOnMainParts, SendOnMainParts.Data.size()), (TryDeleteParts, TryDeleteParts.Data.size())); - Ctx->MonGroup.PlannedToSendOnMain() = SendOnMainParts.Data.size(); - Ctx->MonGroup.CandidatesToDelete() = TryDeleteParts.Data.size(); // start balancing ContinueBalancing(); @@ -173,6 +192,13 @@ namespace NBalancing { void Handle(NActors::TEvents::TEvCompleted::TPtr ev) { BatchManager.Handle(ev); + + if (StartTime + EPOCH_TIMEOUT < TlsActivationContext->Now()) { + Ctx->MonGroup.EpochTimeouts()++; + STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB04, VDISKP(Ctx->VCtx, "Epoch timeout")); + PassAway(); + } + if (BatchManager.IsBatchCompleted()) { Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken); @@ -180,6 +206,23 @@ namespace NBalancing { } } + void Handle(TEvBalancingSendPartsOnMain::TPtr ev) { + Ctx->MonGroup.OnMainByIngressButNotRealy() += ev->Get()->Ids.size(); + STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB05, VDISKP(Ctx->VCtx, "Received from deleter TEvBalancingSendPartsOnMain"), (Parts, ev->Get()->Ids.size())); + for (const auto& id: ev->Get()->Ids) { + if (auto it = TryDeletePartsFullData.find(TLogoBlobID(id, 0)); it != TryDeletePartsFullData.end()) { + for (const auto& part: it->second) { + if (part.PartsMask.Get(id.PartId() - 1)) { + SendOnMainParts.Data.push_back(part); + break; + } + } + } else { + Y_DEBUG_ABORT_S("Part not found in TryDeletePartsFullData"); + } + } + } + /////////////////////////////////////////////////////////////////////////////////////////// // Helper functions /////////////////////////////////////////////////////////////////////////////////////////// @@ -241,6 +284,7 @@ namespace NBalancing { cFunc(NActors::TEvents::TEvWakeup::EventType, CollectKeys) cFunc(TEvReplToken::EventType, ScheduleJobQuant) hFunc(NActors::TEvents::TEvCompleted, Handle) + hFunc(TEvBalancingSendPartsOnMain, Handle) // System events hFunc(NActors::TEvents::TEvUndelivered, Handle) @@ -259,6 +303,7 @@ namespace NBalancing { , It(Ctx->Snap.HullCtx, &Ctx->Snap.LogoBlobsSnap) , SendOnMainParts(BATCH_SIZE) , TryDeleteParts(BATCH_SIZE) + , StartTime(TlsActivationContext->Now()) { } diff --git a/ydb/core/blobstorage/vdisk/balance/defs.h b/ydb/core/blobstorage/vdisk/balance/defs.h index 6206e24565ca..42cf3f477899 100644 --- a/ydb/core/blobstorage/vdisk/balance/defs.h +++ b/ydb/core/blobstorage/vdisk/balance/defs.h @@ -72,5 +72,18 @@ namespace NBalancing { static constexpr ui32 BATCH_SIZE = 32; + static constexpr ui32 MAX_TO_SEND_PER_EPOCH = 1000; + static constexpr ui32 MAX_TO_DELETE_PER_EPOCH = 1000; + static constexpr TDuration EPOCH_TIMEOUT = TDuration::Minutes(1); + + + struct TEvBalancingSendPartsOnMain : TEventLocal { + TEvBalancingSendPartsOnMain(const TVector& ids) + : Ids(ids) + {} + + TVector Ids; + }; + } // NBalancing } // NKikimr diff --git a/ydb/core/blobstorage/vdisk/balance/deleter.cpp b/ydb/core/blobstorage/vdisk/balance/deleter.cpp index e189f3c6988e..5aba8e28fd0a 100644 --- a/ydb/core/blobstorage/vdisk/balance/deleter.cpp +++ b/ydb/core/blobstorage/vdisk/balance/deleter.cpp @@ -153,7 +153,7 @@ namespace { ++Responses; ++Ctx->MonGroup.MarkedReadyToDeleteResponse(); Ctx->MonGroup.MarkedReadyToDeleteWithResponseBytes() += GInfo->GetTopology().GType.PartSize(ev->Get()->Id); - STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB21, VDISKP(Ctx->VCtx, "Deleted local"), (LogoBlobID, ev->Get()->Id)); + STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB21, VDISKP(Ctx->VCtx, "Deleted local"), (LogoBlobID, ev->Get()->Id)); } bool IsDone() const { @@ -200,7 +200,8 @@ namespace { if (ev->Get()->Tag != REQUEST_TIMEOUT_TAG) { return; } - STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB24, VDISKP(Ctx->VCtx, "SendRequestsToCheckPartsOnMain timeout")); + STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB24, VDISKP(Ctx->VCtx, "CandidatesToDeleteAskFromMainBatchTimeout")); + Ctx->MonGroup.CandidatesToDeleteAskFromMainBatchTimeout()++; DeleteLocalParts(); } @@ -217,23 +218,34 @@ namespace { // DeleteState /////////////////////////////////////////////////////////////////////////////////////////// + ui32 CheckPartsOnMain() { + TVector partsNotOnMain; + for (const auto& part: PartsRequester.GetResult()) { + if (!part.HasOnMain) { + partsNotOnMain.push_back(part.Key); + } + } + + if (!partsNotOnMain.empty()) { + STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB29, VDISKP(Ctx->VCtx, "Send TEvBalancingSendPartsOnMain")); + Send(NotifyId, new TEvBalancingSendPartsOnMain(std::move(partsNotOnMain))); + } + + ui32 partsOnMain = PartsRequester.GetResult().size() - partsNotOnMain.size(); + STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB30, VDISKP(Ctx->VCtx, "DeleteLocalParts"), (Parts, PartsRequester.GetResult().size()), (PartsOnMain, partsOnMain)); + + return partsOnMain; + } + void DeleteLocalParts() { Become(&TThis::DeleteState); - if (PartsRequester.GetResult().empty()) { + if (CheckPartsOnMain() == 0) { STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB25, VDISKP(Ctx->VCtx, "Nothing to delete. PassAway")); PassAway(); return; } - { - ui32 partsOnMain = 0; - for (const auto& part: PartsRequester.GetResult()) { - partsOnMain += part.HasOnMain; - } - STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB26, VDISKP(Ctx->VCtx, "DeleteLocalParts"), (Parts, PartsRequester.GetResult().size()), (PartsOnMain, partsOnMain)); - } - PartsDeleter.DeleteParts(SelfId(), PartsRequester.GetResult()); Schedule(TDuration::Seconds(15), new NActors::TEvents::TEvWakeup(DELETE_TIMEOUT_TAG)); // delete timeout @@ -251,12 +263,14 @@ namespace { if (ev->Get()->Tag != DELETE_TIMEOUT_TAG) { return; } - STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB28, VDISKP(Ctx->VCtx, "DeleteLocalParts timeout")); + STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB31, VDISKP(Ctx->VCtx, "MarkReadyBatchTimeout")); + Ctx->MonGroup.MarkReadyBatchTimeout()++; PassAway(); } void PassAway() override { Send(NotifyId, new NActors::TEvents::TEvCompleted(DELETER_ID)); + STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB32, VDISKP(Ctx->VCtx, "TDeleter::PassAway")); TActorBootstrapped::PassAway(); } diff --git a/ydb/core/blobstorage/vdisk/balance/sender.cpp b/ydb/core/blobstorage/vdisk/balance/sender.cpp index 35bdfcdb3ecf..befe283a9664 100644 --- a/ydb/core/blobstorage/vdisk/balance/sender.cpp +++ b/ydb/core/blobstorage/vdisk/balance/sender.cpp @@ -190,7 +190,7 @@ namespace { } else { ++Ctx->MonGroup.SentOnMain(); Ctx->MonGroup.SentOnMainWithResponseBytes() += GInfo->GetTopology().GType.PartSize(LogoBlobIDFromLogoBlobID(ev->Get()->Record.GetBlobID())); - STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB14, VDISKP(Ctx->VCtx, "Put done"), (Msg, ev->Get()->ToString())); + STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB14, VDISKP(Ctx->VCtx, "Put done"), (Msg, ev->Get()->ToString())); } } @@ -204,7 +204,7 @@ namespace { } ++Ctx->MonGroup.SentOnMain(); Ctx->MonGroup.SentOnMainWithResponseBytes() += GInfo->GetTopology().GType.PartSize(LogoBlobIDFromLogoBlobID(item.GetBlobID())); - STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB16, VDISKP(Ctx->VCtx, "MultiPut done"), (Key, LogoBlobIDFromLogoBlobID(item.GetBlobID()).ToString())); + STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB16, VDISKP(Ctx->VCtx, "MultiPut done"), (Key, LogoBlobIDFromLogoBlobID(item.GetBlobID()).ToString())); } } @@ -229,6 +229,8 @@ namespace { void ReadPartsFromDisk() { Become(&TThis::StateRead); + STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB29, VDISKP(Ctx->VCtx, "ReadPartsFromDisk"), (Parts, Reader.GetPartsSize())); + if (Reader.GetPartsSize() == 0) { STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB10, VDISKP(Ctx->VCtx, "Nothing to read. PassAway")); PassAway(); @@ -255,7 +257,8 @@ namespace { if (ev->Get()->Tag != READ_TIMEOUT_TAG) { return; } - STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB17, VDISKP(Ctx->VCtx, "TimeoutRead"), (Requests, Reader.GetPartsSize()), (Responses, Reader.GetResponses())); + STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB17, VDISKP(Ctx->VCtx, "ReadFromHandoffBatchTimeout"), (Requests, Reader.GetPartsSize()), (Responses, Reader.GetResponses())); + Ctx->MonGroup.ReadFromHandoffBatchTimeout()++; SendPartsOnMain(); } @@ -274,6 +277,8 @@ namespace { void SendPartsOnMain() { Become(&TThis::StateSend); + STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB29, VDISKP(Ctx->VCtx, "SendPartsOnMain"), (Parts, Reader.GetResult().size())); + if (Reader.GetResult().empty()) { STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB18, VDISKP(Ctx->VCtx, "Nothing to send. PassAway")); PassAway(); @@ -297,12 +302,14 @@ namespace { if (ev->Get()->Tag != SEND_TIMEOUT_TAG) { return; } - STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB19, VDISKP(Ctx->VCtx, "TimeoutSend")); + STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB19, VDISKP(Ctx->VCtx, "SendOnMainBatchTimeout")); + Ctx->MonGroup.SendOnMainBatchTimeout()++; PassAway(); } void PassAway() override { Send(NotifyId, new NActors::TEvents::TEvCompleted(SENDER_ID)); + STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB28, VDISKP(Ctx->VCtx, "TSender::PassAway")); TActorBootstrapped::PassAway(); } diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h b/ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h index 93b476cbc202..1ec41710c1ae 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h +++ b/ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h @@ -593,38 +593,53 @@ public: GROUP_CONSTRUCTOR(TBalancingGroup) { COUNTER_INIT(BalancingIterations, true); + COUNTER_INIT(EpochTimeouts, true); + COUNTER_INIT(ReplTokenAquired, true); + COUNTER_INIT(OnMainByIngressButNotRealy, true); COUNTER_INIT(PlannedToSendOnMain, false); COUNTER_INIT(CandidatesToDelete, false); COUNTER_INIT(ReadFromHandoffBytes, true); COUNTER_INIT(ReadFromHandoffResponseBytes, true); + COUNTER_INIT(ReadFromHandoffBatchTimeout, true); COUNTER_INIT(SentOnMain, true); COUNTER_INIT(SentOnMainBytes, true); COUNTER_INIT(SentOnMainWithResponseBytes, true); + COUNTER_INIT(SendOnMainBatchTimeout, true); COUNTER_INIT(CandidatesToDeleteAskedFromMain, true); COUNTER_INIT(CandidatesToDeleteAskedFromMainResponse, true); + COUNTER_INIT(CandidatesToDeleteAskFromMainBatchTimeout, true); COUNTER_INIT(MarkedReadyToDelete, true); COUNTER_INIT(MarkedReadyToDeleteBytes, true); COUNTER_INIT(MarkedReadyToDeleteResponse, true); COUNTER_INIT(MarkedReadyToDeleteWithResponseBytes, true); + COUNTER_INIT(MarkReadyBatchTimeout, true); } COUNTER_DEF(BalancingIterations); + COUNTER_DEF(EpochTimeouts); + COUNTER_DEF(ReplTokenAquired); + COUNTER_DEF(OnMainByIngressButNotRealy); + COUNTER_DEF(PlannedToSendOnMain); COUNTER_DEF(ReadFromHandoffBytes); COUNTER_DEF(ReadFromHandoffResponseBytes); + COUNTER_DEF(ReadFromHandoffBatchTimeout); COUNTER_DEF(SentOnMain); COUNTER_DEF(SentOnMainBytes); COUNTER_DEF(SentOnMainWithResponseBytes); + COUNTER_DEF(SendOnMainBatchTimeout); COUNTER_DEF(CandidatesToDelete); COUNTER_DEF(CandidatesToDeleteAskedFromMain); COUNTER_DEF(CandidatesToDeleteAskedFromMainResponse); + COUNTER_DEF(CandidatesToDeleteAskFromMainBatchTimeout); COUNTER_DEF(MarkedReadyToDelete); COUNTER_DEF(MarkedReadyToDeleteBytes); COUNTER_DEF(MarkedReadyToDeleteResponse); COUNTER_DEF(MarkedReadyToDeleteWithResponseBytes); + COUNTER_DEF(MarkReadyBatchTimeout); }; /////////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp index 2a8fbfa5a3ae..0eae700fe6e6 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp @@ -2552,6 +2552,10 @@ namespace NKikimr { } void RunBalancing(const TActorContext &ctx) { + if (VCtx->GroupId.GetRawId() == 0) { + // don't run balancing for the static group + return; + } if (!Config->FeatureFlags.GetUseVDisksBalancing() || VCtx->Top->GType.GetErasure() == TErasureType::ErasureMirror3of4) { return; }