Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.HashMap;
import java.util.Map;

import com.google.common.annotations.VisibleForTesting;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.helix.BucketDataAccessor;
Expand All @@ -51,20 +50,27 @@ public class AssignmentMetadataStore {
private String _bestPossiblePath;
private Map<String, ResourceAssignment> _globalBaseline;
private Map<String, ResourceAssignment> _bestPossibleAssignment;
private boolean _useCache;

AssignmentMetadataStore(String metadataStoreAddrs, String clusterName) {
this(new ZkBucketDataAccessor(metadataStoreAddrs), clusterName);
this(new ZkBucketDataAccessor(metadataStoreAddrs), clusterName, true);
}

AssignmentMetadataStore(BucketDataAccessor bucketDataAccessor, String clusterName) {
protected AssignmentMetadataStore(BucketDataAccessor bucketDataAccessor, String clusterName) {
this(bucketDataAccessor, clusterName, true);
}

protected AssignmentMetadataStore(BucketDataAccessor bucketDataAccessor, String clusterName,
boolean useCache) {
_dataAccessor = bucketDataAccessor;
_baselinePath = String.format(BASELINE_TEMPLATE, clusterName, ASSIGNMENT_METADATA_KEY);
_bestPossiblePath = String.format(BEST_POSSIBLE_TEMPLATE, clusterName, ASSIGNMENT_METADATA_KEY);
_useCache = useCache;
}

public Map<String, ResourceAssignment> getBaseline() {
// Return the in-memory baseline. If null, read from ZK. This is to minimize reads from ZK
if (_globalBaseline == null) {
if (!_useCache || _globalBaseline == null) {
try {
HelixProperty baseline =
_dataAccessor.compressedBucketRead(_baselinePath, HelixProperty.class);
Expand All @@ -79,7 +85,7 @@ public Map<String, ResourceAssignment> getBaseline() {

public Map<String, ResourceAssignment> getBestPossibleAssignment() {
// Return the in-memory baseline. If null, read from ZK. This is to minimize reads from ZK
if (_bestPossibleAssignment == null) {
if (!_useCache || _bestPossibleAssignment == null) {
try {
HelixProperty baseline =
_dataAccessor.compressedBucketRead(_bestPossiblePath, HelixProperty.class);
Expand All @@ -95,7 +101,7 @@ public Map<String, ResourceAssignment> getBestPossibleAssignment() {
public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
// TODO: Make the write async?
// If baseline hasn't changed, skip writing to metadata store
if (compareAssignments(_globalBaseline, globalBaseline)) {
if (_useCache && compareAssignments(_globalBaseline, globalBaseline)) {
return;
}
// Persist to ZK
Expand All @@ -115,7 +121,7 @@ public void persistBestPossibleAssignment(
Map<String, ResourceAssignment> bestPossibleAssignment) {
// TODO: Make the write async?
// If bestPossibleAssignment hasn't changed, skip writing to metadata store
if (compareAssignments(_bestPossibleAssignment, bestPossibleAssignment)) {
if (_useCache && compareAssignments(_bestPossibleAssignment, bestPossibleAssignment)) {
return;
}
// Persist to ZK
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixManager;
Expand Down Expand Up @@ -70,58 +71,64 @@ public class WagedRebalancer {
// When any of the following change happens, the rebalancer needs to do a global rebalance which
// contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline.
private static final Set<HelixConstants.ChangeType> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES =
ImmutableSet.of(HelixConstants.ChangeType.RESOURCE_CONFIG,
HelixConstants.ChangeType.IDEAL_STATE,
HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG);
ImmutableSet
.of(HelixConstants.ChangeType.RESOURCE_CONFIG, HelixConstants.ChangeType.IDEAL_STATE,
HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG);
// To identify if the preference has been configured or not.
private static final Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer>
NOT_CONFIGURED_PREFERENCE = ImmutableMap
.of(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, -1,
ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, -1);

private final ResourceChangeDetector _changeDetector;
private final HelixManager _manager;
private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
private final AssignmentMetadataStore _assignmentMetadataStore;
private final RebalanceAlgorithm _rebalanceAlgorithm;
private RebalanceAlgorithm _rebalanceAlgorithm;
private Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> _preference =
NOT_CONFIGURED_PREFERENCE;
private MetricCollector _metricCollector;

private static AssignmentMetadataStore constructAssignmentStore(HelixManager helixManager) {
AssignmentMetadataStore assignmentMetadataStore = null;
if (helixManager != null) {
String metadataStoreAddrs = helixManager.getMetadataStoreConnectionString();
String clusterName = helixManager.getClusterName();
if (metadataStoreAddrs != null && clusterName != null) {
assignmentMetadataStore = new AssignmentMetadataStore(metadataStoreAddrs, clusterName);
}
private static AssignmentMetadataStore constructAssignmentStore(String metadataStoreAddrs,
String clusterName) {
if (metadataStoreAddrs != null && clusterName != null) {
return new AssignmentMetadataStore(metadataStoreAddrs, clusterName);
}
return assignmentMetadataStore;
return null;
}

public WagedRebalancer(HelixManager helixManager,
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences,
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference,
MetricCollector metricCollector) {
this(constructAssignmentStore(helixManager),
ConstraintBasedAlgorithmFactory.getInstance(preferences),
this(helixManager == null ? null
: constructAssignmentStore(helixManager.getMetadataStoreConnectionString(),
helixManager.getClusterName()),
ConstraintBasedAlgorithmFactory.getInstance(preference),
// Use DelayedAutoRebalancer as the mapping calculator for the final assignment output.
// Mapping calculator will translate the best possible assignment into the applicable state
// mapping based on the current states.
// TODO abstract and separate the main mapping calculator logic from DelayedAutoRebalancer
new DelayedAutoRebalancer(),
// Helix Manager is required for the rebalancer scheduler
helixManager, metricCollector);
_preference = ImmutableMap.copyOf(preference);
}

/**
* This constructor will use null for HelixManager and MetricCollector. With null HelixManager,
* the rebalancer will rebalance solely based on CurrentStates. With null MetricCollector, the
* the rebalancer will not schedule for a future delayed rebalance. With null MetricCollector, the
* rebalancer will not emit JMX metrics.
* @param assignmentMetadataStore
* @param algorithm
* @param mappingCalculator
*/
protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator) {
this(assignmentMetadataStore, algorithm, mappingCalculator, null, null);
RebalanceAlgorithm algorithm) {
this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer(), null, null);
}

/**
* This constructor will use null for HelixManager and MetricCollector. With null HelixManager,
* the rebalancer will rebalance solely based on CurrentStates.
* This constructor will use null for HelixManager. With null HelixManager, the rebalancer will
* not schedule for a future delayed rebalance.
* @param assignmentMetadataStore
* @param algorithm
* @param metricCollector
Expand Down Expand Up @@ -149,11 +156,25 @@ private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
_changeDetector = new ResourceChangeDetector(true);
}

// Update the rebalancer preference configuration if the new preference is different from the
// current preference configuration.
public void updatePreference(
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> newPreference) {
if (_preference.equals(NOT_CONFIGURED_PREFERENCE) || _preference.equals(newPreference)) {
// 1. if the preference was not configured during constructing, no need to update.
// 2. if the preference equals to the new preference, no need to update.
return;
}
_rebalanceAlgorithm = ConstraintBasedAlgorithmFactory.getInstance(newPreference);
_preference = ImmutableMap.copyOf(newPreference);
}

// Release all the resources.
public void close() {
if (_assignmentMetadataStore != null) {
_assignmentMetadataStore.close();
}
_metricCollector.unregister();
}

/**
Expand Down Expand Up @@ -231,9 +252,43 @@ public Map<String, IdealState> computeNewIdealStates(ResourceControllerDataProvi
}

// Coordinate baseline recalculation and partial rebalance according to the cluster changes.
protected Map<String, IdealState> computeBestPossibleStates(
private Map<String, IdealState> computeBestPossibleStates(
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
final CurrentStateOutput currentStateOutput) throws HelixRebalanceException {
final CurrentStateOutput currentStateOutput)
throws HelixRebalanceException {
Set<String> activeNodes = DelayedRebalanceUtil
.getActiveNodes(clusterData.getAllInstances(), clusterData.getEnabledLiveInstances(),
clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
clusterData.getInstanceConfigMap(), clusterData.getClusterConfig());

// Schedule (or unschedule) delayed rebalance according to the delayed rebalance config.
delayedRebalanceSchedule(clusterData, activeNodes, resourceMap.keySet());

Map<String, IdealState> newIdealStates = convertResourceAssignment(clusterData,
computeBestPossibleAssignment(clusterData, resourceMap, activeNodes, currentStateOutput));

// The additional rebalance overwrite is required since the calculated mapping may contains
Comment thread
jiajunwang marked this conversation as resolved.
Outdated
// some delayed rebalanced assignments.
if (!activeNodes.equals(clusterData.getEnabledLiveInstances())) {
applyRebalanceOverwrite(newIdealStates, clusterData, resourceMap,
getBaselineAssignment(_assignmentMetadataStore, currentStateOutput,
resourceMap.keySet()));
}
// Replace the assignment if user-defined preference list is configured.
// Note the user-defined list is intentionally applied to the final mapping after calculation.
// This is to avoid persisting it into the assignment store, which impacts the long term
// assignment evenness and partition movements.
newIdealStates.entrySet().stream().forEach(idealStateEntry -> applyUserDefinedPreferenceList(
clusterData.getResourceConfig(idealStateEntry.getKey()), idealStateEntry.getValue()));

return newIdealStates;
}

// Coordinate baseline recalculation and partial rebalance according to the cluster changes.
protected Map<String, ResourceAssignment> computeBestPossibleAssignment(
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
Set<String> activeNodes, final CurrentStateOutput currentStateOutput)
throws HelixRebalanceException {
getChangeDetector().updateSnapshots(clusterData);
// Get all the changed items' information
Map<HelixConstants.ChangeType, Set<String>> clusterChanges =
Expand All @@ -257,37 +312,11 @@ protected Map<String, IdealState> computeBestPossibleStates(
refreshBaseline(clusterData, clusterChanges, resourceMap, currentStateOutput);
}

Set<String> activeNodes = DelayedRebalanceUtil.getActiveNodes(clusterData.getAllInstances(),
clusterData.getEnabledLiveInstances(), clusterData.getInstanceOfflineTimeMap(),
clusterData.getLiveInstances().keySet(), clusterData.getInstanceConfigMap(),
clusterData.getClusterConfig());

// Schedule (or unschedule) delayed rebalance according to the delayed rebalance config.
delayedRebalanceSchedule(clusterData, activeNodes, resourceMap.keySet());

// Perform partial rebalance
Map<String, ResourceAssignment> newAssignment =
partialRebalance(clusterData, clusterChanges, resourceMap, activeNodes, currentStateOutput);

Map<String, IdealState> finalIdealStateMap =
convertResourceAssignment(clusterData, newAssignment);

// The additional rebalance overwrite is required since the calculated mapping may contains
// some delayed rebalanced assignments.
if (!activeNodes.equals(clusterData.getEnabledLiveInstances())) {
applyRebalanceOverwrite(finalIdealStateMap, clusterData, resourceMap, clusterChanges,
getBaselineAssignment(_assignmentMetadataStore, currentStateOutput,
resourceMap.keySet()));
}
// Replace the assignment if user-defined preference list is configured.
// Note the user-defined list is intentionally applied to the final mapping after calculation.
// This is to avoid persisting it into the assignment store, which impacts the long term
// assignment evenness and partition movements.
finalIdealStateMap.entrySet().stream()
.forEach(idealStateEntry -> applyUserDefinedPreferenceList(
clusterData.getResourceConfig(idealStateEntry.getKey()), idealStateEntry.getValue()));

return finalIdealStateMap;
return newAssignment;
}

/**
Expand Down Expand Up @@ -503,7 +532,7 @@ private void validateInput(ResourceControllerDataProvider clusterData,
Set<String> nonCompatibleResources = resourceMap.entrySet().stream().filter(resourceEntry -> {
IdealState is = clusterData.getIdealState(resourceEntry.getKey());
return is == null || !is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO)
|| !getClass().getName().equals(is.getRebalancerClassName());
|| !WagedRebalancer.class.getName().equals(is.getRebalancerClassName());
}).map(Map.Entry::getKey).collect(Collectors.toSet());
if (!nonCompatibleResources.isEmpty()) {
throw new HelixRebalanceException(String.format(
Expand Down Expand Up @@ -554,7 +583,7 @@ private Map<String, ResourceAssignment> getBaselineAssignment(
* assignmentMetadataStore, return the current state assignment.
* @throws HelixRebalanceException
*/
private Map<String, ResourceAssignment> getBestPossibleAssignment(
protected Map<String, ResourceAssignment> getBestPossibleAssignment(
AssignmentMetadataStore assignmentMetadataStore, CurrentStateOutput currentStateOutput,
Set<String> resources) throws HelixRebalanceException {
Map<String, ResourceAssignment> currentBestAssignment = Collections.emptyMap();
Expand Down Expand Up @@ -614,18 +643,16 @@ private void delayedRebalanceSchedule(ResourceControllerDataProvider clusterData
* @param idealStateMap the calculated ideal states.
* @param clusterData the cluster data cache.
* @param resourceMap the rebalanaced resource map.
* @param clusterChanges the detected cluster changes that triggeres the rebalance.
* @param baseline the baseline assignment
*/
private void applyRebalanceOverwrite(Map<String, IdealState> idealStateMap,
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
Map<HelixConstants.ChangeType, Set<String>> clusterChanges,
Map<String, ResourceAssignment> baseline) throws HelixRebalanceException {
Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
// Note that the calculation used the baseline as the input only. This is for minimizing
// unnecessary partition movement.
Map<String, IdealState> activeIdealStates = convertResourceAssignment(clusterData,
calculateAssignment(clusterData, clusterChanges, resourceMap, enabledLiveInstances,
calculateAssignment(clusterData, Collections.emptyMap(), resourceMap, enabledLiveInstances,
Collections.emptyMap(), baseline));
for (String resourceName : idealStateMap.keySet()) {
// The new calculated ideal state before overwrite
Expand Down Expand Up @@ -664,6 +691,10 @@ private void applyUserDefinedPreferenceList(ResourceConfig resourceConfig,
}
}

protected AssignmentMetadataStore getAssignmentMetadataStore() {
return _assignmentMetadataStore;
}

protected MetricCollector getMetricCollector() {
return _metricCollector;
}
Expand Down
Loading