Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ public enum ClusterConfigProperty {
// Specifies job types and used for quota allocation
QUOTA_TYPES,

/**
* Configurable characteristics of the WAGED rebalancer.
* TODO: Split the WAGED rebalancer configuration items to the other config file.
*/
// The required instance capacity keys for resource partition assignment calculation.
INSTANCE_CAPACITY_KEYS,
// The default instance capacity if no capacity is configured in the Instance Config node.
Expand All @@ -94,7 +98,18 @@ public enum ClusterConfigProperty {
// The preference of the rebalance result.
// EVENNESS - Evenness of the resource utilization, partition, and top state distribution.
// LESS_MOVEMENT - the tendency of keeping the current assignment instead of moving the partition for optimal assignment.
REBALANCE_PREFERENCE
REBALANCE_PREFERENCE,
// Specify if the WAGED rebalancer should asynchronously perform the global rebalance, which is
// in general slower than the partial rebalance.
// Note that asynchronous global rebalance calculation will reduce the controller rebalance
// delay. But it may cause more partition movements. This is because the partial rebalance will
// be performed with a stale baseline. The rebalance result would be an intermediate one and
// could be changed again when a new baseline is calculated.
// For more details, please refer to
// https://github.com/apache/helix/wiki/Weight-aware-Globally-Evenly-distributed-Rebalancer#rebalance-coordinator
//
// Default to be true.
GLOBAL_REBALANCE_ASYNC_MODE
}

public enum GlobalRebalancePreferenceKey {
Expand All @@ -121,6 +136,7 @@ public enum GlobalRebalancePreferenceKey {
.put(GlobalRebalancePreferenceKey.LESS_MOVEMENT, 1).build();
private final static int MAX_REBALANCE_PREFERENCE = 10;
private final static int MIN_REBALANCE_PREFERENCE = 0;
private final static boolean DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED = true;

/**
* Instantiate for a specific cluster
Expand Down Expand Up @@ -822,6 +838,19 @@ public Map<GlobalRebalancePreferenceKey, Integer> getGlobalRebalancePreference()
return DEFAULT_GLOBAL_REBALANCE_PREFERENCE;
}

/**
* Set the asynchronous global rebalance mode.
* @param isAsync true if the global rebalance should be performed asynchronously
*/
public void setGlobalRebalanceAsyncMode(boolean isAsync) {
_record.setBooleanField(ClusterConfigProperty.GLOBAL_REBALANCE_ASYNC_MODE.name(), isAsync);
}

public boolean isGlobalRebalanceAsyncModeEnabled() {
return _record.getBooleanField(ClusterConfigProperty.GLOBAL_REBALANCE_ASYNC_MODE.name(),
DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED);
}

/**
* Get IdealState rules defined in the cluster config.
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@
* under the License.
*/

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.helix.ZNRecord;
import org.testng.Assert;
import org.testng.annotations.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.helix.model.ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS;
import static org.apache.helix.model.ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT;

Expand Down Expand Up @@ -239,4 +239,21 @@ public void testSetPartitionWeightMapInvalid() {
ClusterConfig testConfig = new ClusterConfig("testConfig");
testConfig.setDefaultPartitionWeightMap(weightDataMap);
}

@Test
public void testAsyncGlobalRebalanceOption() {
ClusterConfig testConfig = new ClusterConfig("testConfig");
// Default value is true.
Assert.assertEquals(testConfig.isGlobalRebalanceAsyncModeEnabled(), true);
// Test get the option
testConfig.getRecord()
.setBooleanField(ClusterConfig.ClusterConfigProperty.GLOBAL_REBALANCE_ASYNC_MODE.name(),
false);
Assert.assertEquals(testConfig.isGlobalRebalanceAsyncModeEnabled(), false);
// Test set the option
testConfig.setGlobalRebalanceAsyncMode(true);
Assert.assertEquals(testConfig.getRecord()
.getBooleanField(ClusterConfig.ClusterConfigProperty.GLOBAL_REBALANCE_ASYNC_MODE.name(),
false), true);
}
}