Skip to content

Commit bbb031a

Browse files
xyuanlujunkaixue
authored andcommitted
Gateway - Add GatewayCurrentStateCache for gateway service (#2895)
This pull request introduces a caching mechanism for the Helix Gateway service. The main changes include: Addition of a new GatewayCurrentStateCache class in GatewayCurrentStateCache.java, which manages caching of current and target states for instances in a cluster.
1 parent ad33675 commit bbb031a

3 files changed

Lines changed: 273 additions & 1 deletion

File tree

helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,18 @@
2222
import com.google.common.collect.ImmutableSet;
2323
import java.io.IOException;
2424
import java.util.Collections;
25+
import java.util.HashMap;
2526
import java.util.Map;
2627
import java.util.concurrent.ConcurrentHashMap;
2728
import java.util.concurrent.ExecutorService;
2829
import java.util.concurrent.Executors;
30+
import org.apache.helix.common.caches.CurrentStateCache;
2931
import org.apache.helix.gateway.api.constant.GatewayServiceEventType;
3032
import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
3133
import org.apache.helix.gateway.channel.GatewayServiceChannelConfig;
3234
import org.apache.helix.gateway.channel.HelixGatewayServiceChannelFactory;
3335
import org.apache.helix.gateway.participant.HelixGatewayParticipant;
36+
import org.apache.helix.gateway.util.GatewayCurrentStateCache;
3437
import org.apache.helix.gateway.util.PerKeyBlockingExecutor;
3538

3639

@@ -60,6 +63,8 @@ public class GatewayServiceManager {
6063

6164
private final GatewayServiceChannelConfig _gatewayServiceChannelConfig;
6265

66+
private final Map<String, GatewayCurrentStateCache> _currentStateCacheMap;
67+
6368
public GatewayServiceManager(String zkAddress, GatewayServiceChannelConfig gatewayServiceChannelConfig) {
6469
_helixGatewayParticipantMap = new ConcurrentHashMap<>();
6570
_zkAddress = zkAddress;
@@ -68,6 +73,7 @@ public GatewayServiceManager(String zkAddress, GatewayServiceChannelConfig gatew
6873
_connectionEventProcessor =
6974
new PerKeyBlockingExecutor(CONNECTION_EVENT_THREAD_POOL_SIZE); // todo: make it configurable
7075
_gatewayServiceChannelConfig = gatewayServiceChannelConfig;
76+
_currentStateCacheMap = new HashMap<>();
7177
}
7278

7379
/**
@@ -131,7 +137,6 @@ public void run() {
131137
}
132138
}
133139

134-
135140
public void startService() throws IOException {
136141
_gatewayServiceChannel.start();
137142
}
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
package org.apache.helix.gateway.util;
2+
3+
/*
4+
* Licensed to the Apache Software Foundation (ASF) under one
5+
* or more contributor license agreements. See the NOTICE file
6+
* distributed with this work for additional information
7+
* regarding copyright ownership. The ASF licenses this file
8+
* to you under the Apache License, Version 2.0 (the
9+
* "License"); you may not use this file except in compliance
10+
* with the License. You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing,
15+
* software distributed under the License is distributed on an
16+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17+
* KIND, either express or implied. See the License for the
18+
* specific language governing permissions and limitations
19+
* under the License.
20+
*/
21+
22+
import com.fasterxml.jackson.databind.ObjectMapper;
23+
import com.fasterxml.jackson.databind.node.ObjectNode;
24+
import java.util.HashMap;
25+
import java.util.Map;
26+
27+
28+
/**
29+
* A cache to store the current target assignment, and the reported current state of the instances in a cluster.
30+
*/
31+
public class GatewayCurrentStateCache {
32+
static ObjectMapper mapper = new ObjectMapper();
33+
String _clusterName;
34+
35+
// A cache of current state. It should be updated by the HelixGatewayServiceChannel
36+
// instance -> resource state (resource -> shard -> target state)
37+
Map<String, ShardStateMap> _currentStateMap;
38+
39+
// A cache of target state.
40+
// instance -> resource state (resource -> shard -> target state)
41+
Map<String, ShardStateMap> _targetStateMap;
42+
43+
public GatewayCurrentStateCache(String clusterName) {
44+
_clusterName = clusterName;
45+
_currentStateMap = new HashMap<>();
46+
_targetStateMap = new HashMap<>();
47+
}
48+
49+
public String getCurrentState(String instance, String resource, String shard) {
50+
return _currentStateMap.get(instance).getState(resource, shard);
51+
}
52+
53+
public String getTargetState(String instance, String resource, String shard) {
54+
return _targetStateMap.get(instance).getState(resource, shard);
55+
}
56+
57+
/**
58+
* Update the cached current state of instances in a cluster, and return the diff of the change.
59+
* @param newCurrentStateMap The new current state map of instances in the cluster
60+
* @return
61+
*/
62+
public Map<String, Map<String, Map<String, String>>> updateCacheWithNewCurrentStateAndGetDiff(
63+
Map<String, Map<String, Map<String, String>>> newCurrentStateMap) {
64+
Map<String, Map<String, Map<String, String>>> diff = new HashMap<>();
65+
for (String instance : newCurrentStateMap.keySet()) {
66+
Map<String, Map<String, String>> newCurrentState = newCurrentStateMap.get(instance);
67+
diff.put(instance, _currentStateMap.computeIfAbsent(instance, k -> new ShardStateMap(new HashMap<>()))
68+
.updateAndGetDiff(newCurrentState));
69+
}
70+
return diff;
71+
}
72+
73+
/**
74+
* Update the cache with the current state diff.
75+
* All existing target states remains the same
76+
* @param currentStateDiff
77+
*/
78+
public void updateCacheWithCurrentStateDiff(Map<String, Map<String, Map<String, String>>> currentStateDiff) {
79+
for (String instance : currentStateDiff.keySet()) {
80+
Map<String, Map<String, String>> currentStateDiffMap = currentStateDiff.get(instance);
81+
updateShardStateMapWithDiff(_currentStateMap, instance, currentStateDiffMap);
82+
}
83+
}
84+
85+
/**
86+
* Update the target state with the changed target state maps.
87+
* All existing target states remains the same
88+
* @param targetStateChangeMap
89+
*/
90+
public void updateTargetStateWithDiff(String instance, Map<String, Map<String, String>> targetStateChangeMap) {
91+
updateShardStateMapWithDiff(_targetStateMap, instance, targetStateChangeMap);
92+
}
93+
94+
/**
95+
* Serialize the target state assignments to a JSON Node.
96+
* example : {"instance1":{"resource1":{"shard1":"ONLINE","shard2":"OFFLINE"}}}}
97+
*/
98+
public ObjectNode serializeTargetAssignmentsToJSON() {
99+
ObjectNode root = mapper.createObjectNode();
100+
for (Map.Entry<String, ShardStateMap> entry : _targetStateMap.entrySet()) {
101+
root.set(entry.getKey(), entry.getValue().toJSONNode());
102+
}
103+
return root;
104+
}
105+
106+
private void updateShardStateMapWithDiff(Map<String, ShardStateMap> stateMap, String instance,
107+
Map<String, Map<String, String>> diffMap) {
108+
if (diffMap == null || diffMap.isEmpty()) {
109+
return;
110+
}
111+
stateMap.computeIfAbsent(instance, k -> new ShardStateMap(new HashMap<>())).updateWithDiff(diffMap);
112+
}
113+
114+
public static class ShardStateMap {
115+
Map<String, Map<String, String>> _stateMap;
116+
117+
public ShardStateMap(Map<String, Map<String, String>> stateMap) {
118+
_stateMap = stateMap;
119+
}
120+
121+
public String getState(String instance, String shard) {
122+
return _stateMap.get(instance).get(shard);
123+
}
124+
125+
private Map<String, Map<String, String>> getShardStateMap() {
126+
return _stateMap;
127+
}
128+
129+
private void updateWithDiff(Map<String, Map<String, String>> diffMap) {
130+
for (Map.Entry<String, Map<String, String>> diffEntry : diffMap.entrySet()) {
131+
String resource = diffEntry.getKey();
132+
Map<String, String> diffCurrentState = diffEntry.getValue();
133+
if (_stateMap.get(resource) != null) {
134+
_stateMap.get(resource).entrySet().forEach(currentMapEntry -> {
135+
String shard = currentMapEntry.getKey();
136+
if (diffCurrentState.get(shard) != null) {
137+
currentMapEntry.setValue(diffCurrentState.get(shard));
138+
}
139+
});
140+
} else {
141+
_stateMap.put(resource, diffCurrentState);
142+
}
143+
}
144+
}
145+
146+
private Map<String, Map<String, String>> updateAndGetDiff(Map<String, Map<String, String>> newCurrentStateMap) {
147+
Map<String, Map<String, String>> diff = new HashMap<>();
148+
for (Map.Entry<String, Map<String, String>> entry : newCurrentStateMap.entrySet()) {
149+
String resource = entry.getKey();
150+
Map<String, String> newCurrentState = entry.getValue();
151+
Map<String, String> oldCurrentState = _stateMap.get(resource);
152+
if (oldCurrentState == null) {
153+
diff.put(resource, newCurrentState);
154+
continue;
155+
}
156+
if (!oldCurrentState.equals(newCurrentState)) {
157+
for (String shard : newCurrentState.keySet()) {
158+
if (oldCurrentState.get(shard) == null || !oldCurrentState.get(shard).equals(newCurrentState.get(shard))) {
159+
diff.computeIfAbsent(resource, k -> new HashMap<>()).put(shard, newCurrentState.get(shard));
160+
}
161+
}
162+
}
163+
}
164+
_stateMap = newCurrentStateMap;
165+
return diff;
166+
}
167+
168+
/**
169+
* Serialize the shard state map to a JSON object.
170+
* @return a JSON object representing the shard state map. Example: {"shard1":"ONLINE","shard2":"OFFLINE"}
171+
*/
172+
public ObjectNode toJSONNode() {
173+
ObjectNode root = mapper.createObjectNode();
174+
for (Map.Entry<String, Map<String, String>> entry : _stateMap.entrySet()) {
175+
String resource = entry.getKey();
176+
ObjectNode resourceNode = mapper.createObjectNode();
177+
for (Map.Entry<String, String> shardEntry : entry.getValue().entrySet()) {
178+
resourceNode.put(shardEntry.getKey(), shardEntry.getValue());
179+
}
180+
root.set(resource, resourceNode);
181+
}
182+
return root;
183+
}
184+
}
185+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package org.apache.helix.gateway.utils;
2+
3+
/*
4+
* Licensed to the Apache Software Foundation (ASF) under one
5+
* or more contributor license agreements. See the NOTICE file
6+
* distributed with this work for additional information
7+
* regarding copyright ownership. The ASF licenses this file
8+
* to you under the Apache License, Version 2.0 (the
9+
* "License"); you may not use this file except in compliance
10+
* with the License. You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing,
15+
* software distributed under the License is distributed on an
16+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17+
* KIND, either express or implied. See the License for the
18+
* specific language governing permissions and limitations
19+
* under the License.
20+
*/
21+
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
import org.apache.helix.gateway.util.GatewayCurrentStateCache;
25+
import org.testng.Assert;
26+
import org.testng.annotations.BeforeMethod;
27+
import org.testng.annotations.Test;
28+
29+
30+
public class TestGatewayCurrentStateCache {
31+
private GatewayCurrentStateCache cache;
32+
33+
@BeforeMethod
34+
public void setUp() {
35+
cache = new GatewayCurrentStateCache("TestCluster");
36+
}
37+
38+
@Test
39+
public void testUpdateCacheWithNewCurrentStateAndGetDiff() {
40+
Map<String, Map<String, Map<String, String>>> newState = new HashMap<>();
41+
Map<String, Map<String, String>> instanceState = new HashMap<>();
42+
Map<String, String> shardState = new HashMap<>();
43+
shardState.put("shard1", "ONLINE");
44+
instanceState.put("resource1", shardState);
45+
newState.put("instance1", instanceState);
46+
47+
Map<String, Map<String, Map<String, String>>> diff = cache.updateCacheWithNewCurrentStateAndGetDiff(newState);
48+
49+
Assert.assertNotNull(diff);
50+
Assert.assertEquals(diff.size(), 1);
51+
Assert.assertEquals(diff.get("instance1").get("resource1").get("shard1"), "ONLINE");
52+
}
53+
54+
@Test
55+
public void testUpdateCacheWithCurrentStateDiff() {
56+
Map<String, Map<String, Map<String, String>>> diff = new HashMap<>();
57+
Map<String, Map<String, String>> instanceState = new HashMap<>();
58+
Map<String, String> shardState = new HashMap<>();
59+
shardState.put("shard2", "ONLINE");
60+
shardState.put("shard1", "ONLINE");
61+
instanceState.put("resource1", shardState);
62+
diff.put("instance1", instanceState);
63+
64+
cache.updateCacheWithCurrentStateDiff(diff);
65+
66+
Assert.assertEquals(cache.getCurrentState("instance1", "resource1", "shard1"), "ONLINE");
67+
Assert.assertEquals(cache.getCurrentState("instance1", "resource1", "shard2"), "ONLINE");
68+
}
69+
70+
@Test
71+
public void testUpdateTargetStateWithDiff() {
72+
Map<String, Map<String, String>> targetStateChange = new HashMap<>();
73+
Map<String, String> shardState = new HashMap<>();
74+
shardState.put("shard1", "OFFLINE");
75+
targetStateChange.put("resource1", shardState);
76+
77+
cache.updateTargetStateWithDiff("instance1", targetStateChange);
78+
79+
Assert.assertEquals(cache.getTargetState("instance1", "resource1", "shard1"), "OFFLINE");
80+
Assert.assertEquals(cache.serializeTargetAssignmentsToJSON().toString(), "{\"instance1\":{\"resource1\":{\"shard1\":\"OFFLINE\"}}}");
81+
}
82+
}

0 commit comments

Comments
 (0)