Skip to content

refactor(sync): cron-jobs followups — retention sweep, quota init, drop synchronous syncs#1308

Open
absorbb wants to merge 9 commits into
newjitsufrom
cron_jobs_followups
Open

refactor(sync): cron-jobs followups — retention sweep, quota init, drop synchronous syncs#1308
absorbb wants to merge 9 commits into
newjitsufrom
cron_jobs_followups

Conversation

@absorbb
Copy link
Copy Markdown
Contributor

@absorbb absorbb commented May 19, 2026

Summary

Three followups to the autonomous CronJob path landed in #1287.

1. Move source_task retention from per-request to syncctl ticker

The legacy cleanupTasksLogs ran on every /sources/run hit. With autonomous CronJobs most syncs no longer touch that endpoint, so retention stopped firing for them. Replaced with an hourly sweep inside task_manager.listenTaskStatus.

  • New SQL in `bulker/sync-sidecar/db/db.go` (`CleanupTaskLogs`): single global DELETE with a window-function CTE that preserves the original semantics — keep newest N rows OR rows from the last 60 days per sync, whichever set is larger. `WHERE rn <= N+1` lets Postgres short-circuit the ranking once the boundary is found.
  • New `@@index([sync_id, started_at(sort: Desc)])` on `source_task` so the ranking is index-only.
  • Dropped console-side `cleanupTasksLogs` and the `finally` block in `run.ts`.

2. Drop the synchronous in-process sync path

The only `destinationType.syncs` entry (Mixpanel) was already commented out in 871fa24, so `runSyncSynchronously` was dead code.

Removed:

  • `runSyncSynchronously`, `getImplemetingFunction`, `createDatabaseLogger`, `createDatabaseStore`, `SaasSyncState` from `lib/server/sync.ts`
  • `lib/server/syncs/mixpanel.ts` entirely
  • The `runSynchronously` OR-clause in the manual-trigger admission gate
  • Now-unused imports (`SyncFunction`, `FunctionLogger`, `Store`, `SetOpts`, `DestinationType`, `ServiceConfig`)

3. New `quota-check` sidecar init container

