Skip to content

Commit 3c9c084

Browse files
jiajunwangJiajun Wang
authored andcommitted
Integrate the WAGED rebalancer with all the related components. (#466)
1. Integrate with the algorithm, assignment metadata store, etc. Fix several conflicting interfaces and logics so as to all the rebalancer run correctly. 2. Complete OptimalAssignment. 3. Add integration tests to ensure the correctness of rebalancing logic.
1 parent f3cc6a6 commit 3c9c084

21 files changed

Lines changed: 1597 additions & 222 deletions

helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,13 @@
2323
* Exception thrown by Helix due to rebalance failures.
2424
*/
2525
public class HelixRebalanceException extends Exception {
26+
// TODO: Adding static description or other necessary fields into the enum instances for
27+
// TODO: supporting the rebalance monitor to understand the exception.
2628
public enum Type {
2729
INVALID_CLUSTER_STATUS,
2830
INVALID_REBALANCER_STATUS,
2931
FAILED_TO_CALCULATE,
32+
INVALID_INPUT,
3033
UNKNOWN_FAILURE
3134
}
3235

helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@
2020
*/
2121

2222
import java.io.IOException;
23-
import java.util.Arrays;
23+
import java.util.Collections;
2424
import java.util.HashMap;
2525
import java.util.Map;
26+
27+
import com.google.common.annotations.VisibleForTesting;
28+
import org.I0Itec.zkclient.exception.ZkNoNodeException;
2629
import org.I0Itec.zkclient.serialize.ZkSerializer;
2730
import org.apache.helix.BucketDataAccessor;
2831
import org.apache.helix.HelixException;
29-
import org.apache.helix.HelixManager;
3032
import org.apache.helix.HelixProperty;
3133
import org.apache.helix.ZNRecord;
3234
import org.apache.helix.manager.zk.ZNRecordJacksonSerializer;
@@ -50,33 +52,42 @@ public class AssignmentMetadataStore {
5052
private Map<String, ResourceAssignment> _globalBaseline;
5153
private Map<String, ResourceAssignment> _bestPossibleAssignment;
5254

55+
AssignmentMetadataStore(String metadataStoreAddrs, String clusterName) {
56+
this(new ZkBucketDataAccessor(metadataStoreAddrs), clusterName);
57+
}
58+
5359
AssignmentMetadataStore(BucketDataAccessor bucketDataAccessor, String clusterName) {
5460
_dataAccessor = bucketDataAccessor;
5561
_baselinePath = String.format(BASELINE_TEMPLATE, clusterName, ASSIGNMENT_METADATA_KEY);
5662
_bestPossiblePath = String.format(BEST_POSSIBLE_TEMPLATE, clusterName, ASSIGNMENT_METADATA_KEY);
5763
}
5864

59-
AssignmentMetadataStore(HelixManager helixManager) {
60-
this(new ZkBucketDataAccessor(helixManager.getMetadataStoreConnectionString()),
61-
helixManager.getClusterName());
62-
}
63-
6465
public Map<String, ResourceAssignment> getBaseline() {
6566
// Return the in-memory baseline. If null, read from ZK. This is to minimize reads from ZK
6667
if (_globalBaseline == null) {
67-
HelixProperty baseline =
68-
_dataAccessor.compressedBucketRead(_baselinePath, HelixProperty.class);
69-
_globalBaseline = splitAssignments(baseline);
68+
try {
69+
HelixProperty baseline =
70+
_dataAccessor.compressedBucketRead(_baselinePath, HelixProperty.class);
71+
_globalBaseline = splitAssignments(baseline);
72+
} catch (ZkNoNodeException ex) {
73+
// Metadata does not exist, so return an empty map
74+
_globalBaseline = Collections.emptyMap();
75+
}
7076
}
7177
return _globalBaseline;
7278
}
7379

7480
public Map<String, ResourceAssignment> getBestPossibleAssignment() {
7581
// Return the in-memory baseline. If null, read from ZK. This is to minimize reads from ZK
7682
if (_bestPossibleAssignment == null) {
77-
HelixProperty baseline =
78-
_dataAccessor.compressedBucketRead(_bestPossiblePath, HelixProperty.class);
79-
_bestPossibleAssignment = splitAssignments(baseline);
83+
try {
84+
HelixProperty baseline =
85+
_dataAccessor.compressedBucketRead(_bestPossiblePath, HelixProperty.class);
86+
_bestPossibleAssignment = splitAssignments(baseline);
87+
} catch (ZkNoNodeException ex) {
88+
// Metadata does not exist, so return an empty map
89+
_bestPossibleAssignment = Collections.emptyMap();
90+
}
8091
}
8192
return _bestPossibleAssignment;
8293
}
@@ -113,6 +124,16 @@ public void persistBestPossibleAssignment(
113124
_bestPossibleAssignment = bestPossibleAssignment;
114125
}
115126

127+
protected void finalize() {
128+
// To ensure all resources are released.
129+
close();
130+
}
131+
132+
// Close to release all the resources.
133+
public void close() {
134+
_dataAccessor.disconnect();
135+
}
136+
116137
/**
117138
* Produces one HelixProperty that contains all assignment data.
118139
* @param name
@@ -123,8 +144,9 @@ private HelixProperty combineAssignments(String name,
123144
Map<String, ResourceAssignment> assignmentMap) {
124145
HelixProperty property = new HelixProperty(name);
125146
// Add each resource's assignment as a simple field in one ZNRecord
147+
// Node that don't use Arrays.toString() for the record converting. The deserialize will fail.
126148
assignmentMap.forEach((resource, assignment) -> property.getRecord().setSimpleField(resource,
127-
Arrays.toString(SERIALIZER.serialize(assignment.getRecord()))));
149+
new String(SERIALIZER.serialize(assignment.getRecord()))));
128150
return property;
129151
}
130152

@@ -138,8 +160,8 @@ private Map<String, ResourceAssignment> splitAssignments(HelixProperty property)
138160
// Convert each resource's assignment String into a ResourceAssignment object and put it in a
139161
// map
140162
property.getRecord().getSimpleFields()
141-
.forEach((resource, assignment) -> assignmentMap.put(resource,
142-
new ResourceAssignment((ZNRecord) SERIALIZER.deserialize(assignment.getBytes()))));
163+
.forEach((resource, assignmentStr) -> assignmentMap.put(resource,
164+
new ResourceAssignment((ZNRecord) SERIALIZER.deserialize(assignmentStr.getBytes()))));
143165
return assignmentMap;
144166
}
145167
}

0 commit comments

Comments
 (0)