Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions topic/src/main/java/tech/ydb/topic/impl/DebugTools.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package tech.ydb.topic.impl;

import java.util.concurrent.ThreadLocalRandom;

/**
*
* @author Aleksandr Gorshenin
*/
public class DebugTools {
private static final int ID_LENGTH = 6;
private static final char[] ID_ALPHABET = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890"
.toCharArray();

private DebugTools() { }

public static String createDebugId(String id) {
if (id != null) {
return id;
}

return ThreadLocalRandom.current().ints(0, ID_ALPHABET.length)
.limit(ID_LENGTH)
.map(charId -> ID_ALPHABET[charId])
.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void execute(Runnable task) {
}

private void tryRun() {
if (isExecuted.compareAndSet(false, true)) {
if (!tasks.isEmpty() && isExecuted.compareAndSet(false, true)) {
try {
executor.execute(this);
} catch (RuntimeException ex) {
Expand Down
154 changes: 154 additions & 0 deletions topic/src/main/java/tech/ydb/topic/impl/TopicRetryableStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package tech.ydb.topic.impl;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import com.google.protobuf.Message;
import org.slf4j.Logger;

import tech.ydb.common.retry.RetryConfig;
import tech.ydb.common.retry.RetryPolicy;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;

public abstract class TopicRetryableStream<R extends Message, W extends Message> {
private final Logger logger;
private final String debugId;
private final RetryConfig retryConfig;
private final ScheduledExecutorService scheduler;

private final AtomicReference<TopicStream<R, W>> realStream = new AtomicReference<>();
private final AtomicInteger streamCount = new AtomicInteger(0);
private final RetryState state = new RetryState();

private volatile boolean isClosed = false;

public TopicRetryableStream(Logger logger, String debugId, RetryConfig config, ScheduledExecutorService scheduler) {
this.debugId = debugId;
this.logger = logger;
this.retryConfig = config;
this.scheduler = scheduler;
}

@Override
public String toString() {
return "Session[" + debugId + "]";
}

protected abstract TopicStream<R, W> createNewStream(String debugId);
protected abstract W getInitRequest();

protected abstract void onNext(R message);

protected abstract void onRetry(Status status);
protected abstract void onClose(Status status);

public void start() {
if (isClosed) {
return;
}

String streamID = debugId + '.' + streamCount.incrementAndGet();
TopicStream<R, W> stream = createNewStream(streamID);

if (!realStream.compareAndSet(null, stream)) {
logger.warn("{} double start of stream, skipping", this);
stream.close();
return;
}

stream.start(getInitRequest(), this::onNext).whenComplete((status, th) -> {
realStream.compareAndSet(stream, null);
if (status != null) {
onStreamStop(status, retryConfig.getStatusRetryPolicy(status));
}
if (th != null) {
Status wrapped = Status.of(StatusCode.CLIENT_INTERNAL_ERROR, th);
onStreamStop(wrapped, retryConfig.getThrowableRetryPolicy(th));
}
});
}

protected void resetRetries() {
state.reset();
}

public void send(W msg) {
TopicStream<R, W> stream = realStream.get();
if (stream == null) {
logger.warn("{} send message before stream is ready", this);
return;
}
stream.send(msg);
}

public void close() {
isClosed = true;
TopicStream<R, W> stream = realStream.getAndSet(null);
if (stream != null) {
stream.close();
}
}
Comment thread
alex268 marked this conversation as resolved.

private void onStreamStop(Status status, RetryPolicy policy) {
if (isClosed) { // stream was already closed (usually with success)
onClose(status);
return;
}

if (policy == null) {
logger.warn("{} stopped by non-retryable status {}", this, status);
onClose(status);
return;
}

long nextRetryMs = state.nextRetryMs(policy);

if (nextRetryMs < 0) {
logger.warn("{} stopped after retry policy evaluation for status {}", this, status);
onClose(status);
return;
}

if (nextRetryMs == 0) { // retry immediately
logger.warn("{} retry #{}. Retry immediately...", this, state.retryNumber());
onRetry(status);
start();
return;
}

// retry scheduling
logger.warn("{} retry #{}. Scheduling reconnect in {}ms...", debugId, state.retryNumber(), nextRetryMs);
onRetry(status);

try {
scheduler.schedule(this::start, nextRetryMs, TimeUnit.MILLISECONDS);
} catch (Exception ex) {
logger.error("{} cannot schedule reconnect, stopping", debugId, ex);
onClose(status);
}
Comment thread
alex268 marked this conversation as resolved.
}

private static class RetryState {
private final AtomicInteger count = new AtomicInteger();
private volatile long startedAt = 0;

public long nextRetryMs(RetryPolicy policy) {
int retryNumber = count.getAndIncrement();
if (retryNumber == 0) {
startedAt = System.currentTimeMillis();
}
return policy.nextRetryMs(retryNumber, System.currentTimeMillis() - startedAt);
}

public int retryNumber() {
return count.get();
}

public void reset() {
count.set(0);
}
}
}
79 changes: 79 additions & 0 deletions topic/src/main/java/tech/ydb/topic/impl/TopicStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package tech.ydb.topic.impl;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

import com.google.protobuf.Message;
import org.slf4j.Logger;

import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.grpc.GrpcReadWriteStream;

public abstract class TopicStream<R extends Message, W extends Message> {
private final Logger logger;
private final String debugId;
private final GrpcReadWriteStream<R, W> stream;
private final CompletableFuture<Status> streamStatus = new CompletableFuture<>();
private volatile String token;

public TopicStream(Logger logger, String debugId, GrpcReadWriteStream<R, W> stream) {
this.logger = logger;
this.debugId = debugId;
this.stream = stream;
this.token = stream.authToken();
}

protected abstract W updateTokenMessage(String token);
protected abstract Status parseMessageStatus(R message);

public CompletableFuture<Status> start(W initReq, Consumer<R> messageHandler) {
this.logger.debug("[{}] is about to start", debugId);
this.stream.start((R msg) -> {
Status messageStatus = parseMessageStatus(msg);
if (messageStatus.isSuccess()) {
messageHandler.accept(msg);
} else {
logger.warn("[{}] stopped by getting status {}", debugId, messageStatus);
if (streamStatus.complete(messageStatus)) {
stream.close();
}
}
}).whenComplete((st, th) -> {
Status status = st != null ? st : Status.of(StatusCode.CLIENT_INTERNAL_ERROR, th);
logger.debug("[{}] finished with status {}", debugId, status);
streamStatus.complete(status);
});

if (!streamStatus.isDone()) {
stream.sendNext(initReq);
}

return streamStatus;
}

public void close() {
logger.debug("[{}] closed by app", debugId);
if (!streamStatus.isDone()) {
stream.close();
}
}

public void send(W req) {
if (streamStatus.isDone()) {
logger.warn("[{}] is already closed. Next message with type {} was NOT sent", debugId,
req.getDescriptorForType().getName());
return;
}

String currentToken = stream.authToken();
if (!Objects.equals(token, currentToken)) {
token = currentToken;
logger.info("{} sends new token", this);
stream.sendNext(updateTokenMessage(token));
}

stream.sendNext(req);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,11 @@ public void startAndInitialize() {

private void onInitResponse(YdbTopic.StreamWriteMessage.InitResponse response) {
long lastSeqNo = response.getLastSeqNo();
writer.onInit(lastSeqNo);
sessionId = response.getSessionId();
logger.info("[{}] Session with id {} (partition {}) initialized for topic \"{}\", lastSeqNo {}",
streamId, sessionId, response.getPartitionId(), initRequest.getPath(), lastSeqNo);
writer.onInit(streamId, lastSeqNo);
writer.onStart(lastSeqNo);
}

// Shouldn't be called more than once at a time due to grpc guarantees
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,13 @@ protected void onShutdown(String reason) {
}
}

void onInit(String streamId, long lastSeqNo) {
void onInit(long lastSeqNo) {
reconnectCounter.set(0);
Iterator<SentMessage> resend = writeQueue.updateSeqNo(lastSeqNo);
session.sendAll(() -> resend.hasNext() ? resend.next() : null);
}

void onStart(long lastSeqNo) {
if (initResultFutureRef.get() != null) {
initResultFutureRef.get().complete(new InitResult(lastSeqNo));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ SentMessage nextMessageToSend() {
}

SentMessage sentMsg = new SentMessage(next, seqNo);
logger.debug("[{}] prepare sent message with seqNo {}", id, seqNo);
logger.trace("[{}] prepare sent message with seqNo {}", id, seqNo);
sent.offer(sentMsg);
return sentMsg;
}
Expand Down
19 changes: 19 additions & 0 deletions topic/src/test/java/tech/ydb/topic/impl/DebugToolsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package tech.ydb.topic.impl;

import org.junit.Assert;
import org.junit.Test;

/**
*
* @author Aleksandr Gorshenin
*/
public class DebugToolsTest {
@Test
public void createDebugIdTest() {
Assert.assertEquals("custom-id", DebugTools.createDebugId("custom-id"));

String newID = DebugTools.createDebugId(null);
Assert.assertNotNull(newID);
Assert.assertEquals(6, newID.length());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,14 @@ public void wrongExecuterTest() throws InterruptedException {

@Test
public void wrongTaskTest() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(4);
CountDownLatch latch = new CountDownLatch(5);
Queue<Throwable> problems = new ConcurrentLinkedQueue<>();
ExecutorService pool = Executors.newCachedThreadPool((task) -> {
Thread t = new Thread(task);
t.setUncaughtExceptionHandler((th, ex) -> problems.add(ex));
t.setUncaughtExceptionHandler((th, ex) -> {
problems.add(ex);
latch.countDown();
});
return t;
});

Expand Down
Loading
Loading