Skip to content

Commit 043f445

Browse files
authored
Add Java API for adding and validating resources for WAGED rebalancer (#570)
Add Java API methods for adding and validating resources for WAGED rebalancer. This is a set of convenience APIs provided through HelixAdmin the user could use to more easily add resources and validate them for WAGED rebalance usage. Changelist: 1. Add API methods in HelixAdmin 2. Implement the said methods 3. Add tests
1 parent 22e6b87 commit 043f445

8 files changed

Lines changed: 538 additions & 77 deletions

File tree

helix-core/src/main/java/org/apache/helix/HelixAdmin.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.io.IOException;
2323
import java.util.List;
2424
import java.util.Map;
25+
2526
import org.apache.helix.model.ClusterConstraints;
2627
import org.apache.helix.model.ClusterConstraints.ConstraintType;
2728
import org.apache.helix.model.ConstraintItem;
@@ -30,8 +31,10 @@
3031
import org.apache.helix.model.IdealState;
3132
import org.apache.helix.model.InstanceConfig;
3233
import org.apache.helix.model.MaintenanceSignal;
34+
import org.apache.helix.model.ResourceConfig;
3335
import org.apache.helix.model.StateModelDefinition;
3436

37+
3538
/*
3639
* Helix cluster management
3740
*/
@@ -576,4 +579,50 @@ void rebalance(String clusterName, String resourceName, int replica, String keyP
576579
* Release resources
577580
*/
578581
void close();
582+
583+
/**
584+
* Adds a resource with IdealState and ResourceConfig to be rebalanced by WAGED rebalancer with validation.
585+
* Validation includes the following:
586+
* 1. Check ResourceConfig has the WEIGHT field
587+
* 2. Check that all capacity keys from ClusterConfig are set up in the WEIGHT field
588+
* 3. Check that all ResourceConfig's weightMap fields have all of the capacity keys
589+
* @param clusterName
590+
* @param idealState
591+
* @param resourceConfig
592+
* @return true if the resource has been added successfully. False otherwise
593+
*/
594+
boolean addResourceWithWeight(String clusterName, IdealState idealState,
595+
ResourceConfig resourceConfig);
596+
597+
/**
598+
* Batch-enables Waged rebalance for the names of resources given.
599+
* @param clusterName
600+
* @param resourceNames
601+
* @return
602+
*/
603+
boolean enableWagedRebalance(String clusterName, List<String> resourceNames);
604+
605+
/**
606+
* Validates the resources to see if their weight configs have been set properly.
607+
* Validation includes the following:
608+
* 1. Check ResourceConfig has the WEIGHT field
609+
* 2. Check that all capacity keys from ClusterConfig are set up in the WEIGHT field
610+
* 3. Check that all ResourceConfig's weightMap fields have all of the capacity keys
611+
* @param resourceNames
612+
* @return for each resource, true if the weight configs have been set properly, false otherwise
613+
*/
614+
Map<String, Boolean> validateResourcesForWagedRebalance(String clusterName,
615+
List<String> resourceNames);
616+
617+
/**
618+
* Validates the instances to ensure their weights in InstanceConfigs have been set up properly.
619+
* Validation includes the following:
620+
* 1. If default instance capacity is not set, check that the InstanceConfigs have the CAPACITY field
621+
* 2. Check that all capacity keys defined in ClusterConfig are present in the CAPACITY field
622+
* @param clusterName
623+
* @param instancesNames
624+
* @return
625+
*/
626+
Map<String, Boolean> validateInstancesForWagedRebalance(String clusterName,
627+
List<String> instancesNames);
579628
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package org.apache.helix.controller.rebalancer.util;
2+
3+
/*
4+
* Licensed to the Apache Software Foundation (ASF) under one
5+
* or more contributor license agreements. See the NOTICE file
6+
* distributed with this work for additional information
7+
* regarding copyright ownership. The ASF licenses this file
8+
* to you under the Apache License, Version 2.0 (the
9+
* "License"); you may not use this file except in compliance
10+
* with the License. You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing,
15+
* software distributed under the License is distributed on an
16+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17+
* KIND, either express or implied. See the License for the
18+
* specific language governing permissions and limitations
19+
* under the License.
20+
*/
21+
22+
import java.util.HashMap;
23+
import java.util.List;
24+
import java.util.Map;
25+
26+
import org.apache.helix.HelixException;
27+
import org.apache.helix.model.ClusterConfig;
28+
import org.apache.helix.model.InstanceConfig;
29+
import org.apache.helix.model.ResourceConfig;
30+
31+
32+
/**
33+
* A util class that contains validation-related static methods for WAGED rebalancer.
34+
*/
35+
public class WagedValidationUtil {
36+
/**
37+
* Validates and returns instance capacities. The validation logic ensures that all required capacity keys (in ClusterConfig) are present in InstanceConfig.
38+
* @param clusterConfig
39+
* @param instanceConfig
40+
* @return
41+
*/
42+
public static Map<String, Integer> validateAndGetInstanceCapacity(ClusterConfig clusterConfig,
43+
InstanceConfig instanceConfig) {
44+
// Fetch the capacity of instance from 2 possible sources according to the following priority.
45+
// 1. The instance capacity that is configured in the instance config.
46+
// 2. If the default instance capacity that is configured in the cluster config contains more capacity keys, fill the capacity map with those additional values.
47+
Map<String, Integer> instanceCapacity =
48+
new HashMap<>(clusterConfig.getDefaultInstanceCapacityMap());
49+
instanceCapacity.putAll(instanceConfig.getInstanceCapacityMap());
50+
51+
List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
52+
// All the required keys must exist in the instance config.
53+
if (!instanceCapacity.keySet().containsAll(requiredCapacityKeys)) {
54+
throw new HelixException(String.format(
55+
"The required capacity keys: %s are not fully configured in the instance: %s, capacity map: %s.",
56+
requiredCapacityKeys.toString(), instanceConfig.getInstanceName(),
57+
instanceCapacity.toString()));
58+
}
59+
return instanceCapacity;
60+
}
61+
62+
/**
63+
* Validates and returns partition capacities. The validation logic ensures that all required capacity keys (from ClusterConfig) are present in the ResourceConfig for the partition.
64+
* @param partitionName
65+
* @param resourceConfig
66+
* @param clusterConfig
67+
* @return
68+
*/
69+
public static Map<String, Integer> validateAndGetPartitionCapacity(String partitionName,
70+
ResourceConfig resourceConfig, Map<String, Map<String, Integer>> capacityMap,
71+
ClusterConfig clusterConfig) {
72+
// Fetch the capacity of partition from 3 possible sources according to the following priority.
73+
// 1. The partition capacity that is explicitly configured in the resource config.
74+
// 2. Or, the default partition capacity that is configured under partition name DEFAULT_PARTITION_KEY in the resource config.
75+
// 3. If the default partition capacity that is configured in the cluster config contains more capacity keys, fill the capacity map with those additional values.
76+
Map<String, Integer> partitionCapacity =
77+
new HashMap<>(clusterConfig.getDefaultPartitionWeightMap());
78+
partitionCapacity.putAll(capacityMap.getOrDefault(partitionName,
79+
capacityMap.getOrDefault(ResourceConfig.DEFAULT_PARTITION_KEY, new HashMap<>())));
80+
81+
List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
82+
// If any required capacity key is not configured in the resource config, fail the model creating.
83+
if (!partitionCapacity.keySet().containsAll(requiredCapacityKeys)) {
84+
throw new HelixException(String.format(
85+
"The required capacity keys: %s are not fully configured in the resource: %s, partition: %s, weight map: %s.",
86+
requiredCapacityKeys.toString(), resourceConfig.getResourceName(), partitionName,
87+
partitionCapacity.toString()));
88+
}
89+
return partitionCapacity;
90+
}
91+
}

helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,13 @@
3131
import com.google.common.collect.ImmutableMap;
3232
import com.google.common.collect.ImmutableSet;
3333
import org.apache.helix.HelixException;
34+
import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
3435
import org.apache.helix.model.ClusterConfig;
3536
import org.apache.helix.model.InstanceConfig;
3637
import org.slf4j.Logger;
3738
import org.slf4j.LoggerFactory;
3839

40+
3941
/**
4042
* This class represents a possible allocation of the replication.
4143
* Note that any usage updates to the AssignableNode are not thread safe.
@@ -113,15 +115,16 @@ void assignInitBatch(Collection<AssignableReplica> replicas) {
113115
void assign(AssignableReplica assignableReplica) {
114116
addToAssignmentRecord(assignableReplica);
115117
assignableReplica.getCapacity().entrySet().stream()
116-
.forEach(capacity -> updateCapacityAndUtilization(capacity.getKey(), capacity.getValue()));
118+
.forEach(capacity -> updateCapacityAndUtilization(capacity.getKey(), capacity.getValue()));
117119
}
118120

119121
/**
120122
* Release a replica from the node.
121123
* If the replication is not on this node, the assignable node is not updated.
122124
* @param replica - the replica to be released
123125
*/
124-
void release(AssignableReplica replica) throws IllegalArgumentException {
126+
void release(AssignableReplica replica)
127+
throws IllegalArgumentException {
125128
String resourceName = replica.getResourceName();
126129
String partitionName = replica.getPartitionName();
127130

@@ -320,12 +323,12 @@ private String computeFaultZone(ClusterConfig clusterConfig, InstanceConfig inst
320323
private void addToAssignmentRecord(AssignableReplica replica) {
321324
String resourceName = replica.getResourceName();
322325
String partitionName = replica.getPartitionName();
323-
if (_currentAssignedReplicaMap.containsKey(resourceName)
324-
&& _currentAssignedReplicaMap.get(resourceName).containsKey(partitionName)) {
325-
throw new HelixException(String.format(
326-
"Resource %s already has a replica with state %s from partition %s on node %s",
327-
replica.getResourceName(), replica.getReplicaState(), replica.getPartitionName(),
328-
getInstanceName()));
326+
if (_currentAssignedReplicaMap.containsKey(resourceName) && _currentAssignedReplicaMap
327+
.get(resourceName).containsKey(partitionName)) {
328+
throw new HelixException(String
329+
.format("Resource %s already has a replica with state %s from partition %s on node %s",
330+
replica.getResourceName(), replica.getReplicaState(), replica.getPartitionName(),
331+
getInstanceName()));
329332
} else {
330333
_currentAssignedReplicaMap.computeIfAbsent(resourceName, key -> new HashMap<>())
331334
.put(partitionName, replica);
@@ -348,23 +351,10 @@ private void updateCapacityAndUtilization(String capacityKey, int usage) {
348351
*/
349352
private Map<String, Integer> fetchInstanceCapacity(ClusterConfig clusterConfig,
350353
InstanceConfig instanceConfig) {
351-
// Fetch the capacity of instance from 2 possible sources according to the following priority.
352-
// 1. The instance capacity that is configured in the instance config.
353-
// 2. If the default instance capacity that is configured in the cluster config contains more capacity keys, fill the capacity map with those additional values.
354354
Map<String, Integer> instanceCapacity =
355-
new HashMap<>(clusterConfig.getDefaultInstanceCapacityMap());
356-
instanceCapacity.putAll(instanceConfig.getInstanceCapacityMap());
357-
358-
List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
359-
// All the required keys must exist in the instance config.
360-
if (!instanceCapacity.keySet().containsAll(requiredCapacityKeys)) {
361-
throw new HelixException(String.format(
362-
"The required capacity keys: %s are not fully configured in the instance: %s, capacity map: %s.",
363-
requiredCapacityKeys.toString(), _instanceName, instanceCapacity.toString()));
364-
}
355+
WagedValidationUtil.validateAndGetInstanceCapacity(clusterConfig, instanceConfig);
365356
// Remove all the non-required capacity items from the map.
366-
instanceCapacity.keySet().retainAll(requiredCapacityKeys);
367-
357+
instanceCapacity.keySet().retainAll(clusterConfig.getInstanceCapacityKeys());
368358
return instanceCapacity;
369359
}
370360

helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Map;
2626

2727
import org.apache.helix.HelixException;
28+
import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
2829
import org.apache.helix.model.ClusterConfig;
2930
import org.apache.helix.model.ResourceConfig;
3031
import org.apache.helix.model.StateModelDefinition;
@@ -149,27 +150,10 @@ private Map<String, Integer> fetchCapacityUsage(String partitionName,
149150
"Invalid partition capacity configuration of resource: " + resourceConfig
150151
.getResourceName(), ex);
151152
}
152-
153-
// Fetch the capacity of partition from 3 possible sources according to the following priority.
154-
// 1. The partition capacity that is explicitly configured in the resource config.
155-
// 2. Or, the default partition capacity that is configured under partition name DEFAULT_PARTITION_KEY in the resource config.
156-
// 3. If the default partition capacity that is configured in the cluster config contains more capacity keys, fill the capacity map with those additional values.
157-
Map<String, Integer> partitionCapacity =
158-
new HashMap<>(clusterConfig.getDefaultPartitionWeightMap());
159-
partitionCapacity.putAll(capacityMap.getOrDefault(partitionName,
160-
capacityMap.getOrDefault(ResourceConfig.DEFAULT_PARTITION_KEY, new HashMap<>())));
161-
162-
List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
163-
// If any required capacity key is not configured in the resource config, fail the model creating.
164-
if (!partitionCapacity.keySet().containsAll(requiredCapacityKeys)) {
165-
throw new HelixException(String.format(
166-
"The required capacity keys: %s are not fully configured in the resource: %s, partition: %s, weight map: %s.",
167-
requiredCapacityKeys.toString(), resourceConfig.getResourceName(), partitionName,
168-
partitionCapacity.toString()));
169-
}
153+
Map<String, Integer> partitionCapacity = WagedValidationUtil
154+
.validateAndGetPartitionCapacity(partitionName, resourceConfig, capacityMap, clusterConfig);
170155
// Remove the non-required capacity items.
171-
partitionCapacity.keySet().retainAll(requiredCapacityKeys);
172-
156+
partitionCapacity.keySet().retainAll(clusterConfig.getInstanceCapacityKeys());
173157
return partitionCapacity;
174158
}
175159
}

0 commit comments

Comments
 (0)