Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ fun outboxTransactionRunner(): TransactionRunner = object : TransactionRunner {
}
```

Without a `TransactionRunner` each scheduler tick runs in auto-commit, which can cause duplicate delivery across instances — see Advanced below.

Advanced setups — multiple DataSources, JTA/Exposed PTMs, qualifier precedence — see [Advanced: transactions & multi-DataSource](#advanced-transactions--multi-datasource) below.

## Advanced: transactions & multi-DataSource

Without bracketing, `FOR UPDATE SKIP LOCKED` collapses to the single SELECT statement under JDBC auto-commit, which silently allows duplicate delivery across processor instances. This opt-in is intentionally manual to keep accidental misconfiguration out of multi-instance deployments.

**Multi-DataSource contexts.** If your application has multiple `DataSource` beans and uses a `PlatformTransactionManager` from which okapi cannot extract a `DataSource` (JTA, Exposed's `SpringTransactionManager`, JPA without a JDBC `DataSource`), the autoconfiguration refuses to start until you set `okapi.transaction-manager-qualifier` to the bean name of the PTM that brackets the outbox `DataSource`. `okapi.datasource-qualifier` alone is not sufficient: it picks the outbox `DataSource` but does not constrain which PTM brackets it. Alternative escape hatch: supply your own `@Bean TransactionRunner`. Single-DataSource setups and PTMs whose `DataSource` can be introspected (`DataSourceTransactionManager`, `JpaTransactionManager`, `HibernateTransactionManager`) are unaffected.
Expand Down
8 changes: 5 additions & 3 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ Default JMH config in `okapi-benchmarks/build.gradle.kts` uses:
- `fork = 2` — isolated JVMs to neutralize JIT-profile variance
- `warmupIterations = 3`, `warmup = 10s` — let JIT C2 settle
- `iterations = 5`, `timeOnIteration = 30s` — statistically meaningful sample
- `-Xms2g -Xmx2g -XX:+UseG1GC` — pinned memory and GC for reproducibility
- `-Xms8g -Xmx8g -XX:+UseG1GC` — pinned memory and GC for reproducibility

```sh
./gradlew :okapi-benchmarks:jmh
```

Wall time: ~30 minutes (Testcontainers spin-up + 2 transports × 3 batchSize values × 8 iterations).
Wall time ~30 min: Testcontainers spin-up plus the full JMH matrix — Kafka (3 batchSize combos) and HTTP
(3 batchSize × 3 httpLatencyMs = 9 combos) throughput benchmarks plus DelivererMicroBenchmark, each with
fork=2, 3 warmup + 5 measurement iterations.

Result JSON: `okapi-benchmarks/build/reports/jmh/results.json`

Expand Down Expand Up @@ -87,7 +89,7 @@ investigate variability sources (background processes, thermal throttling, GC).
benchmarks suggest. Treat numbers as **upper bounds** for the library's processing capacity.
- **HTTP benchmark uses WireMock in-JVM**, which adds ~0.3 ms overhead per request (Jetty
servlet pipeline). At `httpLatencyMs=0` the measurement reflects "library + DB + WireMock
overhead", not pure library throughput. For tighter pomiar consider replacing WireMock
overhead", not pure library throughput. For tighter measurement consider replacing WireMock
with `MockWebServer` (Square) — see [`results-baseline-2026-04.md`](results-baseline-2026-04.md)
notes on benchmark methodology.
- **`httpLatencyMs` is server-side delay**, not network RTT. Real production webhook latency
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ package com.softwaremill.okapi.core
* the storage layer never interprets it.
*
* [type] is a stable, unique identifier for the transport (e.g. "http", "kafka").
* It must be included in [serialize] so that [CompositeMessageDeliverer] can
* route entries to the correct [MessageDeliverer] without deserializing the full metadata.
* It must be included in [serialize] so that the concrete [MessageDeliverer]'s
* `deserialize` can reconstruct and validate this metadata.
* (Routing in [CompositeMessageDeliverer] uses the separate [OutboxEntry.deliveryType]
* field, not the serialized JSON.)
*
* Implementors are responsible for:
* - declaring a unique [type] constant
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package com.softwaremill.okapi.core
* Framework-specific modules provide implementations:
* - `okapi-spring-boot`: `SpringTransactionContextValidator` — checks via `TransactionSynchronizationManager`
* - `okapi-exposed`: `ExposedTransactionContextValidator` — checks via Exposed's `TransactionManager.currentOrNull()`
* - Standalone: no-op (always returns true)
* - Standalone: user-provided implementation
*/
interface TransactionContextValidator {
fun isInActiveReadWriteTransaction(): Boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ package com.softwaremill.okapi.core
* Executes a block of code within a transaction.
*
* Framework-specific modules provide implementations:
* - `okapi-spring`: wraps Spring's `TransactionTemplate`
* - `okapi-ktor`: wraps Exposed's `transaction {}`
* - `okapi-spring-boot`: wraps Spring's `TransactionTemplate` via `SpringTransactionRunner`
* - `okapi-exposed`: wraps Exposed's `transaction {}` via `ExposedTransactionRunner`
* - Standalone: user-provided lambda
*/
interface TransactionRunner {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import java.sql.Connection
/**
* Exposed implementation of [ConnectionProvider].
*
* Reads the JDBC [Connection] from Exposed's active `TransactionManager.current()` and
* Reads the JDBC [Connection] from Exposed's active transaction (via `TransactionManager.currentOrNull()`) and
* passes it to the caller's block. Exposed owns the connection's lifecycle — it commits
* or rolls back, and returns the connection to the pool when the enclosing
* `transaction(database) { }` block completes — so this provider performs no cleanup.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ class ObservabilityEndToEndTest : FunSpec({
registry.counter("okapi.entries.retry.scheduled").count() shouldBe 1.0 // still 1 from before
registry.timer("okapi.batch.duration").count() shouldBe 2

// Refresh again and verify new gauge snapshot
metrics.refresh()
registry.find("okapi.entries.count").tag("status", "pending").gauge()!!.value() shouldBe 0.0
registry.find("okapi.entries.count").tag("status", "delivered").gauge()!!.value() shouldBe 1.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ import java.time.Clock

/**
* Proves that [SpringConnectionProvider] releases every JDBC connection it borrows from
* the pool when store methods are called outside a Spring transaction — the exact path
* that previously leaked because the old `getConnection(): Connection` contract had no
* release hook. Every physical `Connection.close()` is counted via a wrapping DataSource,
* and the assertion `opened == closed` must hold at the end of each test.
* the pool when store methods are called outside a Spring transaction. This guards against
* the design pitfall of a getter-style connection API: a `getConnection(): Connection`
* contract would leak because callers can forget to release the borrowed connection.
* `withConnection` prevents this by scoping the borrow so the connection is always released
* when the lambda returns. Every physical `Connection.close()` is counted via a wrapping
* DataSource, and the assertion `opened == closed` must hold at the end of each test.
*/
class ConnectionLeakProofTest : FunSpec({

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,11 @@ class MultiDataSourceTransactionTest : FunSpec({
}
}

test("publish succeeds with nested transaction (savepoint) on outbox DataSource") {
test("publish succeeds with re-entrant (PROPAGATION_REQUIRED) transaction on outbox DataSource") {
val outboxId = outboxTxTemplate.execute {
// Nested call creates a savepoint (PROPAGATION_REQUIRED by default nests via savepoints)
// The re-entrant call uses PROPAGATION_REQUIRED (the TransactionTemplate default), which joins/participates
// in the existing outbox transaction, so publish runs inside the active tx — no savepoint involved
// (savepoints would require PROPAGATION_NESTED).
outboxTxTemplate.execute {
publisher.publish(testMessage, stubDeliveryInfo)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,9 @@ class MicrometerOutboxMetricsTest : FunSpec({
registry.find("okapi.entries.lag.seconds").tag("status", "pending").gauge()!!.value().isNaN() shouldBe true
}

test("gauges are registered eagerly with NaN before first refresh") {
// Ensures Prometheus scrape between bean construction and first refresh sees the metric
// (with NaN value), not a missing series.
test("no store query happens on construction (refresh is lazy)") {
// MultiGauge registers no rows until the first refresh, so no series exists before then;
// construction must not query the store either. This verifies refresh is lazy.
val store = CountingStubStore(counts = mapOf(OutboxStatus.PENDING to 5L))
val registry = SimpleMeterRegistry()
MicrometerOutboxMetrics(store, registry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ private val LIQUIBASE_DISABLED_LOGGER = LoggerFactory.getLogger("com.softwaremil
* `@AutoConfiguration` pass, `@ConditionalOnBean` cannot reliably observe sibling beans defined
* in the same auto-config — Spring's `OnBeanCondition` runs at REGISTER_BEAN phase and is
* evaluated together with the conditions of the bean it is supposed to observe. Splitting
* Liquibase into a downstream auto-config (`@AutoConfigureAfter(OutboxAutoConfiguration)`)
* Liquibase into a downstream auto-config (via `@AutoConfiguration(after = [OutboxAutoConfiguration::class])`)
* guarantees that by the time Liquibase conditions are evaluated, the chosen `*OutboxStore`
* bean has already been fully registered and is visible to `@ConditionalOnBean`.
*
Expand Down Expand Up @@ -63,7 +63,7 @@ class OkapiLiquibaseAutoConfiguration {
* okapi-spring-boot, e.g. Flyway-only consumers do not pull it in).
*
* **Why class-level [ConditionalOnBean]:** this auto-config is processed AFTER
* [OutboxAutoConfiguration] (see `@AutoConfigureAfter` on the outer class), so at the
* [OutboxAutoConfiguration] (see `@AutoConfiguration(after = ...)` on the outer class), so at the
* time `@ConditionalOnBean(PostgresOutboxStore)` is evaluated, the chosen `*OutboxStore`
* bean is already registered and visible. When MySQL wins precedence instead, this gate
* skips the entire class — preventing the dual-Liquibase / wrong-engine-DDL startup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import java.time.Duration
* Configuration for Okapi Micrometer metrics.
*
* @property refreshInterval How often gauge metrics (`okapi.entries.count`, `okapi.entries.lag.seconds`)
* are refreshed from the outbox store. Each refresh runs one transaction with two queries.
* are refreshed from the outbox store. Each refresh runs two store queries, wrapped in a single
* read-only transaction when a transaction manager is available.
* Set under property `okapi.metrics.refresh-interval`, e.g. `PT15S`, `30s`, `1m`. Default: 15 seconds.
*/
@ConfigurationProperties(prefix = "okapi.metrics")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ import javax.sql.DataSource
/**
* Spring Boot autoconfiguration for the outbox processing pipeline.
*
* Requires a [TransactionRunner] bean, or a [PlatformTransactionManager] from which one can be derived.
* Requires a [TransactionRunner] bean (or a [PlatformTransactionManager] from which one can be derived) only when at
* least one scheduler (processor or purger) is enabled.
*
* Required beans (must be provided by the application):
* - One or more [MessageDeliverer] beans — transport implementations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,8 @@ class LiquibaseAutoConfigurationTest : FunSpec({
test("dual-module classpath: only ONE okapi*Liquibase bean activates — matching OutboxStore winner") {
// Pins the OutboxStore-precedence contract for Liquibase auto-config (issue #38
// / KOJAK-80). Both `okapi-postgres` and `okapi-mysql` are on the test classpath
// (build.gradle.kts:35-36). The `*OutboxStore` factories share
// (see okapi-spring-boot/build.gradle.kts testImplementation declarations). The
// `*OutboxStore` factories share
// `@ConditionalOnMissingBean(OutboxStore::class)`, so exactly ONE store bean wins.
// The Liquibase configs MUST mirror that precedence: registering both
// `okapiPostgresLiquibase` and `okapiMysqlLiquibase` against the same DataSource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ class OutboxAutoConfigurationTransactionRunnerTest : FunSpec({
// pins that the cast failure now falls through to the WARN branch instead.
// -----------------------------------------------------------------------------------------
test(
"BUG C1: RTM with non-DataSource resourceFactory + okapi.datasource-qualifier set: WARN should be logged but currently is silent",
"BUG C1: non-DataSource RTM resourceFactory + datasource-qualifier set falls through to WARN (not a silent early-return)",
) {
val ds: DataSource = SimpleDriverDataSource()
val jpaLikeTm = JpaLikeRtmTransactionManager(resourceFactory = Any())
Expand Down