Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/base/blobstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,7 @@ struct TEvBlobStorage {
EvHugePreCompact,
EvHugePreCompactResult,
EvPDiskMetadataLoaded,
EvBalancingSendPartsOnMain,

EvYardInitResult = EvPut + 9 * 512, /// 268 636 672
EvLogResult,
Expand Down
61 changes: 53 additions & 8 deletions ydb/core/blobstorage/vdisk/balance/balancing_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ namespace NBalancing {
} else if (ev->Sender == DeleterId) {
IsDeleteCompleted = true;
} else {
STLOG(PRI_WARN, BS_VDISK_BALANCING, BSVB05, "Unexpected id", (Id, ev->Sender));
STLOG(PRI_WARN, BS_VDISK_BALANCING, BSVB05, "Unexpected actor id", (Id, ev->Sender));
}
}

Expand All @@ -76,9 +76,12 @@ namespace NBalancing {

TBatchedQueue<TPartInfo> SendOnMainParts;
TBatchedQueue<TLogoBlobID> TryDeleteParts;
std::unordered_map<TLogoBlobID, TVector<TPartInfo>> TryDeletePartsFullData; // if part on main by ingress, but actualy it is not, we could not delete it, so we need to send it on main

TBatchManager BatchManager;

TInstant StartTime;

///////////////////////////////////////////////////////////////////////////////////////////
// Main logic
///////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -98,8 +101,12 @@ namespace NBalancing {
}

void ScheduleJobQuant() {
Ctx->MonGroup.ReplTokenAquired()++;
Ctx->MonGroup.PlannedToSendOnMain() = SendOnMainParts.Data.size();
Ctx->MonGroup.CandidatesToDelete() = TryDeleteParts.Data.size();

// once repl token received, start balancing - waking up sender and deleter
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB02, VDISKP(Ctx->VCtx, "Schedule job quant"),
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB02, VDISKP(Ctx->VCtx, "Schedule job quant"),
(SendPartsLeft, SendOnMainParts.Size()), (DeletePartsLeft, TryDeleteParts.Size()),
(ConnectedVDisks, ConnectedVDisks.size()), (TotalVDisks, GInfo->GetTotalVDisksNum()));