Autonomous CronJob pods previously bypassed the EE quota check that the legacy `scheduleSync` ran inline.

  • New sidecar subcommand `quota-check` (`bulker/sync-sidecar/quota_check.go`): HTTP call to console with the syncctl bearer.
  • New console admin endpoint `/api/admin/sync-quota-check`: auths the bearer, calls existing `checkQuota` (extended to accept caller-supplied `taskId` so the SKIPPED row uses the pod's real id), returns 200/403/401.
  • CronJob template adds the init container right after `lease-acquire` so we fail fast before spending pod minutes. Skipped at template-build time when `CONSOLE_URL` / `CONSOLE_TOKEN` aren't configured.
  • Fail-open on transport errors (matches existing posture: billing outage shouldn't paralyze syncs).
  • Bumped `cronTemplateRevision` to 3 so existing CronJobs re-patch.

EE private key stays in the console process — sidecars only need the shared HTTP bearer.

Test plan

  • Apply the new index migration via `pnpm codegen` + `prisma db push`.
  • Verify hourly cleanup ticker in syncctl logs: `source_task retention sweep: deleted N row(s)`.
  • Manual sync triggers still admission-gate on existing RUNNING source_task row; chained discover→read still works via `skipRefresh=true` carveout.
  • Trigger an autonomous CronJob fire for a workspace at quota: `quota-check` init container exits 1, pod fails, console writes SKIPPED `source_task` row with the pod-name task id.
  • Trigger an autonomous CronJob fire for a workspace under quota: `quota-check` exits 0, the rest of the init chain runs.
  • Confirm legacy Mixpanel syncs (if any still exist) gracefully fall through — they should be filtered out of the syncs export and the manual `/sources/run` path will return "Sync function not found" via the existing destination guard.

🤖 Generated with Claude Code

absorbb and others added 2 commits May 19, 2026 18:27
…nit, drop sync runs

Three followups to the autonomous CronJob path (#1287):

1. Move source_task retention from per-/sources/run cleanup to an hourly
   sweep in syncctl. Most syncs no longer hit /sources/run after the
   autonomous-path rollout, so the per-request cleanup stopped firing for
   them. Single global DELETE with a window-function CTE preserves the
   exact "keep newest N rows OR last 60 days, whichever keeps more"
   semantics. Adds a (sync_id, started_at DESC) index so the partition
   ranking is index-only.

2. Drop runSyncSynchronously and the in-process mixpanel sync helpers.
   The only destinationType.syncs entry (Mixpanel) was already commented
   out in 871fa24, so this code path was dead. Removes lib/server/sync.ts
   helpers (createDatabaseLogger / createDatabaseStore /
   getImplemetingFunction / runSyncSynchronously) and
   lib/server/syncs/mixpanel.ts. Manual-trigger "already running" gate is
   simplified to a single condition (no more runSynchronously OR-clause).

3. New `quota-check` sidecar init container for the autonomous CronJob
   pod. Calls a new /api/admin/sync-quota-check endpoint on console which
   signs the EE JWT and forwards to the billing service — the EE private
   key stays in the console process. Runs right after lease-acquire so we
   fail fast before spending pod minutes. Fail-open on transport errors
   to match the existing in-process checkQuota posture. Bumps
   cronTemplateRevision to force re-patch of existing CronJobs.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…it container

Saves one container start in the autonomous CronJob pod and keeps the two
admission gates adjacent. New sidecar subcommand `admission` runs
runQuotaCheck() then runLeaseAcquire() in one process; both helpers
already handle their own os.Exit on terminal outcomes so composition is
trivial.

Original `quota-check` and `lease-acquire` subcommands remain callable
for debugging. cronTemplateRevision bumped to 4.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Copy link
Copy Markdown

@jitsu-code-review jitsu-code-review Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found one correctness regression that should be fixed before merge.

Comment thread bulker/sync-controller/task_manager.go Outdated
t.Errorf("Unable to close stale tasks: %v", err)
}
case <-cleanupTicker.C:
deleted, err := db.CleanupTaskLogs(t.dbpool, 3000, 60*24*time.Hour)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CleanupTaskLogs is now hardcoded to 3000 rows / 60 days. Before this change, retention was configurable via SYNC_TASK_LOG_SIZE and SYNC_TASK_LOG_AGE (see webapps/console/lib/server/serverEnv.ts). This removes an existing operator control and can silently change retention behavior in installations that tuned those envs. Please plumb retention settings into sync-controller config (or another shared config source) instead of hardcoding here.

Copy link
Copy Markdown

@jitsu-code-review jitsu-code-review Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the refactor toward autonomous admission/retention. I found a few blocking correctness issues before this can ship.

Command: []string{"/app/sidecar", "admission"},
Env: admissionEnv,
Resources: smallResources(),
})
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This currently does not compile: the admission init container literal closes with }), leaving an extra ). Running go test ./sync-controller fails with syntax error: unexpected ) in composite literal around this line.

Name: "admission",
Image: c.config.SidecarImage,
Command: []string{"sleep", strconv.Itoa(jitterSec)},
Command: []string{"/app/sidecar", "admission"},
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

admission already executes runLeaseAcquire(), but this pod template still appends a separate lease-acquire init container below. That means lease acquire runs twice; the second call sees a fresh existing lease and returns false (leaseClient.Acquire treats non-stale lease as held), so the pod will fail during init after admission passed.

