GOBBLIN-2263: Plumb DagAction store-insert time onto JobSpec for downstream latency metrics#4191
Conversation
… metrics Adds dbUpdateTimeMillis as an optional field on DagActionStoreChangeEvent so MySQL CDC producers can carry the binlog EventTimestamp (true row modification time) through the kafka hop into change monitors. Change monitors set the value as storeInsertTimeMillis on LeaseParams when constructing them. LaunchDagProc stamps the value onto the FlowSpec config under the new ConfigurationKeys.DAG_ACTION_LAUNCH_STORE_INSERT_TIME_MILLIS_KEY so downstream executors can measure true LAUNCH-to-submission latency, including CDC propagation that flow.executionId-based anchors miss (REST-handler time for ad-hoc, consensus DB time for scheduled). DagTask exposes the LeaseParams via a new getter so DagProc subclasses can read per-event metadata. Defaults to -1 (UNKNOWN_STORE_INSERT_TIME_MILLIS) when the source timestamp isn't available, in which case LaunchDagProc skips the stamp and downstream code falls back to its existing anchor. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| } | ||
| ] | ||
| }, { | ||
| "name" : "dbUpdateTimeMillis", |
There was a problem hiding this comment.
Where are we setting this field?
There was a problem hiding this comment.
Good question. The setter lives in the LinkedIn-internal MySQL CDC producer (binlog reader) that publishes onto Kafka — storeInsertTimeMillis is populated from the binlog event timestamp at row-INSERT time. We're keeping this PR schema-only/plumbing here so the optional field can flow through the existing pipeline. On the read side, the new MysqlMultiActiveLeaseArbiter + DagActionReminderScheduler changes in b1ae982 now propagate the value through consensus and Quartz reminders so LaunchDagProc actually stamps the JobSpec.
For any other open-source producer that wants to populate it, the Avro builder exposes .setStoreInsertTimeMillis(long) after regeneration; omitting it falls back to null and consumers see UNKNOWN — fully backwards-compatible.
| } | ||
| ] | ||
| }, { | ||
| "name" : "dbUpdateTimeMillis", |
There was a problem hiding this comment.
Why not name the avro field storeInsertTimeMillis as well?
There was a problem hiding this comment.
Renamed to storeInsertTimeMillis end-to-end in b1ae982 — Avro field, JobSpec key, and LeaseParams field now share the same name across the chain.
| * arbiter may rewrite via consensus. A value of {@link #UNKNOWN_STORE_INSERT_TIME_MILLIS} signals "not provided" | ||
| * and is the default for callers that do not have access to the source timestamp. | ||
| */ | ||
| long UNKNOWN_STORE_INSERT_TIME_MILLIS = -1L; |
There was a problem hiding this comment.
Nit: Can be moved to LeaseParams
There was a problem hiding this comment.
Moved into LeaseParams as public static final long UNKNOWN_STORE_INSERT_TIME_MILLIS = -1L; in b1ae982. Updated the two external references (DagActionReminderScheduler, LaunchDagProc) to DagActionStore.LeaseParams.UNKNOWN_STORE_INSERT_TIME_MILLIS and adjusted the surrounding javadoc link.
Review fixes (DaisyModi): - Rename Avro field dbUpdateTimeMillis -> storeInsertTimeMillis on DagActionStoreChangeEvent for naming consistency with LeaseParams. - Move UNKNOWN_STORE_INSERT_TIME_MILLIS from DagActionStore interface scope into LeaseParams, where it logically belongs. Update the two external references (DagActionReminderScheduler, LaunchDagProc) and the surrounding javadoc link. Carry the timestamp through consensus + reminders so the JobSpec stamp fires on the primary path: - MysqlMultiActiveLeaseArbiter: at the three call sites that build the consensus LeaseParams (CASE 2 within-epsilon, CASE 3 distinct-event, fresh-lease acquisition), switch from the 2-arg constructor to the 4-arg one and pass through leaseParams.getStoreInsertTimeMillis(). Without this, consensus silently defaults the field to UNKNOWN and LaunchDagProc skips the stamp. - DagActionReminderScheduler: stash storeInsertTimeMillis on the Quartz JobDataMap when scheduling reminders and restore it when the reminder fires, so a host-failure-driven reattempt preserves the timestamp. Defaults to UNKNOWN for reminders scheduled by older code paths.
Covers the three sites this PR plumbs the source-DB row-insert timestamp through:
- LaunchDagProcTest:
* launchDagStampsStoreInsertTimeMillisWhenProvided asserts initialize()
adds the DAG_ACTION_LAUNCH_STORE_INSERT_TIME_MILLIS_KEY to the FlowSpec
config when LeaseParams carries a non-UNKNOWN value, and that the value
is what was supplied.
* launchDagSkipsStoreInsertTimeMillisStampWhenUnknown asserts the key is
NOT added when the LeaseParams carries the UNKNOWN sentinel.
* Existing three tests updated to construct a real LeaseObtainedStatus
via the new buildLaunchDagTask helper (previously passed null, which
NPE'd after the PR added getLeaseParams() into initialize()).
- MysqlMultiActiveLeaseArbiterTest:
* testStoreInsertTimeMillisPreservedThroughConsensus exercises all three
arbiter consensus call sites — fresh-lease (selectInfoResult), CASE 2
(within-epsilon, same event valid), and CASE 3 (distinct event, lease
valid) — and asserts the consensus LeaseParams preserves the inbound
storeInsertTimeMillis instead of silently defaulting to UNKNOWN.
- DagActionReminderSchedulerTest:
* testCreateReminderJobDetailStashesStoreInsertTimeMillis verifies the
JobDataMap on the Quartz JobDetail captures storeInsertTimeMillis.
* testReminderJobExecuteCarriesStoreInsertTimeMillis exercises the full
round-trip — ReminderJob.execute() reconstructs LeaseParams from the
JobDataMap and the value survives.
* testReminderJobExecuteFallsBackToUnknownWhenKeyAbsent asserts the
backwards-compat path (reminder scheduled by older code without the
new key) falls back to UNKNOWN, not throws.
|
Added unit tests covering all three sites this PR plumbs Tests added (7 new, 3 fixed):
Where CI runs them: Locally verified the reminder-scheduler tests pass; arbiter/launch-dag-proc tests require MySQL via testcontainers so they'll run for the first time on CI. |
| public static final String FLOW_EXECUTION_ID_KEY = "flow.executionId"; | ||
| // Stamped onto the JobSpec config by LaunchDagProc, carrying the DagAction store row-insert time in millis | ||
| // (sourced upstream from the CDC binlog event timestamp). Enables downstream executors to measure end-to-end | ||
| // LAUNCH-to-submission latency including CDC propagation. Absent / -1 when the source timestamp is unknown. |
There was a problem hiding this comment.
will r doing this if (storeInsertTimeMillis != DagActionStore.LeaseParams.UNKNOWN_STORE_INSERT_TIME_MILLIS), will this be ever -1L
There was a problem hiding this comment.
it's the default and fallback value -
long storeInsertTimeMillis = jobDataMap.containsKey(FLOW_ACTION_STORE_INSERT_TIME_MILLIS_KEY)
? jobDataMap.getLong(FLOW_ACTION_STORE_INSERT_TIME_MILLIS_KEY)
: DagActionStore.LeaseParams.UNKNOWN_STORE_INSERT_TIME_MILLIS;
`
Exclude io.grpc from the Cloudera repository so Gradle resolves grpc artifacts and version metadata from Maven Central instead of a flaky legacy mirror. Co-authored-by: Cursor <cursoragent@cursor.com>
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
Downstream executors (e.g., a GridGateway producer that submits jobs after
LaunchDagProc) cannot today measure the true end-to-end latency from when a LAUNCHDagActionrow landed in thedag_actionstore to when the executor accepts the job, because that source timestamp is not propagated onto theJobSpecconfig. The two existing anchors fall short:flow.executionIdis set toSystem.currentTimeMillis()at REST-handler time for ad-hoc flows (close enough), but for scheduled flows it is rewritten byMysqlMultiActiveLeaseArbiterto the post-CDC consensus DB timestamp — silently skipping CDC propagation latency.LeaseParams.eventTimeMillisis stamped at change-monitor consume time (System.currentTimeMillis()in the one-arg constructor atDagActionStore.java:118-120) — also after CDC.Change:
dbUpdateTimeMillisfield to theDagActionStoreChangeEventAvro schema so MySQL CDC producers can carry the binlog event timestamp (true row-modification time) across the kafka hop. Optional (["null","long"], defaultnull) for backwards compatibility — older producers continue to work unchanged.storeInsertTimeMillistoDagActionStore.LeaseParamswith a sentinelUNKNOWN_STORE_INSERT_TIME_MILLIS = -1Lfor callers that do not have it. All existing constructors continue to work; a new four-arg constructor accepts the timestamp.DAG_ACTION_LAUNCH_STORE_INSERT_TIME_MILLIS_KEY = "dagAction.launch.storeInsertTimeMillis"toConfigurationKeys.LaunchDagProc.initialize()stamps the value onto theFlowSpecconfig when the carrierLeaseParamsprovides one (skipping it whenUNKNOWN_STORE_INSERT_TIME_MILLIS), alongside the existingFLOW_EXECUTION_ID_KEYstamp.DagTaskexposes the consensusLeaseParamsviagetLeaseParams()soDagProcsubclasses can read per-event metadata.The PR is plumbing-only: it does not change which timestamp is consumed by
MultiActiveLeaseArbiter, lease consensus semantics, or any existing behavior. Downstream consumers can opt in by reading the new JobSpec key; absence is fully backwards-compatible.Tests
This change is pure pass-through plumbing on an optional field. The Avro field is exercised by the generated builder/setter on schema regeneration;
LaunchDagProconly writes the key when the value is non-sentinel, and existing tests constructLeaseParamsvia the legacy constructors which default toUNKNOWN_STORE_INSERT_TIME_MILLIS, so they remain valid. Downstream behavioral tests (anchor-precedence, large-latency, scheduled vs ad-hoc) live in the consumer side (LinkedIn-internal ddm-common). I'm happy to add a smallLaunchDagProcTestcase asserting the stamp is added iff the lease params carry a non-sentinel value if reviewers prefer.Commits