Skip to content

Commit d3b63a3

Browse files
jiajunwangjiajunwang
authored andcommitted
Refine the WAGED rebalancer to minimize the partial rebalance workload. (apache#639)
* Refine the WAGED rebalancer to minimize the partial rebalance workload. Split the cluster module calculation method so that different rebalance logic can have different rebalance scope calculation logic. Also, refine the WAGED rebalancer logic to reduce duplicate code.
1 parent edf769f commit d3b63a3

7 files changed

Lines changed: 624 additions & 239 deletions

File tree

helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import java.util.HashMap;
2525
import java.util.HashSet;
2626
import java.util.Map;
27+
import java.util.Set;
28+
import java.util.stream.Collectors;
2729

2830
import com.google.common.collect.Sets;
2931
import org.apache.helix.HelixConstants;
@@ -173,4 +175,20 @@ public Collection<String> getRemovalsByType(HelixConstants.ChangeType changeType
173175
changedItems -> getRemovedItems(determinePropertyMapByType(changeType, _oldSnapshot),
174176
determinePropertyMapByType(changeType, _newSnapshot)));
175177
}
178+
179+
/**
180+
* @return A map contains all the changed items that are categorized by the change types.
181+
*/
182+
public Map<HelixConstants.ChangeType, Set<String>> getAllChanges() {
183+
return getChangeTypes().stream()
184+
.collect(Collectors.toMap(changeType -> changeType, changeType -> {
185+
Set<String> itemKeys = new HashSet<>();
186+
itemKeys.addAll(getAdditionsByType(changeType));
187+
itemKeys.addAll(getChangesByType(changeType));
188+
itemKeys.addAll(getRemovalsByType(changeType));
189+
return itemKeys;
190+
})).entrySet().stream().filter(changeEntry -> !changeEntry.getValue().isEmpty()).collect(
191+
Collectors
192+
.toMap(changeEntry -> changeEntry.getKey(), changeEntry -> changeEntry.getValue()));
193+
}
176194
}

helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java

Lines changed: 125 additions & 112 deletions
Large diffs are not rendered by default.

helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java

Lines changed: 284 additions & 58 deletions
Large diffs are not rendered by default.

helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ private void reportInstanceCapacityMetrics(ClusterStatusMonitor clusterStatusMon
254254

255255
Map<String, ResourceAssignment> currentStateAssignment =
256256
currentStateOutput.getAssignment(resourceToMonitorMap.keySet());
257-
ClusterModel clusterModel = ClusterModelProvider.generateClusterModelFromCurrentState(
257+
ClusterModel clusterModel = ClusterModelProvider.generateClusterModelFromExistingAssignment(
258258
dataProvider, resourceToMonitorMap, currentStateAssignment);
259259

260260
Map<String, Double> maxUsageMap = new HashMap<>();

helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java

Lines changed: 35 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.helix.controller.pipeline.Stage;
3838
import org.apache.helix.controller.pipeline.StageContext;
3939
import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore;
40+
import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
4041
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
4142
import org.apache.helix.controller.rebalancer.waged.constraints.ConstraintBasedAlgorithmFactory;
4243
import org.apache.helix.controller.stages.AttributeName;
@@ -413,45 +414,45 @@ public String toString() {
413414
return verifierName + "(" + _clusterName + "@" + _zkClient + "@resources["
414415
+ (_resources != null ? Arrays.toString(_resources.toArray()) : "") + "])";
415416
}
416-
}
417417

418-
/**
419-
* A Dryrun WAGED rebalancer that only calculates the assignment based on the cluster status but
420-
* never update the rebalancer assignment metadata.
421-
* This rebalacer is used in the verifiers or tests.
422-
*/
423-
class DryrunWagedRebalancer extends WagedRebalancer {
424-
DryrunWagedRebalancer(String metadataStoreAddrs, String clusterName,
425-
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
426-
super(new ReadOnlyAssignmentMetadataStore(metadataStoreAddrs, clusterName),
427-
ConstraintBasedAlgorithmFactory.getInstance(preferences));
428-
}
418+
/**
419+
* A Dryrun WAGED rebalancer that only calculates the assignment based on the cluster status but
420+
* never update the rebalancer assignment metadata.
421+
* This rebalacer is used in the verifiers or tests.
422+
*/
423+
private class DryrunWagedRebalancer extends WagedRebalancer {
424+
DryrunWagedRebalancer(String metadataStoreAddrs, String clusterName,
425+
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
426+
super(new ReadOnlyAssignmentMetadataStore(metadataStoreAddrs, clusterName),
427+
ConstraintBasedAlgorithmFactory.getInstance(preferences));
428+
}
429429

430-
@Override
431-
protected Map<String, ResourceAssignment> computeBestPossibleAssignment(
432-
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
433-
Set<String> activeNodes, CurrentStateOutput currentStateOutput)
434-
throws HelixRebalanceException {
435-
return getBestPossibleAssignment(getAssignmentMetadataStore(), currentStateOutput,
436-
resourceMap.keySet());
430+
@Override
431+
protected Map<String, ResourceAssignment> computeBestPossibleAssignment(
432+
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
433+
Set<String> activeNodes, CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm)
434+
throws HelixRebalanceException {
435+
return getBestPossibleAssignment(getAssignmentMetadataStore(), currentStateOutput,
436+
resourceMap.keySet());
437+
}
437438
}
438-
}
439439

440-
class ReadOnlyAssignmentMetadataStore extends AssignmentMetadataStore {
441-
ReadOnlyAssignmentMetadataStore(String metadataStoreAddrs, String clusterName) {
442-
super(new ZkBucketDataAccessor(metadataStoreAddrs), clusterName);
443-
}
440+
private class ReadOnlyAssignmentMetadataStore extends AssignmentMetadataStore {
441+
ReadOnlyAssignmentMetadataStore(String metadataStoreAddrs, String clusterName) {
442+
super(new ZkBucketDataAccessor(metadataStoreAddrs), clusterName);
443+
}
444444

445-
@Override
446-
public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
447-
// Update the in-memory reference only
448-
_globalBaseline = globalBaseline;
449-
}
445+
@Override
446+
public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
447+
// Update the in-memory reference only
448+
_globalBaseline = globalBaseline;
449+
}
450450

451-
@Override
452-
public void persistBestPossibleAssignment(
453-
Map<String, ResourceAssignment> bestPossibleAssignment) {
454-
// Update the in-memory reference only
455-
_bestPossibleAssignment = bestPossibleAssignment;
451+
@Override
452+
public void persistBestPossibleAssignment(
453+
Map<String, ResourceAssignment> bestPossibleAssignment) {
454+
// Update the in-memory reference only
455+
_bestPossibleAssignment = bestPossibleAssignment;
456+
}
456457
}
457458
}

helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,7 @@ protected ResourceControllerDataProvider setupClusterDataCache() throws IOExcept
114114
}
115115

