Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ Until `1.0.0`, breaking changes may appear in any release and are flagged with *

### Changed (BREAKING)

- **`PostgresOutboxStore` / `MysqlOutboxStore` no longer take a `clock` constructor
parameter.** It became unused after the lag-gauge fix ([#58](https://github.com/softwaremill/okapi/pull/58)) —
the stores derive no timestamps from a clock. Code that passed an explicit clock
(`PostgresOutboxStore(connectionProvider, clock)`) must drop the second argument;
the usual `PostgresOutboxStore(connectionProvider)` form is unchanged. Spring Boot
users are unaffected.
- **Outbox domain table renamed `outbox` → `okapi_outbox`.** Indexes follow the rename
(`idx_outbox_*` → `idx_okapi_outbox_*`). Host applications with a pre-existing `outbox`
table are no longer affected — okapi creates its own table under the `okapi_` prefix.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ open class HttpThroughputBenchmark {
)

val clock = Clock.systemUTC()
val store = PostgresOutboxStore(postgres.jdbc, clock)
val store = PostgresOutboxStore(postgres.jdbc)
publisher = OutboxPublisher(store, clock)
val urlResolver = ServiceUrlResolver { "http://localhost:${wiremock.port()}" }
val deliverer = HttpMessageDeliverer(urlResolver)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ open class KafkaThroughputBenchmark {
topic = "bench-${UUID.randomUUID()}"

val clock = Clock.systemUTC()
val store = PostgresOutboxStore(postgres.jdbc, clock)
val store = PostgresOutboxStore(postgres.jdbc)
publisher = OutboxPublisher(store, clock)
val deliverer = KafkaMessageDeliverer(producer)
val entryProcessor = OutboxEntryProcessor(deliverer, RetryPolicy(maxRetries = 0), clock)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ class OutboxPurgerTest : FunSpec({
try {
val purger = OutboxPurger(
outboxStore = store,
transactionRunner = noOpTransactionRunner(),
config = OutboxPurgerConfig(interval = ofMillis(50), batchSize = 100),
clock = fixedClock,
)
Expand Down Expand Up @@ -192,6 +193,7 @@ class OutboxPurgerTest : FunSpec({
try {
val purger = OutboxPurger(
outboxStore = store,
transactionRunner = noOpTransactionRunner(),
config = OutboxPurgerConfig(interval = ofMillis(50), batchSize = 100),
clock = fixedClock,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,14 @@ package com.softwaremill.okapi.test.concurrency
import com.softwaremill.okapi.mysql.MysqlOutboxStore
import com.softwaremill.okapi.test.support.MysqlTestSupport
import io.kotest.core.spec.style.FunSpec
import java.time.Clock
import java.time.Instant
import java.time.ZoneOffset

class MysqlConcurrentClaimTest : FunSpec({
val db = MysqlTestSupport()

concurrentClaimTests(
dbName = "mysql",
jdbcProvider = { db.jdbc },
storeFactory = { MysqlOutboxStore(db.jdbc, Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC)) },
storeFactory = { MysqlOutboxStore(db.jdbc) },
startDb = { db.start() },
stopDb = { db.stop() },
truncate = { db.truncate() },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,14 @@ package com.softwaremill.okapi.test.concurrency
import com.softwaremill.okapi.postgres.PostgresOutboxStore
import com.softwaremill.okapi.test.support.PostgresTestSupport
import io.kotest.core.spec.style.FunSpec
import java.time.Clock
import java.time.Instant
import java.time.ZoneOffset

class PostgresConcurrentClaimTest : FunSpec({
val db = PostgresTestSupport()

concurrentClaimTests(
dbName = "postgres",
jdbcProvider = { db.jdbc },
storeFactory = { PostgresOutboxStore(db.jdbc, Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC)) },
storeFactory = { PostgresOutboxStore(db.jdbc) },
startDb = { db.start() },
stopDb = { db.stop() },
truncate = { db.truncate() },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class HttpEndToEndTest : FunSpec({

fun buildPipeline(maxRetries: Int = 3): Triple<OutboxPublisher, OutboxProcessor, PostgresOutboxStore> {
val clock = Clock.systemUTC()
val store = PostgresOutboxStore(db.jdbc, clock)
val store = PostgresOutboxStore(db.jdbc)
val publisher = OutboxPublisher(store, clock)
val urlResolver = ServiceUrlResolver { "http://localhost:${wiremock.port()}" }
val entryProcessor = OutboxEntryProcessor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class KafkaEndToEndTest : FunSpec({

test("full pipeline: publish to outbox -> processNext -> message on Kafka topic") {
val clock = Clock.systemUTC()
val store = PostgresOutboxStore(db.jdbc, clock)
val store = PostgresOutboxStore(db.jdbc)
val publisher = OutboxPublisher(store, clock)
val deliverer = KafkaMessageDeliverer(producer!!)
val entryProcessor = OutboxEntryProcessor(deliverer, RetryPolicy(maxRetries = 3), clock)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class MysqlHttpEndToEndTest : FunSpec({

fun buildPipeline(): Triple<OutboxPublisher, OutboxProcessor, MysqlOutboxStore> {
val clock = Clock.systemUTC()
val store = MysqlOutboxStore(db.jdbc, clock)
val store = MysqlOutboxStore(db.jdbc)
val publisher = OutboxPublisher(store, clock)
val urlResolver = ServiceUrlResolver { "http://localhost:${wiremock.port()}" }
val entryProcessor = OutboxEntryProcessor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class ObservabilityEndToEndTest : FunSpec({

test("full pipeline: publish, deliver, verify Micrometer counters and gauges") {
val registry = SimpleMeterRegistry()
val store = PostgresOutboxStore(db.jdbc, clock)
val store = PostgresOutboxStore(db.jdbc)
val publisher = OutboxPublisher(store, clock)
val listener = MicrometerOutboxListener(registry)
val metrics = MicrometerOutboxMetrics(store, registry, transactionRunner = jdbcTransactionRunner, clock = clock)
Expand Down Expand Up @@ -109,7 +109,7 @@ class ObservabilityEndToEndTest : FunSpec({

test("permanent failure: HTTP 400 → Failed counter incremented, gauge reflects FAILED") {
val registry = SimpleMeterRegistry()
val store = PostgresOutboxStore(db.jdbc, clock)
val store = PostgresOutboxStore(db.jdbc)
val publisher = OutboxPublisher(store, clock)
val listener = MicrometerOutboxListener(registry)
val metrics = MicrometerOutboxMetrics(store, registry, transactionRunner = jdbcTransactionRunner, clock = clock)
Expand All @@ -135,7 +135,7 @@ class ObservabilityEndToEndTest : FunSpec({

test("batch duration timer records realistic delivery time") {
val registry = SimpleMeterRegistry()
val store = PostgresOutboxStore(db.jdbc, clock)
val store = PostgresOutboxStore(db.jdbc)
val publisher = OutboxPublisher(store, clock)
val listener = MicrometerOutboxListener(registry)

Expand All @@ -158,7 +158,7 @@ class ObservabilityEndToEndTest : FunSpec({

test("lag gauge reflects real time difference for pending entries") {
val registry = SimpleMeterRegistry()
val store = PostgresOutboxStore(db.jdbc, clock)
val store = PostgresOutboxStore(db.jdbc)
val publisher = OutboxPublisher(store, clock)
val metrics = MicrometerOutboxMetrics(store, registry, transactionRunner = jdbcTransactionRunner, clock = clock)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package com.softwaremill.okapi.test.store
import com.softwaremill.okapi.mysql.MysqlOutboxStore
import com.softwaremill.okapi.test.support.MysqlTestSupport
import io.kotest.core.spec.style.FunSpec
import java.time.Clock
import java.time.Instant
import java.time.ZoneOffset

class MysqlOutboxStoreTest : FunSpec({
val db = MysqlTestSupport()
Expand All @@ -15,7 +12,6 @@ class MysqlOutboxStoreTest : FunSpec({
storeFactory = {
MysqlOutboxStore(
connectionProvider = db.jdbc,
clock = Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC),
)
},
jdbcProvider = { db.jdbc },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package com.softwaremill.okapi.test.store
import com.softwaremill.okapi.postgres.PostgresOutboxStore
import com.softwaremill.okapi.test.support.PostgresTestSupport
import io.kotest.core.spec.style.FunSpec
import java.time.Clock
import java.time.Instant
import java.time.ZoneOffset

class PostgresOutboxStoreTest : FunSpec({
val db = PostgresTestSupport()
Expand All @@ -15,7 +12,6 @@ class PostgresOutboxStoreTest : FunSpec({
storeFactory = {
PostgresOutboxStore(
connectionProvider = db.jdbc,
clock = Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC),
)
},
jdbcProvider = { db.jdbc },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class ConnectionLeakProofTest : FunSpec({
}
runLiquibase(container)
counter = CountingDataSource(raw)
store = PostgresOutboxStore(SpringConnectionProvider(counter), clock)
store = PostgresOutboxStore(SpringConnectionProvider(counter))
}

afterSpec {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.springframework.boot.test.context.runner.ApplicationContextRunner
import org.springframework.transaction.PlatformTransactionManager
import org.springframework.transaction.support.TransactionTemplate
import org.testcontainers.containers.PostgreSQLContainer
import java.time.Clock
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CyclicBarrier
import java.util.concurrent.Executors
Expand Down Expand Up @@ -69,7 +68,7 @@ class ExposedSpringBridgeEndToEndTest : FunSpec({
.withBean(MessageDeliverer::class.java, { recorder })
.withBean(PlatformTransactionManager::class.java, { SpringTransactionManager(counter) })
.withBean(PostgresOutboxStore::class.java, {
PostgresOutboxStore(SpringConnectionProvider(counter), Clock.systemUTC())
PostgresOutboxStore(SpringConnectionProvider(counter))
})

test("publish inside Spring TX driven by Exposed-bridge PTM uses a single physical connection") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class JpaTransactionManagerFailFastTest : FunSpec({
.withBean("jpaTmA", PlatformTransactionManager::class.java, { JpaTransactionManager(emf) })
.withBean(MessageDeliverer::class.java, { JpaTestStubDeliverer })
.withBean(PostgresOutboxStore::class.java, {
PostgresOutboxStore(SpringConnectionProvider(dsB), java.time.Clock.systemUTC())
PostgresOutboxStore(SpringConnectionProvider(dsB))
})
.withPropertyValues("okapi.liquibase.enabled=false")
.run { ctx ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class JpaTransactionManagerMatchedDataSourceTest : FunSpec({
.withBean("jpaTm", PlatformTransactionManager::class.java, { jpaPtm })
.withBean(MessageDeliverer::class.java, { JpaMatchStubDeliverer })
.withBean(PostgresOutboxStore::class.java, {
PostgresOutboxStore(SpringConnectionProvider(ds), java.time.Clock.systemUTC())
PostgresOutboxStore(SpringConnectionProvider(ds))
})
.withPropertyValues("okapi.liquibase.enabled=false")
.run { ctx ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class MultiDataSourceTransactionTest : FunSpec({
val otherTxManager = DataSourceTransactionManager(otherDataSource)
otherTxTemplate = TransactionTemplate(otherTxManager)

store = PostgresOutboxStore(SpringConnectionProvider(outboxDataSource), clock)
store = PostgresOutboxStore(SpringConnectionProvider(outboxDataSource))
val corePublisher = OutboxPublisher(store, clock)
publisher = SpringOutboxPublisher(delegate = corePublisher, dataSource = outboxDataSource)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class MysqlConnectionLeakProofTest : FunSpec({
}
runLiquibase(container)
counter = CountingDataSource(raw)
store = MysqlOutboxStore(SpringConnectionProvider(counter), clock)
store = MysqlOutboxStore(SpringConnectionProvider(counter))
}

afterSpec {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class WrongPtmMultiDataSourceFailFastTest : FunSpec({
.withBean("exposedTmA", PlatformTransactionManager::class.java, { SpringTransactionManager(dsA) })
.withBean(MessageDeliverer::class.java, { RecordingMessageDeliverer() })
.withBean(PostgresOutboxStore::class.java, {
PostgresOutboxStore(SpringConnectionProvider(dsB), java.time.Clock.systemUTC())
PostgresOutboxStore(SpringConnectionProvider(dsB))
})
.withPropertyValues("okapi.processor.enabled=false", "okapi.liquibase.enabled=false")
.run { ctx ->
Expand All @@ -61,7 +61,7 @@ class WrongPtmMultiDataSourceFailFastTest : FunSpec({
.withBean("exposedTmA", PlatformTransactionManager::class.java, { SpringTransactionManager(dsA) })
.withBean(MessageDeliverer::class.java, { RecordingMessageDeliverer() })
.withBean(PostgresOutboxStore::class.java, {
PostgresOutboxStore(SpringConnectionProvider(dsB), java.time.Clock.systemUTC())
PostgresOutboxStore(SpringConnectionProvider(dsB))
})
.withPropertyValues(
"okapi.processor.enabled=false",
Expand Down Expand Up @@ -90,7 +90,7 @@ class WrongPtmMultiDataSourceFailFastTest : FunSpec({
.withBean("exposedTmA", PlatformTransactionManager::class.java, { SpringTransactionManager(dsA) })
.withBean(MessageDeliverer::class.java, { RecordingMessageDeliverer() })
.withBean(PostgresOutboxStore::class.java, {
PostgresOutboxStore(SpringConnectionProvider(dsB), java.time.Clock.systemUTC())
PostgresOutboxStore(SpringConnectionProvider(dsB))
})
.withPropertyValues(
"okapi.processor.enabled=false",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@ import com.softwaremill.okapi.core.OutboxStatus
import com.softwaremill.okapi.core.OutboxStore
import java.sql.ResultSet
import java.sql.Timestamp
import java.time.Clock
import java.time.Instant
import java.util.UUID

/** MySQL [OutboxStore] implementation using plain JDBC. */
class MysqlOutboxStore(
private val connectionProvider: ConnectionProvider,
private val clock: Clock = Clock.systemUTC(),
) : OutboxStore {

override fun persist(entry: OutboxEntry): OutboxEntry {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@ import com.softwaremill.okapi.core.OutboxStatus
import com.softwaremill.okapi.core.OutboxStore
import java.sql.ResultSet
import java.sql.Timestamp
import java.time.Clock
import java.time.Instant
import java.util.UUID

/** PostgreSQL [OutboxStore] implementation using plain JDBC. */
class PostgresOutboxStore(
private val connectionProvider: ConnectionProvider,
private val clock: Clock = Clock.systemUTC(),
) : OutboxStore {

override fun persist(entry: OutboxEntry): OutboxEntry {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,8 @@ class OutboxAutoConfiguration(
) {
@Bean
@ConditionalOnMissingBean(OutboxStore::class)
fun outboxStore(clock: ObjectProvider<Clock>): PostgresOutboxStore = PostgresOutboxStore(
fun outboxStore(): PostgresOutboxStore = PostgresOutboxStore(
connectionProvider = SpringConnectionProvider(resolveDataSource(dataSources, primaryDataSource, okapiProperties)),
clock = clock.getIfAvailable { Clock.systemUTC() },
)
}

Expand All @@ -255,9 +254,8 @@ class OutboxAutoConfiguration(
) {
@Bean
@ConditionalOnMissingBean(OutboxStore::class)
fun outboxStore(clock: ObjectProvider<Clock>): MysqlOutboxStore = MysqlOutboxStore(
fun outboxStore(): MysqlOutboxStore = MysqlOutboxStore(
connectionProvider = SpringConnectionProvider(resolveDataSource(dataSources, primaryDataSource, okapiProperties)),
clock = clock.getIfAvailable { Clock.systemUTC() },
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class OutboxMysqlEndToEndTest :
connection.close()

val clock = Clock.systemUTC()
store = MysqlOutboxStore(jdbc, clock)
store = MysqlOutboxStore(jdbc)
publisher = OutboxPublisher(store, clock)

val urlResolver = ServiceUrlResolver { "http://localhost:${wiremock.port()}" }
Expand Down