Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ cscope.*
.classpath
.project
.svn
.java-version
target/
/recipes/rabbitmq-consumer-group/target/
.idea
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ public static boolean isInstanceStable(HelixDataAccessor dataAccessor, String in
*/
public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAccessor,
String instanceName) {
return siblingNodesActiveReplicaCheck(dataAccessor, instanceName, Collections.emptySet());
return siblingNodesActiveReplicaCheckWithDetails(dataAccessor, instanceName, Collections.emptySet()).isPassed();
}

/**
Expand All @@ -416,6 +416,29 @@ public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAcces
*/
public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAccessor,
String instanceName, Set<String> toBeStoppedInstances) {
return siblingNodesActiveReplicaCheckWithDetails(dataAccessor, instanceName, toBeStoppedInstances).isPassed();
}

/**
* Check if sibling nodes of the instance meet min active replicas constraint with details
* Two instances are sibling of each other if they host the same partition. And sibling nodes
* that are in toBeStoppableInstances will be presumed to be stopped.
* WARNING: The check uses ExternalView to reduce network traffic but suffer from accuracy
* due to external view propagation latency
*
* This method returns detailed information about the first partition that fails the check,
* including resource name, partition name, current active replicas, and required minimum.
*
* TODO: Use in memory cache and query instance's currentStates
*
* @param dataAccessor A helper class to access the Helix data.
* @param instanceName An instance to be evaluated against this check.
* @param toBeStoppedInstances A set of instances presumed to be are already stopped. And it
* shouldn't contain the `instanceName`
* @return MinActiveReplicaCheckResult with pass/fail status and details of first failure
*/
public static MinActiveReplicaCheckResult siblingNodesActiveReplicaCheckWithDetails(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation doesnt have 1:1 backward compatibility, even if includeDetails is not passed, still we will enter this method and will check all the paritions nonetheless.
I will propose to add a flag in this method like "failOnFirst" which will be true if "includeDetails" wasnt passed or is false, and bail out on first failure.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation is fully backward compatible with existing. We still fail fast and provide details only for the first partition violating the min active replica check.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noticed. Why are we not collecting all the failed partitions?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was to provide an indication to the failed partitions since anyways the result will be truncated.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The result will be truncated in Nimbus UI/Customized status. But we can add logging on ACM side for earlier debugging if we have this

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with many partitions part. Here's what I think can be smartly done.
if error list < 10, list all, and for all remaining, just give a count summary.

Maintaining backward compatibility is simple like I mentioned in the original comment with the use of "failOnFirst"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imho, as already mentioned in response to other comment above, we need to iterate on this feature.

Adding more partition details in string format, I'm not very much convinced.
want to see customers feedback on the current version and iterate based on that.

We need a better request/response modelling to handle all other checks failures too in addition to MIN_ACTIVE_REPLICA check.
Have some thoughts on it. But want to iterate based on the feedback.

Adding more flags in the current API makes them irrelevant for other flavours of the APIs.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few observations:

  1. Logging it on ACM is also not a good idea if that information can be readily fetched.
  2. As @laxman-ch mentioned we can reiterate based on customer feedback and also, the current ask is for Nimbus UI, so this should be good.
  3. We should anyways have an API, which should be provided to give information on all partitions if this is something we (as Helix team) or the customers need.
  4. I like the idea of mentioning a count summary, @laxman-ch do you think we can take this up in this PR or can be a followup?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of mentioning a count summary, @laxman-ch do you think we can take this up in this PR or can be a followup?

Thought about it during the implementation and discussed with @LZD-PratyushBhatt too at that time.

Count info may not provide any info on which users can take an action without additional debugging/api calls which they are doing it today.

"500" partitions failed the replica check

And another reason for not including it is, don't want to pack too many details in a string message. Right approach is to add all these flags/details should be part of the request/response model.

Copy link
Collaborator

@LZD-PratyushBhatt LZD-PratyushBhatt Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By count, I was mentioning the count of INITIAL state (like OFFLINE) and ERROR state.

I was against just printing the first partition info, because if you have 10 partition in ERROR state, you fix one, that doesn't affect 9 others. Rebalancing doesn't have any affect on ERROR partitions.

A count summary atleast let's the USER know that they need to look at Current state of the participant to get list of all the initial/error state partitions.

Anyways, my comment/suggestion is non-binding and I won't block the PR because of this.

Overall LGTM

HelixDataAccessor dataAccessor, String instanceName, Set<String> toBeStoppedInstances) {
PropertyKey.Builder propertyKeyBuilder = dataAccessor.keyBuilder();
List<String> resources = dataAccessor.getChildNames(propertyKeyBuilder.idealStates());

Expand Down Expand Up @@ -464,15 +487,15 @@ public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAcces
}
}
if (numHealthySiblings < minActiveReplicas) {
_logger.info(
"Partition {} doesn't have enough active replicas in sibling nodes. NumHealthySiblings: {}, minActiveReplicas: {}",
partition, numHealthySiblings, minActiveReplicas);
return false;
_logger.warn(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to move this to debug log instead. IDTS it qualifies as WARN.
Also, debug because now we dont just check first failure, we keep going even after the faiilures, so this becomes a Hot path now. In the end we can post a summary.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we keep going even after the faiilures, so this becomes a Hot path now.

Nope. we are still failing fast and provide only the details of first partition that's violating the min active replica check. And I chose warn as this may require attention and helpful while debugging.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, I just noticed, why are we not collecting all the failed partitions?

"Instance {} min active replica check failed: Resource {} partition {} has {}/{} active replicas",
instanceName, resourceName, partition, numHealthySiblings, minActiveReplicas);
return MinActiveReplicaCheckResult.failed(resourceName, partition, numHealthySiblings, minActiveReplicas);
}
}
}
}

