Skip to content

wolfMQTT broker: ordering, persistence, offline queue, AES-GCM at rest#538

Open
dgarske wants to merge 8 commits into
wolfSSL:masterfrom
dgarske:broker_features
Open

wolfMQTT broker: ordering, persistence, offline queue, AES-GCM at rest#538
dgarske wants to merge 8 commits into
wolfSSL:masterfrom
dgarske:broker_features

Conversation

@dgarske
Copy link
Copy Markdown
Member

@dgarske dgarske commented May 13, 2026

  • Per-subscriber outbound queue with MQTT v5 Receive Maximum and inflight cap
  • Persistence hooks API + POSIX backend for sessions, subscriptions, retained messages (schema-wipe-on-mismatch, static-memory restore)
  • Offline message queue with cross-restart replay (orphan sessions, NS_OUTQ persist, v5 Session Expiry Interval)
  • Optional AES-GCM encryption at rest for persisted records
  • Fixes: CI flake, retransmit DUP=1, key cache, schema header, mqtt-sub non-blocking MQTT_CODE_CONTINUE handling

@dgarske dgarske self-assigned this May 13, 2026
Copilot AI review requested due to automatic review settings May 13, 2026 21:54
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Adds opt-in features to the wolfMQTT broker: per-subscriber outbound queueing with v5 Receive Maximum honoring, a hook-based persistence layer with a default POSIX backend, an offline message queue surviving reconnects/restarts, and optional AES-GCM encryption at rest. Also fixes a v5 CONNACK protocol error (Maximum QoS=2 was illegal).

Changes:

  • New per-subscriber outbound queue / ordering / inflight cap in mqtt_broker.c.
  • New persistence layer (mqtt_broker_persist.c, mqtt_broker_persist_posix.c) with schema-wipe-on-mismatch and optional AES-GCM.
  • New configure flags, broker.test cases, and CI matrix entries.

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 17 comments.

Show a summary per file
File Description
wolfmqtt/mqtt_broker.h Public types/macros for outbound queue, orphan sessions, persistence hooks.
src/mqtt_broker.c Outbound queue, orphan session pool, persist shadow-write call sites, CONNACK fix.
src/mqtt_broker_persist.c Record encoders/decoders, restore, schema wipe, AES-GCM wrap/unwrap.
src/mqtt_broker_persist_posix.c Default POSIX file-based backend (kv_put/get/del/iter/sync).
src/include.am Build the new persistence sources into the broker binary.
scripts/broker.test New tests: ordering burst, persist round-trip, schema wipe, offline queue, AES-GCM.
configure.ac --enable-broker-persist and --enable-broker-persist-encrypt flags.
.github/workflows/broker-check.yml CI matrix entries for the new build configurations.
Comments suppressed due to low confidence (1)

src/mqtt_broker.c:1

  • The iterator advance sub = sub->next is now inside #ifndef WOLFMQTT_STATIC_MEMORY. This is presumably correct because the static-memory loop uses indexed iteration, but the asymmetry (and the fact that the dynamic-mode body lives just above the #ifndef and is already inside the linked-list loop) makes this very fragile to future edits. Consider keeping the sub = sub->next advance outside the orphan-handling guard, or refactoring so the loop structure is single-version per memory mode.
