Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT 3.1.1 implementation #582

Merged
merged 17 commits into from
Dec 1, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public final class MqttReasonCodes
public static final byte UNSPECIFIED_ERROR = (byte) 0x80;
public static final byte MALFORMED_PACKET = (byte) 0x81;
public static final byte PROTOCOL_ERROR = (byte) 0x82;
public static final byte IDENTIFIER_REJECTED = (byte) 0x02;
public static final byte SUBACK_FAILURE_CODE_V4 = (byte) 0x80;
public static final byte PACKET_TOO_LARGE = (byte) 0x95;
public static final byte WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED = (byte) 0xa2;
public static final byte SHARED_SUBSCRIPTION_NOT_SUPPORTED = (byte) 0x9e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.String16FW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.Varuint32FW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttConnackV5FW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttConnectHeaderFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttConnectV5FW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttDisconnectV5FW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttPacketHeaderFW;
Expand Down Expand Up @@ -274,6 +275,7 @@ public final class MqttClientFactory implements MqttStreamFactory
private final MqttPublishHeader mqttPublishHeaderRO = new MqttPublishHeader();

private final MqttConnectV5FW.Builder mqttConnectV5RW = new MqttConnectV5FW.Builder();
private final MqttConnectHeaderFW.Builder mqttConnectHeaderRW = new MqttConnectHeaderFW.Builder();
private final MqttSubscribeV5FW.Builder mqttSubscribeV5RW = new MqttSubscribeV5FW.Builder();
private final MqttUnsubscribeV5FW.Builder mqttUnsubscribeV5RW = new MqttUnsubscribeV5FW.Builder();
private final MqttPublishV5FW.Builder mqttPublishV5RW = new MqttPublishV5FW.Builder();
Expand Down Expand Up @@ -2282,14 +2284,21 @@ private void doEncodeConnect(
final int propertiesSize0 = propertiesSize;
final int willSize = will != null ? will.sizeof() : 0;
flags |= will != null ? (WILL_FLAG_MASK | ((willMessage.flags() & RETAIN_MASK) != 0 ? WILL_RETAIN_MASK : 0)) : 0;
final MqttConnectV5FW connect =
mqttConnectV5RW.wrap(writeBuffer, FIELD_OFFSET_PAYLOAD, writeBuffer.capacity())

final MqttConnectHeaderFW connectHeader =
mqttConnectHeaderRW.wrap(writeBuffer, FIELD_OFFSET_PAYLOAD, writeBuffer.capacity())
.typeAndFlags(0x10)
.remainingLength(11 + propertiesSize0 + clientId.length() + 2 + willSize)
.protocolName(MQTT_PROTOCOL_NAME)
.protocolVersion(MQTT_PROTOCOL_VERSION)
.flags(flags)
.keepAlive((int) MILLISECONDS.toSeconds(keepAliveMillis))
.build();

doNetworkData(traceId, authorization, 0L, connectHeader);

final MqttConnectV5FW connect =
mqttConnectV5RW.wrap(writeBuffer, FIELD_OFFSET_PAYLOAD, writeBuffer.capacity())
.properties(p -> p.length(propertiesSize0)
.value(propertyBuffer, 0, propertiesSize0))
.clientId(clientId)
Expand Down

Large diffs are not rendered by default.

36 changes: 18 additions & 18 deletions runtime/binding-mqtt/src/main/zilla/protocol.idl
Original file line number Diff line number Diff line change
Expand Up @@ -94,26 +94,26 @@
case 0x2A: uint8 sharedSubscriptionAvailable;
}

struct MqttConnectV3 extends MqttPacketHeader
struct MqttConnectHeader extends MqttPacketHeader
{
string16 protocolName;
uint8 protocolVersion;
uint8 flags;
uint16 keepAlive;
}

struct MqttConnectV4
{
string16 clientId;
}

struct MqttConnectV5 extends MqttPacketHeader
struct MqttConnectV5
{
string16 protocolName;
uint8 protocolVersion;
uint8 flags;
uint16 keepAlive;
MqttProperties properties;
string16 clientId;
}

struct MqttWillV3
struct MqttWillV4
{
string16 topic;
Binary payload; //TODO: data fragmentation
Expand All @@ -126,7 +126,7 @@
Binary payload; //TODO: data fragmentation
}

struct MqttConnackV3 extends MqttPacketHeader
struct MqttConnackV4 extends MqttPacketHeader
{
uint8 flags;
uint8 reasonCode;
Expand All @@ -139,7 +139,7 @@
MqttProperties properties;
}

struct MqttPublishV3 extends MqttPacketHeader
struct MqttPublishV4 extends MqttPacketHeader
{
string16 topicName;
octets payload;
Expand All @@ -152,7 +152,7 @@
octets payload;
}

struct MqttPubackV3 extends MqttPacketHeader
struct MqttPubackV4 extends MqttPacketHeader
{
uint16 packetId;
uint8 reasonCode;
Expand All @@ -165,7 +165,7 @@
MqttProperties properties;
}

struct MqttPubrecV3 extends MqttPacketHeader
struct MqttPubrecV4 extends MqttPacketHeader
{
uint16 packetId;
uint8 reasonCode;
Expand All @@ -178,7 +178,7 @@
MqttProperties properties;
}

struct MqttPubrelV3 extends MqttPacketHeader
struct MqttPubrelV4 extends MqttPacketHeader
{
uint16 packetId;
uint8 reasonCode;
Expand All @@ -191,7 +191,7 @@
MqttProperties properties;
}

struct MqttPubcompV3 extends MqttPacketHeader
struct MqttPubcompV4 extends MqttPacketHeader
{
uint16 packetId;
uint8 reasonCode;
Expand All @@ -204,7 +204,7 @@
MqttProperties properties;
}

struct MqttSubscribeV3 extends MqttPacketHeader
struct MqttSubscribeV4 extends MqttPacketHeader
{
uint16 packetId;
octets payload;
Expand All @@ -223,7 +223,7 @@
uint8 options;
}

struct MqttSubackV3 extends MqttPacketHeader
struct MqttSubackV4 extends MqttPacketHeader
{
uint16 packetId;
octets payload;
Expand All @@ -241,7 +241,7 @@
uint8 reasonCode;
}

struct MqttUnsubscribeV3 extends MqttPacketHeader
struct MqttUnsubscribeV4 extends MqttPacketHeader
{
uint16 packetId;
octets payload;
Expand All @@ -259,7 +259,7 @@
string16 filter;
}

struct MqttUnsubackV3 extends MqttPacketHeader
struct MqttUnsubackV4 extends MqttPacketHeader
{
uint16 packetId;
}
Expand All @@ -284,7 +284,7 @@
{
}

struct MqttDisconnectV3 extends MqttPacketHeader
struct MqttDisconnectV4 extends MqttPacketHeader
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ public void shouldSendMultipleMessages() throws Exception
@Test
@Configuration("client.yaml")
@Specification({
"${net}/publish.one.message/server",
"${app}/publish.one.message/client"})
"${net}/publish.one.message.properties/server",
"${app}/publish.one.message.properties/client"})
public void shouldSendOneMessage() throws Exception
{
k3po.finish();
Expand Down
Loading