Expand All @@ -125,7 +132,7 @@ namespace NBalancing {
for (ui32 cnt = 0; It.Valid(); It.Next(), ++cnt) {
if (cnt % 100 == 99 && TDuration::Seconds(timer.Passed()) > JOB_GRANULARITY) {
// actor should not block the thread for a long time, so we should yield
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB04, VDISKP(Ctx->VCtx, "Collect keys"), (collected, cnt), (passed, timer.Passed()));
// STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB04, VDISKP(Ctx->VCtx, "Collect keys"), (collected, cnt), (passed, timer.Passed()));
Send(SelfId(), new NActors::TEvents::TEvWakeup());
return;
}
Expand All @@ -138,7 +145,7 @@ namespace NBalancing {

auto [moveMask, delMask] = merger.Ingress.HandoffParts(&top, Ctx->VCtx->ShortSelfVDisk, key);

if (auto partsToSend = merger.Ingress.LocalParts(top.GType) & moveMask; !partsToSend.Empty()) {
if (auto partsToSend = merger.Ingress.LocalParts(top.GType) & moveMask; !partsToSend.Empty() && SendOnMainParts.Size() < MAX_TO_SEND_PER_EPOCH) {
// collect parts to send on main
for (const auto& [parts, data]: merger.Parts) {
if (!(partsToSend & parts).Empty()) {
Expand All @@ -151,35 +158,71 @@ namespace NBalancing {
}
}

if (auto partsToDelete = merger.Ingress.LocalParts(top.GType) & delMask; !partsToDelete.Empty()) {
if (auto partsToDelete = merger.Ingress.LocalParts(top.GType) & delMask; !partsToDelete.Empty() && TryDeleteParts.Size() < MAX_TO_DELETE_PER_EPOCH) {
// collect parts to delete
auto key = It.GetCurKey().LogoBlobID();
for (ui8 partIdx = partsToDelete.FirstPosition(); partIdx < partsToDelete.GetSize(); partIdx = partsToDelete.NextPosition(partIdx)) {
TryDeleteParts.Data.emplace_back(TLogoBlobID(It.GetCurKey().LogoBlobID(), partIdx + 1));
TryDeleteParts.Data.emplace_back(TLogoBlobID(key, partIdx + 1));
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB10, VDISKP(Ctx->VCtx, "Delete"), (LogoBlobId, TryDeleteParts.Data.back().ToString()));
}

for (const auto& [parts, data]: merger.Parts) {
if (!(partsToDelete & parts).Empty()) {
TryDeletePartsFullData[key].emplace_back(TPartInfo{
.Key=key, .PartsMask=parts, .PartData=data
});
}
}
}

merger.Clear();

if (SendOnMainParts.Size() >= MAX_TO_SEND_PER_EPOCH && TryDeleteParts.Size() >= MAX_TO_DELETE_PER_EPOCH) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe || ?

if any of limits is exceeded actor should stop.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, thats not a mistake, I check TryDeleteParts.Size() < MAX_TO_DELETE_PER_EPOCH and SendOnMainParts.Size() < MAX_TO_SEND_PER_EPOCH above before pushing to queues. If I will check ||, there could be a situation when I will only send parts on main for a long time, without deletes

// reached the limit of parts to send and delete
break;
}
}

STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB08, VDISKP(Ctx->VCtx, "Keys collected"),
(SendOnMainParts, SendOnMainParts.Data.size()), (TryDeleteParts, TryDeleteParts.Data.size()));
Ctx->MonGroup.PlannedToSendOnMain() = SendOnMainParts.Data.size();
Ctx->MonGroup.CandidatesToDelete() = TryDeleteParts.Data.size();

// start balancing
ContinueBalancing();
}

void Handle(NActors::TEvents::TEvCompleted::TPtr ev) {
BatchManager.Handle(ev);

if (StartTime + EPOCH_TIMEOUT < TlsActivationContext->Now()) {
Ctx->MonGroup.EpochTimeouts()++;
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB04, VDISKP(Ctx->VCtx, "Epoch timeout"));
PassAway();
}

if (BatchManager.IsBatchCompleted()) {
Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken);

ContinueBalancing();
}
}

void Handle(TEvBalancingSendPartsOnMain::TPtr ev) {
Ctx->MonGroup.OnMainByIngressButNotRealy() += ev->Get()->Ids.size();
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB05, VDISKP(Ctx->VCtx, "Received from deleter TEvBalancingSendPartsOnMain"), (Parts, ev->Get()->Ids.size()));
for (const auto& id: ev->Get()->Ids) {
if (auto it = TryDeletePartsFullData.find(TLogoBlobID(id, 0)); it != TryDeletePartsFullData.end()) {
for (const auto& part: it->second) {
if (part.PartsMask.Get(id.PartId() - 1)) {
SendOnMainParts.Data.push_back(part);
break;
}
}
} else {
Y_DEBUG_ABORT_S("Part not found in TryDeletePartsFullData");
}
}
}

///////////////////////////////////////////////////////////////////////////////////////////
// Helper functions
///////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -241,6 +284,7 @@ namespace NBalancing {
cFunc(NActors::TEvents::TEvWakeup::EventType, CollectKeys)
cFunc(TEvReplToken::EventType, ScheduleJobQuant)
hFunc(NActors::TEvents::TEvCompleted, Handle)
hFunc(TEvBalancingSendPartsOnMain, Handle)