/* mqtt_broker.c

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/mqtt_broker_persist.c Outdated
Comment thread src/mqtt_broker_persist.c Outdated
Comment thread src/mqtt_broker_persist.c
Comment thread src/mqtt_broker_persist.c Outdated
Comment thread src/mqtt_broker.c
Comment thread src/mqtt_broker_persist.c
Comment thread src/mqtt_broker_persist_posix.c Outdated
Comment thread src/mqtt_broker.c
Comment thread configure.ac
Comment thread src/mqtt_broker_persist.c
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 9 changed files in this pull request and generated 28 comments.

Comment thread src/mqtt_broker.c Outdated
Comment thread src/mqtt_broker.c
Comment thread src/mqtt_broker.c
Comment thread src/mqtt_broker.c
Comment thread src/mqtt_broker_persist.c
Comment thread src/mqtt_broker.c
Comment thread src/mqtt_broker.c
Comment thread src/mqtt_broker_persist.c Outdated
Comment thread .github/workflows/broker-check.yml
Comment thread src/mqtt_broker.c
@dgarske dgarske force-pushed the broker_features branch 5 times, most recently from daa75fe to 524bfae Compare May 15, 2026 18:22
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 10 changed files in this pull request and generated 18 comments.

Comments suppressed due to low confidence (2)

src/mqtt_broker_persist.c:1380

  • Same word16 + 1 overflow concern as elsewhere: if the persisted record has flen == 0xFFFF or key_len == 0xFFFF (e.g., corrupted/malicious file), flen + 1 and key_len + 1 wrap to 0 and the size check is bypassed, leading to over-large XMEMCPY into fixed-size buffers (BROKER_MAX_FILTER_LEN/BROKER_MAX_CLIENT_ID_LEN). Use >= or widen the arithmetic.
            if (flen + 1 > BROKER_MAX_FILTER_LEN ||
                    key_len + 1 > BROKER_MAX_CLIENT_ID_LEN) {
                rc = MQTT_CODE_ERROR_OUT_OF_BUFFER;
                goto rollback;
            }

src/mqtt_broker.c:1802

  • BrokerClient_OnPubAck/BrokerClient_OnPubComp remove the in-memory queue entry but never call BrokerPersist_DelOutPub to remove the corresponding on-disk record. As soon as the entry was sent (transitioning to PUBLISH_SENT/PUBREL_SENT) it was already written to disk by the orphan path during disconnect (and may have been restored as such on a previous boot). After a successful PUBACK/PUBCOMP, the broker correctly drops the in-memory state but the persisted record lingers; if the broker restarts before the entry is ever re-orphaned, the record will be replayed as if delivery never completed, causing duplicate delivery to the subscriber. Both ack handlers should call BrokerPersist_DelOutPub(broker, bc->client_id, packet_id) after BrokerClient_UnlinkOutPub.
/* PUBACK from subscriber - completes a QoS 1 delivery. */
static void BrokerClient_OnPubAck(BrokerClient* bc, word16 packet_id)
{
    BrokerOutPub* prev = NULL;
    BrokerOutPub* e;

    if (bc == NULL) {
        return;
    }
    e = BrokerClient_FindOutPub(bc, packet_id, BROKER_OUTQ_PUBLISH_SENT,
            &prev);
    if (e == NULL) {
        WBLOG_DBG(bc->broker,
            "broker: spurious PUBACK sock=%d packet_id=%u",
            (int)bc->sock, (unsigned)packet_id);
        return;
    }
    BrokerClient_UnlinkOutPub(bc, prev, e);
    BrokerClient_DrainOutQueue(bc);
}

/* PUBREC from subscriber - advance the QoS 2 entry to PUBREL_SENT.
 * Returns 1 if a matching entry was found (so the caller knows whether
 * the PUBREL we send is correlated to a real outbound message), 0
 * otherwise. The wire response is still sent in both cases to remain
 * idempotent for buggy peers. */
static int BrokerClient_OnPubRec(BrokerClient* bc, word16 packet_id)
{
    BrokerOutPub* prev = NULL;
    BrokerOutPub* e;

    if (bc == NULL) {
        return 0;
    }
    e = BrokerClient_FindOutPub(bc, packet_id, BROKER_OUTQ_PUBLISH_SENT,
            &prev);
    if (e == NULL) {
        WBLOG_DBG(bc->broker,
            "broker: spurious PUBREC sock=%d packet_id=%u",
            (int)bc->sock, (unsigned)packet_id);
        return 0;
    }
    e->state = BROKER_OUTQ_PUBREL_SENT;
    /* Inflight stays counted - the delivery is still outstanding until
     * PUBCOMP returns. */
    return 1;
}

/* PUBCOMP from subscriber - completes a QoS 2 delivery. */
static void BrokerClient_OnPubComp(BrokerClient* bc, word16 packet_id)
{
    BrokerOutPub* prev = NULL;
    BrokerOutPub* e;

    if (bc == NULL) {
        return;
    }
    e = BrokerClient_FindOutPub(bc, packet_id, BROKER_OUTQ_PUBREL_SENT,
            &prev);
    if (e == NULL) {
        WBLOG_DBG(bc->broker,
            "broker: spurious PUBCOMP sock=%d packet_id=%u",
            (int)bc->sock, (unsigned)packet_id);
        return;
    }
    BrokerClient_UnlinkOutPub(bc, prev, e);
    BrokerClient_DrainOutQueue(bc);
}

Comment thread src/mqtt_broker.c Outdated
Comment thread src/mqtt_broker.c
Comment thread src/mqtt_broker_persist.c Outdated
Comment thread src/mqtt_broker_persist.c Outdated
Comment thread src/mqtt_broker.c
Comment thread src/mqtt_broker.c
Comment thread scripts/broker.test Outdated
Comment thread scripts/broker.test
Comment thread scripts/broker.test
Comment thread src/mqtt_broker_persist_posix.c Outdated
@dgarske dgarske force-pushed the broker_features branch from a706738 to aadf2a6 Compare May 18, 2026 05:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants