[GOBBLIN-XXXX] Expose DagAction insert time on LaunchDagProc FlowSpec for downstream latency instrumentation#4190
Closed
agam-99 wants to merge 1 commit intoapache:masterfrom
Closed
Conversation
Add a default DagActionStore.getDagActionInsertTimeMillis(DagAction) method backed by a MysqlDagActionStore SELECT against the existing modified_time column, plus a companion forwarder on DagManagementStateStore. LaunchDagProc.initialize() now stamps the insert time onto the FlowSpec under a new generic ConfigurationKey, so SpecProducer implementations can compute end-to-end LAUNCH-to-submission latency without re-querying the store. Trigger type (scheduled vs ad-hoc) can be inferred downstream from the existing JOB_SCHEDULE_KEY, so no companion key is added. Best-effort: if the underlying store cannot supply a timestamp or the lookup fails, the stamp is skipped and a warning is logged so the launch path is unaffected.
ec69e6d to
8f1c86f
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
When the
DagProcessingEnginepicks up a LAUNCHDagActionand routes it throughLaunchDagProcfor compilation and submission, downstreamSpecProducerimplementations have no way to know when the action was originally inserted in thedag_actiontable. This is the natural anchor for measuring end-to-end "user request to job submitted" latency in a multi-instance GaaS deployment, where one instance may insert the action and a different instance may process it.The MySQL row already carries this information in its
modified_timecolumn (set server-side at INSERT, never UPDATEd in practice — theON UPDATE CURRENT_TIMESTAMPclause is inert because no UPDATE statement targets this table), but it is not surfaced anywhere on the in-memory model.Change
This PR makes the insert timestamp available to downstream code via two small, additive, generically-useful pieces:
DagActionStore.getDagActionInsertTimeMillis(DagAction)— new default interface method returningOptional<Long>.MysqlDagActionStoreimplements it viaSELECT UNIX_TIMESTAMP(modified_time) * 1000. Other implementations (e.g., test mocks) keep working without change because the default returnsOptional.empty(). Mirrored onDagManagementStateStorefor callers that go through the higher-level abstraction.LaunchDagProc.initialize()looks up the insert timestamp and stamps it onto theFlowSpecunder one newConfigurationKeys.DAG_ACTION_INSERT_TIME_MILLIS_KEY = "flow.dagAction.insertTimeMillis"(Long). The compilation pipeline naturally propagates this onto each compiledJobSpec, so anySpecProducercan read it without reaching back to the action store.Trigger type (scheduled vs ad-hoc) can be inferred downstream from the existing
JOB_SCHEDULE_KEY("job.schedule"), so no companion flowType key is added — that would be redundant.The lookup-and-stamp step is best-effort: any failure (lookup error, missing row, etc.) is logged and swallowed so the LAUNCH path is unaffected.
Why this is OSS-appropriate
The exposure is fully generic — anyone running
MysqlDagActionStoremay want to know when a row was inserted, and anyone writing aSpecProducermay want to compute submit latency. There is no organization-specific code in this PR. Downstream metric emission lives outside this repo and consumes only public, framework-level config keys.Tests
MysqlDagActionStoreTest:testGetDagActionInsertTimeMillisReturnsTimestampForExistingRow— inserts a fresh row, asserts the returned timestamp falls within[insertWallClock - 1s, insertWallClock + 1s](MySQLTIMESTAMPis second-precision).testGetDagActionInsertTimeMillisReturnsEmptyForMissingRow— assertsOptional.empty()for a key that is not present../gradlew :gobblin-service:test --tests "MysqlDagActionStoreTest"reports 8/8 passing.FlowSpecTest::testAddPropertycontinues to pass; no FlowSpec API change in this PR.stampDagActionInsertTimeOnFlowSpechelper degrades gracefully when the underlying store cannot supply a timestamp.Compatibility
defaultimplementations on interfaces.ConfigurationKeyis not consumed; downstream code that does not read it is unaffected.