Skip to content

Commit c089044

Browse files
committed
connection
1 parent 68c2445 commit c089044

18 files changed

Lines changed: 414 additions & 148 deletions

helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,13 @@
1919
* under the License.
2020
*/
2121

22+
import io.grpc.Server;
23+
import java.io.IOException;
24+
import java.util.concurrent.TimeUnit;
25+
import org.apache.helix.gateway.service.GatewayServiceManager;
26+
import org.apache.helix.gateway.util.HelixGatewayGrpcServerBuilder;
27+
28+
2229
/**
2330
* Main class for Helix Gateway.
2431
* It starts the Helix Gateway grpc service.
@@ -28,7 +35,18 @@ public final class HelixGatewayMain {
2835
private HelixGatewayMain() {
2936
}
3037

31-
public static void main(String[] args) throws InterruptedException {
38+
public static void main(String[] args) throws InterruptedException, IOException {
39+
// Create a new server to listen on port 50051
40+
GatewayServiceManager manager = new GatewayServiceManager();
41+
Server server = new HelixGatewayGrpcServerBuilder().setPort(50051)
42+
.setGrpcService(manager.getGrpcService())
43+
.build();
3244

45+
server.start();
46+
System.out.println("Server started, listening on " + server.getPort());
47+
48+
// Wait for the server to shutdown
49+
server.awaitTermination(365, TimeUnit.DAYS);
3350
}
3451
}
52+
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package org.apache.helix.gateway.constant;
2+
3+
public class GatewayServiceGrpcDefaultConfig {
4+
public static final int DEFAULT_SERVER_HEARTBEAT_INTERVAL = 60;
5+
public static final int DEFAULT_AMX_ALLOWED_CLIENT_HEARTBEAT_INTERVAL = 60;
6+
public static final int DEFAULT_CLIENT_TIMEOUT = 5 * 60;
7+
}

helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
* under the License.
2020
*/
2121

22+
import io.grpc.Status;
2223
import io.grpc.stub.StreamObserver;
2324
import java.util.HashMap;
2425
import java.util.Map;
@@ -29,6 +30,8 @@
2930
import org.apache.helix.gateway.service.HelixGatewayServiceProcessor;
3031
import org.apache.helix.gateway.util.PerKeyLockRegistry;
3132
import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
3235
import proto.org.apache.helix.gateway.HelixGatewayServiceGrpc;
3336
import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardState;
3437
import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage;
@@ -40,6 +43,8 @@
4043
*/
4144
public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.HelixGatewayServiceImplBase
4245
implements HelixGatewayServiceProcessor {
46+
// create LOGGER
47+
private static final Logger logger = LoggerFactory.getLogger(HelixGatewayServiceGrpcService.class);
4348

4449
// Map to store the observer for each instance
4550
private final Map<String, StreamObserver<TransitionMessage>> _observerMap = new HashMap<>();
@@ -70,6 +75,7 @@ public StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterCla
7075

7176
@Override
7277
public void onNext(ShardStateMessage request) {
78+
logger.info("Receive message from instance: {}", request.toString());
7379
if (request.hasShardState()) {
7480
ShardState shardState = request.getShardState();
7581
updateObserver(shardState.getInstanceName(), shardState.getClusterName(), responseObserver);
@@ -79,11 +85,13 @@ public void onNext(ShardStateMessage request) {
7985

8086
@Override
8187
public void onError(Throwable t) {
88+
logger.info("Receive on error message: {}", t.getMessage());
8289
onClientClose(responseObserver);
8390
}
8491

8592
@Override
8693
public void onCompleted() {
94+
logger.info("Receive on complete message");
8795
onClientClose(responseObserver);
8896
}
8997
};
@@ -96,13 +104,46 @@ public void onCompleted() {
96104
* @return
97105
*/
98106
@Override
99-
public boolean sendStateTransitionMessage(String instanceName) {
107+
public void sendStateTransitionMessage(String instanceName) {
100108
StreamObserver<TransitionMessage> observer;
101109
observer = _observerMap.get(instanceName);
102110
if (observer != null) {
103111
observer.onNext(StateTransitionMessageTranslateUtil.translateSTMsgToTransitionMessage());
104112
}
105-
return true;
113+
}
114+
115+
/**
116+
* Close the connection of the instance. If closed because of error, use the error reason to close the connection.
117+
* @param instanceName instance name
118+
* @param errorReason error reason for close
119+
*/
120+
@Override
121+
public void closeConnectionWithError(String instanceName, String errorReason) {
122+
logger.info("Close connection for instance: {} with error reason: {}", instanceName, errorReason);
123+
closeConnectionHelper(instanceName, errorReason, true);
124+
}
125+
126+
/**
127+
* Complete the connection of the instance.
128+
* @param instanceName instance name
129+
*/
130+
@Override
131+
public void completeConnection(String instanceName) {
132+
logger.info("Complete connection for instance: {}", instanceName);
133+
closeConnectionHelper(instanceName, null, false);
134+
}
135+
136+
137+
private void closeConnectionHelper(String instanceName, String errorReason, boolean withError) {
138+
StreamObserver<TransitionMessage> observer;
139+
observer = _observerMap.get(instanceName);
140+
if (observer != null) {
141+
if (withError) {
142+
observer.onError(Status.UNAVAILABLE.withDescription(errorReason).asRuntimeException());
143+
} else {
144+
observer.onCompleted();
145+
}
146+
}
106147
}
107148

108149
private void updateObserver(String instanceName, String clusterName,
@@ -120,6 +161,7 @@ private void onClientClose(
120161
Pair<String, String> instanceInfo = _reversedObserverMap.get(responseObserver);
121162
clusterName = instanceInfo.getRight();
122163
instanceName = instanceInfo.getLeft();
164+
logger.info("Client close connection for instance: {}", instanceName);
123165

124166
if (instanceName == null || clusterName == null) {
125167
// TODO: log error;

helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public void newGatewayServiceEvent(GatewayServiceEvent event) {
7777
if (event.getEventType().equals(GatewayServiceEventType.UPDATE)) {
7878
_participantStateTransitionResultUpdator.submit(new shardStateUpdator(event));
7979
} else {
80-
_connectionEventProcessor.offerEvent(event.getInstanceName(), new participantConnectionProcessor(event));
80+
_connectionEventProcessor.offerEvent(event.getInstanceName(), new ParticipantConnectionProcessor(event));
8181
}
8282
}
8383

@@ -107,10 +107,10 @@ public void run() {
107107
* Create HelixGatewayService instance and register it to the manager.
108108
* It includes waiting for ZK connection, and also wait for previous LiveInstance to expire.
109109
*/
110-
class participantConnectionProcessor implements Runnable {
110+
class ParticipantConnectionProcessor implements Runnable {
111111
GatewayServiceEvent _event;
112112

113-
public participantConnectionProcessor(GatewayServiceEvent event) {
113+
public ParticipantConnectionProcessor(GatewayServiceEvent event) {
114114
_event = event;
115115
}
116116

@@ -129,7 +129,7 @@ public void run() {
129129
}
130130

131131
@VisibleForTesting
132-
HelixGatewayServiceGrpcService getGrpcService() {
132+
public HelixGatewayServiceGrpcService getGrpcService() {
133133
return _grpcService;
134134
}
135135

helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManagerFactory.java

Lines changed: 0 additions & 30 deletions
This file was deleted.

helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public GatewayServiceManager getClusterManager() {
5454
* It creates a HelixParticipantManager and connects to the Helix controller.
5555
*/
5656
public void registerParticipant() {
57+
5758
// TODO: create participant manager and add to _participantsMap
5859
HelixManager manager = new ZKHelixManager("clusterName", "instanceName", InstanceType.PARTICIPANT, _zkAddress);
5960
manager.getStateMachineEngine()

helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,24 @@
2424
*/
2525
public interface HelixGatewayServiceProcessor {
2626

27-
public boolean sendStateTransitionMessage( String instanceName);
27+
/**
28+
* Send state transition message to Helix Gateway Service.
29+
* @param instanceName instance name
30+
* @return
31+
*/
32+
public void sendStateTransitionMessage( String instanceName);
33+
34+
/**
35+
* Close connection with error.
36+
* @param instanceName instance name
37+
* @param reason reason for closing connection
38+
*/
39+
public void closeConnectionWithError(String instanceName, String reason);
40+
41+
/**
42+
* Close connection with success.
43+
* @param instanceName instance name
44+
*/
45+
public void completeConnection(String instanceName);
2846

2947
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package org.apache.helix.gateway.util;
2+
3+
/*
4+
* Licensed to the Apache Software Foundation (ASF) under one
5+
* or more contributor license agreements. See the NOTICE file
6+
* distributed with this work for additional information
7+
* regarding copyright ownership. The ASF licenses this file
8+
* to you under the Apache License, Version 2.0 (the
9+
* "License"); you may not use this file except in compliance
10+
* with the License. You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing,
15+
* software distributed under the License is distributed on an
16+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17+
* KIND, either express or implied. See the License for the
18+
* specific language governing permissions and limitations
19+
* under the License.
20+
*/
21+
22+
import io.grpc.BindableService;
23+
import io.grpc.Server;
24+
import io.grpc.ServerBuilder;
25+
import io.grpc.protobuf.services.ProtoReflectionService;
26+
import java.util.concurrent.TimeUnit;
27+
28+
import static org.apache.helix.gateway.constant.GatewayServiceGrpcDefaultConfig.*;
29+
30+
31+
/**
32+
* Builder class to create a Helix gateway service server with custom configurations.
33+
*/
34+
public class HelixGatewayGrpcServerBuilder {
35+
private int port;
36+
private BindableService service;
37+
private int serverHeartBeatInterval = DEFAULT_SERVER_HEARTBEAT_INTERVAL;
38+
private int maxAllowedClientHeartBeatInterval = DEFAULT_AMX_ALLOWED_CLIENT_HEARTBEAT_INTERVAL;
39+
private int clientTimeout = DEFAULT_CLIENT_TIMEOUT;
40+
private boolean enableReflectionService = true;
41+
42+
public HelixGatewayGrpcServerBuilder setPort(int port) {
43+
this.port = port;
44+
return this;
45+
}
46+
47+
public HelixGatewayGrpcServerBuilder setServerHeartBeatInterval(int serverHeartBeatInterval) {
48+
this.serverHeartBeatInterval = serverHeartBeatInterval;
49+
return this;
50+
}
51+
52+
public HelixGatewayGrpcServerBuilder setMaxAllowedClientHeartBeatInterval(int maxAllowedClientHeartBeatInterval) {
53+
this.maxAllowedClientHeartBeatInterval = maxAllowedClientHeartBeatInterval;
54+
return this;
55+
}
56+
57+
public HelixGatewayGrpcServerBuilder setClientTimeout(int clientTimeout) {
58+
this.clientTimeout = clientTimeout;
59+
return this;
60+
}
61+
62+
public HelixGatewayGrpcServerBuilder setGrpcService(BindableService service) {
63+
this.service = service;
64+
return this;
65+
}
66+
67+
public HelixGatewayGrpcServerBuilder enableReflectionService(boolean enableReflectionService) {
68+
this.enableReflectionService = enableReflectionService;
69+
return this;
70+
}
71+
72+
public Server build() {
73+
validate();
74+
75+
ServerBuilder serverBuilder = ServerBuilder.forPort(port)
76+
.addService(service)
77+
.keepAliveTime(serverHeartBeatInterval, TimeUnit.SECONDS) // HeartBeat time
78+
.keepAliveTimeout(clientTimeout, TimeUnit.SECONDS) // KeepAlive client timeout
79+
.permitKeepAliveTime(maxAllowedClientHeartBeatInterval, TimeUnit.SECONDS) // Permit min HeartBeat time
80+
.permitKeepAliveWithoutCalls(true); // Allow KeepAlive forever without active RPCs
81+
82+
if (enableReflectionService) {
83+
serverBuilder.addService(ProtoReflectionService.newInstance());
84+
}
85+
86+
return serverBuilder
87+
.build();
88+
}
89+
90+
private void validate() {
91+
if (port == 0 || service == null) {
92+
throw new IllegalArgumentException("Port and service must be set");
93+
}
94+
if (clientTimeout < maxAllowedClientHeartBeatInterval) {
95+
throw new IllegalArgumentException("Client timeout is less than max allowed client heartbeat interval");
96+
}
97+
}
98+
}
99+

helix-gateway/src/test/java/org/apache/helix/gateway/DummyTest.java

Lines changed: 0 additions & 21 deletions
This file was deleted.

helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyBlockingExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
import java.util.concurrent.CountDownLatch;
44
import java.util.concurrent.TimeUnit;
55
import org.apache.helix.gateway.util.PerKeyBlockingExecutor;
6-
import org.testng.annotations.Test;
76
import org.testng.Assert;
7+
import org.testng.annotations.Test;
88

99

1010
public class TestPerKeyBlockingExecutor {

0 commit comments

Comments
 (0)