Skip to content

Commit 7acc3c0

Browse files
committed
Add ChangeDetector interface and ResourceChangeDetector implementation
In order to efficiently react to changes happening to the cluster in the new WAGED rebalancer, a new component called ChangeDetector was added. Changelist: 1. Add ChangeDetector interface 2. Implement ResourceChangeDetector 3. Add ResourceChangeCache, a wrapper for critical cluster metadata
1 parent 3c3db0b commit 7acc3c0

4 files changed

Lines changed: 348 additions & 5 deletions

File tree

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package org.apache.helix.controller.changedetector;
2+
3+
/*
4+
* Licensed to the Apache Software Foundation (ASF) under one
5+
* or more contributor license agreements. See the NOTICE file
6+
* distributed with this work for additional information
7+
* regarding copyright ownership. The ASF licenses this file
8+
* to you under the Apache License, Version 2.0 (the
9+
* "License"); you may not use this file except in compliance
10+
* with the License. You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing,
15+
* software distributed under the License is distributed on an
16+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17+
* KIND, either express or implied. See the License for the
18+
* specific language governing permissions and limitations
19+
* under the License.
20+
*/
21+
22+
import java.util.Collection;
23+
import org.apache.helix.HelixConstants;
24+
25+
/**
26+
* ChangeDetector interface that will be used to track deltas in the cluster from one pipeline run
27+
* to another. The interface methods are designed to be flexible for both the resource pipeline and
28+
* the task pipeline.
29+
* TODO: Consider splitting this up into two different ChangeDetector interfaces:
30+
* TODO: PropertyBasedChangeDetector and PathBasedChangeDetector.
31+
*/
32+
public interface ChangeDetector {
33+
34+
/**
35+
* Returns all types of changes detected.
36+
* @return a collection of ChangeTypes
37+
*/
38+
Collection<HelixConstants.ChangeType> getChangeTypes();
39+
40+
/**
41+
* Returns the names of items that changed based on the change type given.
42+
* @return a collection of names of items that changed
43+
*/
44+
Collection<String> getChangesByType(HelixConstants.ChangeType changeType);
45+
46+
/**
47+
* Returns the names of items that were added based on the change type given.
48+
* @return a collection of names of items that were added
49+
*/
50+
Collection<String> getAdditionsByType(HelixConstants.ChangeType changeType);
51+
52+
/**
53+
* Returns the names of items that were removed based on the change type given.
54+
* @return a collection of names of items that were removed
55+
*/
56+
Collection<String> getRemovalsByType(HelixConstants.ChangeType changeType);
57+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package org.apache.helix.controller.changedetector;
2+
3+
/*
4+
* Licensed to the Apache Software Foundation (ASF) under one
5+
* or more contributor license agreements. See the NOTICE file
6+
* distributed with this work for additional information
7+
* regarding copyright ownership. The ASF licenses this file
8+
* to you under the Apache License, Version 2.0 (the
9+
* "License"); you may not use this file except in compliance
10+
* with the License. You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing,
15+
* software distributed under the License is distributed on an
16+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17+
* KIND, either express or implied. See the License for the
18+
* specific language governing permissions and limitations
19+
* under the License.
20+
*/
21+
22+
import java.util.HashMap;
23+
import java.util.HashSet;
24+
import java.util.Map;
25+
import java.util.Set;
26+
import org.apache.helix.HelixConstants;
27+
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
28+
import org.apache.helix.model.IdealState;
29+
import org.apache.helix.model.InstanceConfig;
30+
import org.apache.helix.model.LiveInstance;
31+
import org.apache.helix.model.ResourceConfig;
32+
33+
/**
34+
* ResourceChangeCache is a POJO that contains the following Helix metadata:
35+
* 1. InstanceConfig
36+
* 2. IdealState
37+
* 3. ResourceConfig
38+
* 4. LiveInstance
39+
* 5. Changed property types
40+
* It serves as a snapshot of the main controller cache to enable the difference (change)
41+
* calculation between two rounds of the pipeline run.
42+
*/
43+
class ResourceChangeCache {
44+
45+
private Set<HelixConstants.ChangeType> _changedTypes;
46+
private Map<String, InstanceConfig> _instanceConfigMap;
47+
private Map<String, IdealState> _idealStateMap;
48+
private Map<String, ResourceConfig> _resourceConfigMap;
49+
private Map<String, LiveInstance> _liveInstances;
50+
51+
/**
52+
* Constructor using controller cache (ResourceControllerDataProvider).
53+
* @param dataProvider
54+
*/
55+
ResourceChangeCache(ResourceControllerDataProvider dataProvider) {
56+
// Consume all changed types from DataProvider. This is because it is possible that the
57+
// DataProvider has gone through multiple rounds of rebalancing prior to the ChangeDetector
58+
// consuming changed types.
59+
_changedTypes = new HashSet<>(dataProvider.getRefreshedChangeTypes());
60+
dataProvider.clearRefreshedChangeTypes();
61+
62+
_instanceConfigMap = new HashMap<>(dataProvider.getInstanceConfigMap());
63+
_idealStateMap = new HashMap<>(dataProvider.getIdealStates());
64+
_resourceConfigMap = new HashMap<>(dataProvider.getResourceConfigMap());
65+
_liveInstances = new HashMap<>(dataProvider.getLiveInstances());
66+
}
67+
68+
/**
69+
* Copy constructor for ResourceChangeCache.
70+
* @param cache
71+
*/
72+
ResourceChangeCache(ResourceChangeCache cache) {
73+
_changedTypes = new HashSet<>(cache._changedTypes);
74+
_instanceConfigMap = new HashMap<>(cache._instanceConfigMap);
75+
_idealStateMap = new HashMap<>(cache._idealStateMap);
76+
_resourceConfigMap = new HashMap<>(cache._resourceConfigMap);
77+
_liveInstances = new HashMap<>(cache._liveInstances);
78+
}
79+
80+
Set<HelixConstants.ChangeType> getChangedTypes() {
81+
return _changedTypes;
82+
}
83+
84+
Map<String, InstanceConfig> getInstanceConfigMap() {
85+
return _instanceConfigMap;
86+
}
87+
88+
Map<String, IdealState> getIdealStateMap() {
89+
return _idealStateMap;
90+
}
91+
92+
Map<String, ResourceConfig> getResourceConfigMap() {
93+
return _resourceConfigMap;
94+
}
95+
96+
Map<String, LiveInstance> getLiveInstances() {
97+
return _liveInstances;
98+
}
99+
}
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
package org.apache.helix.controller.changedetector;
2+
3+
/*
4+
* Licensed to the Apache Software Foundation (ASF) under one
5+
* or more contributor license agreements. See the NOTICE file
6+
* distributed with this work for additional information
7+
* regarding copyright ownership. The ASF licenses this file
8+
* to you under the Apache License, Version 2.0 (the
9+
* "License"); you may not use this file except in compliance
10+
* with the License. You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing,
15+
* software distributed under the License is distributed on an
16+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17+
* KIND, either express or implied. See the License for the
18+
* specific language governing permissions and limitations
19+
* under the License.
20+
*/
21+
22+
import java.util.Collection;
23+
import java.util.Collections;
24+
import java.util.HashMap;
25+
import java.util.HashSet;
26+
import java.util.Map;
27+
import org.apache.helix.HelixConstants;
28+
import org.apache.helix.HelixException;
29+
import org.apache.helix.HelixProperty;
30+
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
31+
32+
/**
33+
* ResourceChangeDetector implements ChangeDetector. It caches resource-related metadata from
34+
* Helix's main resource pipeline cache (DataProvider) and the computation results of change
35+
* detection.
36+
* WARNING: the methods of this class are not thread-safe.
37+
*/
38+
public class ResourceChangeDetector implements ChangeDetector {
39+
40+
private ResourceChangeCache _oldCache; // cache snapshot for previous pipeline run
41+
private ResourceChangeCache _newCache; // cache snapshot for this pipeline run
42+
43+
private Map<String, ? extends HelixProperty> _oldPropertyMap;
44+
private Map<String, ? extends HelixProperty> _newPropertyMap;
45+
46+
// The following caches the computation results
47+
private Map<HelixConstants.ChangeType, Collection<String>> _changedItems = new HashMap<>();
48+
private Map<HelixConstants.ChangeType, Collection<String>> _addedItems = new HashMap<>();
49+
private Map<HelixConstants.ChangeType, Collection<String>> _removedItems = new HashMap<>();
50+
51+
/**
52+
* Compare the underlying HelixProperty objects and produce a collection of names of changed
53+
* properties.
54+
* @return
55+
*/
56+
private Collection<String> getChangedItems() {
57+
Collection<String> changedItems = new HashSet<>();
58+
_oldPropertyMap.forEach((name, property) -> {
59+
if (_newPropertyMap.containsKey(name) && !property.equals(_newPropertyMap.get(name))) {
60+
changedItems.add(name);
61+
}
62+
});
63+
return changedItems;
64+
}
65+
66+
/**
67+
* Return a collection of names that are newly added.
68+
* @return
69+
*/
70+
private Collection<String> getAddedItems() {
71+
Collection<String> addedItems = new HashSet<>(_newPropertyMap.keySet());
72+
addedItems.removeAll(_oldPropertyMap.keySet());
73+
return addedItems;
74+
}
75+
76+
/**
77+
* Return a collection of names that were removed.
78+
* @return
79+
*/
80+
private Collection<String> getRemovedItems() {
81+
Collection<String> removedItems = new HashSet<>(_oldPropertyMap.keySet());
82+
removedItems.removeAll(_newPropertyMap.keySet());
83+
return removedItems;
84+
}
85+
86+
/**
87+
* Based on the change type given, call the right getters for two property maps and save the
88+
* references as oldPropertyMap and newPropertyMap.
89+
* @param changeType
90+
*/
91+
private void determinePropertyMapByType(HelixConstants.ChangeType changeType) {
92+
switch (changeType) {
93+
case INSTANCE_CONFIG:
94+
_oldPropertyMap = _oldCache.getInstanceConfigMap();
95+
_newPropertyMap = _newCache.getInstanceConfigMap();
96+
break;
97+
case IDEAL_STATE:
98+
_oldPropertyMap = _oldCache.getIdealStateMap();
99+
_newPropertyMap = _newCache.getIdealStateMap();
100+
break;
101+
case RESOURCE_CONFIG:
102+
_oldPropertyMap = _oldCache.getResourceConfigMap();
103+
_newPropertyMap = _newCache.getResourceConfigMap();
104+
break;
105+
case LIVE_INSTANCE:
106+
_oldPropertyMap = _oldCache.getLiveInstances();
107+
_newPropertyMap = _newCache.getLiveInstances();
108+
break;
109+
default:
110+
throw new HelixException(String
111+
.format("ResourceChangeDetector does not support the given change type: %s", changeType));
112+
}
113+
}
114+
115+
/**
116+
* Makes the current newCache the oldCache and reads in the up-to-date cache for change
117+
* computation. To be called in the controller pipeline.
118+
* @param dataProvider newly refreshed DataProvider (cache)
119+
*/
120+
public synchronized void updateCache(ResourceControllerDataProvider dataProvider) {
121+
// Do nothing if nothing has changed
122+
if (dataProvider.getRefreshedChangeTypes().isEmpty()) {
123+
return;
124+
}
125+
126+
// If there are changes, update internal states
127+
_oldCache = new ResourceChangeCache(_newCache);
128+
_newCache = new ResourceChangeCache(dataProvider);
129+
130+
// Invalidate cached computation
131+
_changedItems.clear();
132+
_addedItems.clear();
133+
_removedItems.clear();
134+
}
135+
136+
@Override
137+
public Collection<HelixConstants.ChangeType> getChangeTypes() {
138+
return Collections.unmodifiableSet(_newCache.getChangedTypes());
139+
}
140+
141+
@Override
142+
public Collection<String> getChangesByType(HelixConstants.ChangeType changeType) {
143+
return _changedItems.computeIfAbsent(changeType, changedItems -> {
144+
determinePropertyMapByType(changeType);
145+
return getChangedItems();
146+
});
147+
}
148+
149+
@Override
150+
public Collection<String> getAdditionsByType(HelixConstants.ChangeType changeType) {
151+
return _addedItems.computeIfAbsent(changeType, changedItems -> {
152+
determinePropertyMapByType(changeType);
153+
return getAddedItems();
154+
});
155+
}
156+
157+
@Override
158+
public Collection<String> getRemovalsByType(HelixConstants.ChangeType changeType) {
159+
return _removedItems.computeIfAbsent(changeType, changedItems -> {
160+
determinePropertyMapByType(changeType);
161+
return getRemovedItems();
162+
});
163+
}
164+
}

helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Map;
2727
import java.util.Set;
2828

29+
import java.util.concurrent.ConcurrentHashMap;
2930
import org.apache.helix.HelixConstants;
3031
import org.apache.helix.HelixDataAccessor;
3132
import org.apache.helix.PropertyKey;
@@ -65,6 +66,9 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider {
6566
private Map<String, Map<String, MissingTopStateRecord>> _missingTopStateMap;
6667
private Map<String, Map<String, String>> _lastTopStateLocationMap;
6768

69+
// Maintain a set of all ChangeTypes for change detection
70+
private Set<HelixConstants.ChangeType> _refreshedChangeTypes;
71+
6872
public ResourceControllerDataProvider() {
6973
this(AbstractDataCache.UNKNOWN_CLUSTER);
7074
}
@@ -107,19 +111,21 @@ public String getObjName(ExternalView obj) {
107111
_idealMappingCache = new HashMap<>();
108112
_missingTopStateMap = new HashMap<>();
109113
_lastTopStateLocationMap = new HashMap<>();
114+
_refreshedChangeTypes = ConcurrentHashMap.newKeySet();
110115
}
111116

112117
public synchronized void refresh(HelixDataAccessor accessor) {
113118
long startTime = System.currentTimeMillis();
114119

115120
// Refresh base
116-
Set<HelixConstants.ChangeType> propertyRefreshed = super.doRefresh(accessor);
121+
Set<HelixConstants.ChangeType> changedTypes = super.doRefresh(accessor);
122+
_refreshedChangeTypes.addAll(changedTypes);
117123

118124
// Invalidate cached information if any of the important data has been refreshed
119-
if (propertyRefreshed.contains(HelixConstants.ChangeType.IDEAL_STATE)
120-
|| propertyRefreshed.contains(HelixConstants.ChangeType.LIVE_INSTANCE)
121-
|| propertyRefreshed.contains(HelixConstants.ChangeType.INSTANCE_CONFIG)
122-
|| propertyRefreshed.contains(HelixConstants.ChangeType.RESOURCE_CONFIG)) {
125+
if (changedTypes.contains(HelixConstants.ChangeType.IDEAL_STATE)
126+
|| changedTypes.contains(HelixConstants.ChangeType.LIVE_INSTANCE)
127+
|| changedTypes.contains(HelixConstants.ChangeType.INSTANCE_CONFIG)
128+
|| changedTypes.contains(HelixConstants.ChangeType.RESOURCE_CONFIG)) {
123129
clearCachedResourceAssignments();
124130
}
125131

@@ -262,6 +268,23 @@ public void setCachedIdealMapping(String resource, ZNRecord mapping) {
262268
_idealMappingCache.put(resource, mapping);
263269
}
264270

271+
/**
272+
* Return the set of all PropertyTypes that changed prior to this round of rebalance. The caller
273+
* should clear this set by calling {@link #clearRefreshedChangeTypes()}.
274+
* @return
275+
*/
276+
public Set<HelixConstants.ChangeType> getRefreshedChangeTypes() {
277+
return _refreshedChangeTypes;
278+
}
279+
280+
/**
281+
* Clears the set of all PropertyTypes that changed. The caller will have consumed all change
282+
* types by calling {@link #getRefreshedChangeTypes()}.
283+
*/
284+
public void clearRefreshedChangeTypes() {
285+
_refreshedChangeTypes.clear();
286+
}
287+
265288
public void clearCachedResourceAssignments() {
266289
_resourceAssignmentCache.clear();
267290
_idealMappingCache.clear();

0 commit comments

Comments
 (0)