refactor(sync): cron-jobs followups — retention sweep, quota init, drop synchronous syncs#1308
refactor(sync): cron-jobs followups — retention sweep, quota init, drop synchronous syncs#1308absorbb wants to merge 9 commits into
Conversation
…nit, drop sync runs Three followups to the autonomous CronJob path (#1287): 1. Move source_task retention from per-/sources/run cleanup to an hourly sweep in syncctl. Most syncs no longer hit /sources/run after the autonomous-path rollout, so the per-request cleanup stopped firing for them. Single global DELETE with a window-function CTE preserves the exact "keep newest N rows OR last 60 days, whichever keeps more" semantics. Adds a (sync_id, started_at DESC) index so the partition ranking is index-only. 2. Drop runSyncSynchronously and the in-process mixpanel sync helpers. The only destinationType.syncs entry (Mixpanel) was already commented out in 871fa24, so this code path was dead. Removes lib/server/sync.ts helpers (createDatabaseLogger / createDatabaseStore / getImplemetingFunction / runSyncSynchronously) and lib/server/syncs/mixpanel.ts. Manual-trigger "already running" gate is simplified to a single condition (no more runSynchronously OR-clause). 3. New `quota-check` sidecar init container for the autonomous CronJob pod. Calls a new /api/admin/sync-quota-check endpoint on console which signs the EE JWT and forwards to the billing service — the EE private key stays in the console process. Runs right after lease-acquire so we fail fast before spending pod minutes. Fail-open on transport errors to match the existing in-process checkQuota posture. Bumps cronTemplateRevision to force re-patch of existing CronJobs. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…it container Saves one container start in the autonomous CronJob pod and keeps the two admission gates adjacent. New sidecar subcommand `admission` runs runQuotaCheck() then runLeaseAcquire() in one process; both helpers already handle their own os.Exit on terminal outcomes so composition is trivial. Original `quota-check` and `lease-acquire` subcommands remain callable for debugging. cronTemplateRevision bumped to 4. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
| t.Errorf("Unable to close stale tasks: %v", err) | ||
| } | ||
| case <-cleanupTicker.C: | ||
| deleted, err := db.CleanupTaskLogs(t.dbpool, 3000, 60*24*time.Hour) |
There was a problem hiding this comment.
CleanupTaskLogs is now hardcoded to 3000 rows / 60 days. Before this change, retention was configurable via SYNC_TASK_LOG_SIZE and SYNC_TASK_LOG_AGE (see webapps/console/lib/server/serverEnv.ts). This removes an existing operator control and can silently change retention behavior in installations that tuned those envs. Please plumb retention settings into sync-controller config (or another shared config source) instead of hardcoding here.
| Command: []string{"/app/sidecar", "admission"}, | ||
| Env: admissionEnv, | ||
| Resources: smallResources(), | ||
| }) |
There was a problem hiding this comment.
This currently does not compile: the admission init container literal closes with }), leaving an extra ). Running go test ./sync-controller fails with syntax error: unexpected ) in composite literal around this line.
| Name: "admission", | ||
| Image: c.config.SidecarImage, | ||
| Command: []string{"sleep", strconv.Itoa(jitterSec)}, | ||
| Command: []string{"/app/sidecar", "admission"}, |
There was a problem hiding this comment.
admission already executes runLeaseAcquire(), but this pod template still appends a separate lease-acquire init container below. That means lease acquire runs twice; the second call sees a fresh existing lease and returns false (leaseClient.Acquire treats non-stale lease as held), so the pod will fail during init after admission passed.
| USING cutoffs c | ||
| WHERE t.sync_id = c.sync_id | ||
| AND t.started_at < GREATEST( | ||
| COALESCE(c.nth_started_at, '-infinity'::timestamptz), |
There was a problem hiding this comment.
This changes retention semantics from the previous per-sync cleanup. With COALESCE(nth_started_at, '-infinity'), syncs that have fewer than keepPerSync rows will still delete rows older than maxAge. Previously those syncs retained all rows (NULL cutoff), which is what keep at least newest N rows per sync implies.
…p-sweep null guard - admission init now does jitter → quota-check → lease-acquire in one process. JITTER_MAX_SECONDS env (default 60s, set to 0 to disable) controls the deterministic sub-minute spread; value is plumbed through syncctl JitterMaxSeconds config so operators can override at deploy. - task_manager retention sweep no longer hardcodes 3000/60d. New config fields TaskLogKeepPerSync (SYNC_TASK_LOG_SIZE) and TaskLogMaxAgeDays (SYNC_TASK_LOG_AGE) mirror the previous console-side env knobs. - Cleanup SQL adds an `AND nth_started_at IS NOT NULL` guard so syncs with ≤ keepPerSync rows retain all their rows regardless of age — the COALESCE-to-(-infinity) shortcut silently broke that legacy guarantee. - Drops the stray `lease-acquire` init container that the merge from newjitsu left behind (admission already runs it); same edit fixes the unbalanced braces that broke the build. - Bumps cronTemplateRevision 4 → 5 so existing CronJobs get the new admission env on the next reconcile. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
| t.Errorf("Unable to close stale tasks: %v", err) | ||
| } | ||
| case <-cleanupTicker.C: | ||
| deleted, err := db.CleanupTaskLogs(t.dbpool, t.config.TaskLogKeepPerSync, time.Duration(t.config.TaskLogMaxAgeDays)*24*time.Hour) |
There was a problem hiding this comment.
Running CleanupTaskLogs inline in listenTaskStatus() blocks the same select loop that consumes TaskStatusChannel. This sweep can be expensive (global windowed delete), and while it runs, status updates back up; JobRunner.sendStatus drops updates after 5s when the channel is full (job_runner.go:216-221). That can lose status transitions and leave tasks stuck in stale states. Please move retention sweeping off this hot path (separate goroutine/worker) so status consumption stays continuous.
There was a problem hiding this comment.
One blocking issue: the new hourly retention sweep runs inline in the same loop that consumes task status updates. Because status publication is lossy under backpressure ( drops after 5s when the channel is full), a long-running cleanup query can cause missed status transitions and stale task states. Please move retention cleanup off that hot path (separate goroutine/worker or async trigger) and keep the status-consumer loop responsive.
…shed - successHistory 1 → 2 so the GKE UI / kubectl can show the most recent completed Job alongside the previous one. failureHistory left at 3. - New config JOB_TTL_SECONDS_AFTER_FINISHED (default 0 = disabled) sets Job.Spec.TTLSecondsAfterFinished on every reconciled Job. TTL is a ceiling, not a floor: it bounds how long a finished Job sticks around but cannot extend retention past successfulJobsHistoryLimit. Useful for orphan cleanup (when the parent CronJob is deleted) and for low-frequency syncs that don't naturally cycle out via history limit. - Bump cronTemplateRevision 5 → 6 so existing CronJobs pick up the new TTL field on the next reconcile. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…mp is enough TTL only sets a ceiling on Job lifetime, it can't extend retention past successfulJobsHistoryLimit. The history bump 1 → 2 already covers the "see more past runs" use case; the TTL knob added complexity without solving the underlying need. successHistory=2 and cronTemplateRevision bump are kept. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Reviewed the changes in this PR across sync-controller, sync-sidecar, and console API paths. I did not find actionable bugs, security issues, or correctness regressions in the diff. The admission init flow, quota-check endpoint/auth wiring, and retention migration look coherent.
| const msInMin = 1000 * 60; | ||
| if (ignoreRunning || (runSynchronously && Date.now() - running.updated_at.getTime() >= 2 * msInMin)) { | ||
| await dbLog({ | ||
| if (running && !ignoreRunning) { |
There was a problem hiding this comment.
When ignoreRunning=true, this branch now bypasses the early return but leaves the previous RUNNING row untouched. If that older task was stale/crashed, it remains RUNNING until the periodic stale sweep (up to ~1h+), so later manual runs without ignoreRunning keep failing with "Sync is already running" even though nothing is actually running.
Before this change, the ignoreRunning path marked the existing task as FAILED immediately. Please preserve that behavior (or otherwise clear stale RUNNING rows) when honoring ignoreRunning.
2dd10b9 to
2d15dc3
Compare
There was a problem hiding this comment.
Severity: critical.\n\n now depends on (), but is initialized only later in (). That means stays nil and () calls with a nil repository.\n\nConcrete impact: manual and sync-bound fail at runtime instead of starting the sync (nil repo path), so sync runs regress for this change set.\n\nPlease wire repository initialization before creating (or make fetch repo lazily and guard nil).
There was a problem hiding this comment.
Severity: critical.
TaskManager now depends on syncsRepo (bulker/sync-controller/task_manager.go:32), but syncsRepo is initialized only later in InitContext (bulker/sync-controller/app.go:42-50). That means t.syncsRepo stays nil and resolveSyncEntry (task_manager.go:76) calls WaitForSyncEntry with a nil repository.
Concrete impact: manual /read and sync-bound /discover fail at runtime instead of starting the sync (nil repo path), so sync runs regress for this change set.
Please wire repository initialization before creating TaskManager (or make TaskManager fetch repo lazily and guard nil).
There was a problem hiding this comment.
Found 2 critical issues in this diff that can cause user-visible task orchestration failures, so this should stay blocked until fixed.
ignoreRunningno longer clears staleRUNNINGrows before launching a manual run, which can leave old crashed tasks inRUNNINGand make later manual runs fail withSync is already running.- The hourly
CleanupTaskLogssweep now runs inline in the same loop that consumesTaskStatusChannel; when the sweep is slow, status updates back up andsendStatusdrops updates after 5s, which can leave task state stale/incorrect.
| const msInMin = 1000 * 60; | ||
| if (ignoreRunning || (runSynchronously && Date.now() - running.updated_at.getTime() >= 2 * msInMin)) { | ||
| await dbLog({ | ||
| if (running && !ignoreRunning) { |
There was a problem hiding this comment.
Critical regression: when ignoreRunning=true, this now just bypasses the early return and leaves the previous RUNNING row untouched. If that row is stale/crashed, it can continue to block future manual runs (Sync is already running) until stale cleanup eventually closes it. The previous behavior explicitly failed the old row before proceeding; we should preserve that (or otherwise clear stale RUNNING rows here) when honoring ignoreRunning.
| t.Errorf("Unable to close stale tasks: %v", err) | ||
| } | ||
| case <-cleanupTicker.C: | ||
| deleted, err := db.CleanupTaskLogs(t.dbpool, t.config.TaskLogKeepPerSync, time.Duration(t.config.TaskLogMaxAgeDays)*24*time.Hour) |
There was a problem hiding this comment.
Critical: this retention sweep runs inline on the same goroutine/select loop that drains TaskStatusChannel. CleanupTaskLogs is a global windowed delete and can be slow; while it runs, status messages queue and job_runner.sendStatus drops updates once blocked for 5s (job_runner.go:216-221). That can lose state transitions and leave source_task rows stale. Please move the sweep off this hot path (separate goroutine/worker/ticker) so status consumption remains continuous.
Summary
Three followups to the autonomous CronJob path landed in #1287.
1. Move
source_taskretention from per-request to syncctl tickerThe legacy
cleanupTasksLogsran on every/sources/runhit. With autonomous CronJobs most syncs no longer touch that endpoint, so retention stopped firing for them. Replaced with an hourly sweep insidetask_manager.listenTaskStatus.2. Drop the synchronous in-process sync path
The only `destinationType.syncs` entry (Mixpanel) was already commented out in 871fa24, so `runSyncSynchronously` was dead code.
Removed:
3. New `quota-check` sidecar init container
Autonomous CronJob pods previously bypassed the EE quota check that the legacy `scheduleSync` ran inline.
EE private key stays in the console process — sidecars only need the shared HTTP bearer.
Test plan
🤖 Generated with Claude Code