Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send will message as data frame + reject large packets #363

Merged
merged 17 commits into from
Aug 24, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

import io.aklivity.zilla.specs.binding.mqtt.internal.types.Array32FW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.MqttBinaryFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.MqttEndReasonCode;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.MqttMessageFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.MqttPayloadFormat;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.MqttPayloadFormatFW;
Expand All @@ -46,7 +45,6 @@
import io.aklivity.zilla.specs.binding.mqtt.internal.types.Varuint32FW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttBeginExFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttDataExFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttEndExFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttExtensionKind;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttFlushExFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttPublishBeginExFW;
Expand Down Expand Up @@ -103,12 +101,6 @@ public static MqttFlushExBuilder flushEx()
return new MqttFlushExBuilder();
}

@Function
public static MqttEndExBuilder endEx()
{
return new MqttEndExBuilder();
}

@Function
public static MqttResetExBuilder resetEx()
{
Expand Down Expand Up @@ -664,39 +656,6 @@ public byte[] build()
}
}

public static final class MqttEndExBuilder
{
private final MqttEndExFW.Builder endExRW;

private MqttEndExBuilder()
{
MutableDirectBuffer writeBuffer = new UnsafeBuffer(new byte[1024 * 8]);
this.endExRW = new MqttEndExFW.Builder().wrap(writeBuffer, 0, writeBuffer.capacity());
}

public MqttEndExBuilder typeId(
int typeId)
{
endExRW.typeId(typeId);
return this;
}

public MqttEndExBuilder reason(
String reason)
{
endExRW.reasonCode(r -> r.set(MqttEndReasonCode.valueOf(reason)));
return this;
}

public byte[] build()
{
final MqttEndExFW endEx = endExRW.build();
final byte[] array = new byte[endEx.sizeof()];
endEx.buffer().getBytes(endEx.offset(), array);
return array;
}
}

public static final class MqttResetExBuilder
{
private final MqttResetExFW.Builder resetExRW;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,6 @@ scope mqtt
SESSION(3)
}

enum MqttEndReasonCode (uint8)
{
DISCONNECT(0),
KEEP_ALIVE_EXPIRY(1),
DISCONNECT_WITH_WILL(2)
}

enum MqttPayloadFormat
{
BINARY,
Expand Down Expand Up @@ -190,11 +183,6 @@ scope mqtt
string16 serverRef = null;
}

struct MqttEndEx extends core::stream::Extension
{
MqttEndReasonCode reasonCode = DISCONNECT;
}

union MqttFlushEx switch (uint8) extends core::stream::Extension
{
case 1: mqtt::stream::MqttSubscribeFlushEx subscribe;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,4 @@ write flush

read zilla:data.empty

write zilla:end.ext ${mqtt:endEx()
.typeId(zilla:id("mqtt"))
.reason("KEEP_ALIVE_EXPIRY")
.build()}

write close
read closed
write abort
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,4 @@ read ${mqtt:will()
write zilla:data.empty
write flush

read zilla:end.ext ${mqtt:endEx()
.typeId(zilla:id("mqtt"))
.reason("KEEP_ALIVE_EXPIRY")
.build()}

read closed
write close
read aborted

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,5 @@ write flush

read zilla:data.empty

write zilla:end.ext ${mqtt:endEx()
.typeId(zilla:id("mqtt"))
.reason("DISCONNECT")
.build()}

write close
read closed
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,5 @@ read ${mqtt:will()
write zilla:data.empty
write flush

read zilla:end.ext ${mqtt:endEx()
.typeId(zilla:id("mqtt"))
.reason("DISCONNECT")
.build()}

read closed
write close
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@
import org.junit.Test;
import org.kaazing.k3po.lang.el.BytesMatcher;

import io.aklivity.zilla.specs.binding.mqtt.internal.types.MqttEndReasonCode;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.MqttMessageFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.MqttPayloadFormat;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.MqttSessionStateFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttBeginExFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttDataExFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttEndExFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttFlushExFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttResetExFW;

Expand Down Expand Up @@ -1170,20 +1168,6 @@ public void shouldEncodeMqttSubscribeFlushEx()
0b0001 == f.flags()));
}

@Test
public void shouldEncodeMqttAbortExAsUnsubscribe()
{
final byte[] array = MqttFunctions.endEx()
.typeId(0)
.reason("KEEP_ALIVE_EXPIRY")
.build();

DirectBuffer buffer = new UnsafeBuffer(array);
MqttEndExFW mqttEndEx = new MqttEndExFW().wrap(buffer, 0, buffer.capacity());
assertEquals(0, mqttEndEx.typeId());
assertEquals(MqttEndReasonCode.KEEP_ALIVE_EXPIRY, mqttEndEx.reasonCode().get());
}

@Test
public void shouldEncodeMqttResetEx()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,9 @@ public void shouldRemoveSessionAtCleanStart() throws Exception

@Test
@Specification({
"${app}/session.will.message.disconnect.with.will.message/client",
"${app}/session.will.message.disconnect.with.will.message/server"})
public void shouldSendReasonForEndAfterDisconnectWithWillMessage() throws Exception
{
k3po.finish();
}

