Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static java.lang.Math.max;

Expand All @@ -51,26 +52,28 @@ public class AssignableNode {
private Map<String, Integer> _maxCapacity;
private int _maxPartition; // maximum number of the partitions that can be assigned to the node.

// proposed assignment tracking
// <resource name, partition name set>
private Map<String, Set<String>> _currentAssignments;
// <resource name, top state partition name>
private Map<String, Set<String>> _currentTopStateAssignments;
// <capacity key, capacity value>
private Map<String, Integer> _currentCapacity;
// A map of <resource name, <partition name, replica>> that tracks the replicas assigned to the node.
private Map<String, Map<String, AssignableReplica>> _currentAssignedReplicaMap;
// A map of <capacity key, capacity value> that tracks the current available node capacity
private Map<String, Integer> _currentCapacityMap;
// The maximum capacity utilization (0.0 - 1.0) across all the capacity categories.
private float _highestCapacityUtilization;

/**
* @param clusterConfig
* @param instanceConfig
* @param instanceName
* @param existingAssignment A collection of replicas that have been pre-allocated to the node.
*/
AssignableNode(ClusterConfig clusterConfig, InstanceConfig instanceConfig, String instanceName,
Collection<AssignableReplica> existingAssignment) {
_instanceName = instanceName;
refresh(clusterConfig, instanceConfig, existingAssignment);
}

private void reset() {
_currentAssignments = new HashMap<>();
_currentTopStateAssignments = new HashMap<>();
_currentCapacity = new HashMap<>();
_currentAssignedReplicaMap = new HashMap<>();
_currentCapacityMap = new HashMap<>();
_highestCapacityUtilization = 0;
}

Expand All @@ -80,8 +83,8 @@ private void reset() {
* refreshed. This is under the assumption that the capacity mappings of InstanceConfig and ResourceConfig could
* subject to change. If the assumption is no longer true, this function should become private.
*
* @param clusterConfig - the Cluster Config of the cluster where the node is located
* @param instanceConfig - the Instance Config of the node
* @param clusterConfig - the Cluster Config of the cluster where the node is located
* @param instanceConfig - the Instance Config of the node
* @param existingAssignment - all the existing replicas that are current assigned to the node
*/
private void refresh(ClusterConfig clusterConfig, InstanceConfig instanceConfig,
Expand All @@ -92,7 +95,7 @@ private void refresh(ClusterConfig clusterConfig, InstanceConfig instanceConfig,
if (instanceCapacity.isEmpty()) {
instanceCapacity = clusterConfig.getDefaultInstanceCapacityMap();
}
_currentCapacity.putAll(instanceCapacity);
_currentCapacityMap.putAll(instanceCapacity);
_faultZone = computeFaultZone(clusterConfig, instanceConfig);
_instanceTags = new HashSet<>(instanceConfig.getTags());
_disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap();
Expand All @@ -108,78 +111,110 @@ private void refresh(ClusterConfig clusterConfig, InstanceConfig instanceConfig,
* @param assignableReplica - the replica to be assigned
*/
void assign(AssignableReplica assignableReplica) {
if (!addToAssignmentRecord(assignableReplica, _currentAssignments)) {
throw new HelixException(String
.format("Resource %s already has a replica from partition %s on node %s",
assignableReplica.getResourceName(), assignableReplica.getPartitionName(),
getInstanceName()));
} else {
if (assignableReplica.isReplicaTopState()) {
addToAssignmentRecord(assignableReplica, _currentTopStateAssignments);
}
assignableReplica.getCapacity().entrySet().stream().forEach(
capacity -> updateCapacityAndUtilization(capacity.getKey(), capacity.getValue()));
}
addToAssignmentRecord(assignableReplica);
assignableReplica.getCapacity().entrySet().stream()
.forEach(capacity -> updateCapacityAndUtilization(capacity.getKey(), capacity.getValue()));
}

/**
* Release a replica from the node.
* If the replication is not on this node, the assignable node is not updated.
*
* @param assignableReplica - the replica to be released
* @param replica - the replica to be released
*/
void release(AssignableReplica assignableReplica) throws IllegalArgumentException {
String resourceName = assignableReplica.getResourceName();
String partitionName = assignableReplica.getPartitionName();
void release(AssignableReplica replica) throws IllegalArgumentException {
String resourceName = replica.getResourceName();
String partitionName = replica.getPartitionName();

// Check if the release is necessary
if (!_currentAssignments.containsKey(resourceName)) {
if (!_currentAssignedReplicaMap.containsKey(resourceName)) {
LOG.warn("Resource {} is not on node {}. Ignore the release call.", resourceName,
getInstanceName());
return;
}
Set<String> partitions = _currentAssignments.get(resourceName);
if (!partitions.contains(partitionName)) {
LOG.warn(String
.format("Resource %s does not have a replica from partition %s on node %s", resourceName,
partitionName, getInstanceName()));

Map<String, AssignableReplica> partitionMap = _currentAssignedReplicaMap.get(resourceName);
if (!partitionMap.containsKey(partitionName) || !partitionMap.get(partitionName)
.equals(replica)) {
LOG.warn("Replica {} is not assigned to node {}. Ignore the release call.",
replica.toString(), getInstanceName());
return;
}

partitions.remove(assignableReplica.getPartitionName());
if (assignableReplica.isReplicaTopState()) {
_currentTopStateAssignments.get(resourceName).remove(partitionName);
}
AssignableReplica removedReplica = partitionMap.remove(partitionName);
// Recalculate utilization because of release
_highestCapacityUtilization = 0;
assignableReplica.getCapacity().entrySet().stream()
removedReplica.getCapacity().entrySet().stream()
.forEach(entry -> updateCapacityAndUtilization(entry.getKey(), -1 * entry.getValue()));
}

public Map<String, Set<String>> getCurrentAssignmentsMap() {
return _currentAssignments;
/**
* @return A set of all assigned replicas on the node.
*/
public Set<AssignableReplica> getAssignedReplicas() {
return _currentAssignedReplicaMap.values().stream()
.flatMap(replicaMap -> replicaMap.values().stream()).collect(Collectors.toSet());
}

public Set<String> getCurrentAssignmentsByResource(String resource) {
return _currentAssignments.getOrDefault(resource, Collections.emptySet());
/**
* @return The current assignment in a map of <resource name, set of partition names>
*/
public Map<String, Set<String>> getAssignedPartitionsMap() {
Map<String, Set<String>> assignmentMap = new HashMap<>();
for (String resourceName : _currentAssignedReplicaMap.keySet()) {
assignmentMap.put(resourceName, _currentAssignedReplicaMap.get(resourceName).keySet());
}
return assignmentMap;
}

public Set<String> getCurrentTopStateAssignmentsByResource(String resource) {
return _currentTopStateAssignments.getOrDefault(resource, Collections.emptySet());
/**
* @param resource Resource name
* @return A set of the current assigned replicas' partition names in the specified resource.
*/
public Set<String> getAssignedPartitionsByResource(String resource) {
return _currentAssignedReplicaMap.getOrDefault(resource, Collections.emptyMap()).keySet();
}

public int getTopStateAssignmentTotalSize() {
return _currentTopStateAssignments.values().stream().mapToInt(Set::size).sum();
/**
* @param resource Resource name
* @return A set of the current assigned replicas' partition names with the top state in the specified resource.
*/
public Set<String> getAssignedTopStatePartitionsByResource(String resource) {
return _currentAssignedReplicaMap.getOrDefault(resource, Collections.emptyMap()).entrySet()
.stream().filter(partitionEntry -> partitionEntry.getValue().isReplicaTopState())
.map(partitionEntry -> partitionEntry.getKey()).collect(Collectors.toSet());
}

public int getCurrentAssignmentCount() {
return _currentAssignments.values().stream().mapToInt(Set::size).sum();
/**
* @return The total count of assigned top state partitions.
*/
public long getAssignedTopStatePartitionsCount() {
return _currentAssignedReplicaMap.values().stream()
.flatMap(replicaMap -> replicaMap.values().stream())
.filter(replica -> replica.isReplicaTopState()).count();
}

/**
* @return The total count of assigned replicas.
*/
public long getAssignedReplicaCount() {
return _currentAssignedReplicaMap.values().stream().mapToInt(Map::size).sum();
}

/**
* @return The current available capacity.
*/
public Map<String, Integer> getCurrentCapacity() {
return _currentCapacity;
return _currentCapacityMap;
}

/**
* Return the most concerning capacity utilization number for evenly partition assignment.
* The method dynamically returns the highest utilization number among all the capacity categories.
* For example, if the current node usage is {CPU: 0.9, MEM: 0.4, DISK: 0.6}. Then this call shall
* return 0.9.
* @return The highest utilization number of the node among all the capacity category.
*/
public float getHighestCapacityUtilization() {
return _highestCapacityUtilization;
}
Expand All @@ -196,14 +231,23 @@ public String getFaultZone() {
return _faultZone;
}

/**
* @return A map of <resource name, set of partition names> contains all the partitions that are disabled on the node.
*/
public Map<String, List<String>> getDisabledPartitionsMap() {
return _disabledPartitionsMap;
}

/**
* @return A map of <capacity category, capacity number> that describes the max capacity of the node.
*/
public Map<String, Integer> getMaxCapacity() {
return _maxCapacity;
}

/**
* @return The max partition count that are allowed to be allocated on the node.
*/
public int getMaxPartition() {
return _maxPartition;
}
Expand Down Expand Up @@ -268,10 +312,7 @@ private String computeFaultZone(ClusterConfig clusterConfig, InstanceConfig inst
private void assignNewBatch(Collection<AssignableReplica> replicas) {
Map<String, Integer> totalPartitionCapacity = new HashMap<>();
for (AssignableReplica replica : replicas) {
addToAssignmentRecord(replica, _currentAssignments);
if (replica.isReplicaTopState()) {
addToAssignmentRecord(replica, _currentTopStateAssignments);
}
addToAssignmentRecord(replica);
// increment the capacity requirement according to partition's capacity configuration.
for (Map.Entry<String, Integer> capacity : replica.getCapacity().entrySet()) {
totalPartitionCapacity.compute(capacity.getKey(),
Expand All @@ -287,16 +328,28 @@ private void assignNewBatch(Collection<AssignableReplica> replicas) {
}
}

private boolean addToAssignmentRecord(AssignableReplica replica,
Map<String, Set<String>> currentAssignments) {
return currentAssignments.computeIfAbsent(replica.getResourceName(), k -> new HashSet<>())
.add(replica.getPartitionName());
/**
* @throws HelixException if the replica has already been assigned to the node.
*/
private void addToAssignmentRecord(AssignableReplica replica) {
String resourceName = replica.getResourceName();
String partitionName = replica.getPartitionName();
if (_currentAssignedReplicaMap.containsKey(resourceName) && _currentAssignedReplicaMap
.get(resourceName).containsKey(partitionName)) {
throw new HelixException(String
.format("Resource %s already has a replica with state %s from partition %s on node %s",
replica.getResourceName(), replica.getReplicaState(), replica.getPartitionName(),
getInstanceName()));
} else {
_currentAssignedReplicaMap.computeIfAbsent(resourceName, key -> new HashMap<>())
.put(partitionName, replica);
}
}

private void updateCapacityAndUtilization(String capacityKey, int valueToSubtract) {
if (_currentCapacity.containsKey(capacityKey)) {
int newCapacity = _currentCapacity.get(capacityKey) - valueToSubtract;
_currentCapacity.put(capacityKey, newCapacity);
if (_currentCapacityMap.containsKey(capacityKey)) {
int newCapacity = _currentCapacityMap.get(capacityKey) - valueToSubtract;
_currentCapacityMap.put(capacityKey, newCapacity);
// For the purpose of constraint calculation, the max utilization cannot be larger than 100%.
float utilization = Math.min(
(float) (_maxCapacity.get(capacityKey) - newCapacity) / _maxCapacity.get(capacityKey), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,18 @@ public int compareTo(AssignableReplica replica) {
return 0;
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj instanceof AssignableReplica) {
return compareTo((AssignableReplica) obj) == 0;
} else {
return false;
}
}

public static String generateReplicaKey(String resourceName, String partitionName, String state) {
return String.format("%s-%s-%s", resourceName, partitionName, state);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ private static Map<String, Map<String, Set<String>>> mapAssignmentToFaultZone(
Set<AssignableNode> assignableNodes) {
Map<String, Map<String, Set<String>>> faultZoneAssignmentMap = new HashMap<>();
assignableNodes.stream().forEach(node -> {
for (Map.Entry<String, Set<String>> resourceMap : node.getCurrentAssignmentsMap()
for (Map.Entry<String, Set<String>> resourceMap : node.getAssignedPartitionsMap()
.entrySet()) {
faultZoneAssignmentMap.computeIfAbsent(node.getFaultZone(), k -> new HashMap<>())
.computeIfAbsent(resourceMap.getKey(), k -> new HashSet<>())
Expand Down
Loading