Skip to content

Commit 867431a

Browse files
jiajunwangJiajun Wang
authored andcommitted
Implement Cluster Model Provider. (#392)
* Implement Cluster Model Provider. The model provider is called in the WAGED rebalancer to generate CLuster Model based on the current cluster status. The major responsibility of the provider is to parse all the assignable replicas and identify which replicas need to be reassigned. Note that if the current best possible assignment is still valid, the rebalancer won't need to calculate for the partition assignment. Also, add unit tests to verify the main logic.
1 parent 5371c30 commit 867431a

6 files changed

Lines changed: 517 additions & 12 deletions

File tree

helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataDetector.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,12 @@ public class ClusterDataDetector<T extends BaseControllerDataProvider> {
3535
* All the cluster change type that may trigger a WAGED rebalancer re-calculation.
3636
*/
3737
public enum ChangeType {
38+
BaselineAssignmentChange,
3839
InstanceConfigChange,
3940
ClusterConfigChange,
4041
ResourceConfigChange,
41-
InstanceStateChange,
4242
ResourceIdealStatesChange,
43+
InstanceStateChange,
4344
OtherChange
4445
}
4546

helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public class AssignableNode {
5252
private int _maxPartition; // maximum number of the partitions that can be assigned to the node.
5353

5454
// proposed assignment tracking
55-
// <resource name, partition name>
55+
// <resource name, partition name set>
5656
private Map<String, Set<String>> _currentAssignments;
5757
// <resource name, top state partition name>
5858
private Map<String, Set<String>> _currentTopStateAssignments;

helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
*/
2121

2222
import org.apache.helix.HelixException;
23-
import org.apache.helix.model.IdealState;
2423
import org.apache.helix.model.ResourceAssignment;
2524

2625
import java.util.Collections;
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
package org.apache.helix.controller.rebalancer.waged.model;
2+
3+
/*
4+
* Licensed to the Apache Software Foundation (ASF) under one
5+
* or more contributor license agreements. See the NOTICE file
6+
* distributed with this work for additional information
7+
* regarding copyright ownership. The ASF licenses this file
8+
* to you under the Apache License, Version 2.0 (the
9+
* "License"); you may not use this file except in compliance
10+
* with the License. You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing,
15+
* software distributed under the License is distributed on an
16+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17+
* KIND, either express or implied. See the License for the
18+
* specific language governing permissions and limitations
19+
* under the License.
20+
*/
21+
22+
import org.apache.helix.HelixException;
23+
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
24+
import org.apache.helix.controller.rebalancer.waged.ClusterDataDetector;
25+
import org.apache.helix.model.ClusterConfig;
26+
import org.apache.helix.model.IdealState;
27+
import org.apache.helix.model.InstanceConfig;
28+
import org.apache.helix.model.Resource;
29+
import org.apache.helix.model.ResourceAssignment;
30+
import org.apache.helix.model.ResourceConfig;
31+
import org.apache.helix.model.StateModelDefinition;
32+
33+
import java.util.Collections;
34+
import java.util.HashMap;
35+
import java.util.HashSet;
36+
import java.util.Map;
37+
import java.util.Optional;
38+
import java.util.Set;
39+
import java.util.stream.Collectors;
40+
41+
/**
42+
* This util class generates Cluster Model object based on the controller's data cache.
43+
*/
44+
public class ClusterModelProvider {
45+
46+
/**
47+
* @param dataProvider The controller's data cache.
48+
* @param resourceMap The full list of the resources to be rebalanced. Note that any
49+
* resources that are not in this list will be removed from the
50+
* final assignment.
51+
* @param activeInstances The active instances that will be used in the calculation.
52+
* Note this list can be different from the real active node list
53+
* according to the rebalancer logic.
54+
* @param clusterChanges All the cluster changes that happened after the previous rebalance.
55+
* @param baselineAssignment The persisted Baseline assignment.
56+
* @param bestPossibleAssignment The persisted Best Possible assignment that was generated in the
57+
* previous rebalance.
58+
* @return Generate a new Cluster Model object according to the current cluster status.
59+
*/
60+
public static ClusterModel generateClusterModel(ResourceControllerDataProvider dataProvider,
61+
Map<String, Resource> resourceMap, Set<String> activeInstances,
62+
Map<ClusterDataDetector.ChangeType, Set<String>> clusterChanges,
63+
Map<String, ResourceAssignment> baselineAssignment,
64+
Map<String, ResourceAssignment> bestPossibleAssignment) {
65+
// Generate replica objects for all the resource partitions.
66+
// <resource, replica set>
67+
Map<String, Set<AssignableReplica>> replicaMap =
68+
parseAllReplicas(dataProvider, resourceMap, activeInstances.size());
69+
70+
// Check if the replicas need to be reassigned.
71+
Map<String, Set<AssignableReplica>> allocatedReplicas =
72+
new HashMap<>(); // <instanceName, replica set>
73+
Set<AssignableReplica> toBeAssignedReplicas =
74+
findToBeAssignedReplicas(replicaMap, clusterChanges, activeInstances,
75+
bestPossibleAssignment, allocatedReplicas);
76+
77+
// Construct all the assignable nodes and initialize with the allocated replicas.
78+
Set<AssignableNode> assignableNodes =
79+
parseAllNodes(dataProvider.getClusterConfig(), dataProvider.getInstanceConfigMap(),
80+
activeInstances, allocatedReplicas);
81+
82+
// Construct and initialize cluster context.
83+
ClusterContext context = new ClusterContext(
84+
replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()),
85+
activeInstances.size());
86+
// Initial the cluster context with the allocated assignments.
87+
context.setAssignmentForFaultZoneMap(mapAssignmentToFaultZone(assignableNodes));
88+
89+
return new ClusterModel(context, toBeAssignedReplicas, assignableNodes, baselineAssignment,
90+
bestPossibleAssignment);
91+
}
92+
93+
/**
94+
* Find the minimum set of replicas that need to be reassigned.
95+
* A replica needs to be reassigned if one of the following condition is true:
96+
* 1. Cluster topology (the cluster config / any instance config) has been updated.
97+
* 2. The baseline assignment has been updated.
98+
* 3. The resource config has been updated.
99+
* 4. The resource idealstate has been updated. TODO remove this condition when all resource configurations are migrated to resource config.
100+
* 5. If the current best possible assignment does not contain the partition's valid assignment.
101+
*
102+
* @param replicaMap A map contains all the replicas grouped by resource name.
103+
* @param clusterChanges A map contains all the important metadata updates that happened after the previous rebalance.
104+
* @param activeInstances All the instances that are alive and enabled.
105+
* @param bestPossibleAssignment The current best possible assignment.
106+
* @param allocatedReplicas Return the allocated replicas grouped by the target instance name.
107+
* @return The replicas that need to be reassigned.
108+
*/
109+
private static Set<AssignableReplica> findToBeAssignedReplicas(
110+
Map<String, Set<AssignableReplica>> replicaMap,
111+
Map<ClusterDataDetector.ChangeType, Set<String>> clusterChanges, Set<String> activeInstances,
112+
Map<String, ResourceAssignment> bestPossibleAssignment,
113+
Map<String, Set<AssignableReplica>> allocatedReplicas) {
114+
Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
115+
if (clusterChanges.containsKey(ClusterDataDetector.ChangeType.ClusterConfigChange)
116+
|| clusterChanges.containsKey(ClusterDataDetector.ChangeType.InstanceConfigChange)
117+
|| clusterChanges.containsKey(ClusterDataDetector.ChangeType.BaselineAssignmentChange)) {
118+
// If the cluster topology or baseline assignment has been modified, need to reassign all replicas
119+
toBeAssignedReplicas
120+
.addAll(replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()));
121+
} else {
122+
// check each resource to identify the allocated replicas and to-be-assigned replicas.
123+
for (String resourceName : replicaMap.keySet()) {
124+
Set<AssignableReplica> replicas = replicaMap.get(resourceName);
125+
// 1. if the resource config/idealstate is changed, need to reassign.
126+
// 2. if the resource does appear in the best possible assignment, need to reassign.
127+
if (clusterChanges.getOrDefault(ClusterDataDetector.ChangeType.ResourceConfigChange,
128+
Collections.emptySet()).contains(resourceName) || clusterChanges
129+
.getOrDefault(ClusterDataDetector.ChangeType.ResourceIdealStatesChange,
130+
Collections.emptySet()).contains(resourceName) || !bestPossibleAssignment
131+
.containsKey(resourceName)) {
132+
toBeAssignedReplicas.addAll(replicas);
133+
continue; // go to check next resource
134+
} else {
135+
// check for every best possible assignments to identify if the related replicas need to reassign.
136+
ResourceAssignment assignment = bestPossibleAssignment.get(resourceName);
137+
// <partition, <instance, state>>
138+
Map<String, Map<String, String>> stateMap = assignment.getMappedPartitions().stream()
139+
.collect(Collectors.toMap(partition -> partition.getPartitionName(),
140+
partition -> new HashMap<>(assignment.getReplicaMap(partition))));
141+
for (AssignableReplica replica : replicas) {
142+
// Find any ACTIVE instance allocation that has the same state with the replica
143+
Optional<Map.Entry<String, String>> instanceNameOptional =
144+
stateMap.getOrDefault(replica.getPartitionName(), Collections.emptyMap()).entrySet()
145+
.stream().filter(instanceStateMap ->
146+
instanceStateMap.getValue().equals(replica.getReplicaState()) && activeInstances
147+
.contains(instanceStateMap.getKey())).findAny();
148+
// 3. if no such an instance in the bestPossible assignment, need to reassign the replica
149+
if (!instanceNameOptional.isPresent()) {
150+
toBeAssignedReplicas.add(replica);
151+
continue; // go to check the next replica
152+
} else {
153+
String instanceName = instanceNameOptional.get().getKey();
154+
// * cleanup the best possible state map record,
155+
// * so the selected instance won't be picked up again for the another replica check
156+
stateMap.getOrDefault(replica.getPartitionName(), Collections.emptyMap())
157+
.remove(instanceName);
158+
// the current best possible assignment for this replica is valid,
159+
// add to the allocated replica list.
160+
allocatedReplicas.computeIfAbsent(instanceName, key -> new HashSet<>()).add(replica);
161+
}
162+
}
163+
}
164+
}
165+
}
166+
return toBeAssignedReplicas;
167+
}
168+
169+
/**
170+
* Parse all the nodes that can be assigned replicas based on the configurations.
171+
*
172+
* @param clusterConfig The cluster configuration.
173+
* @param instanceConfigMap A map of all the instance configuration.
174+
* @param activeInstances All the instances that are online and enabled.
175+
* @param allocatedReplicas A map of all the assigned replicas, which will not be reassigned during the rebalance.
176+
* @return A map of assignable node set, <InstanceName, node set>.
177+
*/
178+
private static Set<AssignableNode> parseAllNodes(ClusterConfig clusterConfig,
179+
Map<String, InstanceConfig> instanceConfigMap, Set<String> activeInstances,
180+
Map<String, Set<AssignableReplica>> allocatedReplicas) {
181+
return activeInstances.stream().map(
182+
instanceName -> new AssignableNode(clusterConfig, instanceConfigMap.get(instanceName),
183+
instanceName, allocatedReplicas.getOrDefault(instanceName, Collections.emptySet())))
184+
.collect(Collectors.toSet());
185+
}
186+
187+
/**
188+
* Parse all the replicas that need to be reallocated from the cluster data cache.
189+
*
190+
* @param dataProvider The cluster status cache that contains the current cluster status.
191+
* @param resourceMap All the valid resources that are managed by the rebalancer.
192+
* @return A map of assignable replica set, <ResourceName, replica set>.
193+
*/
194+
private static Map<String, Set<AssignableReplica>> parseAllReplicas(
195+
ResourceControllerDataProvider dataProvider, Map<String, Resource> resourceMap,
196+
int instanceCount) {
197+
Map<String, Set<AssignableReplica>> totalReplicaMap = new HashMap<>();
198+
199+
for (String resourceName : resourceMap.keySet()) {
200+
ResourceConfig config = dataProvider.getResourceConfig(resourceName);
201+
IdealState is = dataProvider.getIdealState(resourceName);
202+
if (is == null) {
203+
throw new HelixException(
204+
"Cannot find the resource ideal state for resource: " + resourceName);
205+
}
206+
String defName = is.getStateModelDefRef();
207+
StateModelDefinition def = dataProvider.getStateModelDef(defName);
208+
if (def == null) {
209+
throw new IllegalArgumentException(String
210+
.format("Cannot find state model definition %s for resource %s.",
211+
is.getStateModelDefRef(), resourceName));
212+
}
213+
214+
Map<String, Integer> stateCountMap =
215+
def.getStateCountMap(instanceCount, is.getReplicaCount(instanceCount));
216+
217+
for (String partition : is.getPartitionSet()) {
218+
for (Map.Entry<String, Integer> entry : stateCountMap.entrySet()) {
219+
String state = entry.getKey();
220+
for (int i = 0; i < entry.getValue(); i++) {
221+
totalReplicaMap.computeIfAbsent(resourceName, key -> new HashSet<>()).add(
222+
new AssignableReplica(config, partition, state,
223+
def.getStatePriorityMap().get(state)));
224+
}
225+
}
226+
}
227+
}
228+
return totalReplicaMap;
229+
}
230+
231+
/**
232+
* @return A map contains the assignments for each fault zone. <fault zone, <resource, set of partitions>>
233+
*/
234+
private static Map<String, Map<String, Set<String>>> mapAssignmentToFaultZone(
235+
Set<AssignableNode> assignableNodes) {
236+
Map<String, Map<String, Set<String>>> faultZoneAssignmentMap = new HashMap<>();
237+
assignableNodes.stream().forEach(node -> {
238+
for (Map.Entry<String, Set<String>> resourceMap : node.getCurrentAssignmentsMap()
239+
.entrySet()) {
240+
faultZoneAssignmentMap.computeIfAbsent(node.getFaultZone(), k -> new HashMap<>())
241+
.computeIfAbsent(resourceMap.getKey(), k -> new HashSet<>())
242+
.addAll(resourceMap.getValue());
243+
}
244+
});
245+
return faultZoneAssignmentMap;
246+
}
247+
}

helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import static org.mockito.Mockito.when;
4242

4343
public abstract class AbstractTestClusterModel {
44+
protected static String _sessionId = "testSessionId";
4445
protected String _testInstanceId;
4546
protected List<String> _resourceNames;
4647
protected List<String> _partitionNames;
@@ -73,16 +74,27 @@ public void initialize() {
7374
_testFaultZoneId = "testZone";
7475
}
7576

77+
InstanceConfig createMockInstanceConfig(String instanceId) {
78+
InstanceConfig testInstanceConfig = new InstanceConfig(instanceId);
79+
testInstanceConfig.setInstanceCapacityMap(_capacityDataMap);
80+
testInstanceConfig.addTag(_testInstanceTags.get(0));
81+
testInstanceConfig.setInstanceEnabled(true);
82+
testInstanceConfig.setZoneId(_testFaultZoneId);
83+
return testInstanceConfig;
84+
}
85+
86+
LiveInstance createMockLiveInstance(String instanceId) {
87+
LiveInstance testLiveInstance = new LiveInstance(instanceId);
88+
testLiveInstance.setSessionId(_sessionId);
89+
return testLiveInstance;
90+
}
91+
7692
protected ResourceControllerDataProvider setupClusterDataCache() throws IOException {
7793
ResourceControllerDataProvider testCache = Mockito.mock(ResourceControllerDataProvider.class);
7894

7995
// 1. Set up the default instance information with capacity configuration.
80-
InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceId");
81-
testInstanceConfig.setInstanceCapacityMap(_capacityDataMap);
82-
testInstanceConfig.addTag(_testInstanceTags.get(0));
96+
InstanceConfig testInstanceConfig = createMockInstanceConfig(_testInstanceId);
8397
testInstanceConfig.setInstanceEnabledForPartition("TestResource", "TestPartition", false);
84-
testInstanceConfig.setInstanceEnabled(true);
85-
testInstanceConfig.setZoneId(_testFaultZoneId);
8698
Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
8799
instanceConfigMap.put(_testInstanceId, testInstanceConfig);
88100
when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
@@ -95,8 +107,7 @@ protected ResourceControllerDataProvider setupClusterDataCache() throws IOExcept
95107
when(testCache.getClusterConfig()).thenReturn(testClusterConfig);
96108

97109
// 3. Mock the live instance node for the default instance.
98-
LiveInstance testLiveInstance = new LiveInstance(_testInstanceId);
99-
testLiveInstance.setSessionId("testSessionId");
110+
LiveInstance testLiveInstance = createMockLiveInstance(_testInstanceId);
100111
Map<String, LiveInstance> liveInstanceMap = new HashMap<>();
101112
liveInstanceMap.put(_testInstanceId, testLiveInstance);
102113
when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
@@ -130,7 +141,7 @@ protected ResourceControllerDataProvider setupClusterDataCache() throws IOExcept
130141
Map<String, CurrentState> currentStatemap = new HashMap<>();
131142
currentStatemap.put(_resourceNames.get(0), testCurrentStateResource1);
132143
currentStatemap.put(_resourceNames.get(1), testCurrentStateResource2);
133-
when(testCache.getCurrentState(_testInstanceId, "testSessionId")).thenReturn(currentStatemap);
144+
when(testCache.getCurrentState(_testInstanceId, _sessionId)).thenReturn(currentStatemap);
134145

135146
// 5. Set up the resource config for the two resources with the partition weight.
136147
Map<String, Integer> capacityDataMapResource1 = new HashMap<>();
@@ -162,7 +173,7 @@ protected ResourceControllerDataProvider setupClusterDataCache() throws IOExcept
162173
protected Set<AssignableReplica> generateReplicas(ResourceControllerDataProvider dataProvider) {
163174
// Create assignable replica based on the current state.
164175
Map<String, CurrentState> currentStatemap =
165-
dataProvider.getCurrentState(_testInstanceId, "testSessionId");
176+
dataProvider.getCurrentState(_testInstanceId, _sessionId);
166177
Set<AssignableReplica> assignmentSet = new HashSet<>();
167178
for (CurrentState cs : currentStatemap.values()) {
168179
ResourceConfig resourceConfig = dataProvider.getResourceConfig(cs.getResourceName());

0 commit comments

Comments
 (0)