return true;
return MinActiveReplicaCheckResult.passed();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.helix.util;

/**
* Result container for minimum active replica validation checks.
* Provides details about validation outcome and specific failure information.
*/
public class MinActiveReplicaCheckResult {
private final boolean passed;
private final String resourceName;
private final String partitionName;
private final int currentActiveReplicas;
private final int requiredMinActiveReplicas;

private MinActiveReplicaCheckResult(boolean passed, String resourceName, String partitionName,
int currentActiveReplicas, int requiredMinActiveReplicas) {
this.passed = passed;
this.resourceName = resourceName;
this.partitionName = partitionName;
this.currentActiveReplicas = currentActiveReplicas;
this.requiredMinActiveReplicas = requiredMinActiveReplicas;
}

public static MinActiveReplicaCheckResult passed() {
return new MinActiveReplicaCheckResult(true, null, null, -1, -1);
}

public static MinActiveReplicaCheckResult failed(String resourceName, String partitionName,
int currentActiveReplicas, int requiredMinActiveReplicas) {
return new MinActiveReplicaCheckResult(false, resourceName, partitionName,
currentActiveReplicas, requiredMinActiveReplicas);
}

public boolean isPassed() {
return passed;
}

public String getResourceName() {
return resourceName;
}

public String getPartitionName() {
return partitionName;
}

public int getCurrentActiveReplicas() {
return currentActiveReplicas;
}

public int getRequiredMinActiveReplicas() {
return requiredMinActiveReplicas;
}

@Override
public String toString() {
if (passed) {
return "MIN_ACTIVE_REPLICA_CHECK_FAILED: passed";
} else {
return String.format("MIN_ACTIVE_REPLICA_CHECK_FAILED: Resource %s partition %s has %d/%d active replicas",
resourceName, partitionName, currentActiveReplicas, requiredMinActiveReplicas);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.PropertyKey;
Expand Down Expand Up @@ -589,13 +588,116 @@ public void TestSiblingNodesActiveReplicaCheckSuccessWithReplicaInErrorState() {
Assert.assertFalse(result);
}

@Test
public void testSiblingNodesActiveReplicaCheckWithDetails() throws Exception {
Mock mock = new Mock();

ClusterConfig clusterConfig = mock(ClusterConfig.class);

InstanceConfig instanceConfig = new InstanceConfig(TEST_INSTANCE);
instanceConfig.setDomain("zone=1");

// set ideal state
IdealState idealState = mock(IdealState.class);
when(idealState.isEnabled()).thenReturn(true);
when(idealState.isValid()).thenReturn(true);
when(idealState.getStateModelDefRef()).thenReturn("MasterSlave");

// set external view
ExternalView externalView = mock(ExternalView.class);
when(externalView.getMinActiveReplicas()).thenReturn(2);
when(externalView.getStateModelDefRef()).thenReturn("MasterSlave");
when(externalView.getPartitionSet()).thenReturn(ImmutableSet.of("testResource_0"));

StateModelDefinition stateModelDefinition = mock(StateModelDefinition.class);
when(stateModelDefinition.getInitialState()).thenReturn("OFFLINE");

doReturn(ImmutableList.of("testResource")).when(mock.dataAccessor)
.getChildNames(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));
doReturn(idealState).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));
doReturn(externalView).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW)));
doReturn(stateModelDefinition).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS)));
when(mock.dataAccessor.getProperty(BUILDER.instanceConfig(TEST_INSTANCE))).thenReturn(instanceConfig);
when(mock.dataAccessor.getProperty(BUILDER.clusterConfig())).thenReturn(clusterConfig);

