diff --git a/src/github.go b/src/github.go index 028f17e..45f3e06 100644 --- a/src/github.go +++ b/src/github.go @@ -93,6 +93,25 @@ type runMetricDetails struct { endedAt string } +type runMetricSet struct { + statusCounter *prometheus.CounterVec + queuedGauge *prometheus.GaugeVec + inProgressGauge *prometheus.GaugeVec + completedGauge *prometheus.GaugeVec + durationHistogram *prometheus.HistogramVec +} + +type runStoreMethods struct { + get func(context.Context, int) (RunState, bool, error) + update func(context.Context, int, RunState) error +} + +const ( + statusQueued = "queued" + statusInProgress = "in_progress" + statusCompleted = "completed" +) + var stateStore StateStore func validateHMAC(body []byte, signature string, secret []byte) bool { @@ -150,115 +169,240 @@ func githubEventsHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } -func observeRunMetrics( - details runMetricDetails, - statusCounter *prometheus.CounterVec, - queuedGauge *prometheus.GaugeVec, - inProgressGauge *prometheus.GaugeVec, - completedGauge *prometheus.GaugeVec, - durationHistogram *prometheus.HistogramVec, -) { - statusCounter.WithLabelValues( - details.repository, - details.branch, - details.name, - details.status, - details.conclusion, - ).Inc() +func normalizeRunState(details runMetricDetails) RunState { + return RunState{ + Repository: details.repository, + Branch: details.branch, + Name: details.name, + Status: normalizeStatus(details.status), + Conclusion: normalizeConclusion(details.conclusion), + StartedAt: details.startedAt, + EndedAt: details.endedAt, + } +} + +func normalizeStatus(status string) string { + return strings.ToLower(strings.TrimSpace(status)) +} + +func normalizeConclusion(conclusion string) string { + return strings.ToLower(strings.TrimSpace(conclusion)) +} + +func stateRank(status string) int { + switch normalizeStatus(status) { + case statusQueued: + return 1 + case statusInProgress: + return 2 + case statusCompleted: + return 3 + default: + return 0 + } +} + +func parseMetricTime(value string) (time.Time, bool) { + value = strings.TrimSpace(value) + if value == "" { + return time.Time{}, false + } - switch strings.ToLower(details.status) { - case "queued": - queuedGauge.WithLabelValues( - details.repository, - details.branch, - details.name, - ).Inc() - case "in_progress": - inProgressGauge.WithLabelValues( - details.repository, - details.branch, - details.name, - ).Inc() - queuedGauge.WithLabelValues( - details.repository, - details.branch, - details.name, - ).Dec() - case "completed": - completedGauge.WithLabelValues( - details.repository, - details.branch, - details.conclusion, - details.name, - ).Inc() - inProgressGauge.WithLabelValues( - details.repository, - details.branch, - details.name, - ).Dec() - - startedAt, err1 := time.Parse(time.RFC3339, details.startedAt) - endedAt, err2 := time.Parse(time.RFC3339, details.endedAt) - if err1 == nil && err2 == nil { - duration := endedAt.Sub(startedAt).Seconds() - durationHistogram.WithLabelValues( - details.repository, - details.branch, - details.name, - details.status, - details.conclusion, - ).Observe(duration) + parsed, err := time.Parse(time.RFC3339, value) + if err != nil { + return time.Time{}, false + } + + return parsed, true +} + +func shouldApplyStateTransition(previous, next RunState) bool { + previousRank := stateRank(previous.Status) + nextRank := stateRank(next.Status) + if nextRank < previousRank { + return false + } + + if nextRank == previousRank { + if next.Status == previous.Status && next.Conclusion == previous.Conclusion { + return false + } + + previousEndedAt, previousHasEndedAt := parseMetricTime(previous.EndedAt) + nextEndedAt, nextHasEndedAt := parseMetricTime(next.EndedAt) + if previousHasEndedAt && nextHasEndedAt && nextEndedAt.Before(previousEndedAt) { + return false + } + + if previousHasEndedAt && !nextHasEndedAt { + return false } } + + return true } -func updateRunState(ctx context.Context, id int, details runMetricDetails, updateFn func(context.Context, int, RunState) error, entityName string) { - if stateStore == nil { +func applyGaugeDelta(details RunState, delta float64, queuedGauge, inProgressGauge, completedGauge *prometheus.GaugeVec) { + switch normalizeStatus(details.Status) { + case statusQueued: + queuedGauge.WithLabelValues(details.Repository, details.Branch, details.Name).Add(delta) + case statusInProgress: + inProgressGauge.WithLabelValues(details.Repository, details.Branch, details.Name).Add(delta) + case statusCompleted: + completedGauge.WithLabelValues(details.Repository, details.Branch, details.Conclusion, details.Name).Add(delta) + } +} + +func observeDuration(details RunState, durationHistogram *prometheus.HistogramVec) { + if normalizeStatus(details.Status) != statusCompleted { return } - if err := updateFn(ctx, id, RunState{ - Repository: details.repository, - Branch: details.branch, - Name: details.name, - Status: details.status, - Conclusion: details.conclusion, - StartedAt: details.startedAt, - EndedAt: details.endedAt, - }); err != nil { + startedAt, startedOK := parseMetricTime(details.StartedAt) + endedAt, endedOK := parseMetricTime(details.EndedAt) + if !startedOK || !endedOK || endedAt.Before(startedAt) { + return + } + + durationHistogram.WithLabelValues( + details.Repository, + details.Branch, + details.Name, + details.Status, + details.Conclusion, + ).Observe(endedAt.Sub(startedAt).Seconds()) +} + +func applyStatefulMetrics(details RunState, previous *RunState, metrics runMetricSet) { + metrics.statusCounter.WithLabelValues( + details.Repository, + details.Branch, + details.Name, + details.Status, + details.Conclusion, + ).Inc() + + if previous != nil { + applyGaugeDelta(*previous, -1, metrics.queuedGauge, metrics.inProgressGauge, metrics.completedGauge) + } + applyGaugeDelta(details, 1, metrics.queuedGauge, metrics.inProgressGauge, metrics.completedGauge) + + if previous == nil || normalizeStatus(previous.Status) != statusCompleted { + observeDuration(details, metrics.durationHistogram) + } +} + +func getPreviousState(ctx context.Context, id int, getFn func(context.Context, int) (RunState, bool, error), entityName string) (*RunState, bool) { + if stateStore == nil { + return nil, true + } + + previous, found, err := getFn(ctx, id) + if err != nil { + logger.Error("Failed to load run state from redis", zap.String("entity", entityName), zap.Int("id", id), zap.Error(err)) + return nil, false + } + if !found { + return nil, true + } + + return &previous, true +} + +func persistRunState(ctx context.Context, id int, next RunState, updateFn func(context.Context, int, RunState) error, entityName string) bool { + if stateStore == nil { + return true + } + + if err := updateFn(ctx, id, next); err != nil { logger.Error("Failed to update run state in redis", zap.String("entity", entityName), zap.Int("id", id), zap.Error(err)) + return false } + + return true } -func updateWorkflowMetrics(ctx context.Context, body []byte) { - var payload GithubWorkflow +func updateTrackedRunMetrics( + ctx context.Context, + id int, + details runMetricDetails, + store runStoreMethods, + entityName string, + metrics runMetricSet, +) { + nextState := normalizeRunState(details) - if err := json.Unmarshal(body, &payload); err != nil { - logger.Error("Failed to unmarshal workflow_run payload", zap.Error(err)) + if stateStore == nil { + applyStatefulMetrics(nextState, nil, metrics) return } - details := runMetricDetails{ - repository: payload.Workflow.Repository.FullName, - branch: payload.Workflow.Branch, - name: payload.Workflow.Name, - status: payload.Workflow.Status, - conclusion: payload.Workflow.Conclusion, - startedAt: payload.Workflow.CreatedAt, - endedAt: payload.Workflow.UpdatedAt, + previousState, ok := getPreviousState(ctx, id, store.get, entityName) + if !ok { + return + } + if previousState != nil && !shouldApplyStateTransition(*previousState, nextState) { + logger.Debug("Skipping stale or duplicate run transition", zap.String("entity", entityName), zap.Int("id", id), zap.String("status", nextState.Status), zap.String("conclusion", nextState.Conclusion)) + return + } + if !persistRunState(ctx, id, nextState, store.update, entityName) { + return } - if stateStore != nil { - updateRunState(ctx, payload.Workflow.RunID, details, stateStore.UpdateWorkflowRun, "workflow_run") + applyStatefulMetrics(nextState, previousState, metrics) +} + +func workflowRunStoreMethods() runStoreMethods { + return runStoreMethods{ + get: func(ctx context.Context, id int) (RunState, bool, error) { + return stateStore.GetWorkflowRun(ctx, id) + }, + update: func(ctx context.Context, id int, state RunState) error { + return stateStore.UpdateWorkflowRun(ctx, id, state) + }, + } +} + +func workflowJobStoreMethods() runStoreMethods { + return runStoreMethods{ + get: func(ctx context.Context, id int) (RunState, bool, error) { + return stateStore.GetWorkflowJob(ctx, id) + }, + update: func(ctx context.Context, id int, state RunState) error { + return stateStore.UpdateWorkflowJob(ctx, id, state) + }, } +} - observeRunMetrics( - details, - workflowStatusCounter, - workflowQueuedGauge, - workflowInProgressGauge, - workflowCompletedGauge, - workflowDurationHistogram, +func updateWorkflowMetrics(ctx context.Context, body []byte) { + var payload GithubWorkflow + + if err := json.Unmarshal(body, &payload); err != nil { + logger.Error("Failed to unmarshal workflow_run payload", zap.Error(err)) + return + } + + updateTrackedRunMetrics( + ctx, + payload.Workflow.RunID, + runMetricDetails{ + repository: payload.Workflow.Repository.FullName, + branch: payload.Workflow.Branch, + name: payload.Workflow.Name, + status: payload.Workflow.Status, + conclusion: payload.Workflow.Conclusion, + startedAt: payload.Workflow.CreatedAt, + endedAt: payload.Workflow.UpdatedAt, + }, + workflowRunStoreMethods(), + "workflow_run", + runMetricSet{ + statusCounter: workflowStatusCounter, + queuedGauge: workflowQueuedGauge, + inProgressGauge: workflowInProgressGauge, + completedGauge: workflowCompletedGauge, + durationHistogram: workflowDurationHistogram, + }, ) } @@ -270,27 +414,27 @@ func updateJobMetrics(ctx context.Context, body []byte) { return } - details := runMetricDetails{ - repository: payload.Job.Repository.FullName, - branch: payload.Job.Branch, - name: payload.Job.WorkflowName, - status: payload.Job.Status, - conclusion: payload.Job.Conclusion, - startedAt: payload.Job.StartedAt, - endedAt: payload.Job.CompletedAt, - } - - if stateStore != nil { - updateRunState(ctx, payload.Job.ID, details, stateStore.UpdateWorkflowJob, "workflow_job") - } - - observeRunMetrics( - details, - jobStatusCounter, - jobQueuedGauge, - jobInProgressGauge, - jobCompletedGauge, - jobDurationHistogram, + updateTrackedRunMetrics( + ctx, + payload.Job.ID, + runMetricDetails{ + repository: payload.Job.Repository.FullName, + branch: payload.Job.Branch, + name: payload.Job.WorkflowName, + status: payload.Job.Status, + conclusion: payload.Job.Conclusion, + startedAt: payload.Job.StartedAt, + endedAt: payload.Job.CompletedAt, + }, + workflowJobStoreMethods(), + "workflow_job", + runMetricSet{ + statusCounter: jobStatusCounter, + queuedGauge: jobQueuedGauge, + inProgressGauge: jobInProgressGauge, + completedGauge: jobCompletedGauge, + durationHistogram: jobDurationHistogram, + }, ) } diff --git a/src/metrics_test.go b/src/metrics_test.go index 06ad2ad..e04b1e4 100644 --- a/src/metrics_test.go +++ b/src/metrics_test.go @@ -10,13 +10,14 @@ import ( "github.com/prometheus/client_golang/prometheus/testutil" ) -const ( - statusQueued = "queued" - statusInProgress = "in_progress" - statusCompleted = "completed" -) +func withInMemoryStateStore(t *testing.T) { + oldStore := stateStore + stateStore = newInMemoryStateStore() + t.Cleanup(func() { stateStore = oldStore }) +} func TestWorkflowStatusCounter(t *testing.T) { + withInMemoryStateStore(t) workflowStatusCounter.Reset() reg.MustRegister(workflowStatusCounter) body, err := os.ReadFile("../test_data/workflow_run.json") @@ -25,7 +26,6 @@ func TestWorkflowStatusCounter(t *testing.T) { } updateWorkflowMetrics(context.Background(), body) - // Test counter if err := testutil.CollectAndCompare(workflowStatusCounter, strings.NewReader(` # HELP promgithub_workflow_status Total number of workflow runs with status # TYPE promgithub_workflow_status counter @@ -36,6 +36,7 @@ func TestWorkflowStatusCounter(t *testing.T) { } func TestJobStatusCounter(t *testing.T) { + withInMemoryStateStore(t) jobStatusCounter.Reset() reg.MustRegister(jobStatusCounter) body, err := os.ReadFile("../test_data/workflow_job.json") @@ -44,7 +45,6 @@ func TestJobStatusCounter(t *testing.T) { } updateJobMetrics(context.Background(), body) - // Test counter if err := testutil.CollectAndCompare(jobStatusCounter, strings.NewReader(` # HELP promgithub_job_status Total number of jobs with status # TYPE promgithub_job_status counter @@ -63,7 +63,6 @@ func TestCommitsPushedCounter(t *testing.T) { } updateCommitMetrics(body) - // Test counter if err := testutil.CollectAndCompare(commitPushedCounter, strings.NewReader(` # HELP promgithub_commit_pushed Total number of commits pushed # TYPE promgithub_commit_pushed counter @@ -82,7 +81,6 @@ func TestPullRequestsCounter(t *testing.T) { } updatePullRequestMetrics(body) - // Test counter if err := testutil.CollectAndCompare(pullRequestCounter, strings.NewReader(` # HELP promgithub_pull_request Total number of pull requests # TYPE promgithub_pull_request counter @@ -93,6 +91,7 @@ func TestPullRequestsCounter(t *testing.T) { } func TestWorkflowDurationHistogram(t *testing.T) { + withInMemoryStateStore(t) workflowDurationHistogram.Reset() reg.MustRegister(workflowDurationHistogram) body, err := os.ReadFile("../test_data/workflow_run.json") @@ -101,7 +100,6 @@ func TestWorkflowDurationHistogram(t *testing.T) { } updateWorkflowMetrics(context.Background(), body) - // Test histogram if err := testutil.CollectAndCompare(workflowDurationHistogram, strings.NewReader(` # HELP promgithub_workflow_duration Duration of workflow runs # TYPE promgithub_workflow_duration histogram @@ -125,6 +123,7 @@ func TestWorkflowDurationHistogram(t *testing.T) { } func TestJobDurationHistogram(t *testing.T) { + withInMemoryStateStore(t) jobDurationHistogram.Reset() reg.MustRegister(jobDurationHistogram) body, err := os.ReadFile("../test_data/workflow_job.json") @@ -133,7 +132,6 @@ func TestJobDurationHistogram(t *testing.T) { } updateJobMetrics(context.Background(), body) - // Test histogram if err := testutil.CollectAndCompare(jobDurationHistogram, strings.NewReader(` # HELP promgithub_job_duration Duration of jobs runs in seconds # TYPE promgithub_job_duration histogram @@ -157,6 +155,7 @@ func TestJobDurationHistogram(t *testing.T) { } func TestWorkflowQueuedGauge(t *testing.T) { + withInMemoryStateStore(t) workflowQueuedGauge.Reset() reg.MustRegister(workflowQueuedGauge) body, err := os.ReadFile("../test_data/workflow_run.json") @@ -165,16 +164,12 @@ func TestWorkflowQueuedGauge(t *testing.T) { } var payload GithubWorkflow - - // Unmarshal the JSON data into the struct if err := json.Unmarshal(body, &payload); err != nil { t.Fatalf("Failed to unmarshal JSON data: %v", err) } - - // Modify the status field payload.Workflow.Status = statusQueued + payload.Workflow.Conclusion = "" - // Marshal the modified struct back to JSON if needed modifiedBody, err := json.Marshal(payload) if err != nil { t.Fatalf("Failed to marshal modified JSON data: %v", err) @@ -182,17 +177,17 @@ func TestWorkflowQueuedGauge(t *testing.T) { updateWorkflowMetrics(context.Background(), modifiedBody) - // Test gauge if err := testutil.CollectAndCompare(workflowQueuedGauge, strings.NewReader(` - # HELP promgithub_workflow_queued Number of workflow runs queued - # TYPE promgithub_workflow_queued gauge - promgithub_workflow_queued{branch="main",repository="user/repo",workflow_name="CI"} 1 - `)); err != nil { + # HELP promgithub_workflow_queued Number of workflow runs queued + # TYPE promgithub_workflow_queued gauge + promgithub_workflow_queued{branch="main",repository="user/repo",workflow_name="CI"} 1 + `)); err != nil { t.Errorf("unexpected metrics: %v", err) } } func TestWorkflowInProgressGauge(t *testing.T) { + withInMemoryStateStore(t) workflowInProgressGauge.Reset() reg.MustRegister(workflowInProgressGauge) body, err := os.ReadFile("../test_data/workflow_run.json") @@ -201,16 +196,13 @@ func TestWorkflowInProgressGauge(t *testing.T) { } var payload GithubWorkflow - - // Unmarshal the JSON data into the struct if err := json.Unmarshal(body, &payload); err != nil { t.Fatalf("Failed to unmarshal JSON data: %v", err) } - - // Modify the status field payload.Workflow.Status = statusInProgress + payload.Workflow.Conclusion = "" + payload.Workflow.UpdatedAt = "" - // Marshal the modified struct back to JSON if needed modifiedBody, err := json.Marshal(payload) if err != nil { t.Fatalf("Failed to marshal modified JSON data: %v", err) @@ -218,17 +210,17 @@ func TestWorkflowInProgressGauge(t *testing.T) { updateWorkflowMetrics(context.Background(), modifiedBody) - // Test gauge if err := testutil.CollectAndCompare(workflowInProgressGauge, strings.NewReader(` - # HELP promgithub_workflow_in_progress Number of workflow runs in progress - # TYPE promgithub_workflow_in_progress gauge - promgithub_workflow_in_progress{branch="main",repository="user/repo",workflow_name="CI"} 1 - `)); err != nil { + # HELP promgithub_workflow_in_progress Number of workflow runs in progress + # TYPE promgithub_workflow_in_progress gauge + promgithub_workflow_in_progress{branch="main",repository="user/repo",workflow_name="CI"} 1 + `)); err != nil { t.Errorf("unexpected metrics: %v", err) } } func TestWorkflowCompletedGauge(t *testing.T) { + withInMemoryStateStore(t) workflowCompletedGauge.Reset() reg.MustRegister(workflowCompletedGauge) body, err := os.ReadFile("../test_data/workflow_run.json") @@ -237,7 +229,6 @@ func TestWorkflowCompletedGauge(t *testing.T) { } updateWorkflowMetrics(context.Background(), body) - // Test gauge if err := testutil.CollectAndCompare(workflowCompletedGauge, strings.NewReader(` # HELP promgithub_workflow_completed Number of workflow runs completed # TYPE promgithub_workflow_completed gauge @@ -248,6 +239,7 @@ func TestWorkflowCompletedGauge(t *testing.T) { } func TestJobQueuedGauge(t *testing.T) { + withInMemoryStateStore(t) jobQueuedGauge.Reset() reg.MustRegister(jobQueuedGauge) body, err := os.ReadFile("../test_data/workflow_job.json") @@ -256,16 +248,13 @@ func TestJobQueuedGauge(t *testing.T) { } var payload GithubJob - - // Unmarshal the JSON data into the struct if err := json.Unmarshal(body, &payload); err != nil { t.Fatalf("Failed to unmarshal JSON data: %v", err) } - - // Modify the status field payload.Job.Status = statusQueued + payload.Job.Conclusion = "" + payload.Job.CompletedAt = "" - // Marshal the modified struct back to JSON modifiedBody, err := json.Marshal(payload) if err != nil { t.Fatalf("Failed to marshal modified JSON data: %v", err) @@ -273,7 +262,6 @@ func TestJobQueuedGauge(t *testing.T) { updateJobMetrics(context.Background(), modifiedBody) - // Test gauge if err := testutil.CollectAndCompare(jobQueuedGauge, strings.NewReader(` # HELP promgithub_job_queued Number of jobs queued # TYPE promgithub_job_queued gauge @@ -284,6 +272,7 @@ func TestJobQueuedGauge(t *testing.T) { } func TestJobInProgressGauge(t *testing.T) { + withInMemoryStateStore(t) jobInProgressGauge.Reset() reg.MustRegister(jobInProgressGauge) body, err := os.ReadFile("../test_data/workflow_job.json") @@ -292,16 +281,13 @@ func TestJobInProgressGauge(t *testing.T) { } var payload GithubJob - - // Unmarshal the JSON data into the struct if err := json.Unmarshal(body, &payload); err != nil { t.Fatalf("Failed to unmarshal JSON data: %v", err) } - - // Modify the status field payload.Job.Status = statusInProgress + payload.Job.Conclusion = "" + payload.Job.CompletedAt = "" - // Marshal the modified struct back to JSON modifiedBody, err := json.Marshal(payload) if err != nil { t.Fatalf("Failed to marshal modified JSON data: %v", err) @@ -309,7 +295,6 @@ func TestJobInProgressGauge(t *testing.T) { updateJobMetrics(context.Background(), modifiedBody) - // Test gauge if err := testutil.CollectAndCompare(jobInProgressGauge, strings.NewReader(` # HELP promgithub_job_in_progress Number of jobs in progress # TYPE promgithub_job_in_progress gauge @@ -320,38 +305,113 @@ func TestJobInProgressGauge(t *testing.T) { } func TestJobCompletedGauge(t *testing.T) { + withInMemoryStateStore(t) jobCompletedGauge.Reset() reg.MustRegister(jobCompletedGauge) - body, err := os.ReadFile("../test_data/workflow_job.json") if err != nil { t.Fatalf("Failed to read test data file: %v", err) } + updateJobMetrics(context.Background(), body) - var payload GithubJob + if err := testutil.CollectAndCompare(jobCompletedGauge, strings.NewReader(` + # HELP promgithub_job_completed Number of jobs completed + # TYPE promgithub_job_completed gauge + promgithub_job_completed{branch="main",job_conclusion="success",repository="user/repo",workflow_name="CI"} 1 + `)); err != nil { + t.Errorf("unexpected metrics: %v", err) + } +} + +func TestWorkflowGaugeTransitionIsIdempotent(t *testing.T) { + withInMemoryStateStore(t) + workflowQueuedGauge.Reset() + workflowInProgressGauge.Reset() + workflowCompletedGauge.Reset() - // Unmarshal the JSON data into the struct + body, err := os.ReadFile("../test_data/workflow_run.json") + if err != nil { + t.Fatalf("Failed to read test data file: %v", err) + } + + var payload GithubWorkflow if err := json.Unmarshal(body, &payload); err != nil { t.Fatalf("Failed to unmarshal JSON data: %v", err) } - // Modify the status field - payload.Job.Status = statusCompleted + payload.Workflow.Status = statusQueued + payload.Workflow.Conclusion = "" + payload.Workflow.UpdatedAt = payload.Workflow.CreatedAt + queuedBody, _ := json.Marshal(payload) + updateWorkflowMetrics(context.Background(), queuedBody) + updateWorkflowMetrics(context.Background(), queuedBody) - // Marshal the modified struct back to JSON - modifiedBody, err := json.Marshal(payload) + payload.Workflow.Status = statusInProgress + payload.Workflow.UpdatedAt = "2024-11-21T11:30:00Z" + inProgressBody, _ := json.Marshal(payload) + updateWorkflowMetrics(context.Background(), inProgressBody) + + payload.Workflow.Status = statusCompleted + payload.Workflow.Conclusion = "success" + payload.Workflow.UpdatedAt = "2024-11-21T12:00:00Z" + completedBody, _ := json.Marshal(payload) + updateWorkflowMetrics(context.Background(), completedBody) + updateWorkflowMetrics(context.Background(), inProgressBody) + + if got := testutil.ToFloat64(workflowQueuedGauge.WithLabelValues("user/repo", "main", "CI")); got != 0 { + t.Fatalf("expected queued gauge to be 0, got %v", got) + } + if got := testutil.ToFloat64(workflowInProgressGauge.WithLabelValues("user/repo", "main", "CI")); got != 0 { + t.Fatalf("expected in progress gauge to be 0, got %v", got) + } + if got := testutil.ToFloat64(workflowCompletedGauge.WithLabelValues("user/repo", "main", "success", "CI")); got != 1 { + t.Fatalf("expected completed gauge to be 1, got %v", got) + } +} + +func TestJobGaugeTransitionIsIdempotent(t *testing.T) { + withInMemoryStateStore(t) + jobQueuedGauge.Reset() + jobInProgressGauge.Reset() + jobCompletedGauge.Reset() + + body, err := os.ReadFile("../test_data/workflow_job.json") if err != nil { - t.Fatalf("Failed to marshal modified JSON data: %v", err) + t.Fatalf("Failed to read test data file: %v", err) } - updateJobMetrics(context.Background(), modifiedBody) + var payload GithubJob + if err := json.Unmarshal(body, &payload); err != nil { + t.Fatalf("Failed to unmarshal JSON data: %v", err) + } - // Test gauge - if err := testutil.CollectAndCompare(jobCompletedGauge, strings.NewReader(` - # HELP promgithub_job_completed Number of jobs completed - # TYPE promgithub_job_completed gauge - promgithub_job_completed{branch="main",job_conclusion="success",repository="user/repo",workflow_name="CI"} 1 - `)); err != nil { - t.Errorf("unexpected metrics: %v", err) + payload.Job.Status = statusQueued + payload.Job.Conclusion = "" + payload.Job.StartedAt = "" + payload.Job.CompletedAt = "" + queuedBody, _ := json.Marshal(payload) + updateJobMetrics(context.Background(), queuedBody) + updateJobMetrics(context.Background(), queuedBody) + + payload.Job.Status = statusInProgress + payload.Job.StartedAt = "2024-11-21T11:00:00Z" + inProgressBody, _ := json.Marshal(payload) + updateJobMetrics(context.Background(), inProgressBody) + + payload.Job.Status = statusCompleted + payload.Job.Conclusion = "success" + payload.Job.CompletedAt = "2024-11-21T12:00:00Z" + completedBody, _ := json.Marshal(payload) + updateJobMetrics(context.Background(), completedBody) + updateJobMetrics(context.Background(), inProgressBody) + + if got := testutil.ToFloat64(jobQueuedGauge.WithLabelValues("user/repo", "main", "CI")); got != 0 { + t.Fatalf("expected queued gauge to be 0, got %v", got) + } + if got := testutil.ToFloat64(jobInProgressGauge.WithLabelValues("user/repo", "main", "CI")); got != 0 { + t.Fatalf("expected in progress gauge to be 0, got %v", got) + } + if got := testutil.ToFloat64(jobCompletedGauge.WithLabelValues("user/repo", "main", "success", "CI")); got != 1 { + t.Fatalf("expected completed gauge to be 1, got %v", got) } } diff --git a/src/redis.go b/src/redis.go index 624bfe8..9752a84 100644 --- a/src/redis.go +++ b/src/redis.go @@ -91,6 +91,14 @@ func (s *RedisStateStore) MarkDeliveryProcessed(ctx context.Context, deliveryID return created, nil } +func (s *RedisStateStore) GetWorkflowRun(ctx context.Context, runID int) (RunState, bool, error) { + if runID == 0 { + return RunState{}, false, errors.New("workflow run id is required") + } + + return s.readState(ctx, s.key("workflow_run", fmt.Sprintf("%d", runID))) +} + func (s *RedisStateStore) UpdateWorkflowRun(ctx context.Context, runID int, state RunState) error { if runID == 0 { return errors.New("workflow run id is required") @@ -99,6 +107,14 @@ func (s *RedisStateStore) UpdateWorkflowRun(ctx context.Context, runID int, stat return s.writeState(ctx, s.key("workflow_run", fmt.Sprintf("%d", runID)), state) } +func (s *RedisStateStore) GetWorkflowJob(ctx context.Context, jobID int) (RunState, bool, error) { + if jobID == 0 { + return RunState{}, false, errors.New("workflow job id is required") + } + + return s.readState(ctx, s.key("workflow_job", fmt.Sprintf("%d", jobID))) +} + func (s *RedisStateStore) UpdateWorkflowJob(ctx context.Context, jobID int, state RunState) error { if jobID == 0 { return errors.New("workflow job id is required") @@ -114,6 +130,23 @@ func (s *RedisStateStore) Close() error { return s.client.Close() } +func (s *RedisStateStore) readState(ctx context.Context, key string) (RunState, bool, error) { + payload, err := s.client.Get(ctx, key).Result() + if err != nil { + if errors.Is(err, redis.Nil) { + return RunState{}, false, nil + } + return RunState{}, false, err + } + + var state RunState + if err := json.Unmarshal([]byte(payload), &state); err != nil { + return RunState{}, false, err + } + + return state, true, nil +} + func (s *RedisStateStore) writeState(ctx context.Context, key string, state RunState) error { payload, err := json.Marshal(state) if err != nil { diff --git a/src/redis_test.go b/src/redis_test.go index 6c3a5f1..e81b7c4 100644 --- a/src/redis_test.go +++ b/src/redis_test.go @@ -29,11 +29,21 @@ func (s *inMemoryStateStore) MarkDeliveryProcessed(_ context.Context, deliveryID return true, nil } +func (s *inMemoryStateStore) GetWorkflowRun(_ context.Context, runID int) (RunState, bool, error) { + state, ok := s.workflow[runID] + return state, ok, nil +} + func (s *inMemoryStateStore) UpdateWorkflowRun(_ context.Context, runID int, state RunState) error { s.workflow[runID] = state return nil } +func (s *inMemoryStateStore) GetWorkflowJob(_ context.Context, jobID int) (RunState, bool, error) { + state, ok := s.jobs[jobID] + return state, ok, nil +} + func (s *inMemoryStateStore) UpdateWorkflowJob(_ context.Context, jobID int, state RunState) error { s.jobs[jobID] = state return nil diff --git a/src/state_store.go b/src/state_store.go index 208b9e3..43c7e29 100644 --- a/src/state_store.go +++ b/src/state_store.go @@ -4,7 +4,9 @@ import "context" type StateStore interface { MarkDeliveryProcessed(ctx context.Context, deliveryID string) (bool, error) + GetWorkflowRun(ctx context.Context, runID int) (RunState, bool, error) UpdateWorkflowRun(ctx context.Context, runID int, state RunState) error + GetWorkflowJob(ctx context.Context, jobID int) (RunState, bool, error) UpdateWorkflowJob(ctx context.Context, jobID int, state RunState) error Close() error }