wolfMQTT broker: ordering, persistence, offline queue, AES-GCM at rest#538
wolfMQTT broker: ordering, persistence, offline queue, AES-GCM at rest#538dgarske wants to merge 8 commits into
Conversation
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds opt-in features to the wolfMQTT broker: per-subscriber outbound queueing with v5 Receive Maximum honoring, a hook-based persistence layer with a default POSIX backend, an offline message queue surviving reconnects/restarts, and optional AES-GCM encryption at rest. Also fixes a v5 CONNACK protocol error (Maximum QoS=2 was illegal).
Changes:
- New per-subscriber outbound queue / ordering / inflight cap in
mqtt_broker.c. - New persistence layer (
mqtt_broker_persist.c,mqtt_broker_persist_posix.c) with schema-wipe-on-mismatch and optional AES-GCM. - New configure flags, broker.test cases, and CI matrix entries.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 17 comments.
Show a summary per file
| File | Description |
|---|---|
| wolfmqtt/mqtt_broker.h | Public types/macros for outbound queue, orphan sessions, persistence hooks. |
| src/mqtt_broker.c | Outbound queue, orphan session pool, persist shadow-write call sites, CONNACK fix. |
| src/mqtt_broker_persist.c | Record encoders/decoders, restore, schema wipe, AES-GCM wrap/unwrap. |
| src/mqtt_broker_persist_posix.c | Default POSIX file-based backend (kv_put/get/del/iter/sync). |
| src/include.am | Build the new persistence sources into the broker binary. |
| scripts/broker.test | New tests: ordering burst, persist round-trip, schema wipe, offline queue, AES-GCM. |
| configure.ac | --enable-broker-persist and --enable-broker-persist-encrypt flags. |
| .github/workflows/broker-check.yml | CI matrix entries for the new build configurations. |
Comments suppressed due to low confidence (1)
src/mqtt_broker.c:1
- The iterator advance
sub = sub->nextis now inside#ifndef WOLFMQTT_STATIC_MEMORY. This is presumably correct because the static-memory loop uses indexed iteration, but the asymmetry (and the fact that the dynamic-mode body lives just above the#ifndefand is already inside the linked-list loop) makes this very fragile to future edits. Consider keeping thesub = sub->nextadvance outside the orphan-handling guard, or refactoring so the loop structure is single-version per memory mode.
/* mqtt_broker.c
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
daa75fe to
524bfae
Compare
…scriptions, retained messages, schema-wipe-on-mismatch, static-memory restore)
… NS_OUTQ persist, v5 Session Expiry Interval)
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 18 comments.
Comments suppressed due to low confidence (2)
src/mqtt_broker_persist.c:1380
- Same
word16 + 1overflow concern as elsewhere: if the persisted record hasflen == 0xFFFForkey_len == 0xFFFF(e.g., corrupted/malicious file),flen + 1andkey_len + 1wrap to 0 and the size check is bypassed, leading to over-largeXMEMCPYinto fixed-size buffers (BROKER_MAX_FILTER_LEN/BROKER_MAX_CLIENT_ID_LEN). Use>=or widen the arithmetic.
if (flen + 1 > BROKER_MAX_FILTER_LEN ||
key_len + 1 > BROKER_MAX_CLIENT_ID_LEN) {
rc = MQTT_CODE_ERROR_OUT_OF_BUFFER;
goto rollback;
}
src/mqtt_broker.c:1802
BrokerClient_OnPubAck/BrokerClient_OnPubCompremove the in-memory queue entry but never callBrokerPersist_DelOutPubto remove the corresponding on-disk record. As soon as the entry was sent (transitioning toPUBLISH_SENT/PUBREL_SENT) it was already written to disk by the orphan path during disconnect (and may have been restored as such on a previous boot). After a successful PUBACK/PUBCOMP, the broker correctly drops the in-memory state but the persisted record lingers; if the broker restarts before the entry is ever re-orphaned, the record will be replayed as if delivery never completed, causing duplicate delivery to the subscriber. Both ack handlers should callBrokerPersist_DelOutPub(broker, bc->client_id, packet_id)afterBrokerClient_UnlinkOutPub.
/* PUBACK from subscriber - completes a QoS 1 delivery. */
static void BrokerClient_OnPubAck(BrokerClient* bc, word16 packet_id)
{
BrokerOutPub* prev = NULL;
BrokerOutPub* e;
if (bc == NULL) {
return;
}
e = BrokerClient_FindOutPub(bc, packet_id, BROKER_OUTQ_PUBLISH_SENT,
&prev);
if (e == NULL) {
WBLOG_DBG(bc->broker,
"broker: spurious PUBACK sock=%d packet_id=%u",
(int)bc->sock, (unsigned)packet_id);
return;
}
BrokerClient_UnlinkOutPub(bc, prev, e);
BrokerClient_DrainOutQueue(bc);
}
/* PUBREC from subscriber - advance the QoS 2 entry to PUBREL_SENT.
* Returns 1 if a matching entry was found (so the caller knows whether
* the PUBREL we send is correlated to a real outbound message), 0
* otherwise. The wire response is still sent in both cases to remain
* idempotent for buggy peers. */
static int BrokerClient_OnPubRec(BrokerClient* bc, word16 packet_id)
{
BrokerOutPub* prev = NULL;
BrokerOutPub* e;
if (bc == NULL) {
return 0;
}
e = BrokerClient_FindOutPub(bc, packet_id, BROKER_OUTQ_PUBLISH_SENT,
&prev);
if (e == NULL) {
WBLOG_DBG(bc->broker,
"broker: spurious PUBREC sock=%d packet_id=%u",
(int)bc->sock, (unsigned)packet_id);
return 0;
}
e->state = BROKER_OUTQ_PUBREL_SENT;
/* Inflight stays counted - the delivery is still outstanding until
* PUBCOMP returns. */
return 1;
}
/* PUBCOMP from subscriber - completes a QoS 2 delivery. */
static void BrokerClient_OnPubComp(BrokerClient* bc, word16 packet_id)
{
BrokerOutPub* prev = NULL;
BrokerOutPub* e;
if (bc == NULL) {
return;
}
e = BrokerClient_FindOutPub(bc, packet_id, BROKER_OUTQ_PUBREL_SENT,
&prev);
if (e == NULL) {
WBLOG_DBG(bc->broker,
"broker: spurious PUBCOMP sock=%d packet_id=%u",
(int)bc->sock, (unsigned)packet_id);
return;
}
BrokerClient_UnlinkOutPub(bc, prev, e);
BrokerClient_DrainOutQueue(bc);
}
MQTT_CODE_CONTINUEhandling