Comment thread bulker/sync-sidecar/db/db.go Outdated
USING cutoffs c
WHERE t.sync_id = c.sync_id
AND t.started_at < GREATEST(
COALESCE(c.nth_started_at, '-infinity'::timestamptz),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes retention semantics from the previous per-sync cleanup. With COALESCE(nth_started_at, '-infinity'), syncs that have fewer than keepPerSync rows will still delete rows older than maxAge. Previously those syncs retained all rows (NULL cutoff), which is what keep at least newest N rows per sync implies.

absorbb and others added 2 commits May 20, 2026 16:18
…p-sweep null guard

- admission init now does jitter → quota-check → lease-acquire in one
  process. JITTER_MAX_SECONDS env (default 60s, set to 0 to disable)
  controls the deterministic sub-minute spread; value is plumbed through
  syncctl JitterMaxSeconds config so operators can override at deploy.
- task_manager retention sweep no longer hardcodes 3000/60d. New config
  fields TaskLogKeepPerSync (SYNC_TASK_LOG_SIZE) and TaskLogMaxAgeDays
  (SYNC_TASK_LOG_AGE) mirror the previous console-side env knobs.
- Cleanup SQL adds an `AND nth_started_at IS NOT NULL` guard so syncs
  with ≤ keepPerSync rows retain all their rows regardless of age — the
  COALESCE-to-(-infinity) shortcut silently broke that legacy guarantee.
- Drops the stray `lease-acquire` init container that the merge from
  newjitsu left behind (admission already runs it); same edit fixes the
  unbalanced braces that broke the build.
- Bumps cronTemplateRevision 4 → 5 so existing CronJobs get the new
  admission env on the next reconcile.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Copy link
Copy Markdown

@jitsu-code-review jitsu-code-review Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed the diff in this PR (syncctl admission/quota path, retention migration, and console API changes). I didn’t find actionable bugs, security issues, or user-visible regressions in the changed code.

t.Errorf("Unable to close stale tasks: %v", err)
}
case <-cleanupTicker.C:
deleted, err := db.CleanupTaskLogs(t.dbpool, t.config.TaskLogKeepPerSync, time.Duration(t.config.TaskLogMaxAgeDays)*24*time.Hour)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running CleanupTaskLogs inline in listenTaskStatus() blocks the same select loop that consumes TaskStatusChannel. This sweep can be expensive (global windowed delete), and while it runs, status updates back up; JobRunner.sendStatus drops updates after 5s when the channel is full (job_runner.go:216-221). That can lose status transitions and leave tasks stuck in stale states. Please move retention sweeping off this hot path (separate goroutine/worker) so status consumption stays continuous.

Copy link
Copy Markdown

@jitsu-code-review jitsu-code-review Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One blocking issue: the new hourly retention sweep runs inline in the same loop that consumes task status updates. Because status publication is lossy under backpressure ( drops after 5s when the channel is full), a long-running cleanup query can cause missed status transitions and stale task states. Please move retention cleanup off that hot path (separate goroutine/worker or async trigger) and keep the status-consumer loop responsive.

absorbb and others added 3 commits May 21, 2026 18:42
…shed

- successHistory 1 → 2 so the GKE UI / kubectl can show the most recent
  completed Job alongside the previous one. failureHistory left at 3.
- New config JOB_TTL_SECONDS_AFTER_FINISHED (default 0 = disabled) sets
  Job.Spec.TTLSecondsAfterFinished on every reconciled Job. TTL is a
  ceiling, not a floor: it bounds how long a finished Job sticks around
  but cannot extend retention past successfulJobsHistoryLimit. Useful
  for orphan cleanup (when the parent CronJob is deleted) and for
  low-frequency syncs that don't naturally cycle out via history limit.
- Bump cronTemplateRevision 5 → 6 so existing CronJobs pick up the new
  TTL field on the next reconcile.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…mp is enough

TTL only sets a ceiling on Job lifetime, it can't extend retention past
successfulJobsHistoryLimit. The history bump 1 → 2 already covers the
"see more past runs" use case; the TTL knob added complexity without
solving the underlying need. successHistory=2 and cronTemplateRevision
bump are kept.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
jitsu-code-review[bot]
jitsu-code-review Bot previously approved these changes May 21, 2026
Copy link
Copy Markdown

@jitsu-code-review jitsu-code-review Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed the changes in this PR across sync-controller, sync-sidecar, and console API paths. I did not find actionable bugs, security issues, or correctness regressions in the diff. The admission init flow, quota-check endpoint/auth wiring, and retention migration look coherent.

