Skip to content

Commit 103513a

Browse files
narendlyJiajun Wang
authored andcommitted
Add ChangeDetector interface and ResourceChangeDetector implementation (#388)
Add ChangeDetector interface and ResourceChangeDetector implementation In order to efficiently react to changes happening to the cluster in the new WAGED rebalancer, a new component called ChangeDetector was added. Changelist: 1. Add ChangeDetector interface 2. Implement ResourceChangeDetector 3. Add ResourceChangeCache, a wrapper for critical cluster metadata 4. Add an integration test, TestResourceChangeDetector
1 parent 73e4a5a commit 103513a

5 files changed

Lines changed: 705 additions & 5 deletions

File tree

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package org.apache.helix.controller.changedetector;
2+
3+
/*
4+
* Licensed to the Apache Software Foundation (ASF) under one
5+
* or more contributor license agreements. See the NOTICE file
6+
* distributed with this work for additional information
7+
* regarding copyright ownership. The ASF licenses this file
8+
* to you under the Apache License, Version 2.0 (the
9+
* "License"); you may not use this file except in compliance
10+
* with the License. You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing,
15+
* software distributed under the License is distributed on an
16+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17+
* KIND, either express or implied. See the License for the
18+
* specific language governing permissions and limitations
19+
* under the License.
20+
*/
21+
22+
import java.util.Collection;
23+
import org.apache.helix.HelixConstants;
24+
25+
/**
26+
* ChangeDetector interface that will be used to track deltas in the cluster from one pipeline run
27+
* to another. The interface methods are designed to be flexible for both the resource pipeline and
28+
* the task pipeline.
29+
* TODO: Consider splitting this up into two different ChangeDetector interfaces:
30+
* TODO: PropertyBasedChangeDetector and PathBasedChangeDetector.
31+
*/
32+
public interface ChangeDetector {
33+
34+
/**
35+
* Returns all types of changes detected.
36+
* @return a collection of ChangeTypes
37+
*/
38+
Collection<HelixConstants.ChangeType> getChangeTypes();
39+
40+
/**
41+
* Returns the names of items that changed based on the change type given.
42+
* @return a collection of names of items that changed
43+
*/
44+
Collection<String> getChangesByType(HelixConstants.ChangeType changeType);
45+
46+
/**
47+
* Returns the names of items that were added based on the change type given.
48+
* @return a collection of names of items that were added
49+
*/
50+
Collection<String> getAdditionsByType(HelixConstants.ChangeType changeType);
51+
52+
/**
53+
* Returns the names of items that were removed based on the change type given.
54+
* @return a collection of names of items that were removed
55+
*/
56+
Collection<String> getRemovalsByType(HelixConstants.ChangeType changeType);
57+
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
package org.apache.helix.controller.changedetector;
2+
3+
/*
4+
* Licensed to the Apache Software Foundation (ASF) under one
5+
* or more contributor license agreements. See the NOTICE file
6+
* distributed with this work for additional information
7+
* regarding copyright ownership. The ASF licenses this file
8+
* to you under the Apache License, Version 2.0 (the
9+
* "License"); you may not use this file except in compliance
10+
* with the License. You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing,
15+
* software distributed under the License is distributed on an
16+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17+
* KIND, either express or implied. See the License for the
18+
* specific language governing permissions and limitations
19+
* under the License.
20+
*/
21+
22+
import com.google.common.collect.Sets;
23+
import java.util.Collection;
24+
import java.util.Collections;
25+
import java.util.HashMap;
26+
import java.util.HashSet;
27+
import java.util.Map;
28+
import org.apache.helix.HelixConstants;
29+
import org.apache.helix.HelixException;
30+
import org.apache.helix.HelixProperty;
31+
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
32+
33+
/**
34+
* ResourceChangeDetector implements ChangeDetector. It caches resource-related metadata from
35+
* Helix's main resource pipeline cache (DataProvider) and the computation results of change
36+
* detection.
37+
* WARNING: the methods of this class are not thread-safe.
38+
*/
39+
public class ResourceChangeDetector implements ChangeDetector {
40+
41+
private ResourceChangeSnapshot _oldSnapshot; // snapshot for previous pipeline run
42+
private ResourceChangeSnapshot _newSnapshot; // snapshot for this pipeline run
43+
44+
// The following caches the computation results
45+
private Map<HelixConstants.ChangeType, Collection<String>> _changedItems = new HashMap<>();
46+
private Map<HelixConstants.ChangeType, Collection<String>> _addedItems = new HashMap<>();
47+
private Map<HelixConstants.ChangeType, Collection<String>> _removedItems = new HashMap<>();
48+
49+
public ResourceChangeDetector() {
50+
_newSnapshot = new ResourceChangeSnapshot();
51+
}
52+
53+
/**
54+
* Compare the underlying HelixProperty objects and produce a collection of names of changed
55+
* properties.
56+
* @return
57+
*/
58+
private Collection<String> getChangedItems(Map<String, ? extends HelixProperty> oldPropertyMap,
59+
Map<String, ? extends HelixProperty> newPropertyMap) {
60+
Collection<String> changedItems = new HashSet<>();
61+
oldPropertyMap.forEach((name, property) -> {
62+
if (newPropertyMap.containsKey(name)
63+
&& !property.getRecord().equals(newPropertyMap.get(name).getRecord())) {
64+
changedItems.add(name);
65+
}
66+
});
67+
return changedItems;
68+
}
69+
70+
/**
71+
* Return a collection of names that are newly added.
72+
* @return
73+
*/
74+
private Collection<String> getAddedItems(Map<String, ? extends HelixProperty> oldPropertyMap,
75+
Map<String, ? extends HelixProperty> newPropertyMap) {
76+
return Sets.difference(newPropertyMap.keySet(), oldPropertyMap.keySet());
77+
}
78+
79+
/**
80+
* Return a collection of names that were removed.
81+
* @return
82+
*/
83+
private Collection<String> getRemovedItems(Map<String, ? extends HelixProperty> oldPropertyMap,
84+
Map<String, ? extends HelixProperty> newPropertyMap) {
85+
return Sets.difference(oldPropertyMap.keySet(), newPropertyMap.keySet());
86+
}
87+
88+
private void clearCachedComputation() {
89+
_changedItems.clear();
90+
_addedItems.clear();
91+
_removedItems.clear();
92+
}
93+
94+
/**
95+
* Based on the change type given and propertyMap type, call the right getters for propertyMap.
96+
* @param changeType
97+
* @param snapshot
98+
* @return
99+
*/
100+
private Map<String, ? extends HelixProperty> determinePropertyMapByType(
101+
HelixConstants.ChangeType changeType, ResourceChangeSnapshot snapshot) {
102+
switch (changeType) {
103+
case INSTANCE_CONFIG:
104+
return snapshot.getInstanceConfigMap();
105+
case IDEAL_STATE:
106+
return snapshot.getIdealStateMap();
107+
case RESOURCE_CONFIG:
108+
return snapshot.getResourceConfigMap();
109+
case LIVE_INSTANCE:
110+
return snapshot.getLiveInstances();
111+
default:
112+
throw new HelixException(String.format(
113+
"ResourceChangeDetector cannot compute the names of changes for the given ChangeType: %s",
114+
changeType));
115+
}
116+
}
117+
118+
/**
119+
* Makes the current newSnapshot the oldSnapshot and reads in the up-to-date snapshot for change
120+
* computation. To be called in the controller pipeline.
121+
* @param dataProvider newly refreshed DataProvider (cache)
122+
*/
123+
public synchronized void updateSnapshots(ResourceControllerDataProvider dataProvider) {
124+
// If there are changes, update internal states
125+
_oldSnapshot = new ResourceChangeSnapshot(_newSnapshot);
126+
_newSnapshot = new ResourceChangeSnapshot(dataProvider);
127+
dataProvider.clearRefreshedChangeTypes();
128+
129+
// Invalidate cached computation
130+
clearCachedComputation();
131+
}
132+
133+
@Override
134+
public Collection<HelixConstants.ChangeType> getChangeTypes() {
135+
return Collections.unmodifiableSet(_newSnapshot.getChangedTypes());
136+
}
137+
138+
@Override
139+
public Collection<String> getChangesByType(HelixConstants.ChangeType changeType) {
140+
return _changedItems.computeIfAbsent(changeType,
141+
changedItems -> getChangedItems(determinePropertyMapByType(changeType, _oldSnapshot),
142+
determinePropertyMapByType(changeType, _newSnapshot)));
143+
}
144+
145+
@Override
146+
public Collection<String> getAdditionsByType(HelixConstants.ChangeType changeType) {
147+
return _addedItems.computeIfAbsent(changeType,
148+
changedItems -> getAddedItems(determinePropertyMapByType(changeType, _oldSnapshot),
149+
determinePropertyMapByType(changeType, _newSnapshot)));
150+
}
151+
152+
@Override
153+
public Collection<String> getRemovalsByType(HelixConstants.ChangeType changeType) {
154+
return _removedItems.computeIfAbsent(changeType,
155+
changedItems -> getRemovedItems(determinePropertyMapByType(changeType, _oldSnapshot),
156+
determinePropertyMapByType(changeType, _newSnapshot)));
157+
}
158+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package org.apache.helix.controller.changedetector;
2+
3+
/*
4+
* Licensed to the Apache Software Foundation (ASF) under one
5+
* or more contributor license agreements. See the NOTICE file
6+
* distributed with this work for additional information
7+
* regarding copyright ownership. The ASF licenses this file
8+
* to you under the Apache License, Version 2.0 (the
9+
* "License"); you may not use this file except in compliance
10+
* with the License. You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing,
15+
* software distributed under the License is distributed on an
16+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17+
* KIND, either express or implied. See the License for the
18+
* specific language governing permissions and limitations
19+
* under the License.
20+
*/
21+
22+
import java.util.HashMap;
23+
import java.util.HashSet;
24+
import java.util.Map;
25+
import java.util.Set;
26+
import org.apache.helix.HelixConstants;
27+
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
28+
import org.apache.helix.model.IdealState;
29+
import org.apache.helix.model.InstanceConfig;
30+
import org.apache.helix.model.LiveInstance;
31+
import org.apache.helix.model.ResourceConfig;
32+
33+
/**
34+
* ResourceChangeSnapshot is a POJO that contains the following Helix metadata:
35+
* 1. InstanceConfig
36+
* 2. IdealState
37+
* 3. ResourceConfig
38+
* 4. LiveInstance
39+
* 5. Changed property types
40+
* It serves as a snapshot of the main controller cache to enable the difference (change)
41+
* calculation between two rounds of the pipeline run.
42+
*/
43+
class ResourceChangeSnapshot {
44+
45+
private Set<HelixConstants.ChangeType> _changedTypes;
46+
private Map<String, InstanceConfig> _instanceConfigMap;
47+
private Map<String, IdealState> _idealStateMap;
48+
private Map<String, ResourceConfig> _resourceConfigMap;
49+
private Map<String, LiveInstance> _liveInstances;
50+
51+
/**
52+
* Default constructor that constructs an empty snapshot.
53+
*/
54+
ResourceChangeSnapshot() {
55+
_changedTypes = new HashSet<>();
56+
_instanceConfigMap = new HashMap<>();
57+
_idealStateMap = new HashMap<>();
58+
_resourceConfigMap = new HashMap<>();
59+
_liveInstances = new HashMap<>();
60+
}
61+
62+
/**
63+
* Constructor using controller cache (ResourceControllerDataProvider).
64+
* @param dataProvider
65+
*/
66+
ResourceChangeSnapshot(ResourceControllerDataProvider dataProvider) {
67+
_changedTypes = new HashSet<>(dataProvider.getRefreshedChangeTypes());
68+
_instanceConfigMap = new HashMap<>(dataProvider.getInstanceConfigMap());
69+
_idealStateMap = new HashMap<>(dataProvider.getIdealStates());
70+
_resourceConfigMap = new HashMap<>(dataProvider.getResourceConfigMap());
71+
_liveInstances = new HashMap<>(dataProvider.getLiveInstances());
72+
}
73+
74+
/**
75+
* Copy constructor for ResourceChangeCache.
76+
* @param cache
77+
*/
78+
ResourceChangeSnapshot(ResourceChangeSnapshot cache) {
79+
_changedTypes = new HashSet<>(cache._changedTypes);
80+
_instanceConfigMap = new HashMap<>(cache._instanceConfigMap);
81+
_idealStateMap = new HashMap<>(cache._idealStateMap);
82+
_resourceConfigMap = new HashMap<>(cache._resourceConfigMap);
83+
_liveInstances = new HashMap<>(cache._liveInstances);
84+
}
85+
86+
Set<HelixConstants.ChangeType> getChangedTypes() {
87+
return _changedTypes;
88+
}
89+
90+
Map<String, InstanceConfig> getInstanceConfigMap() {
91+
return _instanceConfigMap;
92+
}
93+
94+
Map<String, IdealState> getIdealStateMap() {
95+
return _idealStateMap;
96+
}
97+
98+
Map<String, ResourceConfig> getResourceConfigMap() {
99+
return _resourceConfigMap;
100+
}
101+
102+
Map<String, LiveInstance> getLiveInstances() {
103+
return _liveInstances;
104+
}
105+
}

helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Map;
2626
import java.util.Set;
2727

28+
import java.util.concurrent.ConcurrentHashMap;
2829
import org.apache.helix.HelixConstants;
2930
import org.apache.helix.HelixDataAccessor;
3031
import org.apache.helix.PropertyKey;
@@ -64,6 +65,9 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider {
6465
private Map<String, Map<String, MissingTopStateRecord>> _missingTopStateMap;
6566
private Map<String, Map<String, String>> _lastTopStateLocationMap;
6667

68+
// Maintain a set of all ChangeTypes for change detection
69+
private Set<HelixConstants.ChangeType> _refreshedChangeTypes;
70+
6771
public ResourceControllerDataProvider() {
6872
this(AbstractDataCache.UNKNOWN_CLUSTER);
6973
}
@@ -106,19 +110,21 @@ public String getObjName(ExternalView obj) {
106110
_idealMappingCache = new HashMap<>();
107111
_missingTopStateMap = new HashMap<>();
108112
_lastTopStateLocationMap = new HashMap<>();
113+
_refreshedChangeTypes = ConcurrentHashMap.newKeySet();
109114
}
110115

111116
public synchronized void refresh(HelixDataAccessor accessor) {
112117
long startTime = System.currentTimeMillis();
113118

114119
// Refresh base
115-
Set<HelixConstants.ChangeType> propertyRefreshed = super.doRefresh(accessor);
120+
Set<HelixConstants.ChangeType> changedTypes = super.doRefresh(accessor);
121+
_refreshedChangeTypes.addAll(changedTypes);
116122

117123
// Invalidate cached information if any of the important data has been refreshed
118-
if (propertyRefreshed.contains(HelixConstants.ChangeType.IDEAL_STATE)
119-
|| propertyRefreshed.contains(HelixConstants.ChangeType.LIVE_INSTANCE)
120-
|| propertyRefreshed.contains(HelixConstants.ChangeType.INSTANCE_CONFIG)
121-
|| propertyRefreshed.contains(HelixConstants.ChangeType.RESOURCE_CONFIG)) {
124+
if (changedTypes.contains(HelixConstants.ChangeType.IDEAL_STATE)
125+
|| changedTypes.contains(HelixConstants.ChangeType.LIVE_INSTANCE)
126+
|| changedTypes.contains(HelixConstants.ChangeType.INSTANCE_CONFIG)
127+
|| changedTypes.contains(HelixConstants.ChangeType.RESOURCE_CONFIG)) {
122128
clearCachedResourceAssignments();
123129
}
124130

@@ -261,6 +267,23 @@ public void setCachedIdealMapping(String resource, ZNRecord mapping) {
261267
_idealMappingCache.put(resource, mapping);
262268
}
263269

270+
/**
271+
* Return the set of all PropertyTypes that changed prior to this round of rebalance. The caller
272+
* should clear this set by calling {@link #clearRefreshedChangeTypes()}.
273+
* @return
274+
*/
275+
public Set<HelixConstants.ChangeType> getRefreshedChangeTypes() {
276+
return _refreshedChangeTypes;
277+
}
278+
279+
/**
280+
* Clears the set of all PropertyTypes that changed. The caller will have consumed all change
281+
* types by calling {@link #getRefreshedChangeTypes()}.
282+
*/
283+
public void clearRefreshedChangeTypes() {
284+
_refreshedChangeTypes.clear();
285+
}
286+
264287
public void clearCachedResourceAssignments() {
265288
_resourceAssignmentCache.clear();
266289
_idealMappingCache.clear();

0 commit comments

Comments
 (0)