From f409bb420818e77663ded71d5ae830bf1b77ab05 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Fri, 27 Mar 2026 12:07:25 +0000 Subject: [PATCH 1/8] Added base TopicStream --- .../java/tech/ydb/topic/impl/TopicStream.java | 74 +++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 topic/src/main/java/tech/ydb/topic/impl/TopicStream.java diff --git a/topic/src/main/java/tech/ydb/topic/impl/TopicStream.java b/topic/src/main/java/tech/ydb/topic/impl/TopicStream.java new file mode 100644 index 000000000..a4b62cd5f --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/impl/TopicStream.java @@ -0,0 +1,74 @@ +package tech.ydb.topic.impl; + +import java.util.Objects; +import java.util.concurrent.CompletableFuture; + +import org.slf4j.Logger; + +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; +import tech.ydb.core.grpc.GrpcReadWriteStream; + +public abstract class TopicStream { + private final Logger logger; + private final GrpcReadWriteStream stream; + private final String id; + private final CompletableFuture streamStatus = new CompletableFuture<>(); + private volatile String token; + + public TopicStream(Logger logger, String id, GrpcReadWriteStream stream) { + this.logger = logger; + this.id = id; + this.stream = stream; + this.token = stream.authToken(); + } + + @Override + public String toString() { + return "Stream[" + id + "]"; + } + + protected abstract W updateTokenMessage(String token); + protected abstract void handleMessage(R message); + + protected void start() { + logger.info("{} is about to start", this); + stream.start((R msg) -> { + handleMessage(msg); + }).whenComplete((st, th) -> { + Status status = st != null ? st : Status.of(StatusCode.INTERNAL_ERROR, th); + logger.info("{} finished with status {}", this, st); + streamStatus.complete(status); + }); + } + + public void fail(Status error) { + logger.warn("{} stopped with problem {}", this, error); + if (streamStatus.complete(error)) { + stream.close(); + } + } + + public void stop() { + logger.info("{} stop", this); + if (!streamStatus.isDone()) { + stream.close(); + } + } + + public void send(W request) { + if (streamStatus.isDone()) { + logger.trace("{} is already closed. This message is NOT sent:\n{}", this, request); + return; + } + + String currentToken = stream.authToken(); + if (!Objects.equals(token, currentToken)) { + token = currentToken; + logger.info("{} sends new token", id); + stream.sendNext(updateTokenMessage(token)); + } + + stream.sendNext(request); + } +} From f0020fe901b5609ec25bde3fccf389d002822c0e Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Mon, 13 Apr 2026 11:05:21 +0100 Subject: [PATCH 2/8] Added TopicRetryableStream --- .../ydb/topic/impl/TopicRetryableStream.java | 158 ++++++++++++++++++ .../java/tech/ydb/topic/impl/TopicStream.java | 45 ++--- 2 files changed, 182 insertions(+), 21 deletions(-) create mode 100644 topic/src/main/java/tech/ydb/topic/impl/TopicRetryableStream.java diff --git a/topic/src/main/java/tech/ydb/topic/impl/TopicRetryableStream.java b/topic/src/main/java/tech/ydb/topic/impl/TopicRetryableStream.java new file mode 100644 index 000000000..06cd82277 --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/impl/TopicRetryableStream.java @@ -0,0 +1,158 @@ +package tech.ydb.topic.impl; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import tech.ydb.common.retry.RetryConfig; +import tech.ydb.common.retry.RetryPolicy; +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; + +public class TopicRetryableStream { + private static final Logger logger = LoggerFactory.getLogger(TopicRetryableStream.class); + + public interface Handler { + TopicStream createNewStream(String id); + W buildInitRequest(); + + void onRetry(Status status); + void onStop(Status status); + + void onNext(R message); + } + + private static final int ID_LENGTH = 6; + private static final char[] ID_ALPHABET = "abcdefghijklmnopqrstuvwxyzABSDEFGHIJKLMNOPQRSTUVWXYZ1234567890" + .toCharArray(); + + private final String id; + private final Handler handler; + + private final RetryConfig config; + private final ScheduledExecutorService scheduler; + + private final AtomicReference> streamRef = new AtomicReference<>(); + private final AtomicInteger streamCount = new AtomicInteger(0); + private final RetryState state = new RetryState(); + + public TopicRetryableStream(String id, RetryConfig retryConfig, ScheduledExecutorService scheduler, + Handler handler) { + this.id = id == null ? generateRandomId(ID_LENGTH) : id; + this.handler = handler; + this.config = retryConfig; + this.scheduler = scheduler; + } + + public void start() { + String streamID = id + '.' + streamCount.incrementAndGet(); + TopicStream stream = handler.createNewStream(streamID); + + if (!streamRef.compareAndSet(null, stream)) { + logger.warn("[{}] Double start of stream retrier, skippingdouble start", id); + stream.close(); + return; + } + + stream.start(handler.buildInitRequest(), handler::onNext).whenComplete((status, th) -> { + if (status != null) { + onStop(stream, status, config.getStatusRetryPolicy(status)); + } + if (th != null) { + onStop(stream, Status.of(StatusCode.CLIENT_INTERNAL_ERROR, th), config.getThrowableRetryPolicy(th)); + } + }); + } + + public void resetRetries() { + state.reset(); + } + + public void close() { + TopicStream stream = streamRef.getAndSet(null); + if (stream != null) { + stream.close(); + } + } + + public void send(W msg) { + TopicStream stream = streamRef.get(); + if (stream == null) { + logger.warn("[{}] send message before stream is ready", id); + return; + } + stream.send(msg); + } + + private void onStop(TopicStream stream, Status status, RetryPolicy policy) { + if (!streamRef.compareAndSet(stream, null)) { // stream was already closed + return; + } + + if (policy == null) { + logger.warn("[{}] stream stopped by non-retryable status {}", id, status); + handler.onStop(status); + return; + } + + long nextRetryMs = state.nextRetryMs(policy); + + if (nextRetryMs < 0) { + logger.warn("[{}] stream stopped by retry policy {}", id, status); + handler.onStop(status); + return; + } + + if (nextRetryMs == 0) { // retry immediatelly + logger.warn("[{}] stream retry #{}. Retry immediatelly...", id, state.retryNumber()); + handler.onRetry(status); + start(); + return; + } + + // retry scheduling + logger.warn("[{}] stream retry #{}. Scheduling reconnect in {}ms...", id, state.retryNumber(), nextRetryMs); + handler.onRetry(status); + + try { + scheduler.schedule(this::start, nextRetryMs, TimeUnit.MILLISECONDS); + } catch (Exception ex) { + logger.error("[{}] stream cannot schedule reconnect, stopping", id, ex); + handler.onStop(status); + } + } + + private static class RetryState { + private final AtomicInteger count = new AtomicInteger(); + private volatile long startedAt = 0; + + public long nextRetryMs(RetryPolicy policy) { + int retryNumber = count.getAndIncrement(); + if (retryNumber == 0) { + startedAt = System.currentTimeMillis(); + } + return policy.nextRetryMs(retryNumber, System.currentTimeMillis() - startedAt); + } + + public int retryNumber() { + return count.get(); + } + + public void reset() { + count.set(0); + } + } + + private static String generateRandomId(int length) { + return ThreadLocalRandom.current().ints(0, ID_ALPHABET.length) + .limit(length) + .map(charId -> ID_ALPHABET[charId]) + .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append) + .toString(); + } +} diff --git a/topic/src/main/java/tech/ydb/topic/impl/TopicStream.java b/topic/src/main/java/tech/ydb/topic/impl/TopicStream.java index a4b62cd5f..953e49ffe 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/TopicStream.java +++ b/topic/src/main/java/tech/ydb/topic/impl/TopicStream.java @@ -2,6 +2,7 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import org.slf4j.Logger; @@ -23,49 +24,51 @@ public TopicStream(Logger logger, String id, GrpcReadWriteStream stream) { this.token = stream.authToken(); } - @Override - public String toString() { - return "Stream[" + id + "]"; - } - protected abstract W updateTokenMessage(String token); - protected abstract void handleMessage(R message); + protected abstract Status parseMessageStatus(R message); - protected void start() { - logger.info("{} is about to start", this); - stream.start((R msg) -> { - handleMessage(msg); + public CompletableFuture start(W initReq, Consumer messageHandler) { + this.logger.debug("[{}] is about to start", id); + this.stream.start((R msg) -> { + Status messageStatus = parseMessageStatus(msg); + if (messageStatus.isSuccess()) { + messageHandler.accept(msg); + } else { + logger.warn("[{}] stopped by getting status {}", this, messageStatus); + if (streamStatus.complete(messageStatus)) { + stream.close(); + } + } }).whenComplete((st, th) -> { Status status = st != null ? st : Status.of(StatusCode.INTERNAL_ERROR, th); - logger.info("{} finished with status {}", this, st); + logger.debug("{} finished with status {}", id, st); streamStatus.complete(status); }); - } - public void fail(Status error) { - logger.warn("{} stopped with problem {}", this, error); - if (streamStatus.complete(error)) { - stream.close(); + if (!streamStatus.isDone()) { + stream.sendNext(initReq); } + + return streamStatus; } - public void stop() { - logger.info("{} stop", this); + public void close() { + logger.info("[{}] closed by app", id); if (!streamStatus.isDone()) { stream.close(); } } public void send(W request) { - if (streamStatus.isDone()) { - logger.trace("{} is already closed. This message is NOT sent:\n{}", this, request); + if (streamStatus != null) { + logger.warn("[{}] is already closed. This message is NOT sent:\n{}", id, request); return; } String currentToken = stream.authToken(); if (!Objects.equals(token, currentToken)) { token = currentToken; - logger.info("{} sends new token", id); + logger.info("[{}] sends new token", id); stream.sendNext(updateTokenMessage(token)); } From 1a3afd0eb0d1262e7320999af81944a9c78c9da8 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Mon, 13 Apr 2026 17:21:09 +0100 Subject: [PATCH 3/8] Updated streams base implementation --- .../java/tech/ydb/topic/impl/DebugTools.java | 27 ++++ .../ydb/topic/impl/TopicRetryableStream.java | 120 ++++++++---------- .../java/tech/ydb/topic/impl/TopicStream.java | 28 ++-- 3 files changed, 96 insertions(+), 79 deletions(-) create mode 100644 topic/src/main/java/tech/ydb/topic/impl/DebugTools.java diff --git a/topic/src/main/java/tech/ydb/topic/impl/DebugTools.java b/topic/src/main/java/tech/ydb/topic/impl/DebugTools.java new file mode 100644 index 000000000..3bfff236a --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/impl/DebugTools.java @@ -0,0 +1,27 @@ +package tech.ydb.topic.impl; + +import java.util.concurrent.ThreadLocalRandom; + +/** + * + * @author Aleksandr Gorshenin + */ +public class DebugTools { + private static final int ID_LENGTH = 6; + private static final char[] ID_ALPHABET = "abcdefghijklmnopqrstuvwxyzABSDEFGHIJKLMNOPQRSTUVWXYZ1234567890" + .toCharArray(); + + private DebugTools() { } + + public static String createDebugId(String id) { + if (id != null) { + return id; + } + + return ThreadLocalRandom.current().ints(0, ID_ALPHABET.length) + .limit(ID_LENGTH) + .map(charId -> ID_ALPHABET[charId]) + .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append) + .toString(); + } +} diff --git a/topic/src/main/java/tech/ydb/topic/impl/TopicRetryableStream.java b/topic/src/main/java/tech/ydb/topic/impl/TopicRetryableStream.java index 06cd82277..1ee69ffb5 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/TopicRetryableStream.java +++ b/topic/src/main/java/tech/ydb/topic/impl/TopicRetryableStream.java @@ -1,129 +1,125 @@ package tech.ydb.topic.impl; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import com.google.protobuf.Message; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import tech.ydb.common.retry.RetryConfig; import tech.ydb.common.retry.RetryPolicy; import tech.ydb.core.Status; import tech.ydb.core.StatusCode; -public class TopicRetryableStream { - private static final Logger logger = LoggerFactory.getLogger(TopicRetryableStream.class); - - public interface Handler { - TopicStream createNewStream(String id); - W buildInitRequest(); - - void onRetry(Status status); - void onStop(Status status); - - void onNext(R message); - } - - private static final int ID_LENGTH = 6; - private static final char[] ID_ALPHABET = "abcdefghijklmnopqrstuvwxyzABSDEFGHIJKLMNOPQRSTUVWXYZ1234567890" - .toCharArray(); - - private final String id; - private final Handler handler; - - private final RetryConfig config; +public abstract class TopicRetryableStream { + private final Logger logger; + private final String debugId; + private final RetryConfig retryConfig; private final ScheduledExecutorService scheduler; - private final AtomicReference> streamRef = new AtomicReference<>(); + private final AtomicReference> realStream = new AtomicReference<>(); private final AtomicInteger streamCount = new AtomicInteger(0); private final RetryState state = new RetryState(); - public TopicRetryableStream(String id, RetryConfig retryConfig, ScheduledExecutorService scheduler, - Handler handler) { - this.id = id == null ? generateRandomId(ID_LENGTH) : id; - this.handler = handler; - this.config = retryConfig; + public TopicRetryableStream(Logger logger, String debugId, RetryConfig config, ScheduledExecutorService scheduler) { + this.debugId = debugId; + this.logger = logger; + this.retryConfig = config; this.scheduler = scheduler; } + @Override + public String toString() { + return "Session[" + debugId + "]"; + } + + protected abstract TopicStream createNewStream(String debugId); + protected abstract W getInitRequest(); + + protected abstract void onNext(R message); + + protected abstract void onRetry(Status status); + protected abstract void onClose(Status status); + public void start() { - String streamID = id + '.' + streamCount.incrementAndGet(); - TopicStream stream = handler.createNewStream(streamID); + String streamID = debugId + '.' + streamCount.incrementAndGet(); + TopicStream stream = createNewStream(streamID); - if (!streamRef.compareAndSet(null, stream)) { - logger.warn("[{}] Double start of stream retrier, skippingdouble start", id); + if (!realStream.compareAndSet(null, stream)) { + logger.warn("{} double start of stream, skipping", this); stream.close(); return; } - stream.start(handler.buildInitRequest(), handler::onNext).whenComplete((status, th) -> { + stream.start(getInitRequest(), this::onNext).whenComplete((status, th) -> { if (status != null) { - onStop(stream, status, config.getStatusRetryPolicy(status)); + onStreamStop(stream, status, retryConfig.getStatusRetryPolicy(status)); } if (th != null) { - onStop(stream, Status.of(StatusCode.CLIENT_INTERNAL_ERROR, th), config.getThrowableRetryPolicy(th)); + Status wrapped = Status.of(StatusCode.CLIENT_INTERNAL_ERROR, th); + onStreamStop(stream, wrapped, retryConfig.getThrowableRetryPolicy(th)); } }); } - public void resetRetries() { + protected void resetRetries() { state.reset(); } - public void close() { - TopicStream stream = streamRef.getAndSet(null); - if (stream != null) { - stream.close(); - } - } - public void send(W msg) { - TopicStream stream = streamRef.get(); + TopicStream stream = realStream.get(); if (stream == null) { - logger.warn("[{}] send message before stream is ready", id); + logger.warn("{} send message before stream is ready", this); return; } stream.send(msg); } - private void onStop(TopicStream stream, Status status, RetryPolicy policy) { - if (!streamRef.compareAndSet(stream, null)) { // stream was already closed + public void close() { + TopicStream stream = realStream.getAndSet(null); + if (stream != null) { + stream.close(); + } + } + + private void onStreamStop(TopicStream stream, Status status, RetryPolicy policy) { + if (!realStream.compareAndSet(stream, null)) { // stream was already closed (usally with success) + onClose(status); return; } if (policy == null) { - logger.warn("[{}] stream stopped by non-retryable status {}", id, status); - handler.onStop(status); + logger.warn("{} stopped by non-retryable status {}", this, status); + onClose(status); return; } long nextRetryMs = state.nextRetryMs(policy); if (nextRetryMs < 0) { - logger.warn("[{}] stream stopped by retry policy {}", id, status); - handler.onStop(status); + logger.warn("{} stopped by retry policy {}", this, status); + onClose(status); return; } if (nextRetryMs == 0) { // retry immediatelly - logger.warn("[{}] stream retry #{}. Retry immediatelly...", id, state.retryNumber()); - handler.onRetry(status); + logger.warn("{} retry #{}. Retry immediatelly...", this, state.retryNumber()); + onRetry(status); start(); return; } // retry scheduling - logger.warn("[{}] stream retry #{}. Scheduling reconnect in {}ms...", id, state.retryNumber(), nextRetryMs); - handler.onRetry(status); + logger.warn("{} retry #{}. Scheduling reconnect in {}ms...", debugId, state.retryNumber(), nextRetryMs); + onRetry(status); try { scheduler.schedule(this::start, nextRetryMs, TimeUnit.MILLISECONDS); } catch (Exception ex) { - logger.error("[{}] stream cannot schedule reconnect, stopping", id, ex); - handler.onStop(status); + logger.error("{} cannot schedule reconnect, stopping", debugId, ex); + onClose(status); } } @@ -147,12 +143,4 @@ public void reset() { count.set(0); } } - - private static String generateRandomId(int length) { - return ThreadLocalRandom.current().ints(0, ID_ALPHABET.length) - .limit(length) - .map(charId -> ID_ALPHABET[charId]) - .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append) - .toString(); - } } diff --git a/topic/src/main/java/tech/ydb/topic/impl/TopicStream.java b/topic/src/main/java/tech/ydb/topic/impl/TopicStream.java index 953e49ffe..d217f3421 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/TopicStream.java +++ b/topic/src/main/java/tech/ydb/topic/impl/TopicStream.java @@ -4,22 +4,24 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; +import com.google.protobuf.Message; +import com.google.protobuf.TextFormat; import org.slf4j.Logger; import tech.ydb.core.Status; import tech.ydb.core.StatusCode; import tech.ydb.core.grpc.GrpcReadWriteStream; -public abstract class TopicStream { +public abstract class TopicStream { private final Logger logger; + private final String debugId; private final GrpcReadWriteStream stream; - private final String id; private final CompletableFuture streamStatus = new CompletableFuture<>(); private volatile String token; - public TopicStream(Logger logger, String id, GrpcReadWriteStream stream) { + public TopicStream(Logger logger, String debugId, GrpcReadWriteStream stream) { this.logger = logger; - this.id = id; + this.debugId = debugId; this.stream = stream; this.token = stream.authToken(); } @@ -28,20 +30,20 @@ public TopicStream(Logger logger, String id, GrpcReadWriteStream stream) { protected abstract Status parseMessageStatus(R message); public CompletableFuture start(W initReq, Consumer messageHandler) { - this.logger.debug("[{}] is about to start", id); + this.logger.debug("[{}] is about to start", debugId); this.stream.start((R msg) -> { Status messageStatus = parseMessageStatus(msg); if (messageStatus.isSuccess()) { messageHandler.accept(msg); } else { - logger.warn("[{}] stopped by getting status {}", this, messageStatus); + logger.warn("[{}] stopped by getting status {}", debugId, messageStatus); if (streamStatus.complete(messageStatus)) { stream.close(); } } }).whenComplete((st, th) -> { Status status = st != null ? st : Status.of(StatusCode.INTERNAL_ERROR, th); - logger.debug("{} finished with status {}", id, st); + logger.debug("[{}] finished with status {}", debugId, st); streamStatus.complete(status); }); @@ -53,25 +55,25 @@ public CompletableFuture start(W initReq, Consumer messageHandler) { } public void close() { - logger.info("[{}] closed by app", id); + logger.debug("[{}] closed by app", debugId); if (!streamStatus.isDone()) { stream.close(); } } - public void send(W request) { - if (streamStatus != null) { - logger.warn("[{}] is already closed. This message is NOT sent:\n{}", id, request); + public void send(W req) { + if (streamStatus.isDone()) { + logger.warn("[{}] is already closed. This message is NOT sent:\n{}", debugId, TextFormat.shortDebugString(req)); return; } String currentToken = stream.authToken(); if (!Objects.equals(token, currentToken)) { token = currentToken; - logger.info("[{}] sends new token", id); + logger.info("{} sends new token", this); stream.sendNext(updateTokenMessage(token)); } - stream.sendNext(request); + stream.sendNext(req); } } From b60e3df7a58111d16fad34a489b55523dbb7a2cc Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Tue, 14 Apr 2026 12:26:37 +0100 Subject: [PATCH 4/8] Small fixes --- topic/src/main/java/tech/ydb/topic/impl/SerialExecutor.java | 2 +- .../main/java/tech/ydb/topic/impl/TopicRetryableStream.java | 2 +- topic/src/main/java/tech/ydb/topic/impl/TopicStream.java | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/impl/SerialExecutor.java b/topic/src/main/java/tech/ydb/topic/impl/SerialExecutor.java index 1adb6b562..756676300 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/SerialExecutor.java +++ b/topic/src/main/java/tech/ydb/topic/impl/SerialExecutor.java @@ -43,7 +43,7 @@ public void execute(Runnable task) { } private void tryRun() { - if (isExecuted.compareAndSet(false, true)) { + if (!tasks.isEmpty() && isExecuted.compareAndSet(false, true)) { try { executor.execute(this); } catch (RuntimeException ex) { diff --git a/topic/src/main/java/tech/ydb/topic/impl/TopicRetryableStream.java b/topic/src/main/java/tech/ydb/topic/impl/TopicRetryableStream.java index 1ee69ffb5..c9b63716c 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/TopicRetryableStream.java +++ b/topic/src/main/java/tech/ydb/topic/impl/TopicRetryableStream.java @@ -85,7 +85,7 @@ public void close() { } private void onStreamStop(TopicStream stream, Status status, RetryPolicy policy) { - if (!realStream.compareAndSet(stream, null)) { // stream was already closed (usally with success) + if (!realStream.compareAndSet(stream, null)) { // stream was already closed (usually with success) onClose(status); return; } diff --git a/topic/src/main/java/tech/ydb/topic/impl/TopicStream.java b/topic/src/main/java/tech/ydb/topic/impl/TopicStream.java index d217f3421..882034ed5 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/TopicStream.java +++ b/topic/src/main/java/tech/ydb/topic/impl/TopicStream.java @@ -5,7 +5,6 @@ import java.util.function.Consumer; import com.google.protobuf.Message; -import com.google.protobuf.TextFormat; import org.slf4j.Logger; import tech.ydb.core.Status; @@ -63,7 +62,8 @@ public void close() { public void send(W req) { if (streamStatus.isDone()) { - logger.warn("[{}] is already closed. This message is NOT sent:\n{}", debugId, TextFormat.shortDebugString(req)); + logger.warn("[{}] is already closed. Next message with type {} was NOT sent:", debugId, + req.getDescriptorForType().getName()); return; } From 6a2c087c602eb4dc1c816d3f8b39d2b95a4346a7 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Tue, 14 Apr 2026 12:26:59 +0100 Subject: [PATCH 5/8] Added tests for TopicStream & TopicRetryableStream --- .../topic/impl/TopicRetryableStreamTest.java | 298 ++++++++++++++++++ .../tech/ydb/topic/impl/TopicStreamTest.java | 181 +++++++++++ 2 files changed, 479 insertions(+) create mode 100644 topic/src/test/java/tech/ydb/topic/impl/TopicRetryableStreamTest.java create mode 100644 topic/src/test/java/tech/ydb/topic/impl/TopicStreamTest.java diff --git a/topic/src/test/java/tech/ydb/topic/impl/TopicRetryableStreamTest.java b/topic/src/test/java/tech/ydb/topic/impl/TopicRetryableStreamTest.java new file mode 100644 index 000000000..835b30054 --- /dev/null +++ b/topic/src/test/java/tech/ydb/topic/impl/TopicRetryableStreamTest.java @@ -0,0 +1,298 @@ +package tech.ydb.topic.impl; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import com.google.protobuf.Empty; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import tech.ydb.common.retry.RetryConfig; +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; +import tech.ydb.core.grpc.GrpcReadWriteStream; + +public class TopicRetryableStreamTest { + private static final Logger logger = LoggerFactory.getLogger(TopicStreamTest.class); + private static final Empty EMPTY = Empty.getDefaultInstance(); + + /** + * Pairs a mock GrpcReadWriteStream with a concrete TopicStream backed by it. + * Completing grpcFuture simulates the underlying gRPC stream finishing. + */ + private static class StreamHandle { + @SuppressWarnings("unchecked") + private final GrpcReadWriteStream grpc = Mockito.mock(GrpcReadWriteStream.class); + + private final CompletableFuture grpcFuture = new CompletableFuture<>(); + private final TopicStream stream; + + StreamHandle(TopicStream mocked) { + this.stream = mocked; + Mockito.when(mocked.start(Mockito.any(), Mockito.any())).thenReturn(grpcFuture); + } + + StreamHandle() { + Mockito.when(grpc.authToken()).thenReturn("token"); + Mockito.when(grpc.start(Mockito.any())).thenReturn(grpcFuture); + + stream = new TopicStream(logger, "inner", grpc) { + @Override + protected Empty updateTokenMessage(String token) { + return EMPTY; + } + + @Override + protected Status parseMessageStatus(Empty message) { + return Status.SUCCESS; + } + }; + } + + void complete(Status status) { + grpcFuture.complete(status); + } + + void fail(Throwable th) { + grpcFuture.completeExceptionally(th); + } + } + + private static class TestStream extends TopicRetryableStream { + private final List handles; + private int handleIndex = 0; + + final List retryStatuses = new ArrayList<>(); + final List closeStatuses = new ArrayList<>(); + final List receivedMessages = new ArrayList<>(); + + TestStream(List handles, RetryConfig retryConfig, ScheduledExecutorService scheduler) { + super(logger, "test", retryConfig, scheduler); + this.handles = handles; + } + + @Override + protected TopicStream createNewStream(String debugId) { + return handles.get(handleIndex++).stream; + } + + @Override + protected Empty getInitRequest() { + return EMPTY; + } + + @Override + protected void onNext(Empty message) { + receivedMessages.add(message); + } + + @Override + protected void onRetry(Status status) { + retryStatuses.add(status); + } + + @Override + protected void onClose(Status status) { + closeStatuses.add(status); + } + } + + private ScheduledExecutorService mockScheduler() { + return Mockito.mock(ScheduledExecutorService.class); + } + + @Test + public void simpleStartAndCloseTest() { + StreamHandle h = new StreamHandle(); + TestStream retryable = new TestStream(Arrays.asList(h), RetryConfig.noRetries(), mockScheduler()); + + retryable.start(); + retryable.send(EMPTY); + + Mockito.verify(h.grpc).start(Mockito.any()); + Mockito.verify(h.grpc, Mockito.times(2)).sendNext(EMPTY); // init + sent request + + retryable.close(); + + h.complete(Status.SUCCESS); + + Mockito.verify(h.grpc).close(); + Mockito.verify(h.grpc, Mockito.never()).cancel(); + + Assert.assertEquals(Arrays.asList(Status.SUCCESS), retryable.closeStatuses); + } + + @Test + public void doubleStartTest() { + StreamHandle h1 = new StreamHandle(); + StreamHandle h2 = new StreamHandle(); + TestStream retryable = new TestStream(Arrays.asList(h1, h2), RetryConfig.noRetries(), mockScheduler()); + + retryable.start(); // sets realStream = h1.topicStream + retryable.start(); // compareAndSet fails → h2.topicStream is closed + + Mockito.verify(h1.grpc).start(Mockito.any()); + Mockito.verify(h2.grpc, Mockito.never()).start(Mockito.any()); // h2 was never started + Mockito.verify(h2.grpc).close(); // h2 was closed immediately + } + + @Test + public void sendBeforeStartIsIgnoredTest() { + StreamHandle h = new StreamHandle(); + TestStream retryable = new TestStream(Arrays.asList(h), RetryConfig.noRetries(), mockScheduler()); + + retryable.send(EMPTY); // just skipping + + Mockito.verify(h.grpc, Mockito.never()).sendNext(Mockito.any()); + } + + @Test + public void closeBeforeStartIsNoOpTest() { + StreamHandle h = new StreamHandle(); + TestStream retryable = new TestStream(Arrays.asList(h), RetryConfig.noRetries(), mockScheduler()); + + retryable.close(); // no stream yet, should not throw + + Mockito.verify(h.grpc, Mockito.never()).close(); + } + + @Test + public void noRetriesErrorStatusTest() { + StreamHandle h = new StreamHandle(); + TestStream retryable = new TestStream(Arrays.asList(h), RetryConfig.noRetries(), mockScheduler()); + + retryable.start(); + h.complete(Status.of(StatusCode.ABORTED)); + + Assert.assertEquals(Arrays.asList(Status.of(StatusCode.ABORTED)), retryable.closeStatuses); + Assert.assertTrue(retryable.retryStatuses.isEmpty()); + } + + @Test + public void noRetriesExceptionStatusTest() { + @SuppressWarnings("unchecked") + StreamHandle h = new StreamHandle(Mockito.mock(TopicStream.class)); + TestStream retryable = new TestStream(Arrays.asList(h), RetryConfig.noRetries(), mockScheduler()); + + retryable.start(); + RuntimeException ex = new RuntimeException("fail"); + h.fail(ex); + + Assert.assertEquals(Arrays.asList(Status.of(StatusCode.CLIENT_INTERNAL_ERROR, ex)), retryable.closeStatuses); + Assert.assertTrue(retryable.retryStatuses.isEmpty()); + } + + @Test + public void immediateRetryTest() { + StreamHandle h1 = new StreamHandle(); + StreamHandle h2 = new StreamHandle(); + StreamHandle h3 = new StreamHandle(); + + Status s1 = Status.of(StatusCode.UNAVAILABLE); + Status s2 = Status.of(StatusCode.BAD_SESSION); + Status s3 = Status.of(StatusCode.BAD_REQUEST); + + // Policy: immediate retry (0ms) on all attempts, then no more + RetryConfig config = status -> (retryCount, elapsed) -> (status.getCode() != StatusCode.BAD_REQUEST) ? 0 : -1; + + TestStream retryable = new TestStream(Arrays.asList(h1, h2, h3), config, mockScheduler()); + + retryable.start(); + + Mockito.verify(h1.grpc).start(Mockito.any()); // first stream was started + + retryable.send(EMPTY); + h1.complete(s1); + + Mockito.verify(h2.grpc).start(Mockito.any()); // second stream was started + retryable.send(EMPTY); + retryable.send(EMPTY); + h2.complete(s2); + + Mockito.verify(h3.grpc).start(Mockito.any()); // third stream was started + retryable.send(EMPTY); + h3.complete(s3); + + retryable.close(); // no effect + + Mockito.verify(h1.grpc, Mockito.times(2)).sendNext(EMPTY); // init req + send + Mockito.verify(h1.grpc, Mockito.never()).close(); // stream was closed by error + + Mockito.verify(h2.grpc, Mockito.times(3)).sendNext(EMPTY); // init req + 2 * send + Mockito.verify(h2.grpc, Mockito.never()).close(); // stream was closed by error + + Mockito.verify(h3.grpc, Mockito.times(2)).sendNext(EMPTY); // init req + send + Mockito.verify(h3.grpc, Mockito.never()).close(); // stream was closed by error + + Assert.assertEquals(Arrays.asList(s1, s2), retryable.retryStatuses); + Assert.assertEquals(Arrays.asList(s3), retryable.closeStatuses); + } + + @Test + public void closeOnWrongSchedulerTest() { + StreamHandle h = new StreamHandle(); + long delayMs = 500L; + RetryConfig config = status -> (retryCount, elapsed) -> delayMs; + + TestStream retryable = new TestStream(Arrays.asList(h), config, null); + + retryable.start(); + h.complete(Status.of(StatusCode.UNAVAILABLE)); + + Assert.assertEquals(1, retryable.retryStatuses.size()); + Assert.assertEquals(1, retryable.closeStatuses.size()); + } + + @Test + public void scheduledRetryWithCorrectDelayTest() { + StreamHandle h = new StreamHandle(); + ScheduledExecutorService scheduler = mockScheduler(); + long delayMs = 500L; + RetryConfig config = status -> (retryCount, elapsed) -> delayMs; + + TestStream retryable = new TestStream( + Arrays.asList(h), config, scheduler); + + retryable.start(); + h.complete(Status.of(StatusCode.UNAVAILABLE)); + + Assert.assertEquals(Arrays.asList(Status.of(StatusCode.UNAVAILABLE)), retryable.retryStatuses); + Assert.assertTrue(retryable.closeStatuses.isEmpty()); + Mockito.verify(scheduler) + .schedule(Mockito.any(Runnable.class), Mockito.eq(delayMs), Mockito.eq(TimeUnit.MILLISECONDS)); + } + + @Test + public void testResetRetriesAllowsRetryingAgainFromZero() { + StreamHandle h1 = new StreamHandle(); + StreamHandle h2 = new StreamHandle(); + StreamHandle h3 = new StreamHandle(); + // Policy: one immediate retry (retryCount 0), then no more + RetryConfig config = status -> (retryCount, elapsed) -> retryCount == 0 ? 0 : -1; + + TestStream retryable = new TestStream(Arrays.asList(h1, h2, h3), config, mockScheduler()); + + Status error = Status.of(StatusCode.UNAVAILABLE); + retryable.start(); + h1.complete(error); // retry fires (retryCount 0 → 0ms) + + Assert.assertEquals(Arrays.asList(error), retryable.retryStatuses); + Assert.assertTrue(retryable.closeStatuses.isEmpty()); + + // Reset the retry counter so we get another chance + retryable.resetRetries(); + h2.complete(error); // retry fires again (retryCount reset to 0) + + Assert.assertEquals(Arrays.asList(error, error), retryable.retryStatuses); + Assert.assertTrue(retryable.closeStatuses.isEmpty()); + + Mockito.verify(h3.grpc).start(Mockito.any()); + } +} diff --git a/topic/src/test/java/tech/ydb/topic/impl/TopicStreamTest.java b/topic/src/test/java/tech/ydb/topic/impl/TopicStreamTest.java new file mode 100644 index 000000000..41ec23b18 --- /dev/null +++ b/topic/src/test/java/tech/ydb/topic/impl/TopicStreamTest.java @@ -0,0 +1,181 @@ +package tech.ydb.topic.impl; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import com.google.protobuf.StringValue; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; +import tech.ydb.core.grpc.GrpcReadStream; +import tech.ydb.core.grpc.GrpcReadWriteStream; + + +public class TopicStreamTest { + private static final Logger logger = LoggerFactory.getLogger(TopicStreamTest.class); + + private interface MockedStream extends GrpcReadWriteStream { } + + private static class TestStream extends TopicStream { + TestStream(MockedStream mock) { + super(logger, "test", mock); + } + + @Override + protected StringValue updateTokenMessage(String token) { + return msg("new-token:" + token); + } + + @Override + protected Status parseMessageStatus(StringValue message) { + return "fail".equals(message.getValue()) ? Status.of(StatusCode.ABORTED) : Status.SUCCESS; + } + } + + private static StringValue msg(String value) { + return StringValue.newBuilder().setValue(value).build(); + } + + private MockedStream buildMockedStream(String authToken, CompletableFuture result) { + MockedStream grpc = Mockito.mock(MockedStream.class); + Mockito.when(grpc.authToken()).thenReturn(authToken); + Mockito.when(grpc.start(Mockito.any())).thenReturn(result); + return grpc; + } + + @SuppressWarnings("unchecked") + private ArgumentCaptor> buildObserver() { + return ArgumentCaptor.forClass(GrpcReadStream.Observer.class); + } + + @Test + public void baseTest() { + CompletableFuture streamFuture = new CompletableFuture<>(); + MockedStream mock = buildMockedStream("token", streamFuture); + ArgumentCaptor> observer = buildObserver(); + + List received = new ArrayList<>(); + + TestStream stream = new TestStream(mock); + CompletableFuture result = stream.start(msg("init"), received::add); + Mockito.verify(mock).start(observer.capture()); + + stream.send(msg("s1")); + observer.getValue().onNext(msg("r1")); + stream.send(msg("s2")); + stream.send(msg("s3")); + observer.getValue().onNext(msg("r2")); + observer.getValue().onNext(msg("r3")); + + Mockito.verify(mock, Mockito.never()).close(); + Mockito.verify(mock, Mockito.never()).cancel(); + + Mockito.verify(mock).sendNext(msg("init")); + Mockito.verify(mock).sendNext(msg("s1")); + Mockito.verify(mock).sendNext(msg("s2")); + Mockito.verify(mock).sendNext(msg("s3")); + Assert.assertEquals(Arrays.asList(msg("r1"), msg("r2"), msg("r3")), received); + + Assert.assertFalse(result.isDone()); + + stream.close(); + + Mockito.verify(mock).close(); + Mockito.verify(mock, Mockito.never()).cancel(); + + streamFuture.complete(Status.SUCCESS); + + stream.close(); // no effect + } + + @Test + public void startStreamAndImmediatelyFinishTest() { + // Stream may be closed immediately after start, for example if token is invalid + CompletableFuture status = new CompletableFuture<>(); + status.completeExceptionally(new RuntimeException("error")); + MockedStream mock = buildMockedStream("token", status); + + TestStream stream = new TestStream(mock); + stream.start(msg("init-req"), msg -> {}); + + Mockito.verify(mock).start(Mockito.any()); + stream.send(msg("s1")); + + Mockito.verify(mock, Mockito.never()).sendNext(Mockito.any()); + Mockito.verify(mock, Mockito.never()).close(); + Mockito.verify(mock, Mockito.never()).cancel(); + } + + @Test + public void nonSuccessMessageStopsStreamTest() { + CompletableFuture streamFuture = new CompletableFuture<>(); + MockedStream mock = buildMockedStream("token", streamFuture); + ArgumentCaptor> observer = buildObserver(); + + List received = new ArrayList<>(); + + TestStream stream = new TestStream(mock); + CompletableFuture result = stream.start(msg("init"), received::add); + + Mockito.verify(mock).start(observer.capture()); + + stream.send(msg("s1")); + observer.getValue().onNext(msg("r1")); + stream.send(msg("s2")); + observer.getValue().onNext(msg("r2")); + stream.send(msg("s3")); + observer.getValue().onNext(msg("fail")); + observer.getValue().onNext(msg("fail")); // no effect + + Assert.assertTrue(result.isDone()); + Mockito.verify(mock).close(); + Mockito.verify(mock, Mockito.never()).cancel(); + + Mockito.verify(mock).sendNext(msg("init")); + Mockito.verify(mock).sendNext(msg("s1")); + Mockito.verify(mock).sendNext(msg("s2")); + Assert.assertEquals(Arrays.asList(msg("r1"), msg("r2")), received); + } + + @Test + public void tokenUpdatesTest() { + CompletableFuture streamFuture = new CompletableFuture<>(); + MockedStream mock = Mockito.mock(MockedStream.class); + Mockito.when(mock.authToken()).thenReturn("t", "t", "t", "t2", "t2", "t3"); + Mockito.when(mock.start(Mockito.any())).thenReturn(streamFuture); + + TestStream stream = new TestStream(mock); + stream.start(msg("init"), msg -> {}); + + Mockito.verify(mock).start(Mockito.any()); + + stream.send(msg("s1")); + stream.send(msg("s2")); + stream.send(msg("s3")); + stream.send(msg("s4")); + stream.send(msg("s5")); + stream.send(msg("s6")); + + stream.close(); + + Mockito.verify(mock).close(); + Mockito.verify(mock, Mockito.never()).cancel(); + Mockito.verify(mock).sendNext(msg("init")); + Mockito.verify(mock).sendNext(msg("s1")); + Mockito.verify(mock).sendNext(msg("s2")); + Mockito.verify(mock).sendNext(msg("s3")); + Mockito.verify(mock).sendNext(msg("new-token:t2")); + Mockito.verify(mock).sendNext(msg("s4")); + Mockito.verify(mock).sendNext(msg("s5")); + Mockito.verify(mock).sendNext(msg("new-token:t3")); + Mockito.verify(mock).sendNext(msg("s6")); + } +} From d7c4eb8bbc1ff9062dff2548d4143ae83f85f167 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Tue, 14 Apr 2026 13:37:57 +0100 Subject: [PATCH 6/8] Fixed by copilot --- .../java/tech/ydb/topic/impl/DebugTools.java | 2 +- .../ydb/topic/impl/TopicRetryableStream.java | 22 ++++++++++++------ .../java/tech/ydb/topic/impl/TopicStream.java | 6 ++--- .../tech/ydb/topic/impl/DebugToolsTest.java | 19 +++++++++++++++ .../topic/impl/TopicRetryableStreamTest.java | 23 ++++++++++++++++++- 5 files changed, 60 insertions(+), 12 deletions(-) create mode 100644 topic/src/test/java/tech/ydb/topic/impl/DebugToolsTest.java diff --git a/topic/src/main/java/tech/ydb/topic/impl/DebugTools.java b/topic/src/main/java/tech/ydb/topic/impl/DebugTools.java index 3bfff236a..3f8b055fd 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/DebugTools.java +++ b/topic/src/main/java/tech/ydb/topic/impl/DebugTools.java @@ -8,7 +8,7 @@ */ public class DebugTools { private static final int ID_LENGTH = 6; - private static final char[] ID_ALPHABET = "abcdefghijklmnopqrstuvwxyzABSDEFGHIJKLMNOPQRSTUVWXYZ1234567890" + private static final char[] ID_ALPHABET = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890" .toCharArray(); private DebugTools() { } diff --git a/topic/src/main/java/tech/ydb/topic/impl/TopicRetryableStream.java b/topic/src/main/java/tech/ydb/topic/impl/TopicRetryableStream.java index c9b63716c..50c873c03 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/TopicRetryableStream.java +++ b/topic/src/main/java/tech/ydb/topic/impl/TopicRetryableStream.java @@ -23,6 +23,8 @@ public abstract class TopicRetryableStream private final AtomicInteger streamCount = new AtomicInteger(0); private final RetryState state = new RetryState(); + private volatile boolean isClosed = false; + public TopicRetryableStream(Logger logger, String debugId, RetryConfig config, ScheduledExecutorService scheduler) { this.debugId = debugId; this.logger = logger; @@ -44,6 +46,10 @@ public String toString() { protected abstract void onClose(Status status); public void start() { + if (isClosed) { + return; + } + String streamID = debugId + '.' + streamCount.incrementAndGet(); TopicStream stream = createNewStream(streamID); @@ -54,12 +60,13 @@ public void start() { } stream.start(getInitRequest(), this::onNext).whenComplete((status, th) -> { + realStream.compareAndSet(stream, null); if (status != null) { - onStreamStop(stream, status, retryConfig.getStatusRetryPolicy(status)); + onStreamStop(status, retryConfig.getStatusRetryPolicy(status)); } if (th != null) { Status wrapped = Status.of(StatusCode.CLIENT_INTERNAL_ERROR, th); - onStreamStop(stream, wrapped, retryConfig.getThrowableRetryPolicy(th)); + onStreamStop(wrapped, retryConfig.getThrowableRetryPolicy(th)); } }); } @@ -78,14 +85,15 @@ public void send(W msg) { } public void close() { + isClosed = true; TopicStream stream = realStream.getAndSet(null); if (stream != null) { stream.close(); } } - private void onStreamStop(TopicStream stream, Status status, RetryPolicy policy) { - if (!realStream.compareAndSet(stream, null)) { // stream was already closed (usually with success) + private void onStreamStop(Status status, RetryPolicy policy) { + if (isClosed) { // stream was already closed (usually with success) onClose(status); return; } @@ -99,13 +107,13 @@ private void onStreamStop(TopicStream stream, Status status, RetryPolicy p long nextRetryMs = state.nextRetryMs(policy); if (nextRetryMs < 0) { - logger.warn("{} stopped by retry policy {}", this, status); + logger.warn("{} stopped after retry policy evaluation for status {}", this, status); onClose(status); return; } - if (nextRetryMs == 0) { // retry immediatelly - logger.warn("{} retry #{}. Retry immediatelly...", this, state.retryNumber()); + if (nextRetryMs == 0) { // retry immediately + logger.warn("{} retry #{}. Retry immediately...", this, state.retryNumber()); onRetry(status); start(); return; diff --git a/topic/src/main/java/tech/ydb/topic/impl/TopicStream.java b/topic/src/main/java/tech/ydb/topic/impl/TopicStream.java index 882034ed5..d2421fc6b 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/TopicStream.java +++ b/topic/src/main/java/tech/ydb/topic/impl/TopicStream.java @@ -41,8 +41,8 @@ public CompletableFuture start(W initReq, Consumer messageHandler) { } } }).whenComplete((st, th) -> { - Status status = st != null ? st : Status.of(StatusCode.INTERNAL_ERROR, th); - logger.debug("[{}] finished with status {}", debugId, st); + Status status = st != null ? st : Status.of(StatusCode.CLIENT_INTERNAL_ERROR, th); + logger.debug("[{}] finished with status {}", debugId, status); streamStatus.complete(status); }); @@ -62,7 +62,7 @@ public void close() { public void send(W req) { if (streamStatus.isDone()) { - logger.warn("[{}] is already closed. Next message with type {} was NOT sent:", debugId, + logger.warn("[{}] is already closed. Next message with type {} was NOT sent", debugId, req.getDescriptorForType().getName()); return; } diff --git a/topic/src/test/java/tech/ydb/topic/impl/DebugToolsTest.java b/topic/src/test/java/tech/ydb/topic/impl/DebugToolsTest.java new file mode 100644 index 000000000..b4cdae337 --- /dev/null +++ b/topic/src/test/java/tech/ydb/topic/impl/DebugToolsTest.java @@ -0,0 +1,19 @@ +package tech.ydb.topic.impl; + +import org.junit.Assert; +import org.junit.Test; + +/** + * + * @author Aleksandr Gorshenin + */ +public class DebugToolsTest { + @Test + public void createDebugIdTest() { + Assert.assertEquals("custom-id", DebugTools.createDebugId("custom-id")); + + String newID = DebugTools.createDebugId(null); + Assert.assertNotNull(newID); + Assert.assertEquals(6, newID.length()); + } +} diff --git a/topic/src/test/java/tech/ydb/topic/impl/TopicRetryableStreamTest.java b/topic/src/test/java/tech/ydb/topic/impl/TopicRetryableStreamTest.java index 835b30054..2ef55a3b3 100644 --- a/topic/src/test/java/tech/ydb/topic/impl/TopicRetryableStreamTest.java +++ b/topic/src/test/java/tech/ydb/topic/impl/TopicRetryableStreamTest.java @@ -20,7 +20,7 @@ import tech.ydb.core.grpc.GrpcReadWriteStream; public class TopicRetryableStreamTest { - private static final Logger logger = LoggerFactory.getLogger(TopicStreamTest.class); + private static final Logger logger = LoggerFactory.getLogger(TopicRetryableStreamTest.class); private static final Empty EMPTY = Empty.getDefaultInstance(); /** @@ -143,6 +143,27 @@ public void doubleStartTest() { Mockito.verify(h2.grpc).close(); // h2 was closed immediately } + @Test + public void doubleCloseTest() { + StreamHandle h1 = new StreamHandle(); + TestStream retryable = new TestStream(Arrays.asList(h1), RetryConfig.noRetries(), mockScheduler()); + + retryable.start(); + + retryable.close(); + retryable.close(); + + Mockito.verify(h1.grpc).start(Mockito.any()); + Mockito.verify(h1.grpc).close(); + } + + @Test + public void startAfterCloseTest() { + TestStream retryable = new TestStream(Arrays.asList(), RetryConfig.noRetries(), mockScheduler()); + retryable.close(); + retryable.start(); // nothing + } + @Test public void sendBeforeStartIsIgnoredTest() { StreamHandle h = new StreamHandle(); From c369fc683092b648c1a1165c97a0c2b2ef8a3c8d Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Tue, 14 Apr 2026 14:11:55 +0100 Subject: [PATCH 7/8] Fixed data race on writer onInit --- .../main/java/tech/ydb/topic/write/impl/WriteSession.java | 3 ++- .../src/main/java/tech/ydb/topic/write/impl/WriterImpl.java | 5 ++++- .../src/main/java/tech/ydb/topic/write/impl/WriterQueue.java | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriteSession.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriteSession.java index 2e21a9c9a..16d31f3f3 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriteSession.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriteSession.java @@ -81,10 +81,11 @@ public void startAndInitialize() { private void onInitResponse(YdbTopic.StreamWriteMessage.InitResponse response) { long lastSeqNo = response.getLastSeqNo(); + writer.onInit(lastSeqNo); sessionId = response.getSessionId(); logger.info("[{}] Session with id {} (partition {}) initialized for topic \"{}\", lastSeqNo {}", streamId, sessionId, response.getPartitionId(), initRequest.getPath(), lastSeqNo); - writer.onInit(streamId, lastSeqNo); + writer.onStart(lastSeqNo); } // Shouldn't be called more than once at a time due to grpc guarantees diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java index f699030f2..85cf86012 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java @@ -142,10 +142,13 @@ protected void onShutdown(String reason) { } } - void onInit(String streamId, long lastSeqNo) { + void onInit(long lastSeqNo) { reconnectCounter.set(0); Iterator resend = writeQueue.updateSeqNo(lastSeqNo); session.sendAll(() -> resend.hasNext() ? resend.next() : null); + } + + void onStart(long lastSeqNo) { if (initResultFutureRef.get() != null) { initResultFutureRef.get().complete(new InitResult(lastSeqNo)); } diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriterQueue.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriterQueue.java index b4d38f40c..7c2b90459 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriterQueue.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriterQueue.java @@ -107,7 +107,7 @@ SentMessage nextMessageToSend() { } SentMessage sentMsg = new SentMessage(next, seqNo); - logger.debug("[{}] prepare sent message with seqNo {}", id, seqNo); + logger.trace("[{}] prepare sent message with seqNo {}", id, seqNo); sent.offer(sentMsg); return sentMsg; } From 2bcb0bcedcf2dce2229ffb0860c2dce3010d34c2 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Tue, 14 Apr 2026 14:27:54 +0100 Subject: [PATCH 8/8] Fixed flap test --- .../test/java/tech/ydb/topic/impl/SerialExecutorTest.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/topic/src/test/java/tech/ydb/topic/impl/SerialExecutorTest.java b/topic/src/test/java/tech/ydb/topic/impl/SerialExecutorTest.java index ee83b70cd..a60abca09 100644 --- a/topic/src/test/java/tech/ydb/topic/impl/SerialExecutorTest.java +++ b/topic/src/test/java/tech/ydb/topic/impl/SerialExecutorTest.java @@ -136,11 +136,14 @@ public void wrongExecuterTest() throws InterruptedException { @Test public void wrongTaskTest() throws InterruptedException { - CountDownLatch latch = new CountDownLatch(4); + CountDownLatch latch = new CountDownLatch(5); Queue problems = new ConcurrentLinkedQueue<>(); ExecutorService pool = Executors.newCachedThreadPool((task) -> { Thread t = new Thread(task); - t.setUncaughtExceptionHandler((th, ex) -> problems.add(ex)); + t.setUncaughtExceptionHandler((th, ex) -> { + problems.add(ex); + latch.countDown(); + }); return t; });