// System events
hFunc(NActors::TEvents::TEvUndelivered, Handle)
Expand All @@ -259,6 +303,7 @@ namespace NBalancing {
, It(Ctx->Snap.HullCtx, &Ctx->Snap.LogoBlobsSnap)
, SendOnMainParts(BATCH_SIZE)
, TryDeleteParts(BATCH_SIZE)
, StartTime(TlsActivationContext->Now())
{
}

Expand Down
13 changes: 13 additions & 0 deletions ydb/core/blobstorage/vdisk/balance/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,18 @@ namespace NBalancing {

static constexpr ui32 BATCH_SIZE = 32;

static constexpr ui32 MAX_TO_SEND_PER_EPOCH = 1000;
static constexpr ui32 MAX_TO_DELETE_PER_EPOCH = 1000;
static constexpr TDuration EPOCH_TIMEOUT = TDuration::Minutes(1);


struct TEvBalancingSendPartsOnMain : TEventLocal<TEvBalancingSendPartsOnMain, TEvBlobStorage::EvBalancingSendPartsOnMain> {
TEvBalancingSendPartsOnMain(const TVector<TLogoBlobID>& ids)
: Ids(ids)
{}

TVector<TLogoBlobID> Ids;
};

} // NBalancing
} // NKikimr
38 changes: 26 additions & 12 deletions ydb/core/blobstorage/vdisk/balance/deleter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ namespace {
++Responses;
++Ctx->MonGroup.MarkedReadyToDeleteResponse();
Ctx->MonGroup.MarkedReadyToDeleteWithResponseBytes() += GInfo->GetTopology().GType.PartSize(ev->Get()->Id);
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB21, VDISKP(Ctx->VCtx, "Deleted local"), (LogoBlobID, ev->Get()->Id));
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB21, VDISKP(Ctx->VCtx, "Deleted local"), (LogoBlobID, ev->Get()->Id));
}

bool IsDone() const {
Expand Down Expand Up @@ -200,7 +200,8 @@ namespace {
if (ev->Get()->Tag != REQUEST_TIMEOUT_TAG) {
return;
}
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB24, VDISKP(Ctx->VCtx, "SendRequestsToCheckPartsOnMain timeout"));
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB24, VDISKP(Ctx->VCtx, "CandidatesToDeleteAskFromMainBatchTimeout"));
Ctx->MonGroup.CandidatesToDeleteAskFromMainBatchTimeout()++;
DeleteLocalParts();
}

Expand All @@ -217,23 +218,34 @@ namespace {
// DeleteState
///////////////////////////////////////////////////////////////////////////////////////////

ui32 CheckPartsOnMain() {
TVector<TLogoBlobID> partsNotOnMain;
for (const auto& part: PartsRequester.GetResult()) {
if (!part.HasOnMain) {
partsNotOnMain.push_back(part.Key);
}
}

if (!partsNotOnMain.empty()) {
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB29, VDISKP(Ctx->VCtx, "Send TEvBalancingSendPartsOnMain"));
Send(NotifyId, new TEvBalancingSendPartsOnMain(std::move(partsNotOnMain)));
}

ui32 partsOnMain = PartsRequester.GetResult().size() - partsNotOnMain.size();
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB30, VDISKP(Ctx->VCtx, "DeleteLocalParts"), (Parts, PartsRequester.GetResult().size()), (PartsOnMain, partsOnMain));

return partsOnMain;
}

void DeleteLocalParts() {
Become(&TThis::DeleteState);

if (PartsRequester.GetResult().empty()) {
if (CheckPartsOnMain() == 0) {
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB25, VDISKP(Ctx->VCtx, "Nothing to delete. PassAway"));
PassAway();
return;
}

{
ui32 partsOnMain = 0;
for (const auto& part: PartsRequester.GetResult()) {
partsOnMain += part.HasOnMain;
}
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB26, VDISKP(Ctx->VCtx, "DeleteLocalParts"), (Parts, PartsRequester.GetResult().size()), (PartsOnMain, partsOnMain));
}

PartsDeleter.DeleteParts(SelfId(), PartsRequester.GetResult());

Schedule(TDuration::Seconds(15), new NActors::TEvents::TEvWakeup(DELETE_TIMEOUT_TAG)); // delete timeout
Expand All @@ -251,12 +263,14 @@ namespace {
if (ev->Get()->Tag != DELETE_TIMEOUT_TAG) {
return;
}
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB28, VDISKP(Ctx->VCtx, "DeleteLocalParts timeout"));
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB31, VDISKP(Ctx->VCtx, "MarkReadyBatchTimeout"));
Ctx->MonGroup.MarkReadyBatchTimeout()++;
PassAway();
}

void PassAway() override {
Send(NotifyId, new NActors::TEvents::TEvCompleted(DELETER_ID));
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB32, VDISKP(Ctx->VCtx, "TDeleter::PassAway"));
TActorBootstrapped::PassAway();
}

Expand Down
15 changes: 11 additions & 4 deletions ydb/core/blobstorage/vdisk/balance/sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ namespace {
} else {
++Ctx->MonGroup.SentOnMain();
Ctx->MonGroup.SentOnMainWithResponseBytes() += GInfo->GetTopology().GType.PartSize(LogoBlobIDFromLogoBlobID(ev->Get()->Record.GetBlobID()));
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB14, VDISKP(Ctx->VCtx, "Put done"), (Msg, ev->Get()->ToString()));
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB14, VDISKP(Ctx->VCtx, "Put done"), (Msg, ev->Get()->ToString()));
}
}

Expand All @@ -204,7 +204,7 @@ namespace {
}
++Ctx->MonGroup.SentOnMain();
Ctx->MonGroup.SentOnMainWithResponseBytes() += GInfo->GetTopology().GType.PartSize(LogoBlobIDFromLogoBlobID(item.GetBlobID()));
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB16, VDISKP(Ctx->VCtx, "MultiPut done"), (Key, LogoBlobIDFromLogoBlobID(item.GetBlobID()).ToString()));
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB16, VDISKP(Ctx->VCtx, "MultiPut done"), (Key, LogoBlobIDFromLogoBlobID(item.GetBlobID()).ToString()));
}
}

Expand All @@ -229,6 +229,8 @@ namespace {
void ReadPartsFromDisk() {
Become(&TThis::StateRead);

STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB29, VDISKP(Ctx->VCtx, "ReadPartsFromDisk"), (Parts, Reader.GetPartsSize()));

if (Reader.GetPartsSize() == 0) {
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB10, VDISKP(Ctx->VCtx, "Nothing to read. PassAway"));
PassAway();
Expand All @@ -255,7 +257,8 @@ namespace {
if (ev->Get()->Tag != READ_TIMEOUT_TAG) {
return;
}
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB17, VDISKP(Ctx->VCtx, "TimeoutRead"), (Requests, Reader.GetPartsSize()), (Responses, Reader.GetResponses()));
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB17, VDISKP(Ctx->VCtx, "ReadFromHandoffBatchTimeout"), (Requests, Reader.GetPartsSize()), (Responses, Reader.GetResponses()));
Ctx->MonGroup.ReadFromHandoffBatchTimeout()++;
SendPartsOnMain();
}

Expand All @@ -274,6 +277,8 @@ namespace {
void SendPartsOnMain() {
Become(&TThis::StateSend);

STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB29, VDISKP(Ctx->VCtx, "SendPartsOnMain"), (Parts, Reader.GetResult().size()));

if (Reader.GetResult().empty()) {
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB18, VDISKP(Ctx->VCtx, "Nothing to send. PassAway"));
PassAway();
Expand All @@ -297,12 +302,14 @@ namespace {
if (ev->Get()->Tag != SEND_TIMEOUT_TAG) {
return;
}
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB19, VDISKP(Ctx->VCtx, "TimeoutSend"));
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB19, VDISKP(Ctx->VCtx, "SendOnMainBatchTimeout"));
Ctx->MonGroup.SendOnMainBatchTimeout()++;
PassAway();
}

void PassAway() override {
Send(NotifyId, new NActors::TEvents::TEvCompleted(SENDER_ID));
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB28, VDISKP(Ctx->VCtx, "TSender::PassAway"));
TActorBootstrapped::PassAway();
}

Expand Down
15 changes: 15 additions & 0 deletions ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h
Original file line number Diff line number Diff line change
Expand Up @@ -593,38 +593,53 @@ public:
GROUP_CONSTRUCTOR(TBalancingGroup)
{
COUNTER_INIT(BalancingIterations, true);
COUNTER_INIT(EpochTimeouts, true);
COUNTER_INIT(ReplTokenAquired, true);
COUNTER_INIT(OnMainByIngressButNotRealy, true);

COUNTER_INIT(PlannedToSendOnMain, false);
COUNTER_INIT(CandidatesToDelete, false);

COUNTER_INIT(ReadFromHandoffBytes, true);
COUNTER_INIT(ReadFromHandoffResponseBytes, true);
COUNTER_INIT(ReadFromHandoffBatchTimeout, true);
COUNTER_INIT(SentOnMain, true);
COUNTER_INIT(SentOnMainBytes, true);
COUNTER_INIT(SentOnMainWithResponseBytes, true);
COUNTER_INIT(SendOnMainBatchTimeout, true);

COUNTER_INIT(CandidatesToDeleteAskedFromMain, true);
COUNTER_INIT(CandidatesToDeleteAskedFromMainResponse, true);
COUNTER_INIT(CandidatesToDeleteAskFromMainBatchTimeout, true);
COUNTER_INIT(MarkedReadyToDelete, true);
COUNTER_INIT(MarkedReadyToDeleteBytes, true);
COUNTER_INIT(MarkedReadyToDeleteResponse, true);
COUNTER_INIT(MarkedReadyToDeleteWithResponseBytes, true);
COUNTER_INIT(MarkReadyBatchTimeout, true);
}

COUNTER_DEF(BalancingIterations);
COUNTER_DEF(EpochTimeouts);
COUNTER_DEF(ReplTokenAquired);
COUNTER_DEF(OnMainByIngressButNotRealy);

COUNTER_DEF(PlannedToSendOnMain);
COUNTER_DEF(ReadFromHandoffBytes);
COUNTER_DEF(ReadFromHandoffResponseBytes);
COUNTER_DEF(ReadFromHandoffBatchTimeout);
COUNTER_DEF(SentOnMain);
COUNTER_DEF(SentOnMainBytes);
COUNTER_DEF(SentOnMainWithResponseBytes);
COUNTER_DEF(SendOnMainBatchTimeout);
COUNTER_DEF(CandidatesToDelete);
COUNTER_DEF(CandidatesToDeleteAskedFromMain);
COUNTER_DEF(CandidatesToDeleteAskedFromMainResponse);
COUNTER_DEF(CandidatesToDeleteAskFromMainBatchTimeout);
COUNTER_DEF(MarkedReadyToDelete);
COUNTER_DEF(MarkedReadyToDeleteBytes);
COUNTER_DEF(MarkedReadyToDeleteResponse);
COUNTER_DEF(MarkedReadyToDeleteWithResponseBytes);
COUNTER_DEF(MarkReadyBatchTimeout);
};

///////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2552,6 +2552,10 @@ namespace NKikimr {
}

void RunBalancing(const TActorContext &ctx) {
if (VCtx->GroupId.GetRawId() == 0) {
// don't run balancing for the static group
return;
}
if (!Config->FeatureFlags.GetUseVDisksBalancing() || VCtx->Top->GType.GetErasure() == TErasureType::ErasureMirror3of4) {
return;
}
Expand Down