Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@ interface OutboxStore {
*/
fun removeDeliveredBefore(time: Instant, limit: Int): Int

/** Returns the oldest createdAt per status (useful for lag metrics). */
/**
* For each of the given [statuses] that has at least one entry, maps that status to the
* [OutboxEntry.createdAt] of its oldest entry (the minimum `createdAt` among entries in
* that status). Statuses with no entries are omitted from the result -- callers rely on
* absence to mean "no backlog" (e.g. the lag gauge reports 0 for an omitted status).
* Useful for lag metrics.
*/
fun findOldestCreatedAt(statuses: Set<OutboxStatus>): Map<OutboxStatus, Instant>

/** Returns entry count per status. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,27 @@ fun FunSpec.outboxStoreContractTests(
counts shouldContain (OutboxStatus.FAILED to 1L)
}

test("[$dbName] findOldestCreatedAt returns the oldest entry only for statuses that have rows") {
// Regression guard: a status with no rows must be ABSENT from the result map, not defaulted
// to "now". The lag gauge (okapi-micrometer) relies on absence to report 0 for empty statuses;
// if the store returns a near-now timestamp for an empty status the gauge shows a tiny false lag.
val older = Instant.parse("2024-01-01T00:00:00Z")
val newer = Instant.parse("2024-01-02T00:00:00Z")
jdbc.withTransaction {
store.persist(createTestEntry(now = older, messageType = "type.older"))
store.persist(createTestEntry(now = newer, messageType = "type.newer"))
}

val oldest = jdbc.withTransaction {
store.findOldestCreatedAt(setOf(OutboxStatus.PENDING, OutboxStatus.DELIVERED, OutboxStatus.FAILED))
}

// PENDING has two entries -> oldest createdAt is the earlier one.
oldest shouldContain (OutboxStatus.PENDING to older)
// DELIVERED / FAILED have no rows -> omitted entirely, not seeded with the current time.
oldest.keys shouldBe setOf(OutboxStatus.PENDING)
}

test("[$dbName] claimPending returns empty when no PENDING entries") {
val claimed = jdbc.withTransaction { store.claimPending(10) }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class MysqlOutboxStore(
}

override fun findOldestCreatedAt(statuses: Set<OutboxStatus>): Map<OutboxStatus, Instant> {
val result = statuses.associateWith { clock.instant() }.toMutableMap()
val result = mutableMapOf<OutboxStatus, Instant>()
val placeholders = statuses.joinToString(",") { "?" }
val sql = "SELECT status, MIN(created_at) AS min_created_at FROM okapi_outbox WHERE status IN ($placeholders) GROUP BY status"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class PostgresOutboxStore(
}

override fun findOldestCreatedAt(statuses: Set<OutboxStatus>): Map<OutboxStatus, Instant> {
val result = statuses.associateWith { clock.instant() }.toMutableMap()
val result = mutableMapOf<OutboxStatus, Instant>()
val placeholders = statuses.joinToString(",") { "?" }
val sql = "SELECT status, MIN(created_at) AS min_created_at FROM okapi_outbox WHERE status IN ($placeholders) GROUP BY status"

Expand Down