Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.apache.helix.gateway;

/**
* Main class for Helix Gateway.
* It starts the Helix Gateway grpc service.
*/
public final class HelixGatewayMain {

private HelixGatewayMain() {
}

public static void main(String[] args) throws InterruptedException {

}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
package org.apache.helix.gateway.grpcservice;

import io.grpc.stub.StreamObserver;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.gateway.service.GatewayServiceManager;
import org.apache.helix.gateway.service.HelixGatewayServiceProcessor;
import proto.org.apache.helix.gateway.*;
import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.*;

public class HelixGatewayServiceService extends HelixGatewayServiceGrpc.HelixGatewayServiceImplBase {
import java.util.Map;


/**
* Helix Gateway Service GRPC UI implementation.
*/
public class HelixGatewayServiceService extends HelixGatewayServiceGrpc.HelixGatewayServiceImplBase
implements HelixGatewayServiceProcessor {

Map<String, StreamObserver<TransitionMessage>> _observerMap =
new ConcurrentHashMap<String, StreamObserver<TransitionMessage>>();

@Override
public StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage> report(
Expand All @@ -16,6 +29,12 @@ public StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterCla
public void onNext(ShardStateMessage request) {
// called when a client sends a message
//....
String instanceName = request.getInstanceName();
if (!_observerMap.containsValue(instanceName)) {
// update state map
updateObserver(instanceName, responseObserver);
}
// process the message
}

@Override
Expand All @@ -31,4 +50,18 @@ public void onCompleted() {
}
};
}

@Override
public boolean sendStateTransitionMessage(String instanceName) {
return false;
}

@Override
public void sendEventToManager(GatewayServiceManager.GateWayServiceEvent event) {

}

public void updateObserver(String instanceName, StreamObserver<TransitionMessage> streamObserver) {
_observerMap.put(instanceName, streamObserver);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.apache.helix.gateway.service;

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.helix.gateway.grpcservice.HelixGatewayServiceService;


/**
* A top layer class that send/receive messages from Grpc end point, and dispatch them to corrsponding gateway services.
* 1. get event from Grpc service
* 2. Maintain a gateway service registry, one gateway service maps to one Helix cluster
* 3. On init connect, create the participant manager
* 4. For ST reply message, update the tracker
*/

public class GatewayServiceManager {

HelixGatewayServiceService _helixGatewayServiceService;

HelixGatewayServiceProcessor _helixGatewayServiceProcessor;

Map<String, HelixGatewayService> _helixGatewayServiceMap;

// TODO: add thread pool for init
// single thread tp for update

public enum EventType {
CONNECT, // init connection to gateway service
UPDATE, // update state transition result
DISCONNECT // shutdown connection to gateway service.
}

public class GateWayServiceEvent {
// event type
EventType eventType;
// event data
String clusterName;
String participantName;

// todo: add more fields
}

public GatewayServiceManager() {
_helixGatewayServiceMap = new ConcurrentHashMap<>();
}

public AtomicBoolean sendTransitionRequestToApplicationInstance() {

return null;
}

public void updateShardState() {

}

public void newParticipantConnecting() {

}

public void participantDisconnected() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.apache.helix.gateway.service;

/**
* Factory class to create GatewayServiceManager
*/
public class GatewayServiceManagerFactory {

public GatewayServiceManager createGatewayServiceManager() {
return new GatewayServiceManager();
}
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,35 @@
package org.apache.helix.gateway.service;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.gateway.statemodel.HelixGatewayOnlineOfflineStateModelFactory;
import org.apache.helix.manager.zk.ZKHelixManager;


/**
* A service object for each Helix cluster.
* This service object manages the Helix participants in the cluster.
*/
public class HelixGatewayService {
final private Map<String, Map<String, HelixManager>> _participantsMap;

final private String _zkAddress;
private final ClusterManager _clusterManager;

public HelixGatewayService(String zkAddress) {
private final GatewayServiceManager _gatewayServiceManager;
private Map<String, Map<String, AtomicBoolean>> _flagMap;
public HelixGatewayService(GatewayServiceManager gatewayServiceManager, String zkAddress) {
_participantsMap = new ConcurrentHashMap<>();
_zkAddress = zkAddress;
_clusterManager = new ClusterManager();
_gatewayServiceManager = gatewayServiceManager;
_flagMap = new ConcurrentHashMap<>();
}

public ClusterManager getClusterManager() {
return _clusterManager;
public GatewayServiceManager getClusterManager() {
return _gatewayServiceManager;
}

public void start() {
Expand All @@ -32,9 +40,8 @@ public void registerParticipant() {
// TODO: create participant manager and add to _participantsMap
HelixManager manager = new ZKHelixManager("clusterName", "instanceName", InstanceType.PARTICIPANT, _zkAddress);
manager.getStateMachineEngine()
.registerStateModelFactory("OnlineOffline", new HelixGatewayOnlineOfflineStateModelFactory(_clusterManager));
.registerStateModelFactory("OnlineOffline", new HelixGatewayOnlineOfflineStateModelFactory(_gatewayServiceManager));
try {
_clusterManager.addChannel();
manager.connect();
} catch (Exception e) {
throw new RuntimeException(e);
Expand All @@ -45,11 +52,36 @@ public void deregisterParticipant(String clusterName, String participantName) {
HelixManager manager = _participantsMap.get(clusterName).remove(participantName);
if (manager != null) {
manager.disconnect();
_clusterManager.removeChannel(participantName);
removeChannel(participantName);
}
}

public void addChannel() {
// _flagMap.computeIfAbsent(mockApplication.getInstanceName(), k -> new ConcurrentHashMap<>());
}

public void removeChannel(String instanceName) {
_flagMap.remove(instanceName);
}

public AtomicBoolean sendMessage() {
AtomicBoolean flag = new AtomicBoolean(false);
return flag;
}

public void receiveSTResponse() {
// AtomicBoolean flag = _flagMap.get(instanceName).remove(response.getMessageId());
}

public void newParticipantConnecting(){

}

public void participantDisconnected(){

}

public void stop() {
System.out.println("Stoping Helix Gateway Service");
System.out.println("Stopping Helix Gateway Service");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.apache.helix.gateway.service;

/**
* Translate from/to GRPC function call to Helix Gateway Service event.
*/
public interface HelixGatewayServiceProcessor {

public boolean sendStateTransitionMessage(String instanceName);

public void sendEventToManager(GatewayServiceManager.GateWayServiceEvent event);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

public class ReplicaStateTracker {

boolean compareTargetState(){
boolean compareTargetState() {
return true;
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,47 @@

import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.helix.NotificationContext;
import org.apache.helix.gateway.service.ClusterManager;
import org.apache.helix.gateway.service.GatewayServiceManager;
import org.apache.helix.model.Message;
import org.apache.helix.participant.statemachine.StateModel;

public class HelixGatewayOnlineOfflineStateModel extends StateModel {
private boolean _firstTime = true;
private ClusterManager _clusterManager;
private GatewayServiceManager _gatewayServiceManager;

private String _resourceName;
private String _partitionKey;

private AtomicBoolean _completed;

public HelixGatewayOnlineOfflineStateModel(String resourceName, String partitionKey,
ClusterManager clusterManager) {
GatewayServiceManager gatewayServiceManager) {
_resourceName = resourceName;
_partitionKey = partitionKey;
_clusterManager = clusterManager;
_gatewayServiceManager = gatewayServiceManager;
}

public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
if (_firstTime) {
wait(_clusterManager.sendMessage());
wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance());
System.out.println(
"Message for " + message.getPartitionName() + " instance " + message.getTgtName() + " with ADD for "
+ message.getResourceName() + " processed");
_firstTime = false;
}
wait(_clusterManager.sendMessage());
wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance());
System.out.println("Message for " + message.getPartitionName() + " instance " + message.getTgtName()
+ " with CHANGE_ROLE_OFFLINE_ONLINE for " + message.getResourceName() + " processed");
}

public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
wait(_clusterManager.sendMessage());
wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance());
System.out.println("Message for " + message.getPartitionName() + " instance " + message.getTgtName()
+ " with CHANGE_ROLE_ONLINE_OFFLINE for " + message.getResourceName() + " processed");
}

public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
wait(_clusterManager.sendMessage());
wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance());
System.out.println(
"Message for " + message.getPartitionName() + " instance " + message.getTgtName() + " with REMOVE for "
+ message.getResourceName() + " processed");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package org.apache.helix.gateway.statemodel;

import org.apache.helix.gateway.service.ClusterManager;
import org.apache.helix.gateway.service.GatewayServiceManager;
import org.apache.helix.gateway.statemodel.HelixGatewayOnlineOfflineStateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;

public class HelixGatewayOnlineOfflineStateModelFactory extends StateModelFactory<HelixGatewayOnlineOfflineStateModel> {
private ClusterManager _clusterManager;
private GatewayServiceManager _clusterManager;

public HelixGatewayOnlineOfflineStateModelFactory(ClusterManager clusterManager) {
public HelixGatewayOnlineOfflineStateModelFactory(GatewayServiceManager clusterManager) {
_clusterManager = clusterManager;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.apache.helix.gateway.util;

import org.apache.helix.gateway.service.GatewayServiceManager;
import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage;
import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage;


public final class StateTransitionMessageTranslateUtil {

public static TransitionMessage translateSTMsgToProto() {
return null;
}

public static GatewayServiceManager.GateWayServiceEvent translateProtoToSTMsg(ShardStateMessage message) {
return null;
}
}
Loading