116116
@Test
117-
public void testRebalance()
118-
throws IOException, HelixRebalanceException {
117+
public void testRebalance() throws IOException, HelixRebalanceException {
119118
_metadataStore.clearMetadataStore();
120119
WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
121120

@@ -128,6 +127,10 @@ public void testRebalance()
128127
.forEach(partition -> resource.addPartition(partition));
129128
return resource;
130129
}));
130+
// Mocking the change types for triggering a baseline rebalance.
131+
when(clusterData.getRefreshedChangeTypes())
132+
.thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG));
133+
131134
Map<String, IdealState> newIdealStates =
132135
rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
133136
Map<String, ResourceAssignment> algorithmResult = _algorithm.getRebalanceResult();
@@ -150,6 +153,9 @@ public void testPartialRebalance() throws IOException, HelixRebalanceException {
150153
.forEach(partition -> resource.addPartition(partition));
151154
return resource;
152155
}));
156+
// Mocking the change types for triggering a baseline rebalance.
157+
when(clusterData.getRefreshedChangeTypes())
158+
.thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG));
153159

154160
// Test with partial resources listed in the resourceMap input.
155161
// Remove the first resource from the input. Note it still exists in the cluster data cache.
@@ -175,6 +181,9 @@ public void testRebalanceWithCurrentState() throws IOException, HelixRebalanceEx
175181
.forEach(partition -> resource.addPartition(partition));
176182
return resource;
177183
}));
184+
// Mocking the change types for triggering a baseline rebalance.
185+
when(clusterData.getRefreshedChangeTypes())
186+
.thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG));
178187

179188
// Test with current state exists, so the rebalancer should calculate for the intermediate state
180189
// Create current state based on the cluster data cache.
@@ -256,12 +265,12 @@ public void testInvalidClusterStatus() throws IOException, HelixRebalanceExcepti
256265
Collectors.toMap(resourceName -> resourceName, resourceName -> new Resource(resourceName)));
257266
try {
258267
rebalancer.computeBestPossibleAssignment(clusterData, resourceMap,
259-
clusterData.getEnabledLiveInstances(), new CurrentStateOutput());
268+
clusterData.getEnabledLiveInstances(), new CurrentStateOutput(), _algorithm);
260269
Assert.fail("Rebalance shall fail.");
261270
} catch (HelixRebalanceException ex) {
262271
Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.INVALID_CLUSTER_STATUS);
263272
Assert.assertEquals(ex.getMessage(),
264-
"Failed to generate cluster model. Failure Type: INVALID_CLUSTER_STATUS");
273+
"Failed to generate cluster model for partial rebalance. Failure Type: INVALID_CLUSTER_STATUS");
265274
}
266275

267276
// The rebalance will be done with empty mapping result since there is no previously calculated
@@ -321,7 +330,7 @@ public void testAlgorithmException()
321330
// Calculation will fail
322331
try {
323332
rebalancer.computeBestPossibleAssignment(clusterData, resourceMap,
324-
clusterData.getEnabledLiveInstances(), new CurrentStateOutput());
333+
clusterData.getEnabledLiveInstances(), new CurrentStateOutput(), badAlgorithm);
325334
Assert.fail("Rebalance shall fail.");
326335
} catch (HelixRebalanceException ex) {
327336
Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE);

0 commit comments

Comments
 (0)