Skip to content

Commit 7a29324

Browse files
committed
Create Gateway service channel factory (#2883)
This PR introduces a new configuration system for the Helix Gateway service, allowing for more flexible and customizable channel configurations. The main changes include: -Introduction of GatewayServiceChannelConfig class to manage various channel configurations. -Implementation of a factory pattern (HelixGatewayServiceChannelFactory) for creating appropriate service channels based on the configuration. -The PR also includes various improvements in error handling, logging, and code organization.
1 parent bf66bcf commit 7a29324

19 files changed

+527
-176
lines changed

helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,35 +19,32 @@
1919
* under the License.
2020
*/
2121

22-
import io.grpc.Server;
2322
import java.io.IOException;
24-
import java.util.concurrent.TimeUnit;
25-
import org.apache.helix.gateway.channel.HelixGatewayServiceGrpcService;
2623
import org.apache.helix.gateway.service.GatewayServiceManager;
27-
import org.apache.helix.gateway.util.HelixGatewayGrpcServerBuilder;
24+
import org.apache.helix.gateway.channel.GatewayServiceChannelConfig;
25+
26+
import static java.lang.Integer.*;
2827

2928

3029
/**
3130
* Main class for Helix Gateway.
3231
* It starts the Helix Gateway grpc service.
32+
* args0: zk address
33+
* args1: helix gateway groc server port
3334
*/
3435
public final class HelixGatewayMain {
3536

3637
private HelixGatewayMain() {
3738
}
3839

39-
public static void main(String[] args) throws InterruptedException, IOException {
40-
// Create a new server to listen on port 50051
41-
GatewayServiceManager manager = new GatewayServiceManager();
42-
Server server = new HelixGatewayGrpcServerBuilder().setPort(50051)
43-
.setGrpcService((HelixGatewayServiceGrpcService)manager.getHelixGatewayServiceProcessor())
44-
.build();
45-
46-
server.start();
47-
System.out.println("Server started, listening on " + server.getPort());
40+
public static void main(String[] args) throws IOException {
41+
// Create a new server
42+
GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder builder =
43+
new GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder();
44+
GatewayServiceManager manager =
45+
new GatewayServiceManager(args[0], builder.setGrpcServerPort(parseInt(args[1])).build());
4846

49-
// Wait for the server to shutdown
50-
server.awaitTermination(365, TimeUnit.DAYS);
47+
manager.startService();
5148
}
5249
}
5350

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package org.apache.helix.gateway.api.constant;
2+
3+
/*
4+
* Licensed to the Apache Software Foundation (ASF) under one
5+
* or more contributor license agreements. See the NOTICE file
6+
* distributed with this work for additional information
7+
* regarding copyright ownership. The ASF licenses this file
8+
* to you under the Apache License, Version 2.0 (the
9+
* "License"); you may not use this file except in compliance
10+
* with the License. You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing,
15+
* software distributed under the License is distributed on an
16+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17+
* KIND, either express or implied. See the License for the
18+
* specific language governing permissions and limitations
19+
* under the License.
20+
*/
21+
22+
public class GatewayServiceConfigConstant {
23+
public static final int DEFAULT_SERVER_HEARTBEAT_INTERVAL = 60;
24+
public static final int DEFAULT_AMX_ALLOWED_CLIENT_HEARTBEAT_INTERVAL = 60;
25+
public static final int DEFAULT_CLIENT_TIMEOUT = 5 * 60;
26+
public static final int DEFAULT_POLL_INTERVAL_SEC = 60;
27+
}

helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java renamed to helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/GatewayServiceEventType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package org.apache.helix.gateway.constant;
1+
package org.apache.helix.gateway.api.constant;
22

33
/*
44
* Licensed to the Apache Software Foundation (ASF) under one

helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
* under the License.
2020
*/
2121

22+
import java.io.IOException;
2223
import org.apache.helix.gateway.service.GatewayServiceEvent;
2324
import org.apache.helix.gateway.service.GatewayServiceManager;
2425
import org.apache.helix.model.Message;
@@ -54,6 +55,19 @@ default void pushClientEventToGatewayManager(GatewayServiceManager gatewayServic
5455
gatewayServiceManager.onGatewayServiceEvent(event);
5556
}
5657

58+
/**
59+
* Start the gateway service channel.
60+
*
61+
* @throws IOException if the channel cannot be started
62+
*/
63+
public void start() throws IOException;
64+
65+
/**
66+
* Stop the gateway service channel forcefully.
67+
*/
68+
public void stop();
69+
70+
5771
// TODO: remove the following 2 apis in future changes
5872
/**
5973
* Gateway service close connection with error. This function is called when manager wants to close client
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
package org.apache.helix.gateway.channel;
2+
3+
/*
4+
* Licensed to the Apache Software Foundation (ASF) under one
5+
* or more contributor license agreements. See the NOTICE file
6+
* distributed with this work for additional information
7+
* regarding copyright ownership. The ASF licenses this file
8+
* to you under the Apache License, Version 2.0 (the
9+
* "License"); you may not use this file except in compliance
10+
* with the License. You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing,
15+
* software distributed under the License is distributed on an
16+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17+
* KIND, either express or implied. See the License for the
18+
* specific language governing permissions and limitations
19+
* under the License.
20+
*/
21+
22+
23+
import static org.apache.helix.gateway.api.constant.GatewayServiceConfigConstant.*;
24+
25+
26+
public class GatewayServiceChannelConfig {
27+
// Mode to get helix participant information (inbound information). This included health check and shard state transition response
28+
// We do not support hybrid mode as of now, (i.e. have push mode for participant liveness detection and pull mode for shard state)
29+
public enum ChannelMode {
30+
PUSH_MODE, // The gateway service passively receives participant information
31+
POLL_MODE // The gateway service actively polls participant information
32+
}
33+
34+
// NOTE:
35+
// For outbound information - stateTransition request, Gateway service will always push the state transition message.
36+
// We do not support participant poll mode for stateTransition request as of now.
37+
38+
// channel type for the following 3 information - participant liveness detection, shard state transition request and response
39+
// By default, they are all grpc server, user could define them separately.
40+
public enum ChannelType {
41+
GRPC_SERVER,
42+
GRPC_CLIENT,
43+
FILE
44+
}
45+
46+
// service configs
47+
48+
// service mode for inbound information.
49+
private ChannelMode _channelMode;
50+
// channel type for participant liveness detection
51+
private ChannelType _participantConnectionChannelType;
52+
// channel for sending and receiving shard state transition request and shard state response
53+
private ChannelType _shardStateChannelType;
54+
55+
// grpc server configs
56+
private final int _grpcServerPort;
57+
private final int _serverHeartBeatInterval;
58+
private final int _maxAllowedClientHeartBeatInterval;
59+
private final int _clientTimeout;
60+
private final boolean _enableReflectionService;
61+
62+
// poll mode config
63+
private final int _pollIntervalSec;
64+
// TODO: configs for pull mode grpc client
65+
66+
// TODO: configs for pull mode with file
67+
68+
// getters
69+
70+
public ChannelMode getChannelMode() {
71+
return _channelMode;
72+
}
73+
public ChannelType getParticipantConnectionChannelType() {
74+
return _participantConnectionChannelType;
75+
}
76+
77+
public ChannelType getShardStateChannelType() {
78+
return _shardStateChannelType;
79+
}
80+
81+
public int getGrpcServerPort() {
82+
return _grpcServerPort;
83+
}
84+
85+
public int getServerHeartBeatInterval() {
86+
return _serverHeartBeatInterval;
87+
}
88+
89+
public int getMaxAllowedClientHeartBeatInterval() {
90+
return _maxAllowedClientHeartBeatInterval;
91+
}
92+
93+
public int getClientTimeout() {
94+
return _clientTimeout;
95+
}
96+
97+
public boolean getEnableReflectionService() {
98+
return _enableReflectionService;
99+
}
100+
101+
public int getPollIntervalSec() {
102+
return _pollIntervalSec;
103+
}
104+
105+
private GatewayServiceChannelConfig(int grpcServerPort, ChannelMode channelMode, ChannelType participantConnectionChannelType,
106+
ChannelType shardStateChannelType, int serverHeartBeatInterval, int maxAllowedClientHeartBeatInterval,
107+
int clientTimeout, boolean enableReflectionService, int pollIntervalSec) {
108+
_grpcServerPort = grpcServerPort;
109+
_channelMode = channelMode;
110+
_participantConnectionChannelType = participantConnectionChannelType;
111+
_shardStateChannelType = shardStateChannelType;
112+
_serverHeartBeatInterval = serverHeartBeatInterval;
113+
_maxAllowedClientHeartBeatInterval = maxAllowedClientHeartBeatInterval;
114+
_clientTimeout = clientTimeout;
115+
_enableReflectionService = enableReflectionService;
116+
_pollIntervalSec = pollIntervalSec;
117+
}
118+
119+
public static class GatewayServiceProcessorConfigBuilder {
120+
121+
// service configs
122+
private ChannelMode _channelMode = ChannelMode.PUSH_MODE;
123+
private ChannelType _participantConnectionChannelType = ChannelType.GRPC_SERVER;
124+
private ChannelType _shardStatenChannelType = ChannelType.GRPC_SERVER;
125+
126+
// grpc server configs
127+
private int _grpcServerPort;
128+
private int _serverHeartBeatInterval = DEFAULT_SERVER_HEARTBEAT_INTERVAL;
129+
private int _maxAllowedClientHeartBeatInterval = DEFAULT_AMX_ALLOWED_CLIENT_HEARTBEAT_INTERVAL;
130+
private int _clientTimeout = DEFAULT_CLIENT_TIMEOUT;
131+
private boolean _enableReflectionService = true;
132+
133+
// poll mode config
134+
private int _pollIntervalSec = DEFAULT_POLL_INTERVAL_SEC;
135+
// poll mode grpc client configs
136+
137+
// poll mode file configs
138+
139+
140+
public GatewayServiceProcessorConfigBuilder setChannelMode(ChannelMode channelMode) {
141+
_channelMode = channelMode;
142+
return this;
143+
}
144+
145+
public GatewayServiceProcessorConfigBuilder setParticipantConnectionChannelType(ChannelType channelMode) {
146+
_participantConnectionChannelType = channelMode;
147+
return this;
148+
}
149+
150+
public GatewayServiceProcessorConfigBuilder setShardStateProcessorType(ChannelType channelMode) {
151+
_shardStatenChannelType = channelMode;
152+
return this;
153+
}
154+
155+
public GatewayServiceProcessorConfigBuilder setGrpcServerPort(int grpcServerPort) {
156+
_grpcServerPort = grpcServerPort;
157+
return this;
158+
}
159+
160+
public GatewayServiceProcessorConfigBuilder setServerHeartBeatInterval(int serverHeartBeatInterval) {
161+
_serverHeartBeatInterval = serverHeartBeatInterval;
162+
return this;
163+
}
164+
165+
public GatewayServiceProcessorConfigBuilder setMaxAllowedClientHeartBeatInterval(
166+
int maxAllowedClientHeartBeatInterval) {
167+
_maxAllowedClientHeartBeatInterval = maxAllowedClientHeartBeatInterval;
168+
return this;
169+
}
170+
171+
public GatewayServiceProcessorConfigBuilder setClientTimeout(int clientTimeout) {
172+
_clientTimeout = clientTimeout;
173+
return this;
174+
}
175+
176+
public GatewayServiceProcessorConfigBuilder setEnableReflectionService(boolean enableReflectionService) {
177+
_enableReflectionService = enableReflectionService;
178+
return this;
179+
}
180+
181+
public GatewayServiceProcessorConfigBuilder setPollIntervalSec(int pollIntervalSec) {
182+
_pollIntervalSec = pollIntervalSec;
183+
return this;
184+
}
185+
186+
public void validate() {
187+
if ((_participantConnectionChannelType == ChannelType.GRPC_SERVER
188+
&& _shardStatenChannelType != ChannelType.GRPC_SERVER) || (
189+
_participantConnectionChannelType != ChannelType.GRPC_SERVER
190+
&& _shardStatenChannelType == ChannelType.GRPC_SERVER)) {
191+
throw new IllegalArgumentException(
192+
"In caas of GRPC server, Participant connection channel type and shard state channel type must be the same");
193+
}
194+
if (_participantConnectionChannelType == ChannelType.GRPC_SERVER && _grpcServerPort == 0) {
195+
throw new IllegalArgumentException("Grpc server port must be set for grpc server channel type");
196+
}
197+
}
198+
199+
public GatewayServiceChannelConfig build() {
200+
validate();
201+
return new GatewayServiceChannelConfig(_grpcServerPort, _channelMode, _participantConnectionChannelType,
202+
_shardStatenChannelType, _serverHeartBeatInterval, _maxAllowedClientHeartBeatInterval, _clientTimeout,
203+
_enableReflectionService, _pollIntervalSec);
204+
}
205+
}
206+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package org.apache.helix.gateway.channel;
2+
3+
/*
4+
* Licensed to the Apache Software Foundation (ASF) under one
5+
* or more contributor license agreements. See the NOTICE file
6+
* distributed with this work for additional information
7+
* regarding copyright ownership. The ASF licenses this file
8+
* to you under the Apache License, Version 2.0 (the
9+
* "License"); you may not use this file except in compliance
10+
* with the License. You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing,
15+
* software distributed under the License is distributed on an
16+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17+
* KIND, either express or implied. See the License for the
18+
* specific language governing permissions and limitations
19+
* under the License.
20+
*/
21+
22+
import org.apache.commons.lang3.NotImplementedException;
23+
import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
24+
import org.apache.helix.gateway.service.GatewayServiceManager;
25+
26+
27+
public class HelixGatewayServiceChannelFactory {
28+
29+
public static HelixGatewayServiceChannel createServiceChannel(GatewayServiceChannelConfig config,
30+
GatewayServiceManager manager) {
31+
32+
if (config.getChannelMode() == GatewayServiceChannelConfig.ChannelMode.PUSH_MODE) {
33+
if (config.getParticipantConnectionChannelType() == GatewayServiceChannelConfig.ChannelType.GRPC_SERVER) {
34+
return new HelixGatewayServiceGrpcService(manager, config);
35+
}
36+
} else {
37+
return new HelixGatewayServicePollModeChannel(config);
38+
}
39+
throw new IllegalArgumentException(
40+
"Unsupported channel mode and type combination: " + config.getChannelMode() + " , "
41+
+ config.getParticipantConnectionChannelType());
42+
}
43+
}

0 commit comments

Comments
 (0)