Skip to content

GOBBLIN-2263: Plumb DagAction store-insert time onto JobSpec for downstream latency metrics#4191

Open
agam-99 wants to merge 4 commits into
apache:masterfrom
agam-99:agsingh/dagaction-store-insert-time-millis-jobspec-stamp
Open

GOBBLIN-2263: Plumb DagAction store-insert time onto JobSpec for downstream latency metrics#4191
agam-99 wants to merge 4 commits into
apache:masterfrom
agam-99:agsingh/dagaction-store-insert-time-millis-jobspec-stamp

Conversation

@agam-99
Copy link
Copy Markdown
Contributor

@agam-99 agam-99 commented May 11, 2026

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

  • My PR addresses the following Gobblin JIRA issues and references them in the PR title. For example, "[GOBBLIN-2263] My Gobblin PR"
    • JIRA to be filed; happy to retitle once assigned.

Description

  • Here are some details about my PR, including screenshots (if applicable):

Downstream executors (e.g., a GridGateway producer that submits jobs after LaunchDagProc) cannot today measure the true end-to-end latency from when a LAUNCH DagAction row landed in the dag_action store to when the executor accepts the job, because that source timestamp is not propagated onto the JobSpec config. The two existing anchors fall short:

  • flow.executionId is set to System.currentTimeMillis() at REST-handler time for ad-hoc flows (close enough), but for scheduled flows it is rewritten by MysqlMultiActiveLeaseArbiter to the post-CDC consensus DB timestamp — silently skipping CDC propagation latency.
  • LeaseParams.eventTimeMillis is stamped at change-monitor consume time (System.currentTimeMillis() in the one-arg constructor at DagActionStore.java:118-120) — also after CDC.

Change:

  • Adds an optional dbUpdateTimeMillis field to the DagActionStoreChangeEvent Avro schema so MySQL CDC producers can carry the binlog event timestamp (true row-modification time) across the kafka hop. Optional (["null","long"], default null) for backwards compatibility — older producers continue to work unchanged.
  • Adds storeInsertTimeMillis to DagActionStore.LeaseParams with a sentinel UNKNOWN_STORE_INSERT_TIME_MILLIS = -1L for callers that do not have it. All existing constructors continue to work; a new four-arg constructor accepts the timestamp.
  • Adds DAG_ACTION_LAUNCH_STORE_INSERT_TIME_MILLIS_KEY = "dagAction.launch.storeInsertTimeMillis" to ConfigurationKeys.
  • LaunchDagProc.initialize() stamps the value onto the FlowSpec config when the carrier LeaseParams provides one (skipping it when UNKNOWN_STORE_INSERT_TIME_MILLIS), alongside the existing FLOW_EXECUTION_ID_KEY stamp.
  • DagTask exposes the consensus LeaseParams via getLeaseParams() so DagProc subclasses can read per-event metadata.

The PR is plumbing-only: it does not change which timestamp is consumed by MultiActiveLeaseArbiter, lease consensus semantics, or any existing behavior. Downstream consumers can opt in by reading the new JobSpec key; absence is fully backwards-compatible.

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:

This change is pure pass-through plumbing on an optional field. The Avro field is exercised by the generated builder/setter on schema regeneration; LaunchDagProc only writes the key when the value is non-sentinel, and existing tests construct LeaseParams via the legacy constructors which default to UNKNOWN_STORE_INSERT_TIME_MILLIS, so they remain valid. Downstream behavioral tests (anchor-precedence, large-latency, scheduled vs ad-hoc) live in the consumer side (LinkedIn-internal ddm-common). I'm happy to add a small LaunchDagProcTest case asserting the stamp is added iff the lease params carry a non-sentinel value if reviewers prefer.

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

… metrics

Adds dbUpdateTimeMillis as an optional field on DagActionStoreChangeEvent
so MySQL CDC producers can carry the binlog EventTimestamp (true row
modification time) through the kafka hop into change monitors.

Change monitors set the value as storeInsertTimeMillis on LeaseParams
when constructing them. LaunchDagProc stamps the value onto the FlowSpec
config under the new ConfigurationKeys.DAG_ACTION_LAUNCH_STORE_INSERT_TIME_MILLIS_KEY
so downstream executors can measure true LAUNCH-to-submission latency,
including CDC propagation that flow.executionId-based anchors miss
(REST-handler time for ad-hoc, consensus DB time for scheduled).

DagTask exposes the LeaseParams via a new getter so DagProc subclasses
can read per-event metadata. Defaults to -1 (UNKNOWN_STORE_INSERT_TIME_MILLIS)
when the source timestamp isn't available, in which case LaunchDagProc
skips the stamp and downstream code falls back to its existing anchor.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@agam-99 agam-99 changed the title Plumb DagAction store-insert time onto JobSpec for downstream latency metrics GOBBLIN-2263: Plumb DagAction store-insert time onto JobSpec for downstream latency metrics May 13, 2026
}
]
}, {
"name" : "dbUpdateTimeMillis",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are we setting this field?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. The setter lives in the LinkedIn-internal MySQL CDC producer (binlog reader) that publishes onto Kafka — storeInsertTimeMillis is populated from the binlog event timestamp at row-INSERT time. We're keeping this PR schema-only/plumbing here so the optional field can flow through the existing pipeline. On the read side, the new MysqlMultiActiveLeaseArbiter + DagActionReminderScheduler changes in b1ae982 now propagate the value through consensus and Quartz reminders so LaunchDagProc actually stamps the JobSpec.

For any other open-source producer that wants to populate it, the Avro builder exposes .setStoreInsertTimeMillis(long) after regeneration; omitting it falls back to null and consumers see UNKNOWN — fully backwards-compatible.

}
]
}, {
"name" : "dbUpdateTimeMillis",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not name the avro field storeInsertTimeMillis as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to storeInsertTimeMillis end-to-end in b1ae982 — Avro field, JobSpec key, and LeaseParams field now share the same name across the chain.

* arbiter may rewrite via consensus. A value of {@link #UNKNOWN_STORE_INSERT_TIME_MILLIS} signals "not provided"
* and is the default for callers that do not have access to the source timestamp.
*/
long UNKNOWN_STORE_INSERT_TIME_MILLIS = -1L;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can be moved to LeaseParams

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved into LeaseParams as public static final long UNKNOWN_STORE_INSERT_TIME_MILLIS = -1L; in b1ae982. Updated the two external references (DagActionReminderScheduler, LaunchDagProc) to DagActionStore.LeaseParams.UNKNOWN_STORE_INSERT_TIME_MILLIS and adjusted the surrounding javadoc link.

agam-99 added 2 commits May 13, 2026 14:18
Review fixes (DaisyModi):
- Rename Avro field dbUpdateTimeMillis -> storeInsertTimeMillis on
  DagActionStoreChangeEvent for naming consistency with LeaseParams.
- Move UNKNOWN_STORE_INSERT_TIME_MILLIS from DagActionStore interface
  scope into LeaseParams, where it logically belongs. Update the two
  external references (DagActionReminderScheduler, LaunchDagProc) and
  the surrounding javadoc link.

Carry the timestamp through consensus + reminders so the JobSpec stamp
fires on the primary path:
- MysqlMultiActiveLeaseArbiter: at the three call sites that build the
  consensus LeaseParams (CASE 2 within-epsilon, CASE 3 distinct-event,
  fresh-lease acquisition), switch from the 2-arg constructor to the
  4-arg one and pass through leaseParams.getStoreInsertTimeMillis().
  Without this, consensus silently defaults the field to UNKNOWN and
  LaunchDagProc skips the stamp.
- DagActionReminderScheduler: stash storeInsertTimeMillis on the Quartz
  JobDataMap when scheduling reminders and restore it when the reminder
  fires, so a host-failure-driven reattempt preserves the timestamp.
  Defaults to UNKNOWN for reminders scheduled by older code paths.
Covers the three sites this PR plumbs the source-DB row-insert timestamp through:

- LaunchDagProcTest:
  * launchDagStampsStoreInsertTimeMillisWhenProvided asserts initialize()
    adds the DAG_ACTION_LAUNCH_STORE_INSERT_TIME_MILLIS_KEY to the FlowSpec
    config when LeaseParams carries a non-UNKNOWN value, and that the value
    is what was supplied.
  * launchDagSkipsStoreInsertTimeMillisStampWhenUnknown asserts the key is
    NOT added when the LeaseParams carries the UNKNOWN sentinel.
  * Existing three tests updated to construct a real LeaseObtainedStatus
    via the new buildLaunchDagTask helper (previously passed null, which
    NPE'd after the PR added getLeaseParams() into initialize()).

- MysqlMultiActiveLeaseArbiterTest:
  * testStoreInsertTimeMillisPreservedThroughConsensus exercises all three
    arbiter consensus call sites — fresh-lease (selectInfoResult), CASE 2
    (within-epsilon, same event valid), and CASE 3 (distinct event, lease
    valid) — and asserts the consensus LeaseParams preserves the inbound
    storeInsertTimeMillis instead of silently defaulting to UNKNOWN.

- DagActionReminderSchedulerTest:
  * testCreateReminderJobDetailStashesStoreInsertTimeMillis verifies the
    JobDataMap on the Quartz JobDetail captures storeInsertTimeMillis.
  * testReminderJobExecuteCarriesStoreInsertTimeMillis exercises the full
    round-trip — ReminderJob.execute() reconstructs LeaseParams from the
    JobDataMap and the value survives.
  * testReminderJobExecuteFallsBackToUnknownWhenKeyAbsent asserts the
    backwards-compat path (reminder scheduled by older code without the
    new key) falls back to UNKNOWN, not throws.
@agam-99
Copy link
Copy Markdown
Contributor Author

agam-99 commented May 13, 2026

Added unit tests covering all three sites this PR plumbs storeInsertTimeMillis through (3300098a9).

Tests added (7 new, 3 fixed):

  • LaunchDagProcTest

    • launchDagStampsStoreInsertTimeMillisWhenProvided — asserts initialize() stamps DAG_ACTION_LAUNCH_STORE_INSERT_TIME_MILLIS_KEY onto the FlowSpec config when LeaseParams provides a non-UNKNOWN value
    • launchDagSkipsStoreInsertTimeMillisStampWhenUnknown — asserts the key is not added when the sentinel is in play
    • Existing 3 tests updated to construct a real LeaseObtainedStatus via a buildLaunchDagTask helper (previously they passed null, which NPE'd after this PR introduced getLeaseParams() into initialize())
  • MysqlMultiActiveLeaseArbiterTest

    • testStoreInsertTimeMillisPreservedThroughConsensus exercises all three arbiter consensus construction sites: fresh-lease (selectInfoResult path), CASE 2 (within-epsilon, same event valid), CASE 3 (distinct event, lease valid). Asserts the consensus LeaseParams preserves the inbound storeInsertTimeMillis instead of defaulting to UNKNOWN.
  • DagActionReminderSchedulerTest

    • testCreateReminderJobDetailStashesStoreInsertTimeMillisJobDataMap captures the timestamp when scheduling
    • testReminderJobExecuteCarriesStoreInsertTimeMillis — full Quartz round-trip: ReminderJob.execute() reconstructs LeaseParams from the JobDataMap and the value survives
    • testReminderJobExecuteFallsBackToUnknownWhenKeyAbsent — backwards-compat: reminders scheduled by older code without the new key fall back to UNKNOWN rather than throwing

Where CI runs them:
.github/workflows/build_and_test.yaml run_tests job, Service Tests matrix slot — exercises everything under gobblin-service against a real MySQL service container. (My current run shows action_required since this is a first-time contributor PR — would appreciate a maintainer triggering it so reviewers can see the results.)

Locally verified the reminder-scheduler tests pass; arbiter/launch-dag-proc tests require MySQL via testcontainers so they'll run for the first time on CI.

public static final String FLOW_EXECUTION_ID_KEY = "flow.executionId";
// Stamped onto the JobSpec config by LaunchDagProc, carrying the DagAction store row-insert time in millis
// (sourced upstream from the CDC binlog event timestamp). Enables downstream executors to measure end-to-end
// LAUNCH-to-submission latency including CDC propagation. Absent / -1 when the source timestamp is unknown.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will r doing this if (storeInsertTimeMillis != DagActionStore.LeaseParams.UNKNOWN_STORE_INSERT_TIME_MILLIS), will this be ever -1L

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's the default and fallback value -

      long storeInsertTimeMillis = jobDataMap.containsKey(FLOW_ACTION_STORE_INSERT_TIME_MILLIS_KEY)
          ? jobDataMap.getLong(FLOW_ACTION_STORE_INSERT_TIME_MILLIS_KEY)
          : DagActionStore.LeaseParams.UNKNOWN_STORE_INSERT_TIME_MILLIS;
`

Exclude io.grpc from the Cloudera repository so Gradle resolves grpc
artifacts and version metadata from Maven Central instead of a flaky
legacy mirror.

Co-authored-by: Cursor <cursoragent@cursor.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants