Skip to content

Commit d65a0a9

Browse files
authored
control inflight pings in hive (#6916) (#7238)
1 parent 7beef2e commit d65a0a9

6 files changed

Lines changed: 49 additions & 1 deletion

File tree

ydb/core/mind/hive/hive_impl.cpp

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,7 @@ void THive::Handle(TEvPrivate::TEvBootTablets::TPtr&) {
495495
for (auto* node : unimportantNodes) {
496496
node->Ping();
497497
}
498+
ProcessNodePingQueue();
498499
TVector<TTabletId> tabletsToReleaseFromParent;
499500
TSideEffects sideEffects;
500501
sideEffects.Reset(SelfId());
@@ -685,11 +686,13 @@ void THive::Cleanup() {
685686

686687
void THive::Handle(TEvLocal::TEvStatus::TPtr& ev) {
687688
BLOG_D("Handle TEvLocal::TEvStatus for Node " << ev->Sender.NodeId() << ": " << ev->Get()->Record.ShortDebugString());
689+
RemoveFromPingInProgress(ev->Sender.NodeId());
688690
Execute(CreateStatus(ev->Sender, ev->Get()->Record));
689691
}
690692

691693
void THive::Handle(TEvLocal::TEvSyncTablets::TPtr& ev) {
692694
BLOG_D("THive::Handle::TEvSyncTablets");
695+
RemoveFromPingInProgress(ev->Sender.NodeId());
693696
Execute(CreateSyncTablets(ev->Sender, ev->Get()->Record));
694697
}
695698

@@ -743,6 +746,7 @@ void THive::Handle(TEvInterconnect::TEvNodeConnected::TPtr &ev) {
743746
void THive::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr &ev) {
744747
TNodeId nodeId = ev->Get()->NodeId;
745748
BLOG_W("Handle TEvInterconnect::TEvNodeDisconnected, NodeId " << nodeId);
749+
RemoveFromPingInProgress(nodeId);
746750
if (ConnectedNodes.erase(nodeId)) {
747751
UpdateCounterNodesConnected(-1);
748752
}
@@ -915,6 +919,7 @@ void THive::Handle(TEvents::TEvUndelivered::TPtr &ev) {
915919
case TEvLocal::EvPing: {
916920
TNodeId nodeId = ev->Cookie;
917921
TNodeInfo* node = FindNode(nodeId);
922+
NodePingsInProgress.erase(nodeId);
918923
if (node != nullptr && ev->Sender == node->Local) {
919924
if (node->IsDisconnecting()) {
920925
// ping continiousily until we fully disconnected from the node
@@ -923,6 +928,7 @@ void THive::Handle(TEvents::TEvUndelivered::TPtr &ev) {
923928
KillNode(node->Id, node->Local);
924929
}
925930
}
931+
ProcessNodePingQueue();
926932
break;
927933
}
928934
};
@@ -1686,6 +1692,13 @@ void THive::UpdateCounterNodesConnected(i64 nodesConnectedDiff) {
16861692
}
16871693
}
16881694

1695+
void THive::UpdateCounterPingQueueSize() {
1696+
if (TabletCounters != nullptr) {
1697+
auto& counter = TabletCounters->Simple()[NHive::COUNTER_PINGQUEUE_SIZE];
1698+
counter.Set(NodePingQueue.size());
1699+
}
1700+
}
1701+
16891702
void THive::RecordTabletMove(const TTabletMoveInfo& moveInfo) {
16901703
TabletMoveHistory.PushBack(moveInfo);
16911704
TabletCounters->Cumulative()[NHive::COUNTER_TABLETS_MOVED].Increment(1);
@@ -2662,6 +2675,25 @@ void THive::ExecuteStartTablet(TFullTabletId tabletId, const TActorId& local, ui
26622675
Execute(CreateStartTablet(tabletId, local, cookie, external));
26632676
}
26642677

2678+
void THive::QueuePing(const TActorId& local) {
2679+
NodePingQueue.push(local);
2680+
}
2681+
2682+
void THive::ProcessNodePingQueue() {
2683+
while (!NodePingQueue.empty() && NodePingsInProgress.size() < GetMaxPingsInFlight()) {
2684+
TActorId local = NodePingQueue.front();
2685+
TNodeId node = local.NodeId();
2686+
NodePingQueue.pop();
2687+
NodePingsInProgress.insert(node);
2688+
SendPing(local, node);
2689+
}
2690+
}
2691+
2692+
void THive::RemoveFromPingInProgress(TNodeId node) {
2693+
NodePingsInProgress.erase(node);
2694+
ProcessNodePingQueue();
2695+
}
2696+
26652697
void THive::SendPing(const TActorId& local, TNodeId id) {
26662698
Send(local,
26672699
new TEvLocal::TEvPing(HiveId,

ydb/core/mind/hive/hive_impl.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,8 @@ class THive : public TActor<THive>, public TTabletExecutedFlat, public THiveShar
415415
TEventPriorityQueue<THive> EventQueue{*this};
416416
ui64 OperationsLogIndex = 0;
417417
std::vector<TActorId> ActorsWaitingToMoveTablets;
418+
std::queue<TActorId> NodePingQueue;
419+
std::unordered_set<TNodeId> NodePingsInProgress;
418420

419421
struct TPendingCreateTablet {
420422
NKikimrHive::TEvCreateTablet CreateTablet;
@@ -649,6 +651,7 @@ TTabletInfo* FindTabletEvenInDeleting(TTabletId tabletId, TFollowerId followerId
649651
void UpdateCounterBootQueueSize(ui64 bootQueueSize);
650652
void UpdateCounterEventQueueSize(i64 eventQueueSizeDiff);
651653
void UpdateCounterNodesConnected(i64 nodesConnectedDiff);
654+
void UpdateCounterPingQueueSize();
652655
void RecordTabletMove(const TTabletMoveInfo& info);
653656
bool DomainHasNodes(const TSubDomainKey &domainKey) const;
654657
void ProcessBootQueue();
@@ -677,7 +680,10 @@ TTabletInfo* FindTabletEvenInDeleting(TTabletId tabletId, TFollowerId followerId
677680
void UpdateRegisteredDataCenters();
678681
void AddRegisteredDataCentersNode(TDataCenterId dataCenterId, TNodeId nodeId);
679682
void RemoveRegisteredDataCentersNode(TDataCenterId dataCenterId, TNodeId nodeId);
683+
void QueuePing(const TActorId& local);
680684
void SendPing(const TActorId& local, TNodeId id);
685+
void RemoveFromPingInProgress(TNodeId node);
686+
void ProcessNodePingQueue();
681687
void SendReconnect(const TActorId& local);
682688
static THolder<TGroupFilter> BuildGroupParametersForChannel(const TLeaderTabletInfo& tablet, ui32 channelId);
683689
void KickTablet(const TTabletInfo& tablet);
@@ -938,8 +944,13 @@ TTabletInfo* FindTabletEvenInDeleting(TTabletId tabletId, TFollowerId followerId
938944
return CurrentConfig.GetNodeUsageRangeToKick();
939945
}
940946

947+
ui64 GetMaxPingsInFlight() const {
948+
return CurrentConfig.GetMaxPingsInFlight();
949+
}
950+
941951
bool GetLessSystemTabletsMoves() const {
942952
return CurrentConfig.GetLessSystemTabletsMoves();
953+
943954
}
944955

945956
static void ActualizeRestartStatistics(google::protobuf::RepeatedField<google::protobuf::uint64>& restartTimestamps, ui64 barrier);

ydb/core/mind/hive/node_info.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ void TNodeInfo::DeregisterInDomains() {
356356
void TNodeInfo::Ping() {
357357
Y_ABORT_UNLESS((bool)Local);
358358
BLOG_D("Node(" << Id << ") Ping(" << Local << ")");
359-
Hive.SendPing(Local, Id);
359+
Hive.QueuePing(Local);
360360
}
361361

362362
void TNodeInfo::SendReconnect(const TActorId& local) {

ydb/core/mind/hive/tx__register_node.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,9 @@ class TTxRegisterNode : public TTransactionBase<THive> {
8787
BLOG_D("THive::TTxRegisterNode(" << Local.NodeId() << ")::Complete");
8888
TNodeInfo* node = Self->FindNode(Local.NodeId());
8989
if (node != nullptr && node->Local) { // we send ping on every RegisterNode because we want to re-sync tablets upon every reconnection
90+
Self->NodePingsInProgress.erase(node->Id);
9091
node->Ping();
92+
Self->ProcessNodePingQueue();
9193
}
9294
}
9395
};

ydb/core/protos/config.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1476,6 +1476,7 @@ message THiveConfig {
14761476
optional bool EnableDestroyOperations = 74 [default = false];
14771477
optional double NodeUsageRangeToKick = 75 [default = 0.2];
14781478
optional bool LessSystemTabletsMoves = 77 [default = true];
1479+
optional uint64 MaxPingsInFlight = 78 [default = 1000];
14791480
}
14801481

14811482
message TBlobCacheConfig {

ydb/core/protos/counters_hive.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ enum ESimpleCounters {
2929
COUNTER_IMBALANCED_OBJECTS = 19 [(CounterOpts) = {Name: "ImbalancedObjects"}];
3030
COUNTER_WORST_OBJECT_VARIANCE = 20 [(CounterOpts) = {Name: "WorstObjectVariance"}];
3131
COUNTER_STORAGE_SCATTER = 21 [(CounterOpts) = {Name: "StorageScatter"}];
32+
RESERVED22 = 22;
33+
COUNTER_PINGQUEUE_SIZE = 23 [(CounterOpts) = {Name: "PingQueueSize"}];
3234
}
3335

3436
enum ECumulativeCounters {

0 commit comments

Comments
 (0)