diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxStore.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxStore.kt index b9252fb..d0d7ee2 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxStore.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxStore.kt @@ -19,7 +19,13 @@ interface OutboxStore { */ fun removeDeliveredBefore(time: Instant, limit: Int): Int - /** Returns the oldest createdAt per status (useful for lag metrics). */ + /** + * For each of the given [statuses] that has at least one entry, maps that status to the + * [OutboxEntry.createdAt] of its oldest entry (the minimum `createdAt` among entries in + * that status). Statuses with no entries are omitted from the result -- callers rely on + * absence to mean "no backlog" (e.g. the lag gauge reports 0 for an omitted status). + * Useful for lag metrics. + */ fun findOldestCreatedAt(statuses: Set): Map /** Returns entry count per status. */ diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/OutboxStoreContractTests.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/OutboxStoreContractTests.kt index 04ce1a2..2af0948 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/OutboxStoreContractTests.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/OutboxStoreContractTests.kt @@ -225,6 +225,27 @@ fun FunSpec.outboxStoreContractTests( counts shouldContain (OutboxStatus.FAILED to 1L) } + test("[$dbName] findOldestCreatedAt returns the oldest entry only for statuses that have rows") { + // Regression guard: a status with no rows must be ABSENT from the result map, not defaulted + // to "now". The lag gauge (okapi-micrometer) relies on absence to report 0 for empty statuses; + // if the store returns a near-now timestamp for an empty status the gauge shows a tiny false lag. + val older = Instant.parse("2024-01-01T00:00:00Z") + val newer = Instant.parse("2024-01-02T00:00:00Z") + jdbc.withTransaction { + store.persist(createTestEntry(now = older, messageType = "type.older")) + store.persist(createTestEntry(now = newer, messageType = "type.newer")) + } + + val oldest = jdbc.withTransaction { + store.findOldestCreatedAt(setOf(OutboxStatus.PENDING, OutboxStatus.DELIVERED, OutboxStatus.FAILED)) + } + + // PENDING has two entries -> oldest createdAt is the earlier one. + oldest shouldContain (OutboxStatus.PENDING to older) + // DELIVERED / FAILED have no rows -> omitted entirely, not seeded with the current time. + oldest.keys shouldBe setOf(OutboxStatus.PENDING) + } + test("[$dbName] claimPending returns empty when no PENDING entries") { val claimed = jdbc.withTransaction { store.claimPending(10) } diff --git a/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt b/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt index 39eeb48..dcdcbfc 100644 --- a/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt +++ b/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt @@ -107,7 +107,7 @@ class MysqlOutboxStore( } override fun findOldestCreatedAt(statuses: Set): Map { - val result = statuses.associateWith { clock.instant() }.toMutableMap() + val result = mutableMapOf() val placeholders = statuses.joinToString(",") { "?" } val sql = "SELECT status, MIN(created_at) AS min_created_at FROM okapi_outbox WHERE status IN ($placeholders) GROUP BY status" diff --git a/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt b/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt index 2c4036d..a1af83c 100644 --- a/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt +++ b/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt @@ -101,7 +101,7 @@ class PostgresOutboxStore( } override fun findOldestCreatedAt(statuses: Set): Map { - val result = statuses.associateWith { clock.instant() }.toMutableMap() + val result = mutableMapOf() val placeholders = statuses.joinToString(",") { "?" } val sql = "SELECT status, MIN(created_at) AS min_created_at FROM okapi_outbox WHERE status IN ($placeholders) GROUP BY status"