Skip to content

Commit 9397ec2

Browse files
committed
fixup: race condition with FATAL
Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
1 parent 39b59ea commit 9397ec2

7 files changed

Lines changed: 157 additions & 108 deletions

File tree

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ private void onFatal() {
287287
if (errorTask != null && !errorTask.isCancelled()) {
288288
errorTask.cancel(false);
289289
}
290+
this.syncResources.setFatal(true);
290291

291292
this.emitProviderError(ProviderEventDetails.builder()
292293
.errorCode(ErrorCode.PROVIDER_FATAL)

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProviderSyncResources.java

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import dev.openfeature.sdk.EvaluationContext;
44
import dev.openfeature.sdk.ImmutableContext;
55
import dev.openfeature.sdk.ProviderEvent;
6+
import dev.openfeature.sdk.exceptions.FatalError;
67
import dev.openfeature.sdk.exceptions.GeneralError;
78
import lombok.Getter;
89
import lombok.Setter;
@@ -16,8 +17,11 @@ class FlagdProviderSyncResources {
1617
@Setter
1718
private volatile ProviderEvent previousEvent = null;
1819

20+
@Setter
21+
private volatile boolean isFatal;
22+
1923
private volatile EvaluationContext enrichedContext = new ImmutableContext();
20-
private volatile boolean initialized;
24+
private volatile boolean isInitialized;
2125
private volatile boolean isShutDown;
2226

2327
public void setEnrichedContext(EvaluationContext context) {
@@ -31,32 +35,40 @@ public void setEnrichedContext(EvaluationContext context) {
3135
* @return true iff this was the first call to {@code initialize()}
3236
*/
3337
public synchronized boolean initialize() {
34-
if (this.initialized) {
38+
if (this.isInitialized) {
3539
return false;
3640
}
37-
this.initialized = true;
41+
this.isInitialized = true;
42+
this.isFatal = false;
3843
this.notifyAll();
3944
return true;
4045
}
4146

4247
/**
43-
* Blocks the calling thread until either {@link FlagdProviderSyncResources#initialize()} or
44-
* {@link FlagdProviderSyncResources#shutdown()} is called or the deadline is exceeded, whatever happens first. If
45-
* {@link FlagdProviderSyncResources#initialize()} has been executed before {@code waitForInitialization(long)} is
46-
* called, it will return instantly. If the deadline is exceeded, a GeneralError will be thrown.
47-
* If {@link FlagdProviderSyncResources#shutdown()} is called in the meantime, an {@link IllegalStateException} will
48+
* Blocks the calling thread until either
49+
* {@link FlagdProviderSyncResources#initialize()} or
50+
* {@link FlagdProviderSyncResources#shutdown()} is called or the deadline is
51+
* exceeded, whatever happens first. If
52+
* {@link FlagdProviderSyncResources#initialize()} has been executed before
53+
* {@code waitForInitialization(long)} is
54+
* called, it will return instantly. If the deadline is exceeded, a GeneralError
55+
* will be thrown.
56+
* If {@link FlagdProviderSyncResources#shutdown()} is called in the meantime,
57+
* an {@link IllegalStateException} will
4858
* be thrown. Otherwise, the method will return cleanly.
4959
*
5060
* @param deadline the maximum time in ms to wait
51-
* @throws GeneralError when the deadline is exceeded before
52-
* {@link FlagdProviderSyncResources#initialize()} is called on this object
53-
* @throws IllegalStateException when {@link FlagdProviderSyncResources#shutdown()} is called or has been called on
54-
* this object
61+
* @throws GeneralError when the deadline is exceeded before
62+
* {@link FlagdProviderSyncResources#initialize()} is
63+
* called on this object, or when
64+
* {@link FlagdProviderSyncResources#shutdown()}
65+
* @throws FatalError when the provider has been marked as fatal during
66+
* shutdown
5567
*/
5668
public void waitForInitialization(long deadline) {
5769
long start = System.currentTimeMillis();
5870
long end = start + deadline;
59-
while (!initialized && !isShutDown) {
71+
while (!isInitialized && !isShutDown) {
6072
long now = System.currentTimeMillis();
6173
// if wait(0) is called, the thread would wait forever, so we abort when this would happen
6274
if (now >= end) {
@@ -68,7 +80,7 @@ public void waitForInitialization(long deadline) {
6880
if (isShutDown) {
6981
break;
7082
}
71-
if (initialized) { // might have changed in the meantime
83+
if (isInitialized) { // might have changed in the meantime
7284
return;
7385
}
7486
try {
@@ -80,7 +92,11 @@ public void waitForInitialization(long deadline) {
8092
}
8193
}
8294
if (isShutDown) {
83-
throw new IllegalStateException("Already shut down");
95+
String msg = "Already shut down due to previous error.";
96+
if (isFatal) {
97+
throw new FatalError(msg);
98+
}
99+
throw new GeneralError(msg);
84100
}
85101
}
86102

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -159,14 +159,14 @@ public BlockingQueue<QueuePayload> getStreamQueue() {
159159
* @throws InterruptedException if stream can't be closed within deadline.
160160
*/
161161
public void shutdown() throws InterruptedException {
162+
// Do not enqueue errors from here, as this method can be called externally, causing multiple shutdown signals
162163
// Use atomic compareAndSet to ensure shutdown is only executed once
163164
// This prevents race conditions when shutdown is called from multiple threads
164165
if (!shutdown.compareAndSet(false, true)) {
165166
log.debug("Shutdown already in progress or completed");
166167
return;
167168
}
168169

169-
enqueue(QueuePayload.SHUTDOWN);
170170
grpcComponents.channelConnector.shutdown();
171171
}
172172

@@ -195,10 +195,11 @@ private void observeSyncStream() {
195195
observer.metadata = getMetadata();
196196
} catch (StatusRuntimeException metaEx) {
197197
if (fatalStatusCodes.contains(metaEx.getStatus().getCode().name())) {
198-
log.debug(
198+
log.info(
199199
"Fatal status code for metadata request: {}, not retrying",
200200
metaEx.getStatus().getCode());
201201
shutdown();
202+
enqueue(QueuePayload.SHUTDOWN);
202203
} else {
203204
// retry for other status codes
204205
String message = metaEx.getMessage();
@@ -213,11 +214,12 @@ private void observeSyncStream() {
213214
syncFlags(observer);
214215
handleObserverError(observer);
215216
} catch (StatusRuntimeException ex) {
216-
if (fatalStatusCodes.contains(ex.getStatus().getCode().toString())) {
217-
log.debug(
217+
if (fatalStatusCodes.contains(ex.getStatus().getCode().name())) {
218+
log.info(
218219
"Fatal status code during sync stream: {}, not retrying",
219220
ex.getStatus().getCode());
220221
shutdown();
222+
enqueue(QueuePayload.SHUTDOWN);
221223
} else {
222224
// retry for other status codes
223225
log.error("Unexpected sync stream exception, will restart.", ex);

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderSyncResourcesTest.java

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package dev.openfeature.contrib.providers.flagd;
22

3+
import dev.openfeature.sdk.exceptions.FatalError;
34
import dev.openfeature.sdk.exceptions.GeneralError;
45
import java.util.concurrent.atomic.AtomicBoolean;
56
import java.util.concurrent.atomic.AtomicLong;
@@ -111,14 +112,47 @@ void callingInitialize_wakesUpWaitingThread() throws InterruptedException {
111112

112113
@Timeout(2)
113114
@Test
114-
void callingShutdown_wakesUpWaitingThreadWithException() throws InterruptedException {
115+
void callingShutdownWithPreviousNonFatal_wakesUpWaitingThread_WithGeneralException() throws InterruptedException {
115116
final AtomicBoolean isWaiting = new AtomicBoolean();
116117
final AtomicBoolean successfulTest = new AtomicBoolean();
118+
flagdProviderSyncResources.setFatal(false);
119+
117120
Thread waitingThread = new Thread(() -> {
118121
long start = System.currentTimeMillis();
119122
isWaiting.set(true);
120-
Assertions.assertThrows(
121-
IllegalStateException.class, () -> flagdProviderSyncResources.waitForInitialization(10000));
123+
Assertions.assertThrows(GeneralError.class, () -> flagdProviderSyncResources.waitForInitialization(10000));
124+
125+
long end = System.currentTimeMillis();
126+
long duration = end - start;
127+
var wait = MAX_TIME_TOLERANCE * 3;
128+
successfulTest.set(duration < wait);
129+
});
130+
waitingThread.start();
131+
132+
while (!isWaiting.get()) {
133+
Thread.yield();
134+
}
135+
136+
Thread.sleep(MAX_TIME_TOLERANCE); // waitingThread should have started waiting in the meantime
137+
138+
flagdProviderSyncResources.shutdown();
139+
140+
waitingThread.join();
141+
142+
Assertions.assertTrue(successfulTest.get());
143+
}
144+
145+
@Timeout(2)
146+
@Test
147+
void callingShutdownWithPreviousFatal_wakesUpWaitingThread_WithFatalException() throws InterruptedException {
148+
final AtomicBoolean isWaiting = new AtomicBoolean();
149+
final AtomicBoolean successfulTest = new AtomicBoolean();
150+
flagdProviderSyncResources.setFatal(true);
151+
152+
Thread waitingThread = new Thread(() -> {
153+
long start = System.currentTimeMillis();
154+
isWaiting.set(true);
155+
Assertions.assertThrows(FatalError.class, () -> flagdProviderSyncResources.waitForInitialization(10000));
122156

123157
long end = System.currentTimeMillis();
124158
long duration = end - start;

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import static io.restassured.RestAssured.when;
44
import static org.assertj.core.api.Assertions.assertThat;
5-
import static org.awaitility.Awaitility.await;
65

76
import dev.openfeature.contrib.providers.flagd.Config;
87
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
@@ -24,7 +23,6 @@
2423
import java.nio.file.Path;
2524
import java.nio.file.Paths;
2625
import java.time.Duration;
27-
import java.util.concurrent.TimeUnit;
2826
import lombok.extern.slf4j.Slf4j;
2927
import org.apache.commons.io.FileUtils;
3028
import org.apache.commons.lang3.RandomStringUtils;
@@ -202,9 +200,6 @@ public void the_flag_was_modded() {
202200

203201
@Then("the client should be in {} state")
204202
public void the_client_should_be_in_fatal_state(String clientState) {
205-
await().pollDelay(100, TimeUnit.MILLISECONDS)
206-
.atMost(1000, TimeUnit.MILLISECONDS)
207-
.untilAsserted(() -> assertThat(state.client.getProviderState())
208-
.isEqualTo(ProviderState.valueOf(clientState.toUpperCase())));
203+
assertThat(state.client.getProviderState()).isEqualTo(ProviderState.valueOf(clientState.toUpperCase()));
209204
}
210205
}

0 commit comments

Comments
 (0)