Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
Expand Down Expand Up @@ -57,17 +58,18 @@
* Compares the currentState, pendingState with IdealState and generate messages
*/
public class MessageGenerationPhase extends AbstractBaseStage {
private final static String NO_DESIRED_STATE = "NoDesiredState";
private static final String NO_DESIRED_STATE = "NoDesiredState";

// If we see there is any invalid pending message leaving on host, i.e. message
// tells participant to change from SLAVE to MASTER, and the participant is already
// at MASTER state, we wait for timeout and if the message is still not cleaned up by
// participant, controller will cleanup them proactively to unblock further state
// transition
public final static long DEFAULT_OBSELETE_MSG_PURGE_DELAY = HelixUtil
public static final long DEFAULT_OBSELETE_MSG_PURGE_DELAY = HelixUtil
.getSystemPropertyAsLong(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, 60 * 1000);
private final static String PENDING_MESSAGE = "pending message";
private final static String STALE_MESSAGE = "stale message";
private static final String PENDING_MESSAGE = "pending message";
private static final String STALE_MESSAGE = "stale message";
private static final String OFFLINE = "OFFLINE";

private static Logger logger = LoggerFactory.getLogger(MessageGenerationPhase.class);

Expand Down Expand Up @@ -163,6 +165,18 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr
// desired-state->list of generated-messages
Map<String, List<Message>> messageMap = new HashMap<>();

/*
* Calculate the current active replica count based on state model type.
* This represents the number of replicas currently serving traffic for this partition
* Active replicas include: top states, secondary top states(excluding OFFLINE) and ERROR
* states.
* Active replicas exclude: OFFLINE and DROPPED states.
* All qualifying state transitions for this partition will receive this same value,
* allowing clients to understand the current availability level and prioritize accordingly.
*/
int currentActiveReplicaCount =
calculateCurrentActiveReplicaCount(currentStateMap, stateModelDef);

for (String instanceName : instanceStateMap.keySet()) {

Set<Message> staleMessages = cache.getStaleMessagesByInstance(instanceName);
Expand Down Expand Up @@ -250,17 +264,39 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr
pendingMessage, manager, resource, partition, sessionIdMap, instanceName,
stateModelDef, cancellationMessage, isCancellationEnabled);
} else {
// Set currentActiveReplicaNumber to provide metadata for potential message
// prioritization by participant.
// Assign the current active replica count to all qualifying upward transitions for this
// partition.
// This ensures consistent prioritization metadata across concurrent state transitions.
// -1 indicates no prioritization metadata, for eg:Downward ST messages get a -1.
int currentActiveReplicaNumber = -1;

/*
* Assign currentActiveReplicaNumber for qualifying upward state transitions.
* Criteria for assignment:
* - Must be an upward state transition according to state model
* - Target state must be considered active (according to state model type)
*/
if (stateModelDef.isUpwardStateTransition(currentState, nextState)
&& isStateActive(nextState, stateModelDef)) {

// All qualifying transitions for this partition get the same
// currentActiveReplicaNumber
currentActiveReplicaNumber = currentActiveReplicaCount;
}

// Create new state transition message
message = MessageUtil
.createStateTransitionMessage(manager.getInstanceName(), manager.getSessionId(),
resource, partition.getPartitionName(), instanceName, currentState, nextState,
sessionIdMap.get(instanceName), stateModelDef.getId());
message = MessageUtil.createStateTransitionMessage(manager.getInstanceName(),
manager.getSessionId(), resource, partition.getPartitionName(), instanceName,
currentState, nextState, sessionIdMap.get(instanceName), stateModelDef.getId(),
currentActiveReplicaNumber);

if (logger.isDebugEnabled()) {
LogUtil.logDebug(logger, _eventId, String.format(
"Resource %s partition %s for instance %s with currentState %s and nextState %s",
"Resource %s partition %s for instance %s with currentState %s, nextState %s and currentActiveReplicaNumber %d",
resource.getResourceName(), partition.getPartitionName(), instanceName,
currentState, nextState));
currentState, nextState, currentActiveReplicaNumber));
}
}
}
Expand Down Expand Up @@ -290,7 +326,66 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr
} // end of for-each-partition
}

private boolean shouldCreateSTCancellation(Message pendingMessage, String desiredState,
/**
* Calculate the current active replica count based on state model type.
* The count includes replicas in top states, secondary top states (excluding OFFLINE),
* and ERROR states since helix considers them active.Count excludes OFFLINE and DROPPED states.
* @param currentStateMap
* @param stateModelDef
* @return The number of replicas currently in active states, used to determine the
* currentActiveReplicaNumber for the partition.
*/
private int calculateCurrentActiveReplicaCount(Map<String, String> currentStateMap,
StateModelDefinition stateModelDef) {
return (int) currentStateMap.values().stream()
.filter(state -> stateModelDef.getTopState().contains(state) // Top states (MASTER, ONLINE,
// LEADER)
|| getActiveSecondaryTopStates(stateModelDef).contains(state) // Active secondary states
// (SLAVE, STANDBY,
// BOOTSTRAP)
|| HelixDefinedState.ERROR.name().equals(state) // ERROR states (still considered
// active)
// DROPPED and OFFLINE are automatically excluded by getActiveSecondaryTopStates()
).count();
}

/**
* Get active secondary top states - states that are not non-serving states like OFFLINE and
* DROPPED.
* Reasons for elimination:
* - getSecondTopStates() can include OFFLINE as a secondary top state in some state models.
* Example - OnlineOffline:
* getSecondTopStates() = ["OFFLINE"] as it transitions to ONLINE.
* After filtering: activeSecondaryTopStates=[] (removes "OFFLINE" as it's not a serving state).
* @param stateModelDef
*/
private List<String> getActiveSecondaryTopStates(StateModelDefinition stateModelDef) {
return stateModelDef.getSecondTopStates().stream()
// Remove non-serving states
.filter(state -> !OFFLINE.equals(state) && !HelixDefinedState.DROPPED.name().equals(state))
.collect(Collectors.toList());
}

/**
* Determines if the given state is considered active based on the state model type.
* Active states include: top states, active secondary top states (excluding OFFLINE),
* and ERROR states. Active states exclude OFFLINE and DROPPED states.
* ERROR state replicas are always considered active in HELIX as they do not
* affect availability.
* @param state
* @param stateModelDef
* @return true if the state is considered active, false otherwise
*/
private boolean isStateActive(String state, StateModelDefinition stateModelDef) {
// ERROR state is always considered active regardless of state model type
if (HelixDefinedState.ERROR.name().equals(state)) {
return true;
}
return stateModelDef.getTopState().contains(state)
|| getActiveSecondaryTopStates(stateModelDef).contains(state);
}

private boolean shouldCreateSTCancellation(Message pendingMessage, String desiredState,
String initialState) {
if (pendingMessage == null) {
return false;
Expand Down
44 changes: 37 additions & 7 deletions helix-core/src/main/java/org/apache/helix/model/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ public enum Attributes {
RELAY_FROM,
EXPIRY_PERIOD,
SRC_CLUSTER,
ST_REBALANCE_TYPE
ST_REBALANCE_TYPE,
CURRENT_ACTIVE_REPLICA_NUMBER
}

/**
Expand Down Expand Up @@ -137,12 +138,8 @@ public enum STRebalanceType {
/**
* Compares the creation time of two Messages
*/
public static final Comparator<Message> CREATE_TIME_COMPARATOR = new Comparator<Message>() {
@Override
public int compare(Message m1, Message m2) {
return new Long(m1.getCreateTimeStamp()).compareTo(new Long(m2.getCreateTimeStamp()));
}
};
public static final Comparator<Message> CREATE_TIME_COMPARATOR =
(m1, m2) -> Long.compare(m2.getCreateTimeStamp(), m1.getCreateTimeStamp());

/**
* Instantiate a message
Expand Down Expand Up @@ -935,6 +932,39 @@ public void setSrcClusterName(String clusterName) {
_record.setSimpleField(Attributes.SRC_CLUSTER.name(), clusterName);
}

/**
* Set current active replica count for participant-side message prioritization.
* This field indicates the number of replicas currently in active states (including ERROR states)
* for this partition at the time the state transition message is generated.
* Active states include top states, secondary top states (for single-top state models),
* and ERROR states.
* This metadata enables participants to prioritize recovery scenarios (low active counts)
* over load balancing scenarios (high active counts) in custom thread pools or message handlers.
* For example, 2→3 transitions get higher priority than 3→4 transitions.
* Default value is -1 for transitions that don't require prioritization metadata.(for eg :
* downward transitions).
* @param currentActiveReplicaNumber the number of currently active replicas (-1 when there is no
* prioritization metadata,
* >=0 for transitions containing current availability level)
*/
public void setCurrentActiveReplicaNumber(int currentActiveReplicaNumber) {
_record.setIntField(Attributes.CURRENT_ACTIVE_REPLICA_NUMBER.name(),
currentActiveReplicaNumber);
}

/**
* Get the current active replica count for this partition at message generation time.
* This value represents the number of replicas in active states (including ERROR states) before
* any state transitions occur, enabling participant-side message prioritization based on
* current availability levels.
* @return current active replica count, or -1 for cases where we don't provide metadata for
* prioritization like downward state transitions.
*/

public int getCurrentActiveReplicaNumber() {
return _record.getIntField(Attributes.CURRENT_ACTIVE_REPLICA_NUMBER.name(), -1);
}

/**
* Check if this message is targetted for a controller
* @return true if this is a controller message, false otherwise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,4 +505,23 @@ public static Map<String, Integer> getStateCounts(Map<String, String> stateMap)
}
return stateCounts;
}

/**
* Check if a state transition is upward
* @param fromState source state
* @param toState destination state
* @return True if it's an upward state transition, false otherwise
*/
public boolean isUpwardStateTransition(String fromState, String toState) {
Map<String, Integer> statePriorityMap = getStatePriorityMap();

Integer fromStateWeight = statePriorityMap.get(fromState);
Integer toStateWeight = statePriorityMap.get(toState);

if (fromStateWeight == null || toStateWeight == null) {
return false;
}

return toStateWeight < fromStateWeight;
}
}
94 changes: 70 additions & 24 deletions helix-core/src/main/java/org/apache/helix/util/MessageUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static Message createStateTransitionCancellationMessage(String srcInstanc
toState);

Message message =
createStateTransitionMessage(Message.MessageType.STATE_TRANSITION_CANCELLATION,
createBasicStateTransitionMessage(Message.MessageType.STATE_TRANSITION_CANCELLATION,
srcInstanceName, srcSessionId, resource, partitionName, instanceName, currentState,
nextState, sessionId, stateModelDefName);

Expand All @@ -60,28 +60,6 @@ public static Message createStateTransitionCancellationMessage(String srcInstanc
return null;
}

public static Message createStateTransitionMessage(String srcInstanceName, String srcSessionId,
Resource resource, String partitionName, String instanceName, String currentState,
String nextState, String tgtSessionId, String stateModelDefName) {
Message message =
createStateTransitionMessage(Message.MessageType.STATE_TRANSITION, srcInstanceName,
srcSessionId, resource, partitionName, instanceName, currentState, nextState, tgtSessionId,
stateModelDefName);

// Set the retry count for state transition messages.
// TODO: make the retry count configurable in ClusterConfig or IdealState
message.setRetryCount(DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT);

if (resource.getResourceGroupName() != null) {
message.setResourceGroupName(resource.getResourceGroupName());
}
if (resource.getResourceTag() != null) {
message.setResourceTag(resource.getResourceTag());
}

return message;
}

/**
* Creates a message to change participant status
* {@link org.apache.helix.model.LiveInstance.LiveInstanceStatus}
Expand Down Expand Up @@ -121,7 +99,7 @@ private static Message createBasicMessage(Message.MessageType messageType, Strin
}

/* Creates state transition or state transition cancellation message */
private static Message createStateTransitionMessage(Message.MessageType messageType,
private static Message createBasicStateTransitionMessage(Message.MessageType messageType,
String srcInstanceName, String srcSessionId, Resource resource, String partitionName,
String instanceName, String currentState, String nextState, String tgtSessionId,
String stateModelDefName) {
Expand All @@ -136,4 +114,72 @@ private static Message createStateTransitionMessage(Message.MessageType messageT

return message;
}

/**
* Create a state transition message with replica prioritization metadata
* @param srcInstanceName source instance name
* @param srcSessionId source session id
* @param resource resource
* @param partitionName partition name
* @param instanceName target instance name
* @param currentState current state
* @param nextState next state
* @param tgtSessionId target session id
* @param stateModelDefName state model definition name
* @param currentActiveReplicaNumber The number of replicas currently in active states
* for this partition before any state transitions occur. This metadata
* enables participant-side message prioritization by indicating the
* current availability level (e.g., 0→1 recovery scenarios get higher
* priority than 2→3 load balancing scenarios). Set to -1 for transitions
* that should not be prioritized (downward transitions).
* Active states include top states, secondary top states (for single-top
* state models), and ERROR states since they are still considered active by Helix.
* @return state transition message
*/
public static Message createStateTransitionMessage(String srcInstanceName, String srcSessionId,
Resource resource, String partitionName, String instanceName, String currentState,
String nextState, String tgtSessionId, String stateModelDefName,
int currentActiveReplicaNumber) {
Message message = createBasicStateTransitionMessage(Message.MessageType.STATE_TRANSITION,
srcInstanceName, srcSessionId, resource, partitionName, instanceName, currentState,
nextState, tgtSessionId, stateModelDefName);

// Set the retry count for state transition messages.
// TODO: make the retry count configurable in ClusterConfig or IdealState
message.setRetryCount(DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT);

if (resource.getResourceGroupName() != null) {
message.setResourceGroupName(resource.getResourceGroupName());
}
if (resource.getResourceTag() != null) {
message.setResourceTag(resource.getResourceTag());
}

// Set current active replica number for participant-side prioritization
message.setCurrentActiveReplicaNumber(currentActiveReplicaNumber);

return message;
}

/**
* Create a state transition message (backward compatibility)
* @param srcInstanceName source instance name
* @param srcSessionId source session id
* @param resource resource
* @param partitionName partition name
* @param instanceName target instance name
* @param currentState current state
* @param nextState next state
* @param tgtSessionId target session id
* @param stateModelDefName state model definition name
* @return state transition message
*/
public static Message createStateTransitionMessage(String srcInstanceName, String srcSessionId,
Resource resource, String partitionName, String instanceName, String currentState,
String nextState, String tgtSessionId, String stateModelDefName) {
// currentActiveReplicaNumber is set to -1 for ST messages needing no prioritization metadata
// (backward compatibility)
return createStateTransitionMessage(srcInstanceName, srcSessionId, resource, partitionName,
instanceName, currentState, nextState, tgtSessionId, stateModelDefName, -1);
}
}
Loading
Loading