@Test
@Specification({
"${app}/session.will.message.no.ping.within.keep.alive/client",
"${app}/session.will.message.no.ping.within.keep.alive/server"})
public void shouldSendReasonForEndAfterKeepAliveTimeout() throws Exception
"${app}/session.will.message.abort/client",
"${app}/session.will.message.abort/server"})
public void shouldAbortSessionStreamWhenWillDelivery() throws Exception
{
k3po.finish();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.Flyweight;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.MqttBinaryFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.MqttCapabilities;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.MqttEndReasonCode;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.MqttMessageFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.MqttPayloadFormat;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.MqttQoS;
Expand Down Expand Up @@ -148,7 +147,6 @@
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.stream.FlushFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.stream.MqttBeginExFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.stream.MqttDataExFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.stream.MqttEndExFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.stream.MqttFlushExFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.stream.MqttPublishDataExFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.stream.MqttResetExFW;
Expand Down Expand Up @@ -244,7 +242,6 @@ public final class MqttServerFactory implements MqttStreamFactory
private final MqttBeginExFW.Builder mqttPublishBeginExRW = new MqttBeginExFW.Builder();
private final MqttBeginExFW.Builder mqttSubscribeBeginExRW = new MqttBeginExFW.Builder();
private final MqttBeginExFW.Builder mqttSessionBeginExRW = new MqttBeginExFW.Builder();
private final MqttEndExFW.Builder mqttEndExRW = new MqttEndExFW.Builder();
private final MqttDataExFW.Builder mqttPublishDataExRW = new MqttDataExFW.Builder();
private final MqttDataExFW.Builder mqttSubscribeDataExRW = new MqttDataExFW.Builder();
private final MqttDataExFW.Builder mqttSessionDataExRW = new MqttDataExFW.Builder();
Expand Down Expand Up @@ -887,6 +884,7 @@ private int decodeConnectPayload(
int progress = offset;

progress = server.onDecodeConnectPayload(traceId, authorization, buffer, progress, limit);
decodableRemainingBytes -= progress - offset;
if (decodableRemainingBytes == 0)
{
server.decoder = decodePacketType;
Expand Down Expand Up @@ -1520,10 +1518,7 @@ private void onKeepAliveTimeoutSignal(
{
if (session)
{
final MqttEndExFW.Builder builder = mqttEndExRW.wrap(sessionExtBuffer, 0, sessionExtBuffer.capacity())
.typeId(mqttTypeId)
.reasonCode(r -> r.set(MqttEndReasonCode.KEEP_ALIVE_EXPIRY));
sessionStream.doSessionAppEnd(traceId, builder.build());
sessionStream.doSessionAbort(traceId);
}
onDecodeError(traceId, authorization, KEEP_ALIVE_TIMEOUT);
decoder = decodeIgnoreAll;
Expand Down Expand Up @@ -1804,7 +1799,6 @@ else if (this.authField.equals(MqttConnectProperty.PASSWORD))
}
sessionStream.doSessionData(traceId, willPayloadSize, sessionDataExBuilder.build(), will);
}
decodableRemainingBytes -= connectPayloadLimit - progress;
progress = connectPayloadLimit;
}

Expand Down Expand Up @@ -2253,13 +2247,14 @@ private void onDecodeDisconnect(
state = MqttState.closingInitial(state);
if (session)
{
final MqttEndExFW.Builder builder = mqttEndExRW.wrap(sessionExtBuffer, 0, sessionExtBuffer.capacity())
.typeId(mqttTypeId)
.reasonCode(r -> r.set(
disconnect.reasonCode() == DISCONNECT_WITH_WILL_MESSAGE ?
MqttEndReasonCode.DISCONNECT_WITH_WILL :
MqttEndReasonCode.DISCONNECT));
sessionStream.doSessionAppEnd(traceId, builder.build());
if (disconnect.reasonCode() == DISCONNECT_WITH_WILL_MESSAGE)
{
sessionStream.doSessionAbort(traceId);
}
else
{
sessionStream.doSessionAppEnd(traceId, EMPTY_OCTETS);
}
}
closeStreams(traceId, authorization);
doNetworkEnd(traceId, authorization);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public void shouldCloseSessionNormalDisconnect() throws Exception
@Configuration("server.yaml")
@Specification({
"${net}/session.will.message.disconnect.with.will.message/client",
"${app}/session.will.message.disconnect.with.will.message/server"})
"${app}/session.will.message.abort/server"})
@Configure(name = WILDCARD_SUBSCRIPTION_AVAILABLE_NAME, value = "true")
@Configure(name = SHARED_SUBSCRIPTION_AVAILABLE_NAME, value = "true")
@Configure(name = MAXIMUM_QOS_NAME, value = "2")
Expand All @@ -219,7 +219,7 @@ public void shouldCloseSessionDisconnectWithWill() throws Exception
@Configuration("server.yaml")
@Specification({
"${net}/session.will.message.no.ping.within.keep.alive/client",
"${app}/session.will.message.no.ping.within.keep.alive/server"})
"${app}/session.will.message.abort/server"})
@Configure(name = WILDCARD_SUBSCRIPTION_AVAILABLE_NAME, value = "true")
@Configure(name = SHARED_SUBSCRIPTION_AVAILABLE_NAME, value = "true")
@Configure(name = MAXIMUM_QOS_NAME, value = "2")
Expand Down
Loading