Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 51 additions & 34 deletions helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -706,11 +706,12 @@ public void enableP2PMessage(boolean enabled) {

/**
* Set the required Instance Capacity Keys.
* @param capacityKeys
* @param capacityKeys - the capacity key list.
* If null, the capacity keys item will be removed from the config.
*/
public void setInstanceCapacityKeys(List<String> capacityKeys) {
if (capacityKeys == null || capacityKeys.isEmpty()) {
throw new IllegalArgumentException("The input instance capacity key list is empty.");
if (capacityKeys == null) {
_record.getListFields().remove(ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name());
}
_record.setListField(ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name(), capacityKeys);
}
Expand Down Expand Up @@ -744,7 +745,8 @@ public Map<String, Integer> getDefaultInstanceCapacityMap() {
* If the instance capacity is not configured in either Instance Config nor Cluster Config, the
* cluster topology is considered invalid. So the rebalancer may stop working.
* @param capacityDataMap - map of instance capacity data
* @throws IllegalArgumentException - when any of the data value is a negative number or when the map is empty
* If null, the default capacity map item will be removed from the config.
* @throws IllegalArgumentException - when any of the data value is a negative number
*/
public void setDefaultInstanceCapacityMap(Map<String, Integer> capacityDataMap)
throws IllegalArgumentException {
Expand All @@ -769,7 +771,8 @@ public Map<String, Integer> getDefaultPartitionWeightMap() {
* If the partition weight is not configured in either Resource Config nor Cluster Config, the
* cluster topology is considered invalid. So the rebalancer may stop working.
* @param weightDataMap - map of partition weight data
* @throws IllegalArgumentException - when any of the data value is a negative number or when the map is empty
* If null, the default weight map item will be removed from the config.
* @throws IllegalArgumentException - when any of the data value is a negative number
*/
public void setDefaultPartitionWeightMap(Map<String, Integer> weightDataMap)
throws IllegalArgumentException {
Expand All @@ -788,39 +791,43 @@ private Map<String, Integer> getDefaultCapacityMap(ClusterConfigProperty capacit
private void setDefaultCapacityMap(ClusterConfigProperty capacityPropertyType,
Map<String, Integer> capacityDataMap) throws IllegalArgumentException {
if (capacityDataMap == null) {
throw new IllegalArgumentException("Default capacity data is null");
_record.getMapFields().remove(capacityPropertyType.name());
} else {
Map<String, String> data = new HashMap<>();
capacityDataMap.entrySet().stream().forEach(entry -> {
if (entry.getValue() < 0) {
throw new IllegalArgumentException(String
.format("Default capacity data contains a negative value: %s = %d", entry.getKey(),
entry.getValue()));
}
data.put(entry.getKey(), Integer.toString(entry.getValue()));
});
_record.setMapField(capacityPropertyType.name(), data);
}
Map<String, String> data = new HashMap<>();
capacityDataMap.entrySet().stream().forEach(entry -> {
if (entry.getValue() < 0) {
throw new IllegalArgumentException(String
.format("Default capacity data contains a negative value: %s = %d", entry.getKey(),
entry.getValue()));
}
data.put(entry.getKey(), Integer.toString(entry.getValue()));
});
_record.setMapField(capacityPropertyType.name(), data);
}

/**
* Set the global rebalancer's assignment preference.
* @param preference A map of the GlobalRebalancePreferenceKey and the corresponding weight.
* The ratio of the configured weights will determine the rebalancer's behavior.
* If null, the preference item will be removed from the config.
*/
public void setGlobalRebalancePreference(Map<GlobalRebalancePreferenceKey, Integer> preference) {
Map<String, String> preferenceMap = new HashMap<>();

preference.entrySet().stream().forEach(entry -> {
if (entry.getValue() > MAX_REBALANCE_PREFERENCE
|| entry.getValue() < MIN_REBALANCE_PREFERENCE) {
throw new IllegalArgumentException(String
.format("Invalid global rebalance preference configuration. Key %s, Value %d.",
entry.getKey().name(), entry.getValue()));
}
preferenceMap.put(entry.getKey().name(), Integer.toString(entry.getValue()));
});

_record.setMapField(ClusterConfigProperty.REBALANCE_PREFERENCE.name(), preferenceMap);
if (preference == null) {
_record.getMapFields().remove(ClusterConfigProperty.REBALANCE_PREFERENCE.name());
} else {
Map<String, String> preferenceMap = new HashMap<>();
preference.entrySet().stream().forEach(entry -> {
if (entry.getValue() > MAX_REBALANCE_PREFERENCE
|| entry.getValue() < MIN_REBALANCE_PREFERENCE) {
throw new IllegalArgumentException(String
.format("Invalid global rebalance preference configuration. Key %s, Value %d.",
entry.getKey().name(), entry.getValue()));
}
preferenceMap.put(entry.getKey().name(), Integer.toString(entry.getValue()));
});
_record.setMapField(ClusterConfigProperty.REBALANCE_PREFERENCE.name(), preferenceMap);
}
}

/**
Expand Down Expand Up @@ -859,14 +866,24 @@ public boolean isGlobalRebalanceAsyncModeEnabled() {

/**
* Set the abnormal state resolver class map.
* @param resolverMap - the resolver map
* If null, the resolver map item will be removed from the config.
*/
public void setAbnormalStateResolverMap(Map<String, String> resolverMap) {
if (resolverMap.values().stream()
.anyMatch(className -> className == null || className.isEmpty())) {
throw new IllegalArgumentException(
"Invalid Abnormal State Resolver Map definition. Class name cannot be empty.");
if (resolverMap == null) {
_record.getMapFields().remove(ClusterConfigProperty.ABNORMAL_STATES_RESOLVER_MAP.name());
} else {
if (resolverMap.entrySet().stream().anyMatch(e -> {
String stateModelDefName = e.getKey();
String className = e.getValue();
return stateModelDefName == null || stateModelDefName.isEmpty() || className == null
|| className.isEmpty();
})) {
throw new IllegalArgumentException(
"Invalid Abnormal State Resolver Map definition. StateModel definition name and the resolver class name cannot be empty.");
}
_record.setMapField(ClusterConfigProperty.ABNORMAL_STATES_RESOLVER_MAP.name(), resolverMap);
}
_record.setMapField(ClusterConfigProperty.ABNORMAL_STATES_RESOLVER_MAP.name(), resolverMap);
}

public Map<String, String> getAbnormalStateResolverMap() {
Expand Down
30 changes: 14 additions & 16 deletions helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.Set;
import java.util.stream.Collectors;

import com.google.common.base.Splitter;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.util.HelixUtil;
Expand Down Expand Up @@ -534,7 +533,8 @@ public Map<String, Integer> getInstanceCapacityMap() {
/**
* Set the instance capacity information with an Integer mapping.
* @param capacityDataMap - map of instance capacity data
* @throws IllegalArgumentException - when any of the data value is a negative number or when the map is incomplete
* If null, the capacity map item will be removed from the config.
* @throws IllegalArgumentException - when any of the data value is a negative number
*
* This information is required by the global rebalancer.
* @see <a href="Rebalance Algorithm">
Expand All @@ -547,21 +547,19 @@ public Map<String, Integer> getInstanceCapacityMap() {
public void setInstanceCapacityMap(Map<String, Integer> capacityDataMap)
throws IllegalArgumentException {
if (capacityDataMap == null) {
throw new IllegalArgumentException("Capacity Data is null");
_record.getMapFields().remove(InstanceConfigProperty.INSTANCE_CAPACITY_MAP.name());
} else {
Map<String, String> capacityData = new HashMap<>();
capacityDataMap.entrySet().stream().forEach(entry -> {
if (entry.getValue() < 0) {
throw new IllegalArgumentException(String
.format("Capacity Data contains a negative value: %s = %d", entry.getKey(),
entry.getValue()));
}
capacityData.put(entry.getKey(), Integer.toString(entry.getValue()));
});
_record.setMapField(InstanceConfigProperty.INSTANCE_CAPACITY_MAP.name(), capacityData);
}

Map<String, String> capacityData = new HashMap<>();

capacityDataMap.entrySet().stream().forEach(entry -> {
if (entry.getValue() < 0) {
throw new IllegalArgumentException(String
.format("Capacity Data contains a negative value: %s = %d", entry.getKey(),
entry.getValue()));
}
capacityData.put(entry.getKey(), Integer.toString(entry.getValue()));
});

_record.setMapField(InstanceConfigProperty.INSTANCE_CAPACITY_MAP.name(), capacityData);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,17 @@ public void testSetCapacityKeys() {

Assert.assertEquals(keys, testConfig.getRecord()
.getListField(ClusterConfig.ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name()));
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testSetCapacityKeysEmptyList() {
ClusterConfig testConfig = new ClusterConfig("testId");
testConfig.setInstanceCapacityKeys(Collections.emptyList());

Assert.assertEquals(testConfig.getRecord()
.getListField(ClusterConfig.ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name()),
Collections.emptyList());

testConfig.setInstanceCapacityKeys(null);

Assert.assertTrue(testConfig.getRecord()
.getListField(ClusterConfig.ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name()) == null);
}

@Test
Expand Down Expand Up @@ -119,6 +124,17 @@ public void testSetRebalancePreference() {
Assert.assertEquals(testConfig.getRecord()
.getMapField(ClusterConfig.ClusterConfigProperty.REBALANCE_PREFERENCE.name()),
mapFieldData);

testConfig.setGlobalRebalancePreference(Collections.emptyMap());

Assert.assertEquals(testConfig.getRecord()
.getMapField(ClusterConfig.ClusterConfigProperty.REBALANCE_PREFERENCE.name()),
Collections.emptyMap());

testConfig.setGlobalRebalancePreference(null);

Assert.assertTrue(testConfig.getRecord()
.getMapField(ClusterConfig.ClusterConfigProperty.REBALANCE_PREFERENCE.name()) == null);
}

@Test(expectedExceptions = IllegalArgumentException.class)
Expand Down Expand Up @@ -165,17 +181,17 @@ public void testSetInstanceCapacityMap() {

Assert.assertEquals(testConfig.getRecord().getMapField(ClusterConfig.ClusterConfigProperty.
DEFAULT_INSTANCE_CAPACITY_MAP.name()), capacityDataMapString);
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Default capacity data is null")
public void testSetInstanceCapacityMapEmpty() {
Map<String, Integer> capacityDataMap = new HashMap<>();

ClusterConfig testConfig = new ClusterConfig("testConfig");
// The following operation can be done, this will clear the default values
testConfig.setDefaultInstanceCapacityMap(capacityDataMap);
// The following operation will fail
testConfig.setDefaultInstanceCapacityMap(Collections.emptyMap());

Assert.assertEquals(testConfig.getRecord().getMapField(ClusterConfig.ClusterConfigProperty.
DEFAULT_INSTANCE_CAPACITY_MAP.name()), Collections.emptyMap());

testConfig.setDefaultInstanceCapacityMap(null);

Assert.assertTrue(testConfig.getRecord().getMapField(ClusterConfig.ClusterConfigProperty.
DEFAULT_INSTANCE_CAPACITY_MAP.name()) == null);
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Default capacity data contains a negative value: item3 = -3")
Expand Down Expand Up @@ -220,17 +236,17 @@ public void testSetPartitionWeightMap() {

Assert.assertEquals(testConfig.getRecord().getMapField(ClusterConfig.ClusterConfigProperty.
DEFAULT_PARTITION_WEIGHT_MAP.name()), weightDataMapString);
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Default capacity data is null")
public void testSetPartitionWeightMapEmpty() {
Map<String, Integer> weightDataMap = new HashMap<>();

ClusterConfig testConfig = new ClusterConfig("testConfig");
// The following operation can be done, this will clear the default values
testConfig.setDefaultPartitionWeightMap(weightDataMap);
// The following operation will fail
testConfig.setDefaultPartitionWeightMap(Collections.emptyMap());

Assert.assertEquals(testConfig.getRecord().getMapField(ClusterConfig.ClusterConfigProperty.
DEFAULT_PARTITION_WEIGHT_MAP.name()), Collections.emptyMap());

testConfig.setDefaultPartitionWeightMap(null);

Assert.assertTrue(testConfig.getRecord().getMapField(ClusterConfig.ClusterConfigProperty.
DEFAULT_PARTITION_WEIGHT_MAP.name()) == null);
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Default capacity data contains a negative value: item3 = -3")
Expand Down Expand Up @@ -264,12 +280,48 @@ public void testAbnormalStatesResolverConfig() {
// Default value is empty
Assert.assertEquals(testConfig.getAbnormalStateResolverMap(), Collections.EMPTY_MAP);
// Test set
Map<String, String> resolverMap = ImmutableMap.of(MasterSlaveSMD.name,
MockAbnormalStateResolver.class.getName());
Map<String, String> resolverMap =
ImmutableMap.of(MasterSlaveSMD.name, MockAbnormalStateResolver.class.getName());
testConfig.setAbnormalStateResolverMap(resolverMap);
Assert.assertEquals(testConfig.getAbnormalStateResolverMap(), resolverMap);
// Test empty the map
testConfig.setAbnormalStateResolverMap(Collections.emptyMap());
Assert.assertEquals(testConfig.getAbnormalStateResolverMap(), Collections.EMPTY_MAP);

testConfig.setAbnormalStateResolverMap(null);
Assert.assertTrue(testConfig.getRecord()
.getMapField(ClusterConfig.ClusterConfigProperty.ABNORMAL_STATES_RESOLVER_MAP.name())
== null);
}

@Test
public void testSetInvalidAbnormalStatesResolverConfig() {
ClusterConfig testConfig = new ClusterConfig("testConfig");

Map<String, String> resolverMap = new HashMap<>();
resolverMap.put(null, MockAbnormalStateResolver.class.getName());
trySetInvalidAbnormalStatesResolverMap(testConfig, resolverMap);

resolverMap.clear();
resolverMap.put("", MockAbnormalStateResolver.class.getName());
trySetInvalidAbnormalStatesResolverMap(testConfig, resolverMap);

resolverMap.clear();
resolverMap.put(MasterSlaveSMD.name, null);
trySetInvalidAbnormalStatesResolverMap(testConfig, resolverMap);

resolverMap.clear();
resolverMap.put(MasterSlaveSMD.name, "");
trySetInvalidAbnormalStatesResolverMap(testConfig, resolverMap);
}

private void trySetInvalidAbnormalStatesResolverMap(ClusterConfig testConfig,
Map<String, String> resolverMap) {
try {
testConfig.setAbnormalStateResolverMap(resolverMap);
Assert.fail("Invalid resolver setup shall fail.");
} catch (IllegalArgumentException ex) {
// expected
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@
* under the License.
*/

import java.util.Collections;
import java.util.Map;

import com.google.common.collect.ImmutableMap;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.testng.Assert;
import org.testng.annotations.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
* Created with IntelliJ IDEA.
* User: zzhang
Expand Down Expand Up @@ -91,26 +90,26 @@ public void testSetInstanceCapacityMap() {
"item2", 2,
"item3", 3);

Map<String, String> capacityDataMapString = ImmutableMap.of("item1", "1",
"item2", "2",
"item3", "3");
Map<String, String> capacityDataMapString =
ImmutableMap.of("item1", "1", "item2", "2", "item3", "3");

InstanceConfig testConfig = new InstanceConfig("testConfig");
testConfig.setInstanceCapacityMap(capacityDataMap);

Assert.assertEquals(testConfig.getRecord().getMapField(InstanceConfig.InstanceConfigProperty.
INSTANCE_CAPACITY_MAP.name()), capacityDataMapString);
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Capacity Data is null")
public void testSetInstanceCapacityMapEmpty() {
Map<String, Integer> capacityDataMap = new HashMap<>();

InstanceConfig testConfig = new InstanceConfig("testConfig");
// This operation shall be done. This will clear the instance capacity map in the InstanceConfig
testConfig.setInstanceCapacityMap(capacityDataMap);
// This operation will fall.
testConfig.setInstanceCapacityMap(Collections.emptyMap());

Assert.assertEquals(testConfig.getRecord().getMapField(InstanceConfig.InstanceConfigProperty.
INSTANCE_CAPACITY_MAP.name()), Collections.emptyMap());

// This operation shall be done. This will remove the instance capacity map in the InstanceConfig
testConfig.setInstanceCapacityMap(null);

Assert.assertTrue(testConfig.getRecord().getMapField(InstanceConfig.InstanceConfigProperty.
INSTANCE_CAPACITY_MAP.name()) == null);
}

@Test(expectedExceptions = IllegalArgumentException.class,
Expand Down
Loading