diff --git a/tidb-stmt-cache/Dockerfile b/tidb-stmt-cache/Dockerfile new file mode 100644 index 00000000..70599050 --- /dev/null +++ b/tidb-stmt-cache/Dockerfile @@ -0,0 +1,20 @@ +# Two-stage build so the runtime image does not carry Maven, the Maven +# local repo, or the source tree. The runtime image is what k8s pulls. +FROM maven:3.9-eclipse-temurin-17 AS build +WORKDIR /src +# Copy the POM first so dependency resolution caches when only source +# changes. Maven would otherwise re-fetch the entire dep set on every +# source edit, which makes iteration painful. +COPY pom.xml ./ +RUN mvn -B -q dependency:go-offline +COPY src ./src +RUN mvn -B -DskipTests package + +FROM eclipse-temurin:17-jre +WORKDIR /app +# Spring Boot's maven plugin emits the jar to target/. Pinning the jar +# name keeps the COPY deterministic (no wildcard depending on artifact +# version expansion). +COPY --from=build /src/target/tidb-stmt-cache-0.0.1-SNAPSHOT.jar /app/app.jar +EXPOSE 8080 +ENTRYPOINT ["java", "-jar", "/app/app.jar"] diff --git a/tidb-stmt-cache/README.md b/tidb-stmt-cache/README.md new file mode 100644 index 00000000..3c478cd2 --- /dev/null +++ b/tidb-stmt-cache/README.md @@ -0,0 +1,213 @@ +# tidb-stmt-cache + +A Spring Boot (Java 17) sample that drives two distinct keploy regressions +against TiDB and Apache Pulsar in a single app: + +| Endpoint | Exercises | +| --- | --- | +| `GET /api/kv/{v}` and `GET /api/kv/insert-select/{v}` | MySQL Connector/J prepared-statement cache + HikariCP LIFO pool → orphan `COM_STMT_EXECUTE` matcher path | +| `POST /events/patch` | Hibernate INSERT + Pulsar `SEND` on a **partitioned** topic with default `RoundRobinPartitionRouter` → the partition-routing replay regression | + +Both flows share the same `HikariDataSource` bean shape that drives both +regressions: `autoCommit=false`, `prepStmtCacheSize=500`, +`prepStmtCacheSqlLimit=2048`, JPA `provider_disables_autocommit=true`. + +## Why the Pulsar partitioned topic matters + +The Pulsar Java client's default `RoundRobinPartition` router picks a +**random starting partition** when a producer is constructed, then walks +through partitions in order. So: + +* During recording, the producer might start on partition 5. +* During replay, a freshly-constructed producer starts on partition 7. + +The recorded `SEND` mock targets `…events-partition-5`; the live `SEND` +during replay targets `…events-partition-7`. Without keploy's +`baseTopic()` matcher loosening +(`enterprise/pkg/core/proxy/integrations/pulsar/replayer/replayer.go`), +no recorded mock matches the live topic and replay fails with +`pulsar replay: payload-aware mock mismatch`. + +## Layout + +``` +. +├── docker-compose.yml local TiDB + Pulsar (+ partitioned-topic init) +├── Dockerfile two-stage build → tidb-pulsar-app:dev +├── k8s/ manifests for the k8s-proxy auto-replay path +│ ├── 00-namespace.yaml +│ ├── 10-tidb.yaml +│ ├── 20-pulsar.yaml includes a Job that pre-creates the partitioned topic +│ └── 30-app.yaml carries keploy.io/record-session=true for the webhook +├── pom.xml +└── src/main/java/com/example/tidbstmtcache/ + ├── DataSourceConfig.java HikariCP bean (autoCommit=false, …) + ├── EventsController.java POST /events/patch — JPA save + Pulsar send + ├── EventEntity.java JPA entity for the `events` table + ├── EventRepository.java + ├── PulsarConfig.java PulsarClient + Producer with RoundRobinPartition + ├── QueryController.java existing orphan-EXECUTE endpoints (unchanged) + ├── SchemaInitializer.java creates the `kv` table; Hibernate creates `events` + └── TidbStmtCacheApplication.java +``` + +## Quick path — local docker-compose smoke test + +Use this to confirm the app boots and the partition routing is +non-deterministic across producer creations. Does **not** drive keploy. + +```bash +cd samples-java/tidb-stmt-cache +docker compose up -d +# Wait for tidb (port 4000) and pulsar (port 6650) to be ready, and the +# pulsar-init container to exit 0. + +mvn -DskipTests spring-boot:run & # or run from your IDE +APP_PID=$! + +curl -s -X POST http://localhost:8080/events/patch \ + -H 'Content-Type: application/json' \ + -d '{ + "entity_id": "ENTITY-1001", + "event_name": "delivered", + "event_timestamp": "2026-05-23T17:07:22+05:30", + "task_orchestrator": "ORCH-A" + }' +# Expect: {"message":"Event patched"} + +kill $APP_PID +docker compose down -v +``` + +To see the round-robin in action, restart the app between curls and +diff `bin/pulsar-admin topics partitioned-stats persistent://public/default/events` +output — partition message counts will land on different partitions +each cold start. + +## Full path — k8s-proxy auto-replay + +The k8s-proxy controller watches the namespace for pods carrying +`keploy.io/record-session=true` and injects the keploy-agent sidecar. +After a recording is captured, an auto-replay session reconstructs the +app pod in isolation and feeds the recorded HTTP requests back through +it; the agent replays MySQL and Pulsar from mocks. + +### 1 · Build the patched enterprise agent image + +The matcher fix lives in +`enterprise/pkg/core/proxy/integrations/pulsar/replayer/replayer.go` +(`baseTopic` function + its callsites). Build a keploy-agent image that +includes it — the exact `make` target depends on your enterprise repo +layout; from the workspace root: + +```bash +cd ../enterprise +make docker-image AGENT_IMAGE=keploy-agent:partition-fix +kind load docker-image keploy-agent:partition-fix --name +``` + +### 2 · Install the k8s-proxy chart pointing at the patched agent + +```bash +cd ../k8s-proxy +helm upgrade --install k8s-proxy ./charts/k8s-proxy \ + --namespace k8s-proxy --create-namespace \ + --set agent.image=keploy-agent:partition-fix \ + --set webhook.watchNamespaces='{tidb-pulsar-replay}' +``` + +### 3 · Build and load the sample app image + +```bash +cd ../samples-java/tidb-stmt-cache +mvn -DskipTests package +docker build -t tidb-pulsar-app:dev . +kind load docker-image tidb-pulsar-app:dev --name +``` + +### 4 · Apply the manifests + +```bash +kubectl apply -f k8s/ +kubectl -n tidb-pulsar-replay wait deploy/tidb deploy/pulsar deploy/tidb-pulsar-app \ + --for=condition=Available --timeout=5m +kubectl -n tidb-pulsar-replay wait --for=condition=complete job/pulsar-init-topic --timeout=2m +``` + +### 5 · Record a session + +Drive a few `POST /events/patch` requests through the in-cluster Service. +The keploy-agent sidecar attached to `tidb-pulsar-app` will capture the +MySQL and Pulsar traffic. + +```bash +kubectl -n tidb-pulsar-replay port-forward svc/tidb-pulsar-app 8080:80 & +PF_PID=$! + +for i in $(seq 1 5); do + curl -s -X POST http://localhost:8080/events/patch \ + -H 'Content-Type: application/json' \ + -d "{\"entity_id\":\"ENTITY-$i\",\"event_name\":\"delivered\",\"event_timestamp\":\"2026-05-23T17:07:22+05:30\",\"task_orchestrator\":\"ORCH-A\"}" +done + +kill $PF_PID +``` + +Confirm a `SEND` mock landed on a specific partition: + +```bash +kubectl -n tidb-pulsar-replay logs deploy/tidb-pulsar-app -c keploy-agent \ + | grep -E 'commandType.*SEND|topic.*partition-' +``` + +### 6 · Trigger auto-replay + +Use the k8s-proxy `Replay` CR (or REST API, depending on your install). +Example via the openapi-described endpoint: + +```bash +kubectl -n k8s-proxy port-forward svc/k8s-proxy 8000:80 & +curl -s -X POST http://localhost:8000/api/v1/replays \ + -H 'Content-Type: application/json' \ + -d '{ + "namespace": "tidb-pulsar-replay", + "deployment": "tidb-pulsar-app", + "testSetIDs": [""] + }' +``` + +### 7 · Assert the regression is fixed + +```bash +kubectl -n tidb-pulsar-replay logs deploy/tidb-pulsar-app -c keploy-agent \ + | grep -E 'payload-aware mock mismatch|Test passed|result.*passed' +``` + +* **Without the patch** — at least one of the recorded sessions fails + with `pulsar replay: payload-aware mock mismatch for SEND (topic=…events-partition-)`, + the app returns HTTP 500, the testcase is marked failed. +* **With the patch** — the live `SEND` to `…events-partition-` matches + the recorded mock for `…events-partition-` (same base topic + `…events`, same payload), the synthetic `SEND_RECEIPT` is returned, + the app returns HTTP 200, the testcase passes. + +## Replaying an existing recording + +If you already have a `mocks.yaml` and a `tests/` directory captured +against a structurally similar Pulsar producer (partitioned topic, JPA +`INSERT` before the SEND, HTTP 200 response), this sample can serve as +the replay target: + +1. Override `k8s/30-app.yaml`'s `PULSAR_TOPIC` env to match the topic + name in your recorded mocks (`persistent:////`). +2. Load the existing `mocks.yaml` into the test-set storage backend the + k8s-proxy install is configured for, instead of running a fresh + recording. +3. Trigger the replay job with that test-set ID. + +Without the matcher fix in `keploy/enterprise`, a replay where the live +producer round-robins to a different partition than the recording will +log `payload-aware mock mismatch for SEND (topic=…partition-)` and +the testcase fails. With the fix, the SEND resolves against the +recorded mock for `…partition-` (same base topic, same payload) and +the testcase passes. diff --git a/tidb-stmt-cache/docker-compose.yml b/tidb-stmt-cache/docker-compose.yml index e642225a..7f1143f0 100644 --- a/tidb-stmt-cache/docker-compose.yml +++ b/tidb-stmt-cache/docker-compose.yml @@ -1,4 +1,4 @@ -# Minimal single-node TiDB for keploy e2e. +# Minimal TiDB + Apache Pulsar stack for keploy e2e. # # Why this stack and not the full pingcap/tidb-docker-compose with PD + TiKV? # - The sample only exercises the SQL layer (MySQL wire protocol on :4000). @@ -8,12 +8,55 @@ # process, in-memory) mode when no PD address is supplied, which is exactly # what we want for keploy CI: ~5s boot, volatile data, no extra containers. # -# Pin: v8.5.x is the LTS line current at the time this sample was added. -# Bump as new LTS lines ship; the matcher behaviour we're testing has been -# stable across TiDB versions because it depends on the MySQL wire protocol. +# Pulsar runs in standalone mode (one broker + embedded bookkeeper + zk). +# The pulsar-init sidecar exists for a single purpose: create the events +# topic as PARTITIONED with 8 partitions BEFORE the app boots its Java +# producer. Without that, the Java client would auto-create a +# non-partitioned topic on first SEND, and the bug we are testing +# (RoundRobinPartition cursor divergence across record/replay) would not +# reproduce because there is only one partition to route to. +# +# Pin: TiDB v8.5.x is the LTS line current at the time this sample was +# added. Pulsar 4.0.3 matches the broker version the regression was +# originally observed against (Pulsar Server 3.x client compatibility is +# preserved through the 4.x line). services: tidb: image: pingcap/tidb:v8.5.6 ports: - "4000:4000" # MySQL wire protocol - "10080:10080" # status / readiness + + pulsar: + image: apachepulsar/pulsar:4.0.3 + command: bin/pulsar standalone + ports: + - "6650:6650" # binary protocol — Java client connects here + - "8080:8080" # admin REST + healthcheck: + test: ["CMD", "bin/pulsar-admin", "brokers", "healthcheck"] + interval: 5s + timeout: 5s + retries: 30 + start_period: 30s + + # One-shot container that waits for the broker to be healthy, then + # pre-creates the events topic as partitioned. Exits 0 on success; the + # app does NOT depend_on it explicitly because Pulsar Java client + # resolves topic metadata lazily on first SEND, so as long as + # pulsar-init has finished before /events/patch is first hit, we are + # safe. In practice the lookup time exceeds Spring Boot startup time, + # so this ordering holds. + pulsar-init: + image: apachepulsar/pulsar:4.0.3 + depends_on: + pulsar: + condition: service_healthy + entrypoint: + - /bin/sh + - -c + - | + bin/pulsar-admin --admin-url http://pulsar:8080 \ + topics create-partitioned-topic \ + persistent://public/default/events --partitions 8 + restart: "no" diff --git a/tidb-stmt-cache/k8s/00-namespace.yaml b/tidb-stmt-cache/k8s/00-namespace.yaml new file mode 100644 index 00000000..f924fbd2 --- /dev/null +++ b/tidb-stmt-cache/k8s/00-namespace.yaml @@ -0,0 +1,13 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: tidb-pulsar-replay + labels: + # k8s-proxy's static webhook only mutates pods in namespaces that + # match the configured watch list (see chart values + # webhook.namespaceLabel + watchNamespace). The default label key is + # `kubernetes.io/metadata.name`, auto-stamped by the + # NamespaceDefaultLabelName admission plugin, so we just rely on + # the controller's chart to point at this namespace. If you flip + # the chart to a custom label key, add it here. + kubernetes.io/metadata.name: tidb-pulsar-replay diff --git a/tidb-stmt-cache/k8s/10-tidb.yaml b/tidb-stmt-cache/k8s/10-tidb.yaml new file mode 100644 index 00000000..c5ce8abe --- /dev/null +++ b/tidb-stmt-cache/k8s/10-tidb.yaml @@ -0,0 +1,54 @@ +# Single-node TiDB in unistore mode. No PD, no TiKV — fast boot, in-memory +# state, identical SQL surface to a real cluster. Matches the docker-compose +# topology so a recording captured locally and one captured in-cluster +# share the same MySQL wire protocol behaviour. +apiVersion: apps/v1 +kind: Deployment +metadata: + name: tidb + namespace: tidb-pulsar-replay +spec: + replicas: 1 + selector: + matchLabels: + app: tidb + template: + metadata: + labels: + app: tidb + spec: + containers: + - name: tidb + image: pingcap/tidb:v8.5.6 + imagePullPolicy: IfNotPresent + ports: + - name: mysql + containerPort: 4000 + - name: status + containerPort: 10080 + readinessProbe: + tcpSocket: + port: 4000 + initialDelaySeconds: 5 + periodSeconds: 2 + failureThreshold: 30 + resources: + requests: + memory: "256Mi" + cpu: "100m" + limits: + memory: "1Gi" + cpu: "1000m" +--- +apiVersion: v1 +kind: Service +metadata: + name: tidb + namespace: tidb-pulsar-replay +spec: + selector: + app: tidb + ports: + - name: mysql + port: 4000 + targetPort: 4000 diff --git a/tidb-stmt-cache/k8s/20-pulsar.yaml b/tidb-stmt-cache/k8s/20-pulsar.yaml new file mode 100644 index 00000000..453e9ce6 --- /dev/null +++ b/tidb-stmt-cache/k8s/20-pulsar.yaml @@ -0,0 +1,116 @@ +# Pulsar standalone (broker + bookie + zk in one process). NOT production +# topology, but the wire protocol the Java client sees on :6650 is +# identical to a real cluster — which is the only thing the keploy Pulsar +# parser cares about. +# +# The post-start hook inside the broker container creates the events +# topic as PARTITIONED with 8 partitions. Pre-creating as partitioned is +# load-bearing for this sample: if the broker auto-creates on first SEND +# it creates a NON-partitioned topic, and RoundRobinPartition routing +# collapses to a single partition — which would mask the regression. +apiVersion: apps/v1 +kind: Deployment +metadata: + name: pulsar + namespace: tidb-pulsar-replay +spec: + replicas: 1 + selector: + matchLabels: + app: pulsar + template: + metadata: + labels: + app: pulsar + spec: + containers: + - name: pulsar + image: apachepulsar/pulsar:4.0.3 + imagePullPolicy: IfNotPresent + command: ["bin/pulsar", "standalone"] + # Pulsar standalone is broker + bookie + zk in one JVM. With a + # default `-Xms256m -Xmx1g` and bookkeeper direct buffers on top, + # 2Gi total RSS is the floor before the kubelet starts OOM-killing + # it before zk even finishes electing itself. 4Gi limit + the JVM + # heap settings below clear the OOM and let the broker start in + # ~30s on a single kind node. + env: + - name: PULSAR_MEM + value: "-Xms512m -Xmx2g -XX:MaxDirectMemorySize=1g" + ports: + - name: binary + containerPort: 6650 + - name: admin + containerPort: 8080 + # tcpSocket on the binary port is what actually proves the + # broker is serving client traffic. `pulsar-admin brokers + # healthcheck` is a Java process — its boot + REST round-trip + # takes 3-7s, which blows past the default 1s timeoutSeconds. + # Bumping the timeout to 15s "works" but burns CPU forking + # the JVM every 5s for the life of the deployment. TCP probe + # is the cheaper, more accurate signal for "can the Java + # producer connect". + readinessProbe: + tcpSocket: + port: binary + initialDelaySeconds: 30 + periodSeconds: 5 + failureThreshold: 30 + resources: + requests: + memory: "2Gi" + cpu: "200m" + limits: + memory: "4Gi" + cpu: "2000m" +--- +apiVersion: v1 +kind: Service +metadata: + name: pulsar + namespace: tidb-pulsar-replay +spec: + selector: + app: pulsar + ports: + - name: binary + port: 6650 + targetPort: 6650 + - name: admin + port: 8080 + targetPort: 8080 +--- +# One-shot Job that pre-creates the partitioned topic. Runs after the +# Pulsar Service exists; the Job's pod blocks on a healthcheck loop +# rather than depend_on (which Jobs don't have) so it survives a slow +# broker boot. +apiVersion: batch/v1 +kind: Job +metadata: + name: pulsar-init-topic + namespace: tidb-pulsar-replay +spec: + backoffLimit: 30 + template: + spec: + restartPolicy: OnFailure + containers: + - name: pulsar-admin + image: apachepulsar/pulsar:4.0.3 + imagePullPolicy: IfNotPresent + command: + - /bin/sh + - -c + - | + set -eu + # Wait for the broker to answer healthcheck. The Service + # resolves once at least one pod is Ready, but the broker + # admin REST is what actually decides whether + # create-partitioned-topic succeeds. + until bin/pulsar-admin --admin-url http://pulsar:8080 brokers healthcheck >/dev/null 2>&1; do + echo "waiting for pulsar broker…" + sleep 2 + done + bin/pulsar-admin --admin-url http://pulsar:8080 \ + topics create-partitioned-topic \ + persistent://public/default/events --partitions 8 diff --git a/tidb-stmt-cache/k8s/30-app.yaml b/tidb-stmt-cache/k8s/30-app.yaml new file mode 100644 index 00000000..372d8c2b --- /dev/null +++ b/tidb-stmt-cache/k8s/30-app.yaml @@ -0,0 +1,77 @@ +# The Spring Boot app under test. Two things make this manifest specific +# to keploy auto-replay: +# +# 1. metadata.labels.keploy.io/record-session The opt-in label the +# k8s-proxy static webhook (charts/k8s-proxy/templates/static-webhook.yaml) +# selects on. With this label present, the webhook injects the +# keploy-agent sidecar, mounts the TLS-share emptyDir, and rewrites +# egress so MySQL/Pulsar traffic flows through the agent's proxy +# listener at :16789. +# +# 2. spec.template.spec.containers[0].env points app config at the +# in-cluster TiDB and Pulsar Services. JDBC URL keeps the same +# prepStmtCacheSize=500&prepStmtCacheSqlLimit=2048 params from +# application.properties so the in-cluster recording matches the +# local docker-compose recording structurally. +# +# Image: the app image is built locally (`mvn -DskipTests package && +# docker build -t tidb-pulsar-app:dev .`) and loaded into the cluster +# via `kind load docker-image tidb-pulsar-app:dev` for kind, or pushed +# to your registry for a real cluster. +apiVersion: apps/v1 +kind: Deployment +metadata: + name: tidb-pulsar-app + namespace: tidb-pulsar-replay +spec: + replicas: 1 + selector: + matchLabels: + app: tidb-pulsar-app + template: + metadata: + labels: + app: tidb-pulsar-app + keploy.io/record-session: "true" + spec: + containers: + - name: app + image: tidb-pulsar-app:dev + imagePullPolicy: IfNotPresent + ports: + - name: http + containerPort: 8080 + env: + - name: KEPLOY_DATASOURCE_JDBC_URL + value: "jdbc:mysql://tidb:4000/test?useSSL=false&allowPublicKeyRetrieval=true&useServerPrepStmts=true&cachePrepStmts=true&prepStmtCacheSize=500&prepStmtCacheSqlLimit=2048" + - name: PULSAR_BROKER_URL + value: "pulsar://pulsar:6650" + - name: PULSAR_TOPIC + value: "persistent://public/default/events" + readinessProbe: + httpGet: + path: /api/health + port: http + initialDelaySeconds: 10 + periodSeconds: 3 + failureThreshold: 20 + resources: + requests: + memory: "256Mi" + cpu: "100m" + limits: + memory: "1Gi" + cpu: "1000m" +--- +apiVersion: v1 +kind: Service +metadata: + name: tidb-pulsar-app + namespace: tidb-pulsar-replay +spec: + selector: + app: tidb-pulsar-app + ports: + - name: http + port: 80 + targetPort: 8080 diff --git a/tidb-stmt-cache/pom.xml b/tidb-stmt-cache/pom.xml index ae9d535d..78e29c55 100644 --- a/tidb-stmt-cache/pom.xml +++ b/tidb-stmt-cache/pom.xml @@ -15,10 +15,18 @@ tidb-stmt-cache 0.0.1-SNAPSHOT tidb-stmt-cache - E2E sample exercising keploy's MySQL prepared-statement orphan-EXECUTE matching against TiDB + E2E sample exercising keploy's MySQL prepared-statement orphan-EXECUTE matching against TiDB, plus Pulsar partitioned-topic routing for the round-robin replay regression 17 + + 3.1.1 @@ -30,6 +38,19 @@ org.springframework.boot spring-boot-starter-jdbc + + + org.springframework.boot + spring-boot-starter-data-jpa + + + org.apache.pulsar + pulsar-client + ${pulsar.client.version} + diff --git a/tidb-stmt-cache/src/main/java/com/example/tidbstmtcache/DataSourceConfig.java b/tidb-stmt-cache/src/main/java/com/example/tidbstmtcache/DataSourceConfig.java index 9c42aeaf..c1ccff36 100644 --- a/tidb-stmt-cache/src/main/java/com/example/tidbstmtcache/DataSourceConfig.java +++ b/tidb-stmt-cache/src/main/java/com/example/tidbstmtcache/DataSourceConfig.java @@ -3,75 +3,96 @@ import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; /** - * Single HikariCP pool against TiDB :4000 with MySQL Connector/J flags - * that force the orphan-EXECUTE scenario this sample is designed around: + * HikariCP pool against TiDB :4000. Two distinct keploy regressions are + * exercised by this single sample: * - * useServerPrepStmts=true -- server-side prepared statements (stmtIDs) - * cachePrepStmts=true -- per-Connection client-side PS cache - * prepStmtCacheSize >= 1 -- the cache must actually retain entries + * - Orphan COM_STMT_EXECUTE (TiDB prepared-statement cache): driven by + * useServerPrepStmts + cachePrepStmts + prepStmtCacheSize on the + * JDBC URL and HikariCP's LIFO pool — see the existing /api/kv/* and + * /api/kv/insert-select/* endpoints. * - * Pool sizing > 1 with HikariCP's LIFO eviction means sequential HTTP - * requests to /api/kv/{i} often land on the same physical connection. - * On a cache hit, Connector/J skips COM_STMT_PREPARE and emits only - * COM_STMT_EXECUTE using the cached server-side stmtID. The recorder's - * mock for that second EXECUTE is the orphan case keploy/keploy@b2e68adb - * is designed to handle (recordedPrepByConn miss -> expectedQuery="" -> - * param-alone fallback). + * - Pulsar partitioned-topic SEND round-robin (this file's setAutoCommit + * pairing with Hibernate provider_disables_autocommit=true) — driven + * by JPA persisting Event rows before publishing to a partitioned + * Pulsar topic; the partition the client routes to differs between + * record and replay, which is what keploy/enterprise's baseTopic() + * matcher loosening addresses. * - * TiDB is preferred over MySQL here because TiDB's prepared-statement - * cache semantics diverge subtly from MySQL across COM_RESET_CONNECTION, - * which is what surfaced this matcher bug downstream. MySQL 8 alone is - * unlikely to reproduce the orphan condition reliably in one record cycle. + * Why autoCommit(false) is wired here and not just left to the driver + * default: with Hibernate's provider_disables_autocommit=true, the + * provider expects autocommit to already be off when it acquires a + * connection. If autocommit defaults to on, Hibernate will issue a + * redundant SET autocommit=0 on every connection acquisition, which + * shows up as extra MySQL traffic in the recording and makes the mock + * stream noisier than it has to be. */ @Configuration public class DataSourceConfig { - @Value("${datasource.tidb.jdbc-url}") - private String jdbcUrl; + @Value("${keploy.datasource.jdbc-url}") + private String dbUrl; - @Value("${datasource.tidb.username}") - private String username; + @Value("${keploy.datasource.username}") + private String dbUsername; - @Value("${datasource.tidb.password}") - private String password; + @Value("${keploy.datasource.password}") + private String dbPassword; - @Value("${datasource.tidb.driver-class-name}") - private String driverClass; + @Value("${keploy.datasource.driver-class-name}") + private String driverClassName; + + @Value("${keploy.datasource.pool-name}") + private String poolName; + + @Value("${keploy.datasource.minimum-idle}") + private int minimumIdle; + + @Value("${keploy.datasource.maximum-pool-size}") + private int maximumPoolSize; + + @Value("${keploy.datasource.idle-timeout}") + private long idleTimeout; + + @Value("${keploy.datasource.max-lifetime}") + private long maxLifetime; + + @Value("${keploy.datasource.connection-timeout}") + private long connectionTimeout; + + @Value("${keploy.datasource.connection-test-query}") + private String connectionTestQuery; + + @Value("${keploy.datasource.validation-timeout}") + private long validationTimeout; @Bean(destroyMethod = "close") - public HikariDataSource tidbDataSource() { + public HikariDataSource dataSource() { + String resolvedPassword = dbPassword; HikariConfig config = new HikariConfig(); - config.setPoolName("tidb-dataSource"); - config.setUsername(username); - config.setPassword(password); - config.setJdbcUrl(jdbcUrl); - config.setDriverClassName(driverClass); - - // Small pool: enough to be realistic (not 1), small enough that - // sequential curls reliably hit the same physical connection and - // therefore the same Connector/J prepared-statement cache. - config.setMaximumPoolSize(3); - config.setMinimumIdle(1); - - // Keep connections alive long enough to span the whole record - // window so HikariCP doesn't churn the pool mid-test and flush - // the prep cache out from under us. - config.setKeepaliveTime(30_000); - config.setIdleTimeout(60_000); - config.setMaxLifetime(7_200_000); - config.setConnectionTimeout(10_000); - config.setValidationTimeout(5_000); - + config.setJdbcUrl(dbUrl); + config.setUsername(dbUsername); + config.setPassword(resolvedPassword); + config.setDriverClassName(driverClassName); + config.setPoolName(poolName); + config.setMinimumIdle(minimumIdle); + config.setMaximumPoolSize(maximumPoolSize); + config.setIdleTimeout(idleTimeout); + config.setMaxLifetime(maxLifetime); + config.setConnectionTimeout(connectionTimeout); + config.setConnectionTestQuery(connectionTestQuery); + config.setValidationTimeout(validationTimeout); + config.setAutoCommit(false); return new HikariDataSource(config); } @Bean - public JdbcTemplate tidbJdbcTemplate(HikariDataSource tidbDataSource) { - return new JdbcTemplate(tidbDataSource); + public JdbcTemplate jdbcTemplate(HikariDataSource dataSource) { + return new JdbcTemplate(dataSource); } } diff --git a/tidb-stmt-cache/src/main/java/com/example/tidbstmtcache/EventEntity.java b/tidb-stmt-cache/src/main/java/com/example/tidbstmtcache/EventEntity.java new file mode 100644 index 00000000..f5d650a4 --- /dev/null +++ b/tidb-stmt-cache/src/main/java/com/example/tidbstmtcache/EventEntity.java @@ -0,0 +1,89 @@ +package com.example.tidbstmtcache; + +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.GeneratedValue; +import jakarta.persistence.GenerationType; +import jakarta.persistence.Id; +import jakarta.persistence.Table; + +import java.time.Instant; + +/** + * Event row that the /events/patch endpoint persists before fanning the + * event out to a partitioned Pulsar topic. The exact column types are + * deliberately boring — we are not validating Hibernate type mapping + * here, only that an INSERT + COMMIT goes through MySQL Connector/J in + * a way the keploy MySQL parser already covers. + */ +@Entity +@Table(name = "events") +public class EventEntity { + + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long id; + + @Column(name = "entity_id", nullable = false, length = 64) + private String entityId; + + @Column(name = "event_name", nullable = false, length = 64) + private String eventName; + + @Column(name = "event_timestamp", nullable = false) + private Instant eventTimestamp; + + @Column(name = "task_orchestrator", length = 32) + private String taskOrchestrator; + + @Column(name = "payload_json", columnDefinition = "TEXT") + private String payloadJson; + + public Long getId() { + return id; + } + + public void setId(Long id) { + this.id = id; + } + + public String getEntityId() { + return entityId; + } + + public void setEntityId(String entityId) { + this.entityId = entityId; + } + + public String getEventName() { + return eventName; + } + + public void setEventName(String eventName) { + this.eventName = eventName; + } + + public Instant getEventTimestamp() { + return eventTimestamp; + } + + public void setEventTimestamp(Instant eventTimestamp) { + this.eventTimestamp = eventTimestamp; + } + + public String getTaskOrchestrator() { + return taskOrchestrator; + } + + public void setTaskOrchestrator(String taskOrchestrator) { + this.taskOrchestrator = taskOrchestrator; + } + + public String getPayloadJson() { + return payloadJson; + } + + public void setPayloadJson(String payloadJson) { + this.payloadJson = payloadJson; + } +} diff --git a/tidb-stmt-cache/src/main/java/com/example/tidbstmtcache/EventRepository.java b/tidb-stmt-cache/src/main/java/com/example/tidbstmtcache/EventRepository.java new file mode 100644 index 00000000..bc8037f8 --- /dev/null +++ b/tidb-stmt-cache/src/main/java/com/example/tidbstmtcache/EventRepository.java @@ -0,0 +1,6 @@ +package com.example.tidbstmtcache; + +import org.springframework.data.jpa.repository.JpaRepository; + +public interface EventRepository extends JpaRepository { +} diff --git a/tidb-stmt-cache/src/main/java/com/example/tidbstmtcache/EventsController.java b/tidb-stmt-cache/src/main/java/com/example/tidbstmtcache/EventsController.java new file mode 100644 index 00000000..6d0f8c9f --- /dev/null +++ b/tidb-stmt-cache/src/main/java/com/example/tidbstmtcache/EventsController.java @@ -0,0 +1,66 @@ +package com.example.tidbstmtcache; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +import java.time.OffsetDateTime; +import java.util.HashMap; +import java.util.Map; + +/** + * POST /events/patch — persists an event row through Hibernate (TiDB) + * and fans it out to a partitioned Pulsar topic. The request/response + * shapes are intentionally generic so any recording captured against + * this sample can be replayed with the keploy enterprise agent. + * + * Why a synchronous .send() rather than sendAsync(): we want the + * record-time mock for SEND_RECEIPT and the in-transaction COMMIT to + * the database to be serialised in the order keploy's recorder + * observes them on the wire. Async send introduces an extra inflight + * queue that makes the relative ordering of the MySQL COMMIT and the + * Pulsar SEND non-deterministic across runs, which would muddy the + * regression repro for reasons unrelated to the partition router. + */ +@RestController +public class EventsController { + + private final EventRepository eventRepository; + private final Producer producer; + private final ObjectMapper objectMapper; + + public EventsController(EventRepository eventRepository, + Producer producer, + ObjectMapper objectMapper) { + this.eventRepository = eventRepository; + this.producer = producer; + this.objectMapper = objectMapper; + } + + @PostMapping("/events/patch") + @Transactional + public Map patchEvent(@RequestBody Map body) + throws PulsarClientException, JsonProcessingException { + EventEntity entity = new EventEntity(); + entity.setEntityId((String) body.get("entity_id")); + entity.setEventName((String) body.get("event_name")); + entity.setTaskOrchestrator((String) body.get("task_orchestrator")); + entity.setEventTimestamp(OffsetDateTime.parse((String) body.get("event_timestamp")).toInstant()); + entity.setPayloadJson(objectMapper.writeValueAsString(body)); + + eventRepository.save(entity); + + // Synchronous send so the SEND_RECEIPT lands inside the same HTTP + // request boundary. See class doc for why async is avoided. + producer.send(objectMapper.writeValueAsBytes(body)); + + Map out = new HashMap<>(); + out.put("message", "Event patched"); + return out; + } +} diff --git a/tidb-stmt-cache/src/main/java/com/example/tidbstmtcache/PulsarConfig.java b/tidb-stmt-cache/src/main/java/com/example/tidbstmtcache/PulsarConfig.java new file mode 100644 index 00000000..24739b81 --- /dev/null +++ b/tidb-stmt-cache/src/main/java/com/example/tidbstmtcache/PulsarConfig.java @@ -0,0 +1,63 @@ +package com.example.tidbstmtcache; + +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * Pulsar client + producer beans wired for the partitioned-topic + * round-robin scenario that triggers the keploy replay regression. + * + * Routing policy: MessageRoutingMode.RoundRobinPartition is the Java + * client default. The starting partition cursor is randomised per + * producer instance, so the partition the very first SEND in a session + * lands on is non-deterministic across runs. Sequential SENDs from + * the same producer then walk through partitions in order. Net effect: + * a recording captured against partition-N will, at replay time, see + * the live SEND target partition-(N+k mod P) for some k chosen at + * producer construction. + * + * Why we do NOT pin a single partition (e.g. SinglePartition + fixed + * messageKey): pinning would mask the bug, which is the whole reason + * this sample exists. The point is to reproduce the routing mismatch + * end-to-end so the matcher loosening in keploy/enterprise (baseTopic + * in pulsar/replayer/replayer.go) can be verified against a real Java + * producer. + * + * Why we accept a topic that may not yet exist on broker startup: + * docker-compose's pulsar-init container runs `pulsar-admin topics + * create-partitioned-topic` after the broker is healthy but before + * this app's healthcheck flips green. PulsarClient lazy-resolves the + * partitioned metadata on first SEND, so as long as the topic exists + * before /events/patch is hit, no extra synchronisation is needed. + */ +@Configuration +public class PulsarConfig { + + @Value("${pulsar.broker-url}") + private String brokerUrl; + + @Value("${pulsar.topic}") + private String topic; + + @Bean(destroyMethod = "close") + public PulsarClient pulsarClient() throws PulsarClientException { + return PulsarClient.builder() + .serviceUrl(brokerUrl) + .build(); + } + + @Bean(destroyMethod = "close") + public Producer eventProducer(PulsarClient client) throws PulsarClientException { + return client.newProducer(Schema.BYTES) + .topic(topic) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) + .blockIfQueueFull(true) + .create(); + } +} diff --git a/tidb-stmt-cache/src/main/resources/application.properties b/tidb-stmt-cache/src/main/resources/application.properties index 58b5ab6b..56b6e442 100644 --- a/tidb-stmt-cache/src/main/resources/application.properties +++ b/tidb-stmt-cache/src/main/resources/application.properties @@ -2,23 +2,59 @@ server.port=8080 # --- TiDB DataSource --- # TiDB :4000 speaks the MySQL wire protocol, so the MySQL Connector/J 8.x -# driver works as-is. The three JDBC parameters that matter for the -# orphan-EXECUTE scenario keploy/keploy@b2e68adb addresses: +# driver works as-is. JDBC URL parameters that matter: # -# useServerPrepStmts=true server-side prepared statements (stmtIDs -# come from the database, not the client). -# cachePrepStmts=true per-Connection PreparedStatement cache -# keyed by SQL text. This is what causes -# Connector/J to skip COM_STMT_PREPARE on -# cache hits and emit only COM_STMT_EXECUTE. -# prepStmtCacheSize=250 ensure the cache actually retains the few -# statements this sample exercises. +# useServerPrepStmts=true server-side prepared statements (stmtIDs +# come from the database, not the client). +# cachePrepStmts=true per-Connection PreparedStatement cache +# keyed by SQL text. Causes Connector/J to +# skip COM_STMT_PREPARE on cache hits and +# emit only COM_STMT_EXECUTE. +# prepStmtCacheSize=500 retain enough entries to span a record. +# prepStmtCacheSqlLimit=2048 enlarge max-SQL-len so longer Hibernate +# INSERTs/UPDATES still cache. # # root / no password matches the default TiDB bootstrap user in the -# pingcap/tidb:v8.5.x image started by docker-compose.yml -- TiDB does -# not require an init.sql step the way MySQL does because the SchemaInitializer -# CommandLineRunner creates the table from inside the app. -datasource.tidb.jdbc-url=jdbc:mysql://localhost:4000/test?useSSL=false&allowPublicKeyRetrieval=true&useServerPrepStmts=true&cachePrepStmts=true&prepStmtCacheSize=250 -datasource.tidb.username=root -datasource.tidb.password= -datasource.tidb.driver-class-name=com.mysql.cj.jdbc.Driver +# pingcap/tidb:v8.5.x image started by docker-compose.yml. +keploy.datasource.jdbc-url=jdbc:mysql://localhost:4000/test?useSSL=false&allowPublicKeyRetrieval=true&useServerPrepStmts=true&cachePrepStmts=true&prepStmtCacheSize=500&prepStmtCacheSqlLimit=2048 +keploy.datasource.username=root +keploy.datasource.password= +keploy.datasource.driver-class-name=com.mysql.cj.jdbc.Driver +keploy.datasource.pool-name=tidb-dataSource +keploy.datasource.minimum-idle=2 +keploy.datasource.maximum-pool-size=10 +keploy.datasource.idle-timeout=60000 +keploy.datasource.max-lifetime=1800000 +keploy.datasource.connection-timeout=10000 +keploy.datasource.connection-test-query=SELECT 1 +keploy.datasource.validation-timeout=5000 + +# --- JPA / Hibernate --- +# MySQLDialect: TiDB is wire-compatible, the dialect's SQL output is +# accepted by TiDB 8.5.x as long as the Hibernate version stays on the +# MySQL family (not MariaDB-specific syntax). +# +# open-in-view=false: critical for the test. With OSIV enabled, the JPA +# session is held open for the whole HTTP request, which would surface +# additional COMMIT traffic at request-end that does not match the +# Hibernate-managed COMMIT inside the @Transactional service method. +# +# provider_disables_autocommit=true: pairs with HikariCP's +# autoCommit=false. Hibernate explicitly turns autocommit off on +# connection acquire instead of issuing SET autocommit=0 once per +# transaction — fewer wire-level statements, more deterministic +# record/replay. +spring.jpa.hibernate.ddl-auto=update +spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQLDialect +spring.jpa.open-in-view=false +spring.jpa.properties.hibernate.connection.provider_disables_autocommit=true +spring.jpa.properties.hibernate.jdbc.time_zone=UTC + +# --- Apache Pulsar --- +# The Java client connects to a Pulsar broker on :6650 (binary protocol). +# The topic is pre-created as partitioned (events.partitions partitions) +# by the docker-compose pulsar-init container so the default +# RoundRobinPartitionRouter has real partitions to round-robin across. +pulsar.broker-url=pulsar://localhost:6650 +pulsar.topic=persistent://public/default/events +pulsar.partitions=8