// Test with sufficient active replicas - should pass
when(externalView.getStateMap("testResource_0")).thenReturn(
ImmutableMap.of(TEST_INSTANCE, "Master", "instance1", "Slave", "instance2", "Slave"));

MinActiveReplicaCheckResult result =
InstanceValidationUtil.siblingNodesActiveReplicaCheckWithDetails(mock.dataAccessor, TEST_INSTANCE, Collections.emptySet());

Assert.assertTrue(result.isPassed());
Assert.assertNull(result.getPartitionName());

// Test with insufficient active replicas after instance removal - should fail
System.out.println("Setting up second scenario...");
when(externalView.getStateMap("testResource_0")).thenReturn(
ImmutableMap.of(TEST_INSTANCE, "Master", "instance1", "ERROR", "instance2", "ERROR"));

result = InstanceValidationUtil.siblingNodesActiveReplicaCheckWithDetails(mock.dataAccessor, TEST_INSTANCE, Collections.emptySet());

Assert.assertFalse(result.isPassed());
Assert.assertNotNull(result.getPartitionName());
Assert.assertEquals("testResource_0", result.getPartitionName());
Assert.assertEquals("testResource", result.getResourceName());
Assert.assertEquals(0, result.getCurrentActiveReplicas());
Assert.assertEquals(2, result.getRequiredMinActiveReplicas());
}

@Test
public void testSiblingNodesActiveReplicaCheckWithDetailsFailFast() throws Exception {
Mock mock = new Mock();

ClusterConfig clusterConfig = mock(ClusterConfig.class);

InstanceConfig instanceConfig = new InstanceConfig(TEST_INSTANCE);
instanceConfig.setDomain("zone=1");

// set ideal state
IdealState idealState = mock(IdealState.class);
when(idealState.isEnabled()).thenReturn(true);
when(idealState.isValid()).thenReturn(true);
when(idealState.getStateModelDefRef()).thenReturn("MasterSlave");

// set external view - this one will fail the check
ExternalView externalView = mock(ExternalView.class);
when(externalView.getMinActiveReplicas()).thenReturn(2);
when(externalView.getStateModelDefRef()).thenReturn("MasterSlave");
when(externalView.getPartitionSet()).thenReturn(ImmutableSet.of("resource1_0"));
when(externalView.getStateMap("resource1_0")).thenReturn(
ImmutableMap.of(TEST_INSTANCE, "Master", "instance1", "ERROR")); // Failure scenario - only 0 healthy siblings after removal

StateModelDefinition stateModelDefinition = mock(StateModelDefinition.class);
when(stateModelDefinition.getInitialState()).thenReturn("OFFLINE");

doReturn(ImmutableList.of("resource1")).when(mock.dataAccessor)
.getChildNames(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));
doReturn(idealState).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));
doReturn(externalView).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW)));
doReturn(stateModelDefinition).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS)));

when(mock.dataAccessor.getProperty(BUILDER.instanceConfig(TEST_INSTANCE))).thenReturn(instanceConfig);
when(mock.dataAccessor.getProperty(BUILDER.clusterConfig())).thenReturn(clusterConfig);

MinActiveReplicaCheckResult result =
InstanceValidationUtil.siblingNodesActiveReplicaCheckWithDetails(mock.dataAccessor, TEST_INSTANCE, Collections.emptySet());

// Should fail on the resource
Assert.assertFalse(result.isPassed());
Assert.assertEquals("resource1_0", result.getPartitionName());
Assert.assertEquals("resource1", result.getResourceName());
Assert.assertEquals(0, result.getCurrentActiveReplicas());
Assert.assertEquals(2, result.getRequiredMinActiveReplicas());
}

private class Mock {
HelixDataAccessor dataAccessor;
ConfigAccessor configAccessor;

Mock() {
this.dataAccessor = mock(HelixDataAccessor.class);
this.configAccessor = mock(ConfigAccessor.class);
when(dataAccessor.keyBuilder()).thenReturn(BUILDER);
}
}
Expand Down
Loading
Loading