Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,15 @@ private void refresh(ClusterConfig clusterConfig, InstanceConfig instanceConfig,
Collection<AssignableReplica> existingAssignment) {
reset();

_currentCapacity.putAll(instanceConfig.getInstanceCapacityMap());
Map<String, Integer> instanceCapacity = instanceConfig.getInstanceCapacityMap();
if (instanceCapacity.isEmpty()) {
instanceCapacity = clusterConfig.getDefaultInstanceCapacityMap();
}
_currentCapacity.putAll(instanceCapacity);
_faultZone = computeFaultZone(clusterConfig, instanceConfig);
_instanceTags = new HashSet<>(instanceConfig.getTags());
_disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap();
_maxCapacity = instanceConfig.getInstanceCapacityMap();
_maxCapacity = instanceCapacity;
_maxPartition = clusterConfig.getMaxPartitionsPerInstance();

assignNewBatch(existingAssignment);
Expand Down
44 changes: 44 additions & 0 deletions helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
import org.apache.helix.api.config.StateTransitionTimeoutConfig;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Cluster configurations
Expand Down Expand Up @@ -86,6 +88,8 @@ public enum ClusterConfigProperty {

// The required instance capacity keys for resource partition assignment calculation.
INSTANCE_CAPACITY_KEYS,
// The default instance capacity if no capacity is configured in the Instance Config node.
DEFAULT_INSTANCE_CAPACITY_MAP,
// The preference of the rebalance result.
// EVENNESS - Evenness of the resource utilization, partition, and top state distribution.
// LESS_MOVEMENT - the tendency of keeping the current assignment instead of moving the partition for optimal assignment.
Expand Down Expand Up @@ -700,6 +704,46 @@ public List<String> getInstanceCapacityKeys() {
return capacityKeys;
}

/**
* Get the default instance capacity information from the map fields.
* @return data map if it exists, or empty map
*/
public Map<String, Integer> getDefaultInstanceCapacityMap() {
Map<String, String> capacityData =
_record.getMapField(ClusterConfigProperty.DEFAULT_INSTANCE_CAPACITY_MAP.name());

if (capacityData != null) {
return capacityData.entrySet().stream().collect(
Collectors.toMap(entry -> entry.getKey(), entry -> Integer.parseInt(entry.getValue())));
}
return Collections.emptyMap();
}

/**
* Set the default instance capacity information with an Integer mapping
* @param capacityDataMap - map of instance capacity data
* @throws IllegalArgumentException - when any of the data value is a negative number or when the map is empty
*/
public void setDefaultInstanceCapacityMap(Map<String, Integer> capacityDataMap)
throws IllegalArgumentException {
if (capacityDataMap == null || capacityDataMap.size() == 0) {
throw new IllegalArgumentException("Default Instance Capacity Data is empty");
}

Map<String, String> capacityData = new HashMap<>();

capacityDataMap.entrySet().stream().forEach(entry -> {
Comment thread
jiajunwang marked this conversation as resolved.
if (entry.getValue() < 0) {
throw new IllegalArgumentException(String
.format("Default Instance Capacity Data contains a negative value: %s = %d",
entry.getKey(), entry.getValue()));
}
capacityData.put(entry.getKey(), Integer.toString(entry.getValue()));
});

_record.setMapField(ClusterConfigProperty.DEFAULT_INSTANCE_CAPACITY_MAP.name(), capacityData);
}

/**
* Set the global rebalancer's assignment preference.
* @param preference A map of the GlobalRebalancePreferenceKey and the corresponding weight.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,4 +200,17 @@ public void testParseFaultZone() throws IOException {

Assert.assertEquals(assignableNode.getFaultZone(), "2/testInstance/");
}

@Test
public void testDefaultInstanceCapacity() {
ClusterConfig testClusterConfig = new ClusterConfig("testClusterConfigId");
testClusterConfig.setDefaultInstanceCapacityMap(_capacityDataMap);

InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceConfigId");

AssignableNode assignableNode =
new AssignableNode(testClusterConfig, testInstanceConfig, _testInstanceId,
Collections.emptyList());
Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
Comment thread
narendly marked this conversation as resolved.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
*/

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.helix.ZNRecord;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -127,4 +129,56 @@ public void testSetRebalancePreferenceInvalidNumber() {
ClusterConfig testConfig = new ClusterConfig("testId");
testConfig.setGlobalRebalancePreference(preference);
}

@Test
public void testGetInstanceCapacityMap() {
Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1, "item2", 2, "item3", 3);

Map<String, String> capacityDataMapString =
ImmutableMap.of("item1", "1", "item2", "2", "item3", "3");

ZNRecord rec = new ZNRecord("testId");
rec.setMapField(ClusterConfig.ClusterConfigProperty.DEFAULT_INSTANCE_CAPACITY_MAP.name(),
capacityDataMapString);
ClusterConfig testConfig = new ClusterConfig(rec);

Assert.assertTrue(testConfig.getDefaultInstanceCapacityMap().equals(capacityDataMap));
}

@Test
public void testGetInstanceCapacityMapEmpty() {
ClusterConfig testConfig = new ClusterConfig("testId");

Assert.assertTrue(testConfig.getDefaultInstanceCapacityMap().equals(Collections.emptyMap()));
}

@Test
public void testSetInstanceCapacityMap() {
Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1, "item2", 2, "item3", 3);

Map<String, String> capacityDataMapString =
ImmutableMap.of("item1", "1", "item2", "2", "item3", "3");

ClusterConfig testConfig = new ClusterConfig("testConfig");
testConfig.setDefaultInstanceCapacityMap(capacityDataMap);

Assert.assertEquals(testConfig.getRecord().getMapField(ClusterConfig.ClusterConfigProperty.
DEFAULT_INSTANCE_CAPACITY_MAP.name()), capacityDataMapString);
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Default Instance Capacity Data is empty")
public void testSetInstanceCapacityMapEmpty() {
Map<String, Integer> capacityDataMap = new HashMap<>();

ClusterConfig testConfig = new ClusterConfig("testConfig");
testConfig.setDefaultInstanceCapacityMap(capacityDataMap);
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Default Instance Capacity Data contains a negative value: item3 = -3")
public void testSetInstanceCapacityMapInvalid() {
Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1, "item2", 2, "item3", -3);

ClusterConfig testConfig = new ClusterConfig("testConfig");
testConfig.setDefaultInstanceCapacityMap(capacityDataMap);
}
}