Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions helix-core/src/main/java/org/apache/helix/HelixAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.ConstraintItem;
Expand All @@ -30,8 +31,10 @@
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.MaintenanceSignal;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;


/*
* Helix cluster management
*/
Expand Down Expand Up @@ -576,4 +579,50 @@ void rebalance(String clusterName, String resourceName, int replica, String keyP
* Release resources
*/
void close();

/**
* Adds a resource with IdealState and ResourceConfig to be rebalanced by WAGED rebalancer with validation.
Comment thread
narendly marked this conversation as resolved.
* Validation includes the following:
* 1. Check ResourceConfig has the WEIGHT field
* 2. Check that all capacity keys from ClusterConfig are set up in the WEIGHT field
* 3. Check that all ResourceConfig's weightMap fields have all of the capacity keys
* @param clusterName
* @param idealState
* @param resourceConfig
* @return true if the resource has been added successfully. False otherwise
*/
boolean addResourceWithWeight(String clusterName, IdealState idealState,
ResourceConfig resourceConfig);

/**
* Batch-enables Waged rebalance for the names of resources given.
* @param clusterName
* @param resourceNames
* @return
*/
boolean enableWagedRebalance(String clusterName, List<String> resourceNames);

/**
* Validates the resources to see if their weight configs have been set properly.
* Validation includes the following:
* 1. Check ResourceConfig has the WEIGHT field
* 2. Check that all capacity keys from ClusterConfig are set up in the WEIGHT field
* 3. Check that all ResourceConfig's weightMap fields have all of the capacity keys
* @param resourceNames
* @return for each resource, true if the weight configs have been set properly, false otherwise
*/
Map<String, Boolean> validateResourcesForWagedRebalance(String clusterName,
List<String> resourceNames);

/**
* Validates the instances to ensure their weights in InstanceConfigs have been set up properly.
* Validation includes the following:
* 1. If default instance capacity is not set, check that the InstanceConfigs have the CAPACITY field
* 2. Check that all capacity keys defined in ClusterConfig are present in the CAPACITY field
* @param clusterName
* @param instancesNames
* @return
*/
Map<String, Boolean> validateInstancesForWagedRebalance(String clusterName,
Comment thread
jiajunwang marked this conversation as resolved.
List<String> instancesNames);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* This class represents a possible allocation of the replication.
* Note that any usage updates to the AssignableNode are not thread safe.
Expand Down Expand Up @@ -113,15 +114,16 @@ void assignInitBatch(Collection<AssignableReplica> replicas) {
void assign(AssignableReplica assignableReplica) {
addToAssignmentRecord(assignableReplica);
assignableReplica.getCapacity().entrySet().stream()
.forEach(capacity -> updateCapacityAndUtilization(capacity.getKey(), capacity.getValue()));
.forEach(capacity -> updateCapacityAndUtilization(capacity.getKey(), capacity.getValue()));
}

/**
* Release a replica from the node.
* If the replication is not on this node, the assignable node is not updated.
* @param replica - the replica to be released
*/
void release(AssignableReplica replica) throws IllegalArgumentException {
void release(AssignableReplica replica)
throws IllegalArgumentException {
String resourceName = replica.getResourceName();
String partitionName = replica.getPartitionName();

Expand Down Expand Up @@ -320,12 +322,12 @@ private String computeFaultZone(ClusterConfig clusterConfig, InstanceConfig inst
private void addToAssignmentRecord(AssignableReplica replica) {
String resourceName = replica.getResourceName();
String partitionName = replica.getPartitionName();
if (_currentAssignedReplicaMap.containsKey(resourceName)
&& _currentAssignedReplicaMap.get(resourceName).containsKey(partitionName)) {
throw new HelixException(String.format(
"Resource %s already has a replica with state %s from partition %s on node %s",
replica.getResourceName(), replica.getReplicaState(), replica.getPartitionName(),
getInstanceName()));
if (_currentAssignedReplicaMap.containsKey(resourceName) && _currentAssignedReplicaMap
.get(resourceName).containsKey(partitionName)) {
throw new HelixException(String
.format("Resource %s already has a replica with state %s from partition %s on node %s",
replica.getResourceName(), replica.getReplicaState(), replica.getPartitionName(),
getInstanceName()));
} else {
_currentAssignedReplicaMap.computeIfAbsent(resourceName, key -> new HashMap<>())
.put(partitionName, replica);
Expand All @@ -348,23 +350,10 @@ private void updateCapacityAndUtilization(String capacityKey, int usage) {
*/
private Map<String, Integer> fetchInstanceCapacity(ClusterConfig clusterConfig,
InstanceConfig instanceConfig) {
// Fetch the capacity of instance from 2 possible sources according to the following priority.
// 1. The instance capacity that is configured in the instance config.
// 2. If the default instance capacity that is configured in the cluster config contains more capacity keys, fill the capacity map with those additional values.
Map<String, Integer> instanceCapacity =
new HashMap<>(clusterConfig.getDefaultInstanceCapacityMap());
instanceCapacity.putAll(instanceConfig.getInstanceCapacityMap());

List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
// All the required keys must exist in the instance config.
if (!instanceCapacity.keySet().containsAll(requiredCapacityKeys)) {
throw new HelixException(String.format(
"The required capacity keys: %s are not fully configured in the instance: %s, capacity map: %s.",
requiredCapacityKeys.toString(), _instanceName, instanceCapacity.toString()));
}
validateAndGetInstanceCapacity(clusterConfig, instanceConfig);
// Remove all the non-required capacity items from the map.
instanceCapacity.keySet().retainAll(requiredCapacityKeys);

instanceCapacity.keySet().retainAll(clusterConfig.getInstanceCapacityKeys());
return instanceCapacity;
}

Expand All @@ -382,4 +371,30 @@ public int compareTo(AssignableNode o) {
public String toString() {
return _instanceName;
}

/**
* Validates and returns instance capacities. The validation logic ensures that all required capacity keys (in ClusterConfig) are present in InstanceConfig.
* @param clusterConfig
* @param instanceConfig
* @return
*/
public static Map<String, Integer> validateAndGetInstanceCapacity(ClusterConfig clusterConfig,
Comment thread
narendly marked this conversation as resolved.
Outdated
InstanceConfig instanceConfig) {
// Fetch the capacity of instance from 2 possible sources according to the following priority.
// 1. The instance capacity that is configured in the instance config.
// 2. If the default instance capacity that is configured in the cluster config contains more capacity keys, fill the capacity map with those additional values.
Map<String, Integer> instanceCapacity =
new HashMap<>(clusterConfig.getDefaultInstanceCapacityMap());
instanceCapacity.putAll(instanceConfig.getInstanceCapacityMap());

List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
// All the required keys must exist in the instance config.
if (!instanceCapacity.keySet().containsAll(requiredCapacityKeys)) {
throw new HelixException(String.format(
"The required capacity keys: %s are not fully configured in the instance: %s, capacity map: %s.",
requiredCapacityKeys.toString(), instanceConfig.getInstanceName(),
instanceCapacity.toString()));
}
return instanceCapacity;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,23 @@ private Map<String, Integer> fetchCapacityUsage(String partitionName,
"Invalid partition capacity configuration of resource: " + resourceConfig
.getResourceName(), ex);
}
Map<String, Integer> partitionCapacity =
validateAndGetPartitionCapacity(partitionName, resourceConfig, capacityMap, clusterConfig);
Comment thread
jiajunwang marked this conversation as resolved.
Outdated
// Remove the non-required capacity items.
partitionCapacity.keySet().retainAll(clusterConfig.getInstanceCapacityKeys());
return partitionCapacity;
}

/**
* Validates and returns partition capacities. The validation logic ensures that all required capacity keys (from ClusterConfig) are present in the ResourceConfig for the partition.
* @param partitionName
* @param resourceConfig
* @param clusterConfig
* @return
*/
public static Map<String, Integer> validateAndGetPartitionCapacity(String partitionName,
Comment thread
jiajunwang marked this conversation as resolved.
Outdated
ResourceConfig resourceConfig, Map<String, Map<String, Integer>> capacityMap,
ClusterConfig clusterConfig) {
// Fetch the capacity of partition from 3 possible sources according to the following priority.
// 1. The partition capacity that is explicitly configured in the resource config.
// 2. Or, the default partition capacity that is configured under partition name DEFAULT_PARTITION_KEY in the resource config.
Expand All @@ -167,9 +183,6 @@ private Map<String, Integer> fetchCapacityUsage(String partitionName,
requiredCapacityKeys.toString(), resourceConfig.getResourceName(), partitionName,
partitionCapacity.toString()));
}
// Remove the non-required capacity items.
partitionCapacity.keySet().retainAll(requiredCapacityKeys);

return partitionCapacity;
}
}
Loading