Skip to content

Commit 8541304

Browse files
jiajunwangJiajun Wang
authored andcommitted
Improve the algorithm so it prioritizes the assignment to the idle nodes when the constraint evaluation results are the same (#651)
This is to get rid of the randomness when the algorithm result is a tie. Usually, when the algorithm picks up the nodes with the same score, more partition movements will be triggered on a cluster change.
1 parent 4e43125 commit 8541304

1 file changed

Lines changed: 33 additions & 8 deletions

File tree

helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
*/
2121

2222
import java.util.ArrayList;
23-
import java.util.Comparator;
23+
import java.util.Collection;
2424
import java.util.HashMap;
2525
import java.util.List;
2626
import java.util.Map;
@@ -63,18 +63,22 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
6363
}
6464

6565
@Override
66-
public OptimalAssignment calculate(ClusterModel clusterModel) throws HelixRebalanceException {
66+
public OptimalAssignment calculate(ClusterModel clusterModel)
67+
throws HelixRebalanceException {
6768
OptimalAssignment optimalAssignment = new OptimalAssignment();
6869
List<AssignableNode> nodes = new ArrayList<>(clusterModel.getAssignableNodes().values());
70+
Set<String> busyInstances =
71+
getBusyInstances(clusterModel.getContext().getBestPossibleAssignment().values());
6972
// Sort the replicas so the input is stable for the greedy algorithm.
7073
// For the other algorithm implementation, this sorting could be unnecessary.
7174
for (AssignableReplica replica : getOrderedAssignableReplica(clusterModel)) {
7275
Optional<AssignableNode> maybeBestNode =
73-
getNodeWithHighestPoints(replica, nodes, clusterModel.getContext(), optimalAssignment);
76+
getNodeWithHighestPoints(replica, nodes, clusterModel.getContext(), busyInstances,
77+
optimalAssignment);
7478
// stop immediately if any replica cannot find best assignable node
7579
if (optimalAssignment.hasAnyFailure()) {
76-
String errorMessage = String.format(
77-
"Unable to find any available candidate node for partition %s; Fail reasons: %s",
80+
String errorMessage = String
81+
.format("Unable to find any available candidate node for partition %s; Fail reasons: %s",
7882
replica.getPartitionName(), optimalAssignment.getFailures());
7983
throw new HelixRebalanceException(errorMessage,
8084
HelixRebalanceException.Type.FAILED_TO_CALCULATE);
@@ -89,7 +93,7 @@ public OptimalAssignment calculate(ClusterModel clusterModel) throws HelixRebala
8993

9094
private Optional<AssignableNode> getNodeWithHighestPoints(AssignableReplica replica,
9195
List<AssignableNode> assignableNodes, ClusterContext clusterContext,
92-
OptimalAssignment optimalAssignment) {
96+
Set<String> busyInstances, OptimalAssignment optimalAssignment) {
9397
Map<AssignableNode, List<HardConstraint>> hardConstraintFailures = new ConcurrentHashMap<>();
9498
List<AssignableNode> candidateNodes = assignableNodes.parallelStream().filter(candidateNode -> {
9599
boolean isValid = true;
@@ -113,8 +117,18 @@ private Optional<AssignableNode> getNodeWithHighestPoints(AssignableReplica repl
113117

114118
return candidateNodes.parallelStream().map(node -> new HashMap.SimpleEntry<>(node,
115119
getAssignmentNormalizedScore(node, replica, clusterContext)))
116-
.max(Comparator.comparingDouble((scoreEntry) -> scoreEntry.getValue()))
117-
.map(Map.Entry::getKey);
120+
.max((nodeEntry1, nodeEntry2) -> {
121+
int scoreCompareResult = nodeEntry1.getValue().compareTo(nodeEntry2.getValue());
122+
if (scoreCompareResult == 0) {
123+
// If the evaluation scores of 2 nodes are the same, the algorithm assigns the replica
124+
// to the idle node first.
125+
int idleScore1 = busyInstances.contains(nodeEntry1.getKey().getInstanceName()) ? 0 : 1;
126+
int idleScore2 = busyInstances.contains(nodeEntry2.getKey().getInstanceName()) ? 0 : 1;
127+
return idleScore1 - idleScore2;
128+
} else {
129+
return scoreCompareResult;
130+
}
131+
}).map(Map.Entry::getKey);
118132
}
119133

120134
private double getAssignmentNormalizedScore(AssignableNode node, AssignableReplica replica,
@@ -200,4 +214,15 @@ private List<AssignableReplica> getOrderedAssignableReplica(ClusterModel cluster
200214
});
201215
return orderedAssignableReplicas;
202216
}
217+
218+
/**
219+
* @param assignments A collection of resource replicas assignment.
220+
* @return A set of instance names that have at least one replica assigned in the input assignments.
221+
*/
222+
private Set<String> getBusyInstances(Collection<ResourceAssignment> assignments) {
223+
return assignments.stream().flatMap(
224+
resourceAssignment -> resourceAssignment.getRecord().getMapFields().values().stream()
225+
.flatMap(instanceStateMap -> instanceStateMap.keySet().stream())
226+
.collect(Collectors.toSet()).stream()).collect(Collectors.toSet());
227+
}
203228
}

0 commit comments

Comments
 (0)