Skip to content

[GOBBLIN-XXXX] Expose DagAction insert time on LaunchDagProc FlowSpec for downstream latency instrumentation#4190

Closed
agam-99 wants to merge 1 commit intoapache:masterfrom
agam-99:agsingh/dagaction-insert-time-exposure
Closed

[GOBBLIN-XXXX] Expose DagAction insert time on LaunchDagProc FlowSpec for downstream latency instrumentation#4190
agam-99 wants to merge 1 commit intoapache:masterfrom
agam-99:agsingh/dagaction-insert-time-exposure

Conversation

@agam-99
Copy link
Copy Markdown
Contributor

@agam-99 agam-99 commented May 5, 2026

Problem

When the DagProcessingEngine picks up a LAUNCH DagAction and routes it through LaunchDagProc for compilation and submission, downstream SpecProducer implementations have no way to know when the action was originally inserted in the dag_action table. This is the natural anchor for measuring end-to-end "user request to job submitted" latency in a multi-instance GaaS deployment, where one instance may insert the action and a different instance may process it.

The MySQL row already carries this information in its modified_time column (set server-side at INSERT, never UPDATEd in practice — the ON UPDATE CURRENT_TIMESTAMP clause is inert because no UPDATE statement targets this table), but it is not surfaced anywhere on the in-memory model.

Change

This PR makes the insert timestamp available to downstream code via two small, additive, generically-useful pieces:

  1. DagActionStore.getDagActionInsertTimeMillis(DagAction) — new default interface method returning Optional<Long>. MysqlDagActionStore implements it via SELECT UNIX_TIMESTAMP(modified_time) * 1000. Other implementations (e.g., test mocks) keep working without change because the default returns Optional.empty(). Mirrored on DagManagementStateStore for callers that go through the higher-level abstraction.

  2. LaunchDagProc.initialize() looks up the insert timestamp and stamps it onto the FlowSpec under one new ConfigurationKeys.DAG_ACTION_INSERT_TIME_MILLIS_KEY = "flow.dagAction.insertTimeMillis" (Long). The compilation pipeline naturally propagates this onto each compiled JobSpec, so any SpecProducer can read it without reaching back to the action store.

Trigger type (scheduled vs ad-hoc) can be inferred downstream from the existing JOB_SCHEDULE_KEY ("job.schedule"), so no companion flowType key is added — that would be redundant.

The lookup-and-stamp step is best-effort: any failure (lookup error, missing row, etc.) is logged and swallowed so the LAUNCH path is unaffected.

Why this is OSS-appropriate

The exposure is fully generic — anyone running MysqlDagActionStore may want to know when a row was inserted, and anyone writing a SpecProducer may want to compute submit latency. There is no organization-specific code in this PR. Downstream metric emission lives outside this repo and consumes only public, framework-level config keys.

Tests

  • MysqlDagActionStoreTest:
    • Added testGetDagActionInsertTimeMillisReturnsTimestampForExistingRow — inserts a fresh row, asserts the returned timestamp falls within [insertWallClock - 1s, insertWallClock + 1s] (MySQL TIMESTAMP is second-precision).
    • Added testGetDagActionInsertTimeMillisReturnsEmptyForMissingRow — asserts Optional.empty() for a key that is not present.
    • All existing tests continue to pass: ./gradlew :gobblin-service:test --tests "MysqlDagActionStoreTest" reports 8/8 passing.
  • FlowSpecTest::testAddProperty continues to pass; no FlowSpec API change in this PR.
  • Verified that the LAUNCH path still compiles and the new stampDagActionInsertTimeOnFlowSpec helper degrades gracefully when the underlying store cannot supply a timestamp.

Compatibility

  • No schema change.
  • No breaking changes — all new methods have default implementations on interfaces.
  • No behavior change when the new ConfigurationKey is not consumed; downstream code that does not read it is unaffected.

Add a default DagActionStore.getDagActionInsertTimeMillis(DagAction) method
backed by a MysqlDagActionStore SELECT against the existing modified_time
column, plus a companion forwarder on DagManagementStateStore.
LaunchDagProc.initialize() now stamps the insert time onto the FlowSpec under
a new generic ConfigurationKey, so SpecProducer implementations can compute
end-to-end LAUNCH-to-submission latency without re-querying the store. Trigger
type (scheduled vs ad-hoc) can be inferred downstream from the existing
JOB_SCHEDULE_KEY, so no companion key is added.

Best-effort: if the underlying store cannot supply a timestamp or the lookup
fails, the stamp is skipped and a warning is logged so the launch path is
unaffected.
@agam-99 agam-99 force-pushed the agsingh/dagaction-insert-time-exposure branch from ec69e6d to 8f1c86f Compare May 5, 2026 06:02
@agam-99 agam-99 closed this May 7, 2026
@agam-99 agam-99 deleted the agsingh/dagaction-insert-time-exposure branch May 7, 2026 06:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant