Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@
import org.apache.helix.ZNRecord;
import org.apache.helix.api.config.StateTransitionThrottleConfig;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
Expand All @@ -50,7 +49,6 @@
*/
public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceControllerDataProvider> {
private static final Logger LOG = LoggerFactory.getLogger(DelayedAutoRebalancer.class);
private static RebalanceScheduler _rebalanceScheduler = new RebalanceScheduler();

@Override
public IdealState computeNewIdealState(String resourceName,
Expand Down Expand Up @@ -79,7 +77,8 @@ public IdealState computeNewIdealState(String resourceName,

ClusterConfig clusterConfig = clusterData.getClusterConfig();
ResourceConfig resourceConfig = clusterData.getResourceConfig(resourceName);
boolean delayRebalanceEnabled = isDelayRebalanceEnabled(currentIdealState, clusterConfig);
boolean delayRebalanceEnabled =
DelayedRebalanceUtil.isDelayRebalanceEnabled(currentIdealState, clusterConfig);

if (resourceConfig != null) {
userDefinedPreferenceList = resourceConfig.getPreferenceLists();
Expand Down Expand Up @@ -110,16 +109,18 @@ public IdealState computeNewIdealState(String resourceName,

Set<String> activeNodes = liveEnabledNodes;
if (delayRebalanceEnabled) {
long delay = getRebalanceDelay(currentIdealState, clusterConfig);
activeNodes = getActiveInstances(allNodes, currentIdealState, liveEnabledNodes,
clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
clusterData.getInstanceConfigMap(), delay, clusterConfig);
long delay = DelayedRebalanceUtil.getRebalanceDelay(currentIdealState, clusterConfig);
activeNodes = DelayedRebalanceUtil
.getActiveNodes(allNodes, currentIdealState, liveEnabledNodes,
clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
clusterData.getInstanceConfigMap(), delay, clusterConfig);

Set<String> offlineOrDisabledInstances = new HashSet<>(activeNodes);
offlineOrDisabledInstances.removeAll(liveEnabledNodes);
setRebalanceScheduler(currentIdealState, offlineOrDisabledInstances,
clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
clusterData.getInstanceConfigMap(), delay, clusterConfig);
DelayedRebalanceUtil.setRebalanceScheduler(currentIdealState.getResourceName(), true,
offlineOrDisabledInstances, clusterData.getInstanceOfflineTimeMap(),
clusterData.getLiveInstances().keySet(), clusterData.getInstanceConfigMap(), delay,
clusterConfig, _manager);
}

if (allNodes.isEmpty() || activeNodes.isEmpty()) {
Expand Down Expand Up @@ -162,16 +163,16 @@ public IdealState computeNewIdealState(String resourceName,
.computePartitionAssignment(allNodeList, liveEnabledNodeList, currentMapping, clusterData);
ZNRecord finalMapping = newIdealMapping;

if (isDelayRebalanceEnabled(currentIdealState, clusterConfig)) {
if (DelayedRebalanceUtil.isDelayRebalanceEnabled(currentIdealState, clusterConfig)) {
List<String> activeNodeList = new ArrayList<>(activeNodes);
Collections.sort(activeNodeList);
int minActiveReplicas = getMinActiveReplica(currentIdealState, replicaCount);
int minActiveReplicas =
DelayedRebalanceUtil.getMinActiveReplica(currentIdealState, replicaCount);

ZNRecord newActiveMapping = _rebalanceStrategy
.computePartitionAssignment(allNodeList, activeNodeList, currentMapping, clusterData);
finalMapping =
getFinalDelayedMapping(currentIdealState, newIdealMapping, newActiveMapping, liveEnabledNodes,
replicaCount, minActiveReplicas);
finalMapping = getFinalDelayedMapping(currentIdealState, newIdealMapping, newActiveMapping,
liveEnabledNodes, replicaCount, minActiveReplicas);
}

finalMapping.getListFields().putAll(userDefinedPreferenceList);
Expand Down Expand Up @@ -202,162 +203,15 @@ private IdealState generateNewIdealState(String resourceName, IdealState current
return newIdealState;
}

/* get all active instances (live instances plus offline-yet-active instances */
private Set<String> getActiveInstances(Set<String> allNodes, IdealState idealState,
Set<String> liveEnabledNodes, Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes,
Map<String, InstanceConfig> instanceConfigMap, long delay, ClusterConfig clusterConfig) {
Set<String> activeInstances = new HashSet<>(liveEnabledNodes);

if (!isDelayRebalanceEnabled(idealState, clusterConfig)) {
return activeInstances;
}

Set<String> offlineOrDisabledInstances = new HashSet<>(allNodes);
offlineOrDisabledInstances.removeAll(liveEnabledNodes);

long currentTime = System.currentTimeMillis();
for (String ins : offlineOrDisabledInstances) {
long inactiveTime = getInactiveTime(ins, liveNodes, instanceOfflineTimeMap.get(ins), delay,
instanceConfigMap.get(ins), clusterConfig);
InstanceConfig instanceConfig = instanceConfigMap.get(ins);
if (inactiveTime > currentTime && instanceConfig != null && instanceConfig
.isDelayRebalanceEnabled()) {
activeInstances.add(ins);
}
}

return activeInstances;
}

/* Set a rebalance scheduler for the closest future rebalance time. */
private void setRebalanceScheduler(IdealState idealState, Set<String> offlineOrDisabledInstances,
Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes,
Map<String, InstanceConfig> instanceConfigMap, long delay,
ClusterConfig clusterConfig) {
String resourceName = idealState.getResourceName();
if (!isDelayRebalanceEnabled(idealState, clusterConfig)) {
_rebalanceScheduler.removeScheduledRebalance(resourceName);
return;
}

long currentTime = System.currentTimeMillis();
long nextRebalanceTime = Long.MAX_VALUE;
// calculate the closest future rebalance time
for (String ins : offlineOrDisabledInstances) {
long inactiveTime = getInactiveTime(ins, liveNodes, instanceOfflineTimeMap.get(ins), delay,
instanceConfigMap.get(ins), clusterConfig);
if (inactiveTime != -1 && inactiveTime > currentTime && inactiveTime < nextRebalanceTime) {
nextRebalanceTime = inactiveTime;
}
}

if (nextRebalanceTime == Long.MAX_VALUE) {
long startTime = _rebalanceScheduler.removeScheduledRebalance(resourceName);
if (LOG.isDebugEnabled()) {
LOG.debug(String
.format("Remove exist rebalance timer for resource %s at %d\n", resourceName, startTime));
}
} else {
long currentScheduledTime = _rebalanceScheduler.getRebalanceTime(resourceName);
if (currentScheduledTime < 0 || currentScheduledTime > nextRebalanceTime) {
_rebalanceScheduler.scheduleRebalance(_manager, resourceName, nextRebalanceTime);
if (LOG.isDebugEnabled()) {
LOG.debug(String
.format("Set next rebalance time for resource %s at time %d\n", resourceName,
nextRebalanceTime));
}
}
}
}

/**
* The time when an offline or disabled instance should be treated as inactive. return -1 if it is
* inactive now.
*
* @return
*/
private long getInactiveTime(String instance, Set<String> liveInstances, Long offlineTime,
long delay, InstanceConfig instanceConfig, ClusterConfig clusterConfig) {
long inactiveTime = Long.MAX_VALUE;

// check the time instance went offline.
if (!liveInstances.contains(instance)) {
if (offlineTime != null && offlineTime > 0 && offlineTime + delay < inactiveTime) {
inactiveTime = offlineTime + delay;
}
}

// check the time instance got disabled.
if (!instanceConfig.getInstanceEnabled() || (clusterConfig.getDisabledInstances() != null
&& clusterConfig.getDisabledInstances().containsKey(instance))) {
long disabledTime = instanceConfig.getInstanceEnabledTime();
if (clusterConfig.getDisabledInstances() != null && clusterConfig.getDisabledInstances()
.containsKey(instance)) {
// Update batch disable time
long batchDisableTime = Long.parseLong(clusterConfig.getDisabledInstances().get(instance));
if (disabledTime == -1 || disabledTime > batchDisableTime) {
disabledTime = batchDisableTime;
}
}
if (disabledTime > 0 && disabledTime + delay < inactiveTime) {
inactiveTime = disabledTime + delay;
}
}

if (inactiveTime == Long.MAX_VALUE) {
return -1;
}

return inactiveTime;
}

private long getRebalanceDelay(IdealState idealState, ClusterConfig clusterConfig) {
long delayTime = idealState.getRebalanceDelay();
if (delayTime < 0) {
delayTime = clusterConfig.getRebalanceDelayTime();
}
return delayTime;
}

private boolean isDelayRebalanceEnabled(IdealState idealState, ClusterConfig clusterConfig) {
long delay = getRebalanceDelay(idealState, clusterConfig);
return (delay > 0 && idealState.isDelayRebalanceEnabled() && clusterConfig
. isDelayRebalaceEnabled());
}

private ZNRecord getFinalDelayedMapping(IdealState idealState, ZNRecord newIdealMapping,
ZNRecord newActiveMapping, Set<String> liveInstances, int numReplica, int minActiveReplica) {
if (minActiveReplica >= numReplica) {
return newIdealMapping;
}
ZNRecord finalMapping = new ZNRecord(idealState.getResourceName());
for (String partition : newIdealMapping.getListFields().keySet()) {
List<String> idealList = newIdealMapping.getListField(partition);
List<String> activeList = newActiveMapping.getListField(partition);

List<String> liveList = new ArrayList<>();
int activeReplica = 0;
for (String ins : activeList) {
if (liveInstances.contains(ins)) {
activeReplica++;
liveList.add(ins);
}
}

if (activeReplica >= minActiveReplica) {
finalMapping.setListField(partition, activeList);
} else {
List<String> candidates = new ArrayList<String>(idealList);
candidates.removeAll(activeList);
for (String liveIns : candidates) {
liveList.add(liveIns);
if (liveList.size() >= minActiveReplica) {
break;
}
}
finalMapping.setListField(partition, liveList);
}
}
finalMapping.setListFields(DelayedRebalanceUtil
.getFinalDelayedMapping(newIdealMapping.getListFields(), newActiveMapping.getListFields(),
liveInstances, minActiveReplica));
return finalMapping;
}

Expand Down Expand Up @@ -391,10 +245,11 @@ public ResourceAssignment computeBestPossiblePartitionState(ResourceControllerDa
Set<String> liveNodes = cache.getLiveInstances().keySet();

ClusterConfig clusterConfig = cache.getClusterConfig();
long delayTime = getRebalanceDelay(idealState, clusterConfig);
Set<String> activeNodes = getActiveInstances(allNodes, idealState, liveNodes,
cache.getInstanceOfflineTimeMap(), cache.getLiveInstances().keySet(),
cache.getInstanceConfigMap(), delayTime, clusterConfig);
long delayTime = DelayedRebalanceUtil.getRebalanceDelay(idealState, clusterConfig);
Set<String> activeNodes = DelayedRebalanceUtil
.getActiveNodes(allNodes, idealState, liveNodes, cache.getInstanceOfflineTimeMap(),
cache.getLiveInstances().keySet(), cache.getInstanceConfigMap(), delayTime,
clusterConfig);

String stateModelDefName = idealState.getStateModelDefRef();
StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
Expand All @@ -419,14 +274,6 @@ public ResourceAssignment computeBestPossiblePartitionState(ResourceControllerDa
return partitionMapping;
}

private int getMinActiveReplica(IdealState idealState, int replicaCount) {
int minActiveReplicas = idealState.getMinActiveReplicas();
if (minActiveReplicas < 0) {
minActiveReplicas = replicaCount;
}
return minActiveReplicas;
}

/**
* compute best state for resource in AUTO ideal state mode
* @param liveInstances
Expand Down
Loading