Copy link
Copy Markdown

@jitsu-code-review jitsu-code-review Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The refactor is heading in the right direction, but there is one manual-run regression that needs to be fixed before merge.

const msInMin = 1000 * 60;
if (ignoreRunning || (runSynchronously && Date.now() - running.updated_at.getTime() >= 2 * msInMin)) {
await dbLog({
if (running && !ignoreRunning) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When ignoreRunning=true, this branch now bypasses the early return but leaves the previous RUNNING row untouched. If that older task was stale/crashed, it remains RUNNING until the periodic stale sweep (up to ~1h+), so later manual runs without ignoreRunning keep failing with "Sync is already running" even though nothing is actually running.

Before this change, the ignoreRunning path marked the existing task as FAILED immediately. Please preserve that behavior (or otherwise clear stale RUNNING rows) when honoring ignoreRunning.

@absorbb absorbb force-pushed the cron_jobs_followups branch from 2dd10b9 to 2d15dc3 Compare May 22, 2026 17:53
Copy link
Copy Markdown

@jitsu-code-review jitsu-code-review Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity: critical.\n\n now depends on (), but is initialized only later in (). That means stays nil and () calls with a nil repository.\n\nConcrete impact: manual and sync-bound fail at runtime instead of starting the sync (nil repo path), so sync runs regress for this change set.\n\nPlease wire repository initialization before creating (or make fetch repo lazily and guard nil).

Copy link
Copy Markdown

@jitsu-code-review jitsu-code-review Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity: critical.

TaskManager now depends on syncsRepo (bulker/sync-controller/task_manager.go:32), but syncsRepo is initialized only later in InitContext (bulker/sync-controller/app.go:42-50). That means t.syncsRepo stays nil and resolveSyncEntry (task_manager.go:76) calls WaitForSyncEntry with a nil repository.

Concrete impact: manual /read and sync-bound /discover fail at runtime instead of starting the sync (nil repo path), so sync runs regress for this change set.

Please wire repository initialization before creating TaskManager (or make TaskManager fetch repo lazily and guard nil).

Copy link
Copy Markdown

@jitsu-code-review jitsu-code-review Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found 2 critical issues in this diff that can cause user-visible task orchestration failures, so this should stay blocked until fixed.

  1. ignoreRunning no longer clears stale RUNNING rows before launching a manual run, which can leave old crashed tasks in RUNNING and make later manual runs fail with Sync is already running.
  2. The hourly CleanupTaskLogs sweep now runs inline in the same loop that consumes TaskStatusChannel; when the sweep is slow, status updates back up and sendStatus drops updates after 5s, which can leave task state stale/incorrect.

const msInMin = 1000 * 60;
if (ignoreRunning || (runSynchronously && Date.now() - running.updated_at.getTime() >= 2 * msInMin)) {
await dbLog({
if (running && !ignoreRunning) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical regression: when ignoreRunning=true, this now just bypasses the early return and leaves the previous RUNNING row untouched. If that row is stale/crashed, it can continue to block future manual runs (Sync is already running) until stale cleanup eventually closes it. The previous behavior explicitly failed the old row before proceeding; we should preserve that (or otherwise clear stale RUNNING rows here) when honoring ignoreRunning.

t.Errorf("Unable to close stale tasks: %v", err)
}
case <-cleanupTicker.C:
deleted, err := db.CleanupTaskLogs(t.dbpool, t.config.TaskLogKeepPerSync, time.Duration(t.config.TaskLogMaxAgeDays)*24*time.Hour)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical: this retention sweep runs inline on the same goroutine/select loop that drains TaskStatusChannel. CleanupTaskLogs is a global windowed delete and can be slow; while it runs, status messages queue and job_runner.sendStatus drops updates once blocked for 5s (job_runner.go:216-221). That can lose state transitions and leave source_task rows stale. Please move the sweep off this hot path (separate goroutine/worker/ticker) so status consumption remains continuous.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant