Skip to content

Commit 5bd9fb6

Browse files
committed
de-interfac-ify Subscriber
1 parent 19acb03 commit 5bd9fb6

File tree

3 files changed

+291
-354
lines changed

3 files changed

+291
-354
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java

Lines changed: 289 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,35 @@
1717
package com.google.cloud.pubsub.spi.v1;
1818

1919
import com.google.api.gax.bundling.FlowController;
20+
import com.google.api.stats.Distribution;
2021
import com.google.auth.Credentials;
2122
import com.google.auth.oauth2.GoogleCredentials;
2223
import com.google.cloud.Clock;
2324
import com.google.cloud.pubsub.spi.v1.MessageReceiver.AckReply;
25+
import com.google.common.annotations.VisibleForTesting;
2426
import com.google.common.base.Optional;
2527
import com.google.common.base.Preconditions;
28+
import com.google.common.primitives.Ints;
29+
import com.google.common.util.concurrent.AbstractService;
2630
import com.google.common.util.concurrent.Service;
31+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
2732
import io.grpc.ManagedChannelBuilder;
33+
import io.grpc.Status;
34+
import io.grpc.StatusRuntimeException;
35+
import io.grpc.netty.GrpcSslContexts;
36+
import io.grpc.netty.NegotiationType;
37+
import io.grpc.netty.NettyChannelBuilder;
2838
import java.io.IOException;
39+
import java.util.ArrayList;
40+
import java.util.List;
41+
import java.util.concurrent.CountDownLatch;
42+
import java.util.concurrent.Executors;
2943
import java.util.concurrent.ScheduledExecutorService;
44+
import java.util.concurrent.ScheduledFuture;
45+
import java.util.concurrent.TimeUnit;
3046
import org.joda.time.Duration;
47+
import org.slf4j.Logger;
48+
import org.slf4j.LoggerFactory;
3149

3250
/**
3351
* A Cloud Pub/Sub <a href="https://cloud.google.com/pubsub/docs/subscriber">subscriber</a> that is
@@ -85,33 +103,278 @@
85103
* subscriber.stopAsync();
86104
* </code></pre>
87105
*/
88-
public interface Subscriber extends Service {
89-
/** Retrieves a snapshot of the current subscriber statistics. */
90-
SubscriberStats getStats();
91-
92-
/** Subscription for which the subscriber is streaming messages. */
93-
String getSubscription();
94-
95-
/**
96-
* Time before a message is to expire when the subscriber is going to attempt to renew its ack
97-
* deadline.
98-
*/
99-
Duration getAckExpirationPadding();
100-
101-
/**
102-
* Maximum number of outstanding messages before limits are enforced.
103-
*
104-
* <p><b>When limits are enforced, no more messages will be dispatched to the {@link
105-
* MessageReceiver} but due to the gRPC and HTTP/2 buffering and congestion control window
106-
* management, still some extra bytes could be kept at lower layers.</b>
107-
*/
108-
Optional<Integer> getMaxOutstandingElementCount();
109-
110-
/** Maximum number of outstanding bytes before limits are enforced. */
111-
Optional<Integer> getMaxOutstandingRequestBytes();
106+
public class Subscriber extends AbstractService {
107+
private static final int THREADS_PER_CHANNEL = 5;
108+
@VisibleForTesting static final int CHANNELS_PER_CORE = 10;
109+
private static final int MAX_INBOUND_MESSAGE_SIZE =
110+
20 * 1024 * 1024; // 20MB API maximum message size.
111+
private static final int INITIAL_ACK_DEADLINE_SECONDS = 10;
112+
private static final int MAX_ACK_DEADLINE_SECONDS = 600;
113+
private static final int MIN_ACK_DEADLINE_SECONDS = 10;
114+
private static final Duration ACK_DEADLINE_UPDATE_PERIOD = Duration.standardMinutes(1);
115+
private static final double PERCENTILE_FOR_ACK_DEADLINE_UPDATES = 99.9;
116+
117+
private static final Logger logger = LoggerFactory.getLogger(Subscriber.class);
118+
119+
private final String subscription;
120+
private final FlowController.Settings flowControlSettings;
121+
private final Duration ackExpirationPadding;
122+
private final ScheduledExecutorService executor;
123+
private final Distribution ackLatencyDistribution =
124+
new Distribution(MAX_ACK_DEADLINE_SECONDS + 1);
125+
private final int numChannels;
126+
private final FlowController flowController;
127+
private final ManagedChannelBuilder<? extends ManagedChannelBuilder<?>> channelBuilder;
128+
private final Credentials credentials;
129+
private final MessageReceiver receiver;
130+
private final List<StreamingSubscriberConnection> streamingSubscriberConnections;
131+
private final List<PollingSubscriberConnection> pollingSubscriberConnections;
132+
private final Clock clock;
133+
private ScheduledFuture<?> ackDeadlineUpdater;
134+
private int streamAckDeadlineSeconds;
135+
136+
public Subscriber(Builder builder) throws IOException {
137+
receiver = builder.receiver;
138+
flowControlSettings = builder.flowControlSettings;
139+
subscription = builder.subscription;
140+
ackExpirationPadding = builder.ackExpirationPadding;
141+
streamAckDeadlineSeconds =
142+
Math.max(
143+
INITIAL_ACK_DEADLINE_SECONDS,
144+
Ints.saturatedCast(ackExpirationPadding.getStandardSeconds()));
145+
clock = builder.clock.isPresent() ? builder.clock.get() : Clock.defaultClock();
146+
147+
flowController = new FlowController(builder.flowControlSettings, false);
148+
149+
numChannels = Math.max(1, Runtime.getRuntime().availableProcessors()) * CHANNELS_PER_CORE;
150+
executor =
151+
builder.executor.isPresent()
152+
? builder.executor.get()
153+
: Executors.newScheduledThreadPool(
154+
numChannels * THREADS_PER_CHANNEL,
155+
new ThreadFactoryBuilder()
156+
.setDaemon(true)
157+
.setNameFormat("cloud-pubsub-subscriber-thread-%d")
158+
.build());
159+
160+
channelBuilder =
161+
builder.channelBuilder.isPresent()
162+
? builder.channelBuilder.get()
163+
: NettyChannelBuilder.forAddress(
164+
SubscriberSettings.getDefaultServiceAddress(),
165+
SubscriberSettings.getDefaultServicePort())
166+
.maxMessageSize(MAX_INBOUND_MESSAGE_SIZE)
167+
.flowControlWindow(5000000) // 2.5 MB
168+
.negotiationType(NegotiationType.TLS)
169+
.sslContext(GrpcSslContexts.forClient().ciphers(null).build())
170+
.executor(executor);
171+
172+
credentials =
173+
builder.credentials.isPresent()
174+
? builder.credentials.get()
175+
: GoogleCredentials.getApplicationDefault()
176+
.createScoped(SubscriberSettings.getDefaultServiceScopes());
177+
178+
streamingSubscriberConnections = new ArrayList<StreamingSubscriberConnection>(numChannels);
179+
pollingSubscriberConnections = new ArrayList<PollingSubscriberConnection>(numChannels);
180+
}
181+
182+
@Override
183+
protected void doStart() {
184+
logger.debug("Starting subscriber group.");
185+
startStreamingConnections();
186+
notifyStarted();
187+
}
188+
189+
@Override
190+
protected void doStop() {
191+
stopAllStreamingConnections();
192+
stopAllPollingConnections();
193+
notifyStopped();
194+
}
195+
196+
private void startStreamingConnections() {
197+
synchronized (streamingSubscriberConnections) {
198+
for (int i = 0; i < numChannels; i++) {
199+
streamingSubscriberConnections.add(
200+
new StreamingSubscriberConnection(
201+
subscription,
202+
credentials,
203+
receiver,
204+
ackExpirationPadding,
205+
streamAckDeadlineSeconds,
206+
ackLatencyDistribution,
207+
channelBuilder.build(),
208+
flowController,
209+
executor,
210+
clock));
211+
}
212+
startConnections(
213+
streamingSubscriberConnections,
214+
new Listener() {
215+
@Override
216+
public void failed(State from, Throwable failure) {
217+
// If a connection failed is because of a fatal error, we should fail the
218+
// whole subscriber.
219+
stopAllStreamingConnections();
220+
if (failure instanceof StatusRuntimeException
221+
&& ((StatusRuntimeException) failure).getStatus().getCode()
222+
== Status.Code.UNIMPLEMENTED) {
223+
logger.info("Unable to open streaming connections, falling back to polling.");
224+
startPollingConnections();
225+
return;
226+
}
227+
notifyFailed(failure);
228+
}
229+
});
230+
}
231+
232+
ackDeadlineUpdater =
233+
executor.scheduleAtFixedRate(
234+
new Runnable() {
235+
@Override
236+
public void run() {
237+
// It is guaranteed this will be <= MAX_ACK_DEADLINE_SECONDS, the max of the API.
238+
long ackLatency =
239+
ackLatencyDistribution.getNthPercentile(PERCENTILE_FOR_ACK_DEADLINE_UPDATES);
240+
if (ackLatency > 0) {
241+
int possibleStreamAckDeadlineSeconds =
242+
Math.max(
243+
MIN_ACK_DEADLINE_SECONDS,
244+
Ints.saturatedCast(
245+
Math.max(ackLatency, ackExpirationPadding.getStandardSeconds())));
246+
if (streamAckDeadlineSeconds != possibleStreamAckDeadlineSeconds) {
247+
streamAckDeadlineSeconds = possibleStreamAckDeadlineSeconds;
248+
logger.debug(
249+
"Updating stream deadline to {} seconds.", streamAckDeadlineSeconds);
250+
for (StreamingSubscriberConnection subscriberConnection :
251+
streamingSubscriberConnections) {
252+
subscriberConnection.updateStreamAckDeadline(streamAckDeadlineSeconds);
253+
}
254+
}
255+
}
256+
}
257+
},
258+
ACK_DEADLINE_UPDATE_PERIOD.getMillis(),
259+
ACK_DEADLINE_UPDATE_PERIOD.getMillis(),
260+
TimeUnit.MILLISECONDS);
261+
}
262+
263+
private void stopAllStreamingConnections() {
264+
stopConnections(streamingSubscriberConnections);
265+
ackDeadlineUpdater.cancel(true);
266+
}
267+
268+
private void startPollingConnections() {
269+
synchronized (pollingSubscriberConnections) {
270+
for (int i = 0; i < numChannels; i++) {
271+
pollingSubscriberConnections.add(
272+
new PollingSubscriberConnection(
273+
subscription,
274+
credentials,
275+
receiver,
276+
ackExpirationPadding,
277+
ackLatencyDistribution,
278+
channelBuilder.build(),
279+
flowController,
280+
executor,
281+
clock));
282+
}
283+
startConnections(
284+
pollingSubscriberConnections,
285+
new Listener() {
286+
@Override
287+
public void failed(State from, Throwable failure) {
288+
// If a connection failed is because of a fatal error, we should fail the
289+
// whole subscriber.
290+
stopAllPollingConnections();
291+
try {
292+
notifyFailed(failure);
293+
} catch (IllegalStateException e) {
294+
if (isRunning()) {
295+
throw e;
296+
}
297+
// It could happen that we are shutting down while some channels fail.
298+
}
299+
}
300+
});
301+
}
302+
}
303+
304+
private void stopAllPollingConnections() {
305+
stopConnections(pollingSubscriberConnections);
306+
}
307+
308+
private void startConnections(
309+
List<? extends Service> connections, final Listener connectionsListener) {
310+
final CountDownLatch subscribersStarting = new CountDownLatch(numChannels);
311+
for (final Service subscriber : connections) {
312+
executor.submit(
313+
new Runnable() {
314+
@Override
315+
public void run() {
316+
subscriber.startAsync().awaitRunning();
317+
subscribersStarting.countDown();
318+
subscriber.addListener(connectionsListener, executor);
319+
}
320+
});
321+
}
322+
try {
323+
subscribersStarting.await();
324+
} catch (InterruptedException e) {
325+
throw new RuntimeException(e);
326+
}
327+
}
328+
329+
private void stopConnections(List<? extends Service> connections) {
330+
ArrayList<Service> liveConnections;
331+
synchronized (connections) {
332+
liveConnections = new ArrayList<Service>(connections);
333+
connections.clear();
334+
}
335+
final CountDownLatch connectionsStopping = new CountDownLatch(liveConnections.size());
336+
for (final Service subscriberConnection : liveConnections) {
337+
executor.submit(
338+
new Runnable() {
339+
@Override
340+
public void run() {
341+
try {
342+
subscriberConnection.stopAsync().awaitTerminated();
343+
} catch (IllegalStateException ignored) {
344+
// It is expected for some connections to be already in state failed so stop will
345+
// throw this expection.
346+
}
347+
connectionsStopping.countDown();
348+
}
349+
});
350+
}
351+
try {
352+
connectionsStopping.await();
353+
} catch (InterruptedException e) {
354+
throw new IllegalStateException(e);
355+
}
356+
}
357+
358+
public SubscriberStats getStats() {
359+
// TODO: Implement me
360+
return null;
361+
}
362+
363+
public String getSubscription() {
364+
return subscription;
365+
}
366+
367+
public Duration getAckExpirationPadding() {
368+
return ackExpirationPadding;
369+
}
370+
371+
public FlowController.Settings getFlowControlSettings() {
372+
return flowControlSettings;
373+
}
374+
112375

113376
/** Builder of {@link Subscriber Subscribers}. */
114-
public final class Builder {
377+
public static final class Builder {
115378
private static final Duration MIN_ACK_EXPIRATION_PADDING = Duration.millis(100);
116379
private static final Duration DEFAULT_ACK_EXPIRATION_PADDING = Duration.millis(500);
117380

@@ -207,7 +470,7 @@ Builder setClock(Clock clock) {
207470
}
208471

209472
public Subscriber build() throws IOException {
210-
return new SubscriberImpl(this);
473+
return new Subscriber(this);
211474
}
212475
}
213476
}

0 commit comments

Comments
 (0)