Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ void testBrokenSnapshotRecovery(@TempDir File snapshotKvTabletDir) throws Except
putRecordsToLeader(kvReplica, kvRecords);

// trigger first snapshot
scheduledExecutorService.triggerNonPeriodicScheduledTask();
triggerSnapshotTaskWithRetry(scheduledExecutorService, 5);
kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 0);

// put more data and create second snapshot
Expand All @@ -514,7 +514,7 @@ void testBrokenSnapshotRecovery(@TempDir File snapshotKvTabletDir) throws Except
putRecordsToLeader(kvReplica, kvRecords);

// trigger second snapshot
scheduledExecutorService.triggerNonPeriodicScheduledTask();
triggerSnapshotTaskWithRetry(scheduledExecutorService, 5);
kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 1);

// put more data and create third snapshot (this will be the broken one)
Expand All @@ -525,7 +525,7 @@ void testBrokenSnapshotRecovery(@TempDir File snapshotKvTabletDir) throws Except
putRecordsToLeader(kvReplica, kvRecords);

// trigger third snapshot
scheduledExecutorService.triggerNonPeriodicScheduledTask();
triggerSnapshotTaskWithRetry(scheduledExecutorService, 5);
CompletedSnapshot snapshot2 = kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 2);

// verify that snapshot2 is the latest one before we break it
Expand Down Expand Up @@ -766,4 +766,22 @@ public void reset() {
isScheduled = false;
}
}

/** A helper function with support for retries for flaky triggering operations. */
private static void triggerSnapshotTaskWithRetry(
ManuallyTriggeredScheduledExecutorService scheduledExecutorService, int maxRetries)
throws Exception {
for (int i = 0; i < maxRetries; i++) {
try {
scheduledExecutorService.triggerNonPeriodicScheduledTask();
return;
} catch (java.util.NoSuchElementException e) {
if (i == maxRetries - 1) {
throw e;
}

Thread.sleep(50);
}
}
}
}