Skip to content

Commit 08a2015

Browse files
authored
Refine the WAGED rebalancer related interfaces for integration (apache#431)
* Refine the WAGED rebalancer related interfaces and initial integrate with the BestPossibleStateCalStage. - Modify the BestPossibleStateCalStage logic to plugin the WAGED rebalancer. - Refine ClusterModel to integrate with the ClusterDataDetector implementation. - Enabling getting the changed details for Cluster Config in the change detector. Which is required by the WAGED rebalancer.
1 parent 672c9cb commit 08a2015

9 files changed

Lines changed: 202 additions & 260 deletions

File tree

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package org.apache.helix;
2+
3+
/*
4+
* Licensed to the Apache Software Foundation (ASF) under one
5+
* or more contributor license agreements. See the NOTICE file
6+
* distributed with this work for additional information
7+
* regarding copyright ownership. The ASF licenses this file
8+
* to you under the Apache License, Version 2.0 (the
9+
* "License"); you may not use this file except in compliance
10+
* with the License. You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing,
15+
* software distributed under the License is distributed on an
16+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17+
* KIND, either express or implied. See the License for the
18+
* specific language governing permissions and limitations
19+
* under the License.
20+
*/
21+
22+
/**
23+
* Exception thrown by Helix due to rebalance failures.
24+
*/
25+
public class HelixRebalanceException extends Exception {
26+
enum RebalanceFailureType {
27+
INVALID_CLUSTER_STATUS,
28+
INVALID_REBALANCER_STATUS,
29+
FAILED_TO_CALCULATE,
30+
UNKNOWN_FAILURE
31+
}
32+
33+
private final RebalanceFailureType _type;
34+
35+
public HelixRebalanceException(String message, RebalanceFailureType type, Throwable cause) {
36+
super(String.format("%s. Failure Type: %s", message, type.name()), cause);
37+
_type = type;
38+
}
39+
40+
public RebalanceFailureType getFailureType() {
41+
return _type;
42+
}
43+
}

helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,17 @@
2020
*/
2121

2222
import com.google.common.collect.Sets;
23+
import org.apache.helix.HelixConstants;
24+
import org.apache.helix.HelixProperty;
25+
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
2329
import java.util.Collection;
2430
import java.util.Collections;
2531
import java.util.HashMap;
2632
import java.util.HashSet;
2733
import java.util.Map;
28-
import org.apache.helix.HelixConstants;
29-
import org.apache.helix.HelixException;
30-
import org.apache.helix.HelixProperty;
31-
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
3234

3335
/**
3436
* ResourceChangeDetector implements ChangeDetector. It caches resource-related metadata from
@@ -37,6 +39,7 @@
3739
* WARNING: the methods of this class are not thread-safe.
3840
*/
3941
public class ResourceChangeDetector implements ChangeDetector {
42+
private static final Logger LOG = LoggerFactory.getLogger(ResourceChangeDetector.class.getName());
4043

4144
private ResourceChangeSnapshot _oldSnapshot; // snapshot for previous pipeline run
4245
private ResourceChangeSnapshot _newSnapshot; // snapshot for this pipeline run
@@ -108,10 +111,13 @@ private void clearCachedComputation() {
108111
return snapshot.getResourceConfigMap();
109112
case LIVE_INSTANCE:
110113
return snapshot.getLiveInstances();
114+
case CONFIG:
115+
return Collections.emptyMap();
111116
default:
112-
throw new HelixException(String.format(
113-
"ResourceChangeDetector cannot compute the names of changes for the given ChangeType: %s",
114-
changeType));
117+
LOG.warn(
118+
"ResourceChangeDetector cannot compute the names of changes for the given ChangeType: {}",
119+
changeType);
120+
return Collections.emptyMap();
115121
}
116122
}
117123

helix-core/src/main/java/org/apache/helix/controller/rebalancer/GlobalRebalancer.java

Lines changed: 0 additions & 67 deletions
This file was deleted.

helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataDetector.java

Lines changed: 0 additions & 73 deletions
This file was deleted.

helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataProvider.java

Lines changed: 0 additions & 54 deletions
This file was deleted.

helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java

Lines changed: 51 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@
1919
* under the License.
2020
*/
2121

22-
import org.apache.helix.HelixException;
2322
import org.apache.helix.HelixManager;
23+
import org.apache.helix.HelixRebalanceException;
24+
import org.apache.helix.controller.changedetector.ResourceChangeDetector;
2425
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
25-
import org.apache.helix.controller.rebalancer.GlobalRebalancer;
26+
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
27+
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
28+
import org.apache.helix.controller.rebalancer.waged.constraints.ConstraintsRebalanceAlgorithm;
2629
import org.apache.helix.controller.stages.CurrentStateOutput;
2730
import org.apache.helix.model.IdealState;
2831
import org.apache.helix.model.Resource;
@@ -36,23 +39,57 @@
3639
* A placeholder before we have the implementation.
3740
* Weight-Aware Globally-Even Distribute Rebalancer.
3841
*
39-
* @see <a href="Design Document">https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer</a>
42+
* @see <a href="https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer">
43+
* Design Document
44+
* </a>
4045
*/
41-
public class WagedRebalancer implements GlobalRebalancer<ResourceControllerDataProvider> {
46+
public class WagedRebalancer {
4247
private static final Logger LOG = LoggerFactory.getLogger(WagedRebalancer.class);
4348

44-
@Override
45-
public void init(HelixManager manager) { }
49+
// --------- The following fields are placeholders and need replacement. -----------//
50+
// TODO Shall we make the metadata store a static threadlocal object as well to avoid reinitialization?
51+
private final AssignmentMetadataStore _assignmentMetadataStore;
52+
private final RebalanceAlgorithm _rebalanceAlgorithm;
53+
// ------------------------------------------------------------------------------------//
4654

47-
@Override
48-
public Map<String, IdealState> computeNewIdealState(CurrentStateOutput currentStateOutput,
49-
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap)
50-
throws HelixException {
51-
return new HashMap<>();
55+
// The cluster change detector is a stateful object. Make it static to avoid unnecessary
56+
// reinitialization.
57+
private static final ThreadLocal<ResourceChangeDetector> CHANGE_DETECTOR_THREAD_LOCAL =
58+
new ThreadLocal<>();
59+
private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
60+
61+
private ResourceChangeDetector getChangeDetector() {
62+
if (CHANGE_DETECTOR_THREAD_LOCAL.get() == null) {
63+
CHANGE_DETECTOR_THREAD_LOCAL.set(new ResourceChangeDetector());
64+
}
65+
return CHANGE_DETECTOR_THREAD_LOCAL.get();
66+
}
67+
68+
public WagedRebalancer(HelixManager helixManager) {
69+
// TODO init the metadata store according to their requirement when integrate, or change to final static method if possible.
70+
_assignmentMetadataStore = new AssignmentMetadataStore();
71+
// TODO init the algorithm according to the requirement when integrate.
72+
_rebalanceAlgorithm = new ConstraintsRebalanceAlgorithm();
73+
74+
// Use the mapping calculator in DelayedAutoRebalancer for calculating the final assignment
75+
// output.
76+
// This calculator will translate the best possible assignment into an applicable state mapping
77+
// based on the current states.
78+
// TODO abstract and separate the mapping calculator logic from the DelayedAutoRebalancer
79+
_mappingCalculator = new DelayedAutoRebalancer();
5280
}
5381

54-
@Override
55-
public RebalanceFailureReason getFailureReason() {
56-
return new RebalanceFailureReason(RebalanceFailureType.UNKNOWN_FAILURE);
82+
/**
83+
* Compute the new IdealStates for all the resources input. The IdealStates include both the new
84+
* partition assignment (in the listFiles) and the new replica state mapping (in the mapFields).
85+
* @param clusterData The Cluster status data provider.
86+
* @param resourceMap A map containing all the rebalancing resources.
87+
* @param currentStateOutput The present Current State of the cluster.
88+
* @return A map containing the computed new IdealStates.
89+
*/
90+
public Map<String, IdealState> computeNewIdealStates(ResourceControllerDataProvider clusterData,
91+
Map<String, Resource> resourceMap, final CurrentStateOutput currentStateOutput)
92+
throws HelixRebalanceException {
93+
return new HashMap<>();
5794
}
5895
}

0 commit comments

Comments
 (0)