Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b96f558
feat(phase4): add retry infrastructure to EventPipeline
didiergarcia Mar 11, 2026
ce25d7f
test(phase4): add Step 1 verification tests
didiergarcia Mar 11, 2026
4d66c96
feat(phase4): add upload gate to skip uploads when rate-limited
didiergarcia Mar 11, 2026
ca347d6
feat(phase4): add per-batch retry logic in upload loop
didiergarcia Mar 12, 2026
8683777
feat: Phase 4 - EventPipeline integration with smart retry system
didiergarcia Mar 12, 2026
68da12a
refactor: Rename EventPipelinePhase4Test to EventPipelineRetryInfrast…
didiergarcia Mar 13, 2026
80dbe31
feat: Add timeProvider parameter to EventPipeline for testability
didiergarcia Mar 16, 2026
f6291ac
test: Add retry chain tests using FakeTimeProvider
didiergarcia Mar 16, 2026
c87da75
Merge branch 'main' into feat/tapi-retry-phase4-pipeline-integration
didiergarcia Mar 24, 2026
ec8132f
Fix SDK bugs found by e2e retry tests (#302)
MichaelGHSeg Mar 24, 2026
19de1c6
fix: Reduce e2e-cli initial delay from 5s to 500ms
didiergarcia Mar 25, 2026
4f78189
fix: Increase e2e-cli initial delay to 2s for settings error tolerance
didiergarcia Mar 25, 2026
eb8028c
fix: Use adaptive polling (1.5s initial + 200ms intervals) for e2e ti…
didiergarcia Mar 25, 2026
6c5bfb3
fix: Remove fixed initial delay, use aggressive polling from start
didiergarcia Mar 25, 2026
da14a34
fix: Revert to 1.2s initial delay for SDK initialization
didiergarcia Mar 25, 2026
9ef871b
fix: Reduce initial delay to 900ms to meet <1s timing requirement
didiergarcia Mar 25, 2026
fd7d1c6
fix: Use 2s HTTP timeouts for fast settings failure recovery
didiergarcia Mar 25, 2026
998d230
fix: Track batch file lifecycle instead of assuming empty means done
MichaelGHSeg Mar 25, 2026
3f42d7f
feat: read httpConfig from CDN settings and fix retry enforcement (#304)
MichaelGHSeg Mar 27, 2026
3ee672c
test: Fix HttpConfigTest to use lowercase RetryBehavior values
didiergarcia Mar 30, 2026
de79472
fix: test isolation and timing flakiness (#305)
MichaelGHSeg Mar 30, 2026
d3ffcef
Adding more detailed logging to the tests to help with debugging test…
MichaelGHSeg Apr 1, 2026
31fb465
Fix CI test failures by updating WaitingPlugin to use setup instead o…
MichaelGHSeg Apr 1, 2026
788536b
fix: update waiting plugin test to use two manual resumes as automati…
MichaelGHSeg Apr 1, 2026
391b2db
Setting test timeout to investigate hanging test
MichaelGHSeg Apr 2, 2026
fba8a2c
fix: address CI failures in WaitingTests and e2e 400 status test
MichaelGHSeg Apr 15, 2026
c5d8201
fix: use analyticsDispatcher for StartupQueue state subscription
MichaelGHSeg Apr 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@ compileKotlin {

test {
useJUnitPlatform()

if (System.getenv("CI") != null) {
// Prevent indefinite hangs in CI and surface the exact timed-out test.
systemProperty "junit.jupiter.execution.timeout.default", "2 m"
}

testLogging {
events "failed", "skipped"
exceptionFormat "full"
showExceptions true
showCauses true
showStackTraces true
}
}

dependencies {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.segment.analytics.kotlin.core
import com.segment.analytics.kotlin.core.Constants.DEFAULT_API_HOST
import com.segment.analytics.kotlin.core.Constants.DEFAULT_CDN_HOST
import com.segment.analytics.kotlin.core.platform.policies.FlushPolicy
import com.segment.analytics.kotlin.core.retry.HttpConfig
import com.segment.analytics.kotlin.core.utilities.ConcreteStorageProvider
import kotlinx.coroutines.*
import sovran.kotlin.Store
Expand All @@ -21,6 +22,7 @@ import sovran.kotlin.Store
* @property defaultSettings Settings object that will be used as fallback in case of network failure, defaults to empty
* @property autoAddSegmentDestination automatically add SegmentDestination plugin, defaults to `true`
* @property apiHost set a default apiHost to which Segment sends events, defaults to `api.segment.io/v1`
* @property httpConfig HTTP retry configuration for rate limiting and exponential backoff, defaults to `null` (legacy mode)
*/
data class Configuration(
val writeKey: String,
Expand All @@ -38,7 +40,8 @@ data class Configuration(
var apiHost: String = DEFAULT_API_HOST,
var cdnHost: String = DEFAULT_CDN_HOST,
var requestFactory: RequestFactory = RequestFactory(),
var errorHandler: ErrorHandler? = null
var errorHandler: ErrorHandler? = null,
var httpConfig: HttpConfig? = null
) {
fun isValid(): Boolean {
return writeKey.isNotBlank() && application != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ object Telemetry: Subscriber {
errorHandler?.invoke(e)
if (e.responseCode == 429) {
val headers = e.responseHeaders
val rateLimit = headers["Retry-After"]?.firstOrNull()?.toLongOrNull()
val rateLimit = headers["retry-after"]?.firstOrNull()?.toLongOrNull()
if (rateLimit != null) {
rateLimitEndTime = rateLimit + (System.currentTimeMillis() / 1000)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.segment.analytics.kotlin.core.platform.plugins.logger.LogKind
import com.segment.analytics.kotlin.core.platform.plugins.logger.log
import com.segment.analytics.kotlin.core.platform.plugins.logger.segmentLog
import com.segment.analytics.kotlin.core.platform.policies.FlushPolicy
import com.segment.analytics.kotlin.core.retry.*
import com.segment.analytics.kotlin.core.utilities.EncodeDefaultsJson
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
Expand All @@ -22,7 +23,9 @@ open class EventPipeline(
private val logTag: String,
apiKey: String,
private val flushPolicies: List<FlushPolicy>,
var apiHost: String = Constants.DEFAULT_API_HOST
var apiHost: String = Constants.DEFAULT_API_HOST,
private val httpConfig: HttpConfig? = null,
private val timeProvider: TimeProvider = SystemTimeProvider()
) {

private var writeChannel: Channel<BaseEvent>
Expand All @@ -39,6 +42,12 @@ open class EventPipeline(

protected open val networkIODispatcher get() = analytics.networkIODispatcher

// Retry state machine for smart retry logic
private var retryStateMachine: RetryStateMachine
private var retryState: RetryState



var running: Boolean
private set

Expand All @@ -53,6 +62,23 @@ open class EventPipeline(

writeChannel = Channel(UNLIMITED)
uploadChannel = Channel(UNLIMITED)

// Initialize retry state machine with config (or defaults if null)
// Convert HttpConfig to RetryConfig (they have the same structure)
val retryConfig = httpConfig?.let {
RetryConfig(
rateLimitConfig = it.rateLimitConfig,
backoffConfig = it.backoffConfig
)
} ?: RetryConfig()

retryStateMachine = RetryStateMachine(
retryConfig,
timeProvider
)

// Load persisted retry state (or start with defaults)
retryState = storage.loadRetryState()
}

fun put(event: BaseEvent) {
Expand Down Expand Up @@ -87,6 +113,18 @@ open class EventPipeline(
unschedule()
}

/**
* Update the retry configuration from CDN settings.
* Recreates the RetryStateMachine with the new config while preserving retry state.
*/
fun updateHttpConfig(newConfig: HttpConfig) {
val retryConfig = RetryConfig(
rateLimitConfig = newConfig.rateLimitConfig,
backoffConfig = newConfig.backoffConfig
)
retryStateMachine = RetryStateMachine(retryConfig, timeProvider)
}

open fun stringifyBaseEvent(payload: BaseEvent): String {
val finalPayload = EncodeDefaultsJson.encodeToJsonElement(payload)
.jsonObject.filterNot { (k, v) ->
Expand Down Expand Up @@ -129,13 +167,74 @@ open class EventPipeline(
storage.rollover()
}

// Upload Gate - Check if pipeline is rate-limited
val currentTime = timeProvider.currentTimeMillis()
if (retryState.isRateLimited(currentTime)) {
analytics.log("$logTag skipping uploads: pipeline is rate-limited until ${retryState.waitUntilTime}")
return@consumeEach // Skip all uploads for this flush
}

// Clear RATE_LIMITED state if wait time has passed
val waitTime = retryState.waitUntilTime
if (retryState.pipelineState == PipelineState.RATE_LIMITED &&
waitTime != null &&
currentTime >= waitTime) {
retryState = retryState.copy(
pipelineState = PipelineState.READY,
waitUntilTime = null
)
}
val fileUrlList = parseFilePaths(storage.read(Storage.Constants.Events))
for (url in fileUrlList) {
// Load batch metadata and check if we should upload
val (decision, updatedState) = retryStateMachine.shouldUploadBatch(
retryState,
url
)
retryState = updatedState

// Check if we should skip this batch
when (decision) {
UploadDecision.SkipAllBatches -> {
// This shouldn't happen here (caught by upload gate), but handle it
analytics.log("$logTag skipping remaining uploads")
break // Stop processing remaining files
}
UploadDecision.SkipThisBatch -> {
analytics.log("$logTag skipping batch $url: not ready for retry")
continue // Skip this file, continue with next
}
is UploadDecision.DropBatch -> {
// Batch exceeded retry limits - delete it
val reason = decision.reason
analytics.log("$logTag dropping batch $url: $reason")
analytics.reportInternalError(
Exception("Batch dropped: $reason")
)
storage.removeFile(url)
continue
}
UploadDecision.Proceed -> {
// Continue with upload
}
}
// Get retry count for X-Retry-Count header
val retryCount = retryStateMachine.getRetryCount(retryState, url)

// upload event file
var shouldCleanup = true
var statusCode = 0
var retryAfterSeconds: Int? = null

storage.readAsStream(url)?.use { data ->
try {
val connection = httpClient.upload(apiHost)

// Add X-Retry-Count header (only on retries, not first attempt)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it hurts anything, but I had asked Val about whether we should just always include it even when it's zero as a way to distinguish new non-retries from old retires-and-non-retries. Buuut if we're going to be moving to v2 anyway, that makes it entirely moot. The test will probably need to be changed to look for no retry count = 0 and I can fix the other SDKs.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be consistent across the libraries. We can just always add it or do it only after the first one?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's go with only after first attempt, so no x-retry-count=0. Neater, less traffic most of the time.

if (retryCount > 0) {
connection.connection.setRequestProperty("X-Retry-Count", retryCount.toString())
}

connection.outputStream?.let {
// Write the payloads into the OutputStream
data.copyTo(connection.outputStream)
Expand All @@ -144,21 +243,43 @@ open class EventPipeline(
// Upload the payloads.
connection.close()
}
// Cleanup uploaded payloads

// Success!
statusCode = 200
analytics.log("$logTag uploaded $url")
} catch (e: Exception) {
analytics.reportInternalError(e)

// Extract status code and retry-after from exception
if (e is HTTPException) {
statusCode = e.responseCode
retryAfterSeconds = (e.responseHeaders["retry-after"])?.firstOrNull()?.toIntOrNull()
}

shouldCleanup = handleUploadException(e, url)
}
}

// Update retry state based on response
val responseInfo = ResponseInfo(
statusCode = if (statusCode > 0) statusCode else 500, // Default to 500 for unknown errors
retryAfterSeconds = retryAfterSeconds,
batchFile = url,
currentTime = timeProvider.currentTimeMillis()
)
retryState = retryStateMachine.handleResponse(retryState, responseInfo)

// Persist updated retry state
withContext(fileIODispatcher) {
storage.saveRetryState(retryState)
}

if (shouldCleanup) {
storage.removeFile(url)
}
}
}
}

private fun schedule() {
flushPolicies.forEach { it.schedule(analytics) }
}
Expand All @@ -173,12 +294,15 @@ open class EventPipeline(
var shouldCleanup = false
if (e is HTTPException) {
analytics.log("$logTag exception while uploading, ${e.message}")
if (e.is4xx() && e.responseCode != 429) {
if (retryStateMachine.shouldDeleteBatch(e.responseCode)) {
// Simply log and proceed to remove the rejected payloads from the queue.
Analytics.segmentLog(
message = "Payloads were rejected by server. Marked for removal.",
kind = LogKind.ERROR
)
analytics.reportInternalError(
Exception("Batch dropped due to non-retryable HTTP ${e.responseCode}")
)
shouldCleanup = true
} else {
Analytics.segmentLog(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import com.segment.analytics.kotlin.core.platform.policies.FrequencyFlushPolicy
import com.segment.analytics.kotlin.core.retry.HttpConfig
import kotlinx.coroutines.launch
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.jsonObject
import kotlinx.serialization.json.jsonPrimitive
import kotlinx.serialization.json.booleanOrNull
import sovran.kotlin.Subscriber

@Serializable
Expand Down Expand Up @@ -85,7 +88,8 @@ class SegmentDestination: DestinationPlugin(), VersionedPlugin, Subscriber {
key,
configuration.writeKey,
flushPolicies,
configuration.apiHost
configuration.apiHost,
configuration.httpConfig
)

analyticsScope.launch(analyticsDispatcher) {
Expand All @@ -102,10 +106,33 @@ class SegmentDestination: DestinationPlugin(), VersionedPlugin, Subscriber {
override fun update(settings: Settings, type: Plugin.UpdateType) {
super.update(settings, type)
if (settings.hasIntegrationSettings(this)) {
// only populate the apiHost value if it exists
settings.destinationSettings<SegmentSettings>(key)?.apiHost?.let {
val segmentSettings = settings.destinationSettings<SegmentSettings>(key)

// Update apiHost if it exists
segmentSettings?.apiHost?.let {
pipeline?.apiHost = it
}

// Read httpConfig from CDN settings and apply to pipeline
segmentSettings?.httpConfig?.let { cdnConfig ->
// CDN-sourced config defaults enabled to true (presence implies active).
// Only honor explicit enabled: false from CDN.
val rawJson = settings.integrations[key]?.jsonObject
val httpConfigJson = rawJson?.get("httpConfig")?.jsonObject

val rlEnabled = httpConfigJson?.get("rateLimitConfig")?.jsonObject
?.get("enabled")?.jsonPrimitive?.booleanOrNull
val boEnabled = httpConfigJson?.get("backoffConfig")?.jsonObject
?.get("enabled")?.jsonPrimitive?.booleanOrNull

val adjustedConfig = HttpConfig(
rateLimitConfig = cdnConfig.rateLimitConfig.copy(enabled = rlEnabled ?: true),
backoffConfig = cdnConfig.backoffConfig.copy(enabled = boEnabled ?: true)
)

analytics.configuration.httpConfig = adjustedConfig
pipeline?.updateHttpConfig(adjustedConfig)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class StartupQueue : Plugin, Subscriber {
subscriber = this@StartupQueue,
stateClazz = System::class,
initialState = true,
queue = analyticsDispatcher,
handler = this@StartupQueue::runningUpdate
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,42 @@ import kotlinx.serialization.encoding.Decoder
import kotlinx.serialization.encoding.Encoder
import kotlinx.serialization.json.*

/**
* Retry configuration for smart retry logic.
* Both rate limiting and exponential backoff default to disabled (legacy mode).
*/
@Serializable
data class RetryConfig(
val rateLimitConfig: RateLimitConfig = RateLimitConfig(),
/**
* Configuration for rate limiting (429 responses).
* Default: disabled for backward compatibility.
*/
val backoffConfig: BackoffConfig = BackoffConfig()
)

@Serializable
data class RateLimitConfig(
val enabled: Boolean = true,
val enabled: Boolean = false,
val maxRetryCount: Int = 100,
val maxRetryInterval: Int = 300,
val maxRateLimitDuration: Long = 43200
val maxRetryInterval: Int = 300
) {
/**
* Validate and clamp all numeric values to safe ranges.
*/
fun validated(): RateLimitConfig = copy(
maxRetryCount = maxRetryCount.coerceIn(0, 1000),
maxRetryInterval = maxRetryInterval.coerceIn(1, 3600),
maxRateLimitDuration = maxRateLimitDuration.coerceIn(0, 604800)
maxRetryInterval = maxRetryInterval.coerceIn(1, 3600)
)
/**
* Configuration for exponential backoff on retryable errors.
* Default: disabled for backward compatibility.
*/
}

@Serializable
data class BackoffConfig(
val enabled: Boolean = true,
val enabled: Boolean = false,
val maxRetryCount: Int = 100,
val baseBackoffInterval: Double = 0.5,
val maxBackoffInterval: Int = 300,
Expand Down
Loading
Loading