From 7c95c9fd85935de50fff7c4a7baf3cbd406ff154 Mon Sep 17 00:00:00 2001 From: bmaidics Date: Tue, 2 Jan 2024 11:26:20 +0100 Subject: [PATCH] Send disconnect even without mqtt reset extension --- .../internal/stream/MqttServerFactory.java | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java index 242658a950..92437f398e 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java @@ -4922,29 +4922,35 @@ private void onSessionReset( final OctetsFW extension = reset.extension(); final MqttResetExFW mqttResetEx = extension.get(mqttResetExRO::tryWrap); + byte reasonCode = SESSION_TAKEN_OVER; + boolean serverRefExists = false; + String16FW serverRef = null; + String16FW reason = null; + if (mqttResetEx != null) { - String16FW serverRef = mqttResetEx.serverRef(); - byte reasonCode = (byte) mqttResetEx.reasonCode(); - String16FW reason = mqttResetEx.reason(); - boolean serverRefExists = serverRef != null && serverRef.asString() != null; + serverRef = mqttResetEx.serverRef(); + reasonCode = (byte) mqttResetEx.reasonCode(); + reason = mqttResetEx.reason(); + serverRefExists = serverRef != null && serverRef.asString() != null; if (reasonCode == SUCCESS) { reasonCode = serverRefExists ? SERVER_MOVED : SESSION_TAKEN_OVER; } + } - if (!connected) - { - doCancelConnectTimeout(); - doEncodeConnack(traceId, authorization, reasonCode, assignedClientId, - false, serverRefExists ? serverRef : null, reason, version); - } - else if (version == MQTT_PROTOCOL_VERSION_5) - { - doEncodeDisconnect(traceId, authorization, reasonCode, serverRefExists ? serverRef : null, reason); - } + if (!connected) + { + doCancelConnectTimeout(); + doEncodeConnack(traceId, authorization, reasonCode, assignedClientId, + false, serverRefExists ? serverRef : null, reason, version); } + else if (version == MQTT_PROTOCOL_VERSION_5) + { + doEncodeDisconnect(traceId, authorization, reasonCode, serverRefExists ? serverRef : null, reason); + } + doNetworkEnd(traceId, authorization); setInitialClosed(); decodeNetwork(traceId);