Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,22 @@


public class TestPartitionMigrationBase extends ZkTestBase {
final int NUM_NODE = 6;
protected final int NUM_NODE = 6;
protected static final int START_PORT = 12918;
protected static final int _PARTITIONS = 50;

protected final String CLASS_NAME = getShortClassName();
protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
protected ClusterControllerManager _controller;

List<MockParticipantManager> _participants = new ArrayList<>();
protected List<MockParticipantManager> _participants = new ArrayList<>();
protected int _replica = 3;
protected int _minActiveReplica = _replica - 1;
ZkHelixClusterVerifier _clusterVerifier;
protected ZkHelixClusterVerifier _clusterVerifier;
protected List<String> _testDBs = new ArrayList<>();

MigrationStateVerifier _migrationVerifier;
HelixManager _manager;
protected MigrationStateVerifier _migrationVerifier;
protected HelixManager _manager;
protected ConfigAccessor _configAccessor;


Expand Down Expand Up @@ -134,7 +134,7 @@ protected Map<String, IdealState> createTestDBs(long delayTime) throws Interrupt
return idealStateMap;
}

class MigrationStateVerifier implements IdealStateChangeListener, ExternalViewChangeListener {
protected class MigrationStateVerifier implements IdealStateChangeListener, ExternalViewChangeListener {
static final int EXTRA_REPLICA = 1;

boolean _hasMoreReplica = false;
Expand All @@ -144,7 +144,6 @@ class MigrationStateVerifier implements IdealStateChangeListener, ExternalViewCh
boolean trackEnabled = false;
Map<String, IdealState> _resourceMap;


public MigrationStateVerifier(Map<String, IdealState> resourceMap, HelixManager manager) {
_resourceMap = resourceMap;
_manager = manager;
Expand Down Expand Up @@ -243,7 +242,6 @@ public void reset() {
}
}


@AfterClass
public void afterClass() throws Exception {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,29 @@
import java.util.HashMap;
import java.util.Map;

import org.apache.helix.integration.rebalancer.PartitionMigration.TestExpandCluster;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.integration.rebalancer.PartitionMigration.TestPartitionMigrationBase;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;


public class TestWagedExpandCluster extends TestExpandCluster {
public class TestWagedExpandCluster extends TestPartitionMigrationBase {
Map<String, IdealState> _resourceMap;

@BeforeClass
public void beforeClass()
throws Exception {
super.beforeClass();
_resourceMap = createTestDBs(1000000);
// TODO remove this sleep after fix https://github.com/apache/helix/issues/526
Thread.sleep(1000);
_migrationVerifier = new MigrationStateVerifier(_resourceMap, _manager);
}

protected Map<String, IdealState> createTestDBs(long delayTime) {
Map<String, IdealState> idealStateMap = new HashMap<>();
int i = 0;
Expand All @@ -49,4 +66,94 @@ protected Map<String, IdealState> createTestDBs(long delayTime) {

return idealStateMap;
}

@Test
public void testClusterExpansion()
throws Exception {
Assert.assertTrue(_clusterVerifier.verifyByPolling());

_migrationVerifier.start();

// expand cluster by adding instance one by one
int numNodes = _participants.size();
for (int i = numNodes; i < numNodes + NUM_NODE; i++) {
String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
MockParticipantManager participant = createAndStartParticipant(storageNodeName);
_participants.add(participant);
Thread.sleep(50);
}

Assert.assertTrue(_clusterVerifier.verifyByPolling());
Assert.assertFalse(_migrationVerifier.hasLessReplica());
Assert.assertFalse(_migrationVerifier.hasMoreReplica());

_migrationVerifier.stop();
}

@Test(dependsOnMethods = {"testClusterExpansion"})
public void testClusterExpansionByEnableInstance()
throws Exception {
Assert.assertTrue(_clusterVerifier.verifyByPolling());

_migrationVerifier.reset();
_migrationVerifier.start();

int numNodes = _participants.size();
// add new instances with all disabled
for (int i = numNodes; i < numNodes + NUM_NODE; i++) {
String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
InstanceConfig config = InstanceConfig.toInstanceConfig(storageNodeName);
config.setInstanceEnabled(false);
config.getRecord().getSimpleFields()
.remove(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED_TIMESTAMP.name());

_gSetupTool.getClusterManagementTool().addInstance(CLUSTER_NAME, config);

// start dummy participants
MockParticipantManager participant =
new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName);
participant.syncStart();
_participants.add(participant);
}

// enable new instance one by one
for (int i = numNodes; i < numNodes + NUM_NODE; i++) {
String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
_gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, storageNodeName, true);
Thread.sleep(100);
}

Assert.assertTrue(_clusterVerifier.verifyByPolling());
Assert.assertFalse(_migrationVerifier.hasLessReplica());
_migrationVerifier.stop();
}

@Test(dependsOnMethods = {"testClusterExpansion", "testClusterExpansionByEnableInstance"})
public void testClusterShrink()
throws Exception {
ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
clusterConfig.setDelayRebalaceEnabled(false);
clusterConfig.setRebalanceDelayTime(0);
_configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);

Assert.assertTrue(_clusterVerifier.verifyByPolling());

_migrationVerifier.reset();
_migrationVerifier.start();

// remove instance one by one
for (int i = 0; i < NUM_NODE; i++) {
String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
MockParticipantManager participant = _participants.get(i);
participant.syncStop();
_gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, storageNodeName, false);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
}

Assert.assertTrue(_clusterVerifier.verifyByPolling());
Assert.assertFalse(_migrationVerifier.hasLessMinActiveReplica());
Assert.assertFalse(_migrationVerifier.hasMoreReplica());

_migrationVerifier.stop();
}
}