Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
Expand All @@ -51,6 +53,7 @@
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyPathBuilder;
Expand All @@ -77,6 +80,7 @@
import org.apache.helix.model.ControllerHistory;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.CustomizedStateConfig;
import org.apache.helix.model.EvacuationInfo;
import org.apache.helix.model.CustomizedView;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HelixConfigScope;
Expand Down Expand Up @@ -478,6 +482,32 @@ public boolean isEvacuateFinished(String clusterName, String instanceName,
return !instanceHasCurrentStateOrMessage(clusterName, instanceName, exclusionTypes);
}

/**
* Returns a detailed evacuation status for the given instance.
*
* This is used by the Helix REST {@code isEvacuateFinished} command to return an extendable JSON
* response (for example, remaining partition count) without changing the {@link HelixAdmin}
* public interface.
*/
public EvacuationInfo getEvacuationStatus(String clusterName, String instanceName,
Set<InstanceDrainExclusionType> exclusionTypes) {
EvacuationInfo result = new EvacuationInfo();

InstanceConfig config = getInstanceConfig(clusterName, instanceName);
if (config == null || config.getInstanceOperation().getOperation() !=
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we should have different reason code for config as null vs no evacuate operation assigned.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think so it will add any value, even if instance config is not present, instance is technically not evacauting, and in both cases the user will anyways go to the cluster to check the instance config.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some point, we will propagate this to UI. For user facing things, we should be meticulous about what messages we show/return.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think no harm in adding it, added! Thanks for the suggestion!

InstanceConstants.InstanceOperation.EVACUATE) {
result.setState(EvacuationInfo.EvacuationState.NOT_EVACUATING);
result.setReason(EvacuationInfo.ReasonCode.NOT_IN_EVACUATE_OPERATION);
return result;
}

boolean hasBlocking = instanceHasCurrentStateOrMessage(
clusterName, instanceName, exclusionTypes, result);
result.setState(hasBlocking ? EvacuationInfo.EvacuationState.IN_PROGRESS
: EvacuationInfo.EvacuationState.COMPLETED);
return result;
}

@Override
public boolean isInstanceDrained(String clusterName, String instanceName) {
return !instanceHasCurrentStateOrMessage(clusterName, instanceName, Collections.emptySet());
Expand Down Expand Up @@ -811,6 +841,15 @@ public boolean forceKillInstance(String clusterName, String instanceName, String
*/
private boolean instanceHasCurrentStateOrMessage(String clusterName,
String instanceName, Set<InstanceDrainExclusionType> exclusionTypes) {
return instanceHasCurrentStateOrMessage(clusterName, instanceName, exclusionTypes, null);
}

/**
* Same as {@link #instanceHasCurrentStateOrMessage(String, String, Set)} but optionally fills
* {@code evacuationInfo} with additional details for REST responses.
*/
private boolean instanceHasCurrentStateOrMessage(String clusterName, String instanceName,
Set<InstanceDrainExclusionType> exclusionTypes, @Nullable EvacuationInfo evacuationInfo) {
HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();

Expand All @@ -825,11 +864,18 @@ private boolean instanceHasCurrentStateOrMessage(String clusterName,
if (sessions.isEmpty()) {
logger.info("Instance {} in cluster {} does not have any session. The instance can be removed.",
instanceName, clusterName);
if (evacuationInfo != null) {
evacuationInfo.setRemainingPartitionCount(0);
evacuationInfo.setPendingMessageCount(0);
}
return false;
}
if (sessions.size() > 1) {
logger.info("Instance {} in cluster {} is carrying over from prev session.",
instanceName, clusterName);
if (evacuationInfo != null) {
evacuationInfo.setReason(EvacuationInfo.ReasonCode.MULTIPLE_SESSIONS);
}
return true;
}

Expand All @@ -839,9 +885,29 @@ private boolean instanceHasCurrentStateOrMessage(String clusterName,
if (currentStates == null || currentStates.isEmpty()) {
logger.info("Instance {} in cluster {} does not have any current state.",
instanceName, clusterName);
if (evacuationInfo != null) {
evacuationInfo.setRemainingPartitionCount(0);
evacuationInfo.setPendingMessageCount(0);
}
return false;
}

// Calculate max mtime across all CurrentState ZNodes for lastActivityTimestamp
// note: getChildValues() that is used above to fetch currentStates, doesn't populate stat data,
// so we need to fetch stats separately, this is not a expensive zk op, as its a batch call.
if (evacuationInfo != null) {
List<PropertyKey> currentStateKeys = new ArrayList<>();
for (CurrentState cs : currentStates) {
currentStateKeys.add(keyBuilder.currentState(instanceName, sessionId, cs.getResourceName()));
}
accessor.getPropertyStats(currentStateKeys).stream()
.filter(Objects::nonNull)
.max(Comparator.comparingLong(HelixProperty.Stat::getModifiedTime))
.map(HelixProperty.Stat::getModifiedTime)
.filter(maxMtime -> maxMtime > 0)
.ifPresent(evacuationInfo::setLastActivityTimestamp);
}

List<IdealState> idealStates = accessor.getChildValues(keyBuilder.idealStates(), true);

// Step 1: Get set of FULL_AUTO and CUSTOMIZED resources, except for excluded ones through exclusion list
Expand All @@ -856,35 +922,43 @@ private boolean instanceHasCurrentStateOrMessage(String clusterName,
List<PartitionInfo> partitionsStillOnInstance =
PartitionExclusionHelper.getCustomizedPartitionsStillOnInstance(
currentStates, idealStates, instanceName, allowedResources, filters);

boolean hasPartitionsStillOnInstance = !partitionsStillOnInstance.isEmpty();
logger.info("Instance {} in cluster {} (offline) has {} partitions still on instance after exclusions",
instanceName, clusterName, partitionsStillOnInstance.size());
if (evacuationInfo != null) {
evacuationInfo.setRemainingPartitionCount(partitionsStillOnInstance.size());
evacuationInfo.setPendingMessageCount(0);
}
return hasPartitionsStillOnInstance;
}

// Handle online instances - check for pending messages first
// Handle online instances - also report pending messages
List<String> messages = accessor.getChildNames(keyBuilder.messages(instanceName));
if (messages != null && !messages.isEmpty()) {
int pendingMessageCount = messages != null ? messages.size() : 0;
if (evacuationInfo != null) {
evacuationInfo.setPendingMessageCount(pendingMessageCount);
}
if (pendingMessageCount > 0) {
logger.info("Instance {} in cluster {} has {} pending messages.",
instanceName, clusterName, messages.size());
return true;
instanceName, clusterName, pendingMessageCount);
}

// Step 4: Collect all partitions from current states
// Step 4: Collect all partitions from current states (after resource-level exclusions)
List<PartitionInfo> allPartitions =
PartitionExclusionHelper.collectPartitions(currentStates, allowedResources);

// Step 5: Apply exclusions to the collected partitions
// Step 5: Apply partition-level exclusions
List<PartitionInfo> remainingPartitions =
PartitionExclusionHelper.applyExclusions(allPartitions, filters);

// Step 6: Check if any partitions remain after exclusions
boolean hasRemainingPartitions = !remainingPartitions.isEmpty();
logger.info("Instance {} in cluster {} has {} partitions after applying {} exclusions (from {} total)",
instanceName, clusterName, remainingPartitions.size(), exclusionTypes.size(), allPartitions.size());
if (evacuationInfo != null) {
evacuationInfo.setRemainingPartitionCount(remainingPartitions.size());
}

return hasRemainingPartitions;
return pendingMessageCount > 0 || hasRemainingPartitions;
}

/**
Expand Down
157 changes: 157 additions & 0 deletions helix-core/src/main/java/org/apache/helix/model/EvacuationInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package org.apache.helix.model;

/*
* Licensed to the Apache Software Foundation (ASF) under one
Comment thread
LZD-PratyushBhatt marked this conversation as resolved.
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import com.fasterxml.jackson.annotation.JsonInclude;

/**
* Data class representing the evacuation status of an instance.
* Used by the isEvacuateFinished REST API to return detailed information
* about the evacuation progress.
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
public class EvacuationInfo {

/**
* Enum representing the current state of evacuation.
*/
public enum EvacuationState {
NOT_EVACUATING, // Instance is not in EVACUATE operation
IN_PROGRESS, // Evacuation is ongoing
COMPLETED // Evacuation finished successfully
}

/**
* Enum representing reasons why evacuation may be blocked or incomplete.
*/
public enum ReasonCode {
NOT_IN_EVACUATE_OPERATION("Instance is not in EVACUATE operation"),
MULTIPLE_SESSIONS("Instance has multiple sessions and is carrying over from previous session");

private final String message;

ReasonCode(String message) {
this.message = message;
}

public String getMessage() {
return message;
}
}

private EvacuationState state;
private Integer remainingPartitionCount;
private Integer pendingMessageCount;
private String reason;
private Long lastActivityTimestamp;

/**
* Default constructor for Jackson deserialization.
* Fields are left as null so they won't be serialized when not applicable.
*/
public EvacuationInfo() {
this.state = EvacuationState.NOT_EVACUATING;
// remainingPartitionCount and pendingMessageCount are intentionally left null
// so they won't be serialized for NOT_EVACUATING state
}

/**
* Constructor with all fields.
*/
public EvacuationInfo(EvacuationState state, Integer remainingPartitionCount, Integer pendingMessageCount, String reason) {
this.state = state;
this.remainingPartitionCount = remainingPartitionCount;
this.pendingMessageCount = pendingMessageCount;
this.reason = reason;
}

public EvacuationState getState() {
return state;
}

public void setState(EvacuationState state) {
this.state = state;
}

/**
* Returns whether evacuation is complete.
* This is a derived field for backward compatibility with clients expecting a boolean "successful" field.
*
* @return true if state is COMPLETED, false otherwise
*/
public boolean isSuccessful() {
return state == EvacuationState.COMPLETED;
}

public Integer getRemainingPartitionCount() {
return remainingPartitionCount;
}

public void setRemainingPartitionCount(Integer remainingPartitionCount) {
this.remainingPartitionCount = remainingPartitionCount;
}

public Integer getPendingMessageCount() {
return pendingMessageCount;
}

public void setPendingMessageCount(Integer pendingMessageCount) {
this.pendingMessageCount = pendingMessageCount;
}

public String getReason() {
return reason;
}

public void setReason(String reason) {
this.reason = reason;
}

/**
* Sets the reason using a predefined ReasonCode.
*/
public void setReason(ReasonCode reasonCode) {
this.reason = reasonCode.getMessage();
}

public Long getLastActivityTimestamp() {
return lastActivityTimestamp;
}

/**
* Sets the timestamp of the last activity during evacuation.
* This is the max modification time across all CurrentState ZNodes for the instance.
*/
public void setLastActivityTimestamp(Long lastActivityTimestamp) {
this.lastActivityTimestamp = lastActivityTimestamp;
}

@Override
public String toString() {
return "EvacuationInfo{" +
"successful=" + isSuccessful() +
", state=" + state +
", remainingPartitionCount=" + remainingPartitionCount +
", pendingMessageCount=" + pendingMessageCount +
", reason='" + reason + '\'' +
", lastActivityTimestamp=" + lastActivityTimestamp +
'}';
}
}
Loading
Loading