diff --git a/README.md b/README.md index ccd7583..708e757 100644 --- a/README.md +++ b/README.md @@ -90,6 +90,12 @@ fun outboxTransactionRunner(): TransactionRunner = object : TransactionRunner { } ``` +Without a `TransactionRunner` each scheduler tick runs in auto-commit, which can cause duplicate delivery across instances — see Advanced below. + +Advanced setups — multiple DataSources, JTA/Exposed PTMs, qualifier precedence — see [Advanced: transactions & multi-DataSource](#advanced-transactions--multi-datasource) below. + +## Advanced: transactions & multi-DataSource + Without bracketing, `FOR UPDATE SKIP LOCKED` collapses to the single SELECT statement under JDBC auto-commit, which silently allows duplicate delivery across processor instances. This opt-in is intentionally manual to keep accidental misconfiguration out of multi-instance deployments. **Multi-DataSource contexts.** If your application has multiple `DataSource` beans and uses a `PlatformTransactionManager` from which okapi cannot extract a `DataSource` (JTA, Exposed's `SpringTransactionManager`, JPA without a JDBC `DataSource`), the autoconfiguration refuses to start until you set `okapi.transaction-manager-qualifier` to the bean name of the PTM that brackets the outbox `DataSource`. `okapi.datasource-qualifier` alone is not sufficient: it picks the outbox `DataSource` but does not constrain which PTM brackets it. Alternative escape hatch: supply your own `@Bean TransactionRunner`. Single-DataSource setups and PTMs whose `DataSource` can be introspected (`DataSourceTransactionManager`, `JpaTransactionManager`, `HibernateTransactionManager`) are unaffected. diff --git a/benchmarks/README.md b/benchmarks/README.md index 867bab5..1ddcadc 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -13,13 +13,15 @@ Default JMH config in `okapi-benchmarks/build.gradle.kts` uses: - `fork = 2` — isolated JVMs to neutralize JIT-profile variance - `warmupIterations = 3`, `warmup = 10s` — let JIT C2 settle - `iterations = 5`, `timeOnIteration = 30s` — statistically meaningful sample -- `-Xms2g -Xmx2g -XX:+UseG1GC` — pinned memory and GC for reproducibility +- `-Xms8g -Xmx8g -XX:+UseG1GC` — pinned memory and GC for reproducibility ```sh ./gradlew :okapi-benchmarks:jmh ``` -Wall time: ~30 minutes (Testcontainers spin-up + 2 transports × 3 batchSize values × 8 iterations). +Wall time ~30 min: Testcontainers spin-up plus the full JMH matrix — Kafka (3 batchSize combos) and HTTP +(3 batchSize × 3 httpLatencyMs = 9 combos) throughput benchmarks plus DelivererMicroBenchmark, each with +fork=2, 3 warmup + 5 measurement iterations. Result JSON: `okapi-benchmarks/build/reports/jmh/results.json` @@ -87,7 +89,7 @@ investigate variability sources (background processes, thermal throttling, GC). benchmarks suggest. Treat numbers as **upper bounds** for the library's processing capacity. - **HTTP benchmark uses WireMock in-JVM**, which adds ~0.3 ms overhead per request (Jetty servlet pipeline). At `httpLatencyMs=0` the measurement reflects "library + DB + WireMock - overhead", not pure library throughput. For tighter pomiar consider replacing WireMock + overhead", not pure library throughput. For tighter measurement consider replacing WireMock with `MockWebServer` (Square) — see [`results-baseline-2026-04.md`](results-baseline-2026-04.md) notes on benchmark methodology. - **`httpLatencyMs` is server-side delay**, not network RTT. Real production webhook latency diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/DeliveryInfo.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/DeliveryInfo.kt index b6c5a80..46e9071 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/DeliveryInfo.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/DeliveryInfo.kt @@ -8,8 +8,10 @@ package com.softwaremill.okapi.core * the storage layer never interprets it. * * [type] is a stable, unique identifier for the transport (e.g. "http", "kafka"). - * It must be included in [serialize] so that [CompositeMessageDeliverer] can - * route entries to the correct [MessageDeliverer] without deserializing the full metadata. + * It must be included in [serialize] so that the concrete [MessageDeliverer]'s + * `deserialize` can reconstruct and validate this metadata. + * (Routing in [CompositeMessageDeliverer] uses the separate [OutboxEntry.deliveryType] + * field, not the serialized JSON.) * * Implementors are responsible for: * - declaring a unique [type] constant diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/TransactionContextValidator.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/TransactionContextValidator.kt index 3b30582..91db363 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/TransactionContextValidator.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/TransactionContextValidator.kt @@ -6,7 +6,7 @@ package com.softwaremill.okapi.core * Framework-specific modules provide implementations: * - `okapi-spring-boot`: `SpringTransactionContextValidator` — checks via `TransactionSynchronizationManager` * - `okapi-exposed`: `ExposedTransactionContextValidator` — checks via Exposed's `TransactionManager.currentOrNull()` - * - Standalone: no-op (always returns true) + * - Standalone: user-provided implementation */ interface TransactionContextValidator { fun isInActiveReadWriteTransaction(): Boolean diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/TransactionRunner.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/TransactionRunner.kt index 0be93b0..bc17a92 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/TransactionRunner.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/TransactionRunner.kt @@ -4,8 +4,8 @@ package com.softwaremill.okapi.core * Executes a block of code within a transaction. * * Framework-specific modules provide implementations: - * - `okapi-spring`: wraps Spring's `TransactionTemplate` - * - `okapi-ktor`: wraps Exposed's `transaction {}` + * - `okapi-spring-boot`: wraps Spring's `TransactionTemplate` via `SpringTransactionRunner` + * - `okapi-exposed`: wraps Exposed's `transaction {}` via `ExposedTransactionRunner` * - Standalone: user-provided lambda */ interface TransactionRunner { diff --git a/okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedConnectionProvider.kt b/okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedConnectionProvider.kt index cf1851f..926bb7f 100644 --- a/okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedConnectionProvider.kt +++ b/okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedConnectionProvider.kt @@ -7,7 +7,7 @@ import java.sql.Connection /** * Exposed implementation of [ConnectionProvider]. * - * Reads the JDBC [Connection] from Exposed's active `TransactionManager.current()` and + * Reads the JDBC [Connection] from Exposed's active transaction (via `TransactionManager.currentOrNull()`) and * passes it to the caller's block. Exposed owns the connection's lifecycle — it commits * or rolls back, and returns the connection to the pool when the enclosing * `transaction(database) { }` block completes — so this provider performs no cleanup. diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/ObservabilityEndToEndTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/ObservabilityEndToEndTest.kt index 2945040..48ee38f 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/ObservabilityEndToEndTest.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/ObservabilityEndToEndTest.kt @@ -102,7 +102,6 @@ class ObservabilityEndToEndTest : FunSpec({ registry.counter("okapi.entries.retry.scheduled").count() shouldBe 1.0 // still 1 from before registry.timer("okapi.batch.duration").count() shouldBe 2 - // Refresh again and verify new gauge snapshot metrics.refresh() registry.find("okapi.entries.count").tag("status", "pending").gauge()!!.value() shouldBe 0.0 registry.find("okapi.entries.count").tag("status", "delivered").gauge()!!.value() shouldBe 1.0 diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/ConnectionLeakProofTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/ConnectionLeakProofTest.kt index ee57036..78b7877 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/ConnectionLeakProofTest.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/ConnectionLeakProofTest.kt @@ -19,10 +19,12 @@ import java.time.Clock /** * Proves that [SpringConnectionProvider] releases every JDBC connection it borrows from - * the pool when store methods are called outside a Spring transaction — the exact path - * that previously leaked because the old `getConnection(): Connection` contract had no - * release hook. Every physical `Connection.close()` is counted via a wrapping DataSource, - * and the assertion `opened == closed` must hold at the end of each test. + * the pool when store methods are called outside a Spring transaction. This guards against + * the design pitfall of a getter-style connection API: a `getConnection(): Connection` + * contract would leak because callers can forget to release the borrowed connection. + * `withConnection` prevents this by scoping the borrow so the connection is always released + * when the lambda returns. Every physical `Connection.close()` is counted via a wrapping + * DataSource, and the assertion `opened == closed` must hold at the end of each test. */ class ConnectionLeakProofTest : FunSpec({ diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/MultiDataSourceTransactionTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/MultiDataSourceTransactionTest.kt index c96e3a6..3638954 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/MultiDataSourceTransactionTest.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/MultiDataSourceTransactionTest.kt @@ -108,9 +108,11 @@ class MultiDataSourceTransactionTest : FunSpec({ } } - test("publish succeeds with nested transaction (savepoint) on outbox DataSource") { + test("publish succeeds with re-entrant (PROPAGATION_REQUIRED) transaction on outbox DataSource") { val outboxId = outboxTxTemplate.execute { - // Nested call creates a savepoint (PROPAGATION_REQUIRED by default nests via savepoints) + // The re-entrant call uses PROPAGATION_REQUIRED (the TransactionTemplate default), which joins/participates + // in the existing outbox transaction, so publish runs inside the active tx — no savepoint involved + // (savepoints would require PROPAGATION_NESTED). outboxTxTemplate.execute { publisher.publish(testMessage, stubDeliveryInfo) } diff --git a/okapi-micrometer/src/test/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxMetricsTest.kt b/okapi-micrometer/src/test/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxMetricsTest.kt index 7fcd241..3cd28d4 100644 --- a/okapi-micrometer/src/test/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxMetricsTest.kt +++ b/okapi-micrometer/src/test/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxMetricsTest.kt @@ -149,9 +149,9 @@ class MicrometerOutboxMetricsTest : FunSpec({ registry.find("okapi.entries.lag.seconds").tag("status", "pending").gauge()!!.value().isNaN() shouldBe true } - test("gauges are registered eagerly with NaN before first refresh") { - // Ensures Prometheus scrape between bean construction and first refresh sees the metric - // (with NaN value), not a missing series. + test("no store query happens on construction (refresh is lazy)") { + // MultiGauge registers no rows until the first refresh, so no series exists before then; + // construction must not query the store either. This verifies refresh is lazy. val store = CountingStubStore(counts = mapOf(OutboxStatus.PENDING to 5L)) val registry = SimpleMeterRegistry() MicrometerOutboxMetrics(store, registry) diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiLiquibaseAutoConfiguration.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiLiquibaseAutoConfiguration.kt index 1818280..4c7b520 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiLiquibaseAutoConfiguration.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiLiquibaseAutoConfiguration.kt @@ -25,7 +25,7 @@ private val LIQUIBASE_DISABLED_LOGGER = LoggerFactory.getLogger("com.softwaremil * `@AutoConfiguration` pass, `@ConditionalOnBean` cannot reliably observe sibling beans defined * in the same auto-config — Spring's `OnBeanCondition` runs at REGISTER_BEAN phase and is * evaluated together with the conditions of the bean it is supposed to observe. Splitting - * Liquibase into a downstream auto-config (`@AutoConfigureAfter(OutboxAutoConfiguration)`) + * Liquibase into a downstream auto-config (via `@AutoConfiguration(after = [OutboxAutoConfiguration::class])`) * guarantees that by the time Liquibase conditions are evaluated, the chosen `*OutboxStore` * bean has already been fully registered and is visible to `@ConditionalOnBean`. * @@ -63,7 +63,7 @@ class OkapiLiquibaseAutoConfiguration { * okapi-spring-boot, e.g. Flyway-only consumers do not pull it in). * * **Why class-level [ConditionalOnBean]:** this auto-config is processed AFTER - * [OutboxAutoConfiguration] (see `@AutoConfigureAfter` on the outer class), so at the + * [OutboxAutoConfiguration] (see `@AutoConfiguration(after = ...)` on the outer class), so at the * time `@ConditionalOnBean(PostgresOutboxStore)` is evaluated, the chosen `*OutboxStore` * bean is already registered and visible. When MySQL wins precedence instead, this gate * skips the entire class — preventing the dual-Liquibase / wrong-engine-DDL startup diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiMetricsProperties.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiMetricsProperties.kt index e532790..d885cdc 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiMetricsProperties.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiMetricsProperties.kt @@ -7,7 +7,8 @@ import java.time.Duration * Configuration for Okapi Micrometer metrics. * * @property refreshInterval How often gauge metrics (`okapi.entries.count`, `okapi.entries.lag.seconds`) - * are refreshed from the outbox store. Each refresh runs one transaction with two queries. + * are refreshed from the outbox store. Each refresh runs two store queries, wrapped in a single + * read-only transaction when a transaction manager is available. * Set under property `okapi.metrics.refresh-interval`, e.g. `PT15S`, `30s`, `1m`. Default: 15 seconds. */ @ConfigurationProperties(prefix = "okapi.metrics") diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt index dd9cc77..a79b704 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt @@ -38,7 +38,8 @@ import javax.sql.DataSource /** * Spring Boot autoconfiguration for the outbox processing pipeline. * - * Requires a [TransactionRunner] bean, or a [PlatformTransactionManager] from which one can be derived. + * Requires a [TransactionRunner] bean (or a [PlatformTransactionManager] from which one can be derived) only when at + * least one scheduler (processor or purger) is enabled. * * Required beans (must be provided by the application): * - One or more [MessageDeliverer] beans — transport implementations diff --git a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/LiquibaseAutoConfigurationTest.kt b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/LiquibaseAutoConfigurationTest.kt index 80ae010..8d97834 100644 --- a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/LiquibaseAutoConfigurationTest.kt +++ b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/LiquibaseAutoConfigurationTest.kt @@ -394,7 +394,8 @@ class LiquibaseAutoConfigurationTest : FunSpec({ test("dual-module classpath: only ONE okapi*Liquibase bean activates — matching OutboxStore winner") { // Pins the OutboxStore-precedence contract for Liquibase auto-config (issue #38 // / KOJAK-80). Both `okapi-postgres` and `okapi-mysql` are on the test classpath - // (build.gradle.kts:35-36). The `*OutboxStore` factories share + // (see okapi-spring-boot/build.gradle.kts testImplementation declarations). The + // `*OutboxStore` factories share // `@ConditionalOnMissingBean(OutboxStore::class)`, so exactly ONE store bean wins. // The Liquibase configs MUST mirror that precedence: registering both // `okapiPostgresLiquibase` and `okapiMysqlLiquibase` against the same DataSource diff --git a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfigurationTransactionRunnerTest.kt b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfigurationTransactionRunnerTest.kt index f7dc346..f23b042 100644 --- a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfigurationTransactionRunnerTest.kt +++ b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfigurationTransactionRunnerTest.kt @@ -558,7 +558,7 @@ class OutboxAutoConfigurationTransactionRunnerTest : FunSpec({ // pins that the cast failure now falls through to the WARN branch instead. // ----------------------------------------------------------------------------------------- test( - "BUG C1: RTM with non-DataSource resourceFactory + okapi.datasource-qualifier set: WARN should be logged but currently is silent", + "BUG C1: non-DataSource RTM resourceFactory + datasource-qualifier set falls through to WARN (not a silent early-return)", ) { val ds: DataSource = SimpleDriverDataSource() val jpaLikeTm = JpaLikeRtmTransactionManager(resourceFactory = Any())