Skip to content

Commit 2d619df

Browse files
xyuanlujunkaixue
authored andcommitted
Gateway service - service structure dummy class (#2840)
Gateway service - service structure dummy class
1 parent 2b44e87 commit 2d619df

13 files changed

Lines changed: 216 additions & 69 deletions
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package org.apache.helix.gateway;
2+
3+
/**
4+
* Main class for Helix Gateway.
5+
* It starts the Helix Gateway grpc service.
6+
*/
7+
public final class HelixGatewayMain {
8+
9+
private HelixGatewayMain() {
10+
}
11+
12+
public static void main(String[] args) throws InterruptedException {
13+
14+
}
15+
}

helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,23 @@
11
package org.apache.helix.gateway.grpcservice;
22

33
import io.grpc.stub.StreamObserver;
4+
import java.util.concurrent.ConcurrentHashMap;
5+
import org.apache.helix.gateway.service.GatewayServiceManager;
6+
import org.apache.helix.gateway.service.HelixGatewayServiceProcessor;
47
import proto.org.apache.helix.gateway.*;
58
import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.*;
69

7-
public class HelixGatewayServiceService extends HelixGatewayServiceGrpc.HelixGatewayServiceImplBase {
10+
import java.util.Map;
11+
12+
13+
/**
14+
* Helix Gateway Service GRPC UI implementation.
15+
*/
16+
public class HelixGatewayServiceService extends HelixGatewayServiceGrpc.HelixGatewayServiceImplBase
17+
implements HelixGatewayServiceProcessor {
18+
19+
Map<String, StreamObserver<TransitionMessage>> _observerMap =
20+
new ConcurrentHashMap<String, StreamObserver<TransitionMessage>>();
821

922
@Override
1023
public StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage> report(
@@ -16,6 +29,12 @@ public StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterCla
1629
public void onNext(ShardStateMessage request) {
1730
// called when a client sends a message
1831
//....
32+
String instanceName = request.getInstanceName();
33+
if (!_observerMap.containsValue(instanceName)) {
34+
// update state map
35+
updateObserver(instanceName, responseObserver);
36+
}
37+
// process the message
1938
}
2039

2140
@Override
@@ -31,4 +50,18 @@ public void onCompleted() {
3150
}
3251
};
3352
}
53+
54+
@Override
55+
public boolean sendStateTransitionMessage(String instanceName) {
56+
return false;
57+
}
58+
59+
@Override
60+
public void sendEventToManager(GatewayServiceManager.GateWayServiceEvent event) {
61+
62+
}
63+
64+
public void updateObserver(String instanceName, StreamObserver<TransitionMessage> streamObserver) {
65+
_observerMap.put(instanceName, streamObserver);
66+
}
3467
}

helix-gateway/src/main/java/org/apache/helix/gateway/service/ClusterManager.java

Lines changed: 0 additions & 33 deletions
This file was deleted.
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package org.apache.helix.gateway.service;
2+
3+
import java.util.Map;
4+
5+
import java.util.concurrent.ConcurrentHashMap;
6+
import java.util.concurrent.atomic.AtomicBoolean;
7+
import org.apache.helix.gateway.grpcservice.HelixGatewayServiceService;
8+
9+
10+
/**
11+
* A top layer class that send/receive messages from Grpc end point, and dispatch them to corrsponding gateway services.
12+
* 1. get event from Grpc service
13+
* 2. Maintain a gateway service registry, one gateway service maps to one Helix cluster
14+
* 3. On init connect, create the participant manager
15+
* 4. For ST reply message, update the tracker
16+
*/
17+
18+
public class GatewayServiceManager {
19+
20+
HelixGatewayServiceService _helixGatewayServiceService;
21+
22+
HelixGatewayServiceProcessor _helixGatewayServiceProcessor;
23+
24+
Map<String, HelixGatewayService> _helixGatewayServiceMap;
25+
26+
// TODO: add thread pool for init
27+
// single thread tp for update
28+
29+
public enum EventType {
30+
CONNECT, // init connection to gateway service
31+
UPDATE, // update state transition result
32+
DISCONNECT // shutdown connection to gateway service.
33+
}
34+
35+
public class GateWayServiceEvent {
36+
// event type
37+
EventType eventType;
38+
// event data
39+
String clusterName;
40+
String participantName;
41+
42+
// todo: add more fields
43+
}
44+
45+
public GatewayServiceManager() {
46+
_helixGatewayServiceMap = new ConcurrentHashMap<>();
47+
}
48+
49+
public AtomicBoolean sendTransitionRequestToApplicationInstance() {
50+
51+
return null;
52+
}
53+
54+
public void updateShardState() {
55+
56+
}
57+
58+
public void newParticipantConnecting() {
59+
60+
}
61+
62+
public void participantDisconnected() {
63+
64+
}
65+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package org.apache.helix.gateway.service;
2+
3+
/**
4+
* Factory class to create GatewayServiceManager
5+
*/
6+
public class GatewayServiceManagerFactory {
7+
8+
public GatewayServiceManager createGatewayServiceManager() {
9+
return new GatewayServiceManager();
10+
}
11+
}
Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,35 @@
11
package org.apache.helix.gateway.service;
22

3+
import java.util.List;
34
import java.util.Map;
45
import java.util.concurrent.ConcurrentHashMap;
6+
7+
import java.util.concurrent.atomic.AtomicBoolean;
58
import org.apache.helix.HelixManager;
69
import org.apache.helix.InstanceType;
710
import org.apache.helix.gateway.statemodel.HelixGatewayOnlineOfflineStateModelFactory;
811
import org.apache.helix.manager.zk.ZKHelixManager;
912

1013

14+
/**
15+
* A service object for each Helix cluster.
16+
* This service object manages the Helix participants in the cluster.
17+
*/
1118
public class HelixGatewayService {
1219
final private Map<String, Map<String, HelixManager>> _participantsMap;
1320

1421
final private String _zkAddress;
15-
private final ClusterManager _clusterManager;
16-
17-
public HelixGatewayService(String zkAddress) {
22+
private final GatewayServiceManager _gatewayServiceManager;
23+
private Map<String, Map<String, AtomicBoolean>> _flagMap;
24+
public HelixGatewayService(GatewayServiceManager gatewayServiceManager, String zkAddress) {
1825
_participantsMap = new ConcurrentHashMap<>();
1926
_zkAddress = zkAddress;
20-
_clusterManager = new ClusterManager();
27+
_gatewayServiceManager = gatewayServiceManager;
28+
_flagMap = new ConcurrentHashMap<>();
2129
}
2230

23-
public ClusterManager getClusterManager() {
24-
return _clusterManager;
31+
public GatewayServiceManager getClusterManager() {
32+
return _gatewayServiceManager;
2533
}
2634

2735
public void start() {
@@ -32,9 +40,8 @@ public void registerParticipant() {
3240
// TODO: create participant manager and add to _participantsMap
3341
HelixManager manager = new ZKHelixManager("clusterName", "instanceName", InstanceType.PARTICIPANT, _zkAddress);
3442
manager.getStateMachineEngine()
35-
.registerStateModelFactory("OnlineOffline", new HelixGatewayOnlineOfflineStateModelFactory(_clusterManager));
43+
.registerStateModelFactory("OnlineOffline", new HelixGatewayOnlineOfflineStateModelFactory(_gatewayServiceManager));
3644
try {
37-
_clusterManager.addChannel();
3845
manager.connect();
3946
} catch (Exception e) {
4047
throw new RuntimeException(e);
@@ -45,11 +52,36 @@ public void deregisterParticipant(String clusterName, String participantName) {
4552
HelixManager manager = _participantsMap.get(clusterName).remove(participantName);
4653
if (manager != null) {
4754
manager.disconnect();
48-
_clusterManager.removeChannel(participantName);
55+
removeChannel(participantName);
4956
}
5057
}
5158

59+
public void addChannel() {
60+
// _flagMap.computeIfAbsent(mockApplication.getInstanceName(), k -> new ConcurrentHashMap<>());
61+
}
62+
63+
public void removeChannel(String instanceName) {
64+
_flagMap.remove(instanceName);
65+
}
66+
67+
public AtomicBoolean sendMessage() {
68+
AtomicBoolean flag = new AtomicBoolean(false);
69+
return flag;
70+
}
71+
72+
public void receiveSTResponse() {
73+
// AtomicBoolean flag = _flagMap.get(instanceName).remove(response.getMessageId());
74+
}
75+
76+
public void newParticipantConnecting(){
77+
78+
}
79+
80+
public void participantDisconnected(){
81+
82+
}
83+
5284
public void stop() {
53-
System.out.println("Stoping Helix Gateway Service");
85+
System.out.println("Stopping Helix Gateway Service");
5486
}
5587
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package org.apache.helix.gateway.service;
2+
3+
/**
4+
* Translate from/to GRPC function call to Helix Gateway Service event.
5+
*/
6+
public interface HelixGatewayServiceProcessor {
7+
8+
public boolean sendStateTransitionMessage(String instanceName);
9+
10+
public void sendEventToManager(GatewayServiceManager.GateWayServiceEvent event);
11+
}

helix-gateway/src/main/java/org/apache/helix/gateway/service/ReplicaStateTracker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
public class ReplicaStateTracker {
44

5-
boolean compareTargetState(){
5+
boolean compareTargetState() {
66
return true;
77
}
88

helix-gateway/src/main/java/org/apache/helix/gateway/service/StateTransitionMessageTranslator.java

Lines changed: 0 additions & 4 deletions
This file was deleted.

helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,47 +2,47 @@
22

33
import java.util.concurrent.atomic.AtomicBoolean;
44
import org.apache.helix.NotificationContext;
5-
import org.apache.helix.gateway.service.ClusterManager;
5+
import org.apache.helix.gateway.service.GatewayServiceManager;
66
import org.apache.helix.model.Message;
77
import org.apache.helix.participant.statemachine.StateModel;
88

99
public class HelixGatewayOnlineOfflineStateModel extends StateModel {
1010
private boolean _firstTime = true;
11-
private ClusterManager _clusterManager;
11+
private GatewayServiceManager _gatewayServiceManager;
1212

1313
private String _resourceName;
1414
private String _partitionKey;
1515

1616
private AtomicBoolean _completed;
1717

1818
public HelixGatewayOnlineOfflineStateModel(String resourceName, String partitionKey,
19-
ClusterManager clusterManager) {
19+
GatewayServiceManager gatewayServiceManager) {
2020
_resourceName = resourceName;
2121
_partitionKey = partitionKey;
22-
_clusterManager = clusterManager;
22+
_gatewayServiceManager = gatewayServiceManager;
2323
}
2424

2525
public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
2626
if (_firstTime) {
27-
wait(_clusterManager.sendMessage());
27+
wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance());
2828
System.out.println(
2929
"Message for " + message.getPartitionName() + " instance " + message.getTgtName() + " with ADD for "
3030
+ message.getResourceName() + " processed");
3131
_firstTime = false;
3232
}
33-
wait(_clusterManager.sendMessage());
33+
wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance());
3434
System.out.println("Message for " + message.getPartitionName() + " instance " + message.getTgtName()
3535
+ " with CHANGE_ROLE_OFFLINE_ONLINE for " + message.getResourceName() + " processed");
3636
}
3737

3838
public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
39-
wait(_clusterManager.sendMessage());
39+
wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance());
4040
System.out.println("Message for " + message.getPartitionName() + " instance " + message.getTgtName()
4141
+ " with CHANGE_ROLE_ONLINE_OFFLINE for " + message.getResourceName() + " processed");
4242
}
4343

4444
public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
45-
wait(_clusterManager.sendMessage());
45+
wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance());
4646
System.out.println(
4747
"Message for " + message.getPartitionName() + " instance " + message.getTgtName() + " with REMOVE for "
4848
+ message.getResourceName() + " processed");

0 commit comments

Comments
 (0)