diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/config/MqttKafkaOptionsConfigAdapter.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/config/MqttKafkaOptionsConfigAdapter.java index 02498608eb..c87d1ccc5e 100644 --- a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/config/MqttKafkaOptionsConfigAdapter.java +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/config/MqttKafkaOptionsConfigAdapter.java @@ -30,10 +30,10 @@ import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaPublishConfig; import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaTopicsConfig; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.MqttKafkaBinding; +import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.MqttQoS; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.String16FW; import io.aklivity.zilla.runtime.engine.config.OptionsConfig; import io.aklivity.zilla.runtime.engine.config.OptionsConfigAdapterSpi; -import io.aklivity.zilla.specs.binding.mqtt.internal.types.MqttQoS; public class MqttKafkaOptionsConfigAdapter implements OptionsConfigAdapterSpi, JsonbAdapter { diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java index 956d8cf1a9..da34e303b0 100644 --- a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java @@ -61,7 +61,6 @@ import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.KafkaSkip; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.MqttPayloadFormat; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.MqttQoS; -import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.MqttSubscribeOffsetMetadataFW; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.MqttTopicFilterFW; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.OctetsFW; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.String16FW; @@ -93,6 +92,7 @@ import io.aklivity.zilla.runtime.engine.binding.function.MessageConsumer; import io.aklivity.zilla.runtime.engine.buffer.BufferPool; import io.aklivity.zilla.runtime.engine.concurrent.Signaler; +import io.aklivity.zilla.specs.binding.mqtt.kafka.internal.types.MqttSubscribeOffsetMetadataFW; public class MqttKafkaSubscribeFactory implements MqttKafkaStreamFactory { @@ -109,7 +109,7 @@ public class MqttKafkaSubscribeFactory implements MqttKafkaStreamFactory private static final int DATA_FLAG_FIN = 0x01; private static final OctetsFW EMPTY_OCTETS = new OctetsFW().wrap(new UnsafeBuffer(new byte[0]), 0, 0); private static final String16FW EMPTY_STRING = new String16FW(""); - private static final int OFFSET_METADATA_VERSION = 1; + private static final int OFFSET_METADATA_VERSION = 2; private final OctetsFW emptyRO = new OctetsFW().wrap(new UnsafeBuffer(0L, 0), 0, 0); private final BeginFW beginRO = new BeginFW(); @@ -467,93 +467,118 @@ private void onMqttFlush( binding.resolveAll(authorization, filters) : null; final int packetId = mqttSubscribeFlushEx.packetId(); - if (!filters.isEmpty()) + if (!MqttKafkaState.replyClosed(state)) { - if (routes != null) + if (!filters.isEmpty()) { - routes.forEach(r -> - { - final long routeOrder = r.order; - if (!messages.containsKey(routeOrder)) - { - KafkaMessagesProxy messagesProxy = new KafkaMessagesProxy(originId, r, this); - messages.put(routeOrder, messagesProxy); - messagesPerTopicKey.put(messagesProxy.topicKey, r.order); - messagesProxy.doKafkaBegin(traceId, authorization, 0, filters); - } - else - { - messages.get(routeOrder).doKafkaFlush(traceId, authorization, budgetId, reserved, qos, filters); - } - }); + onFiltersChanged(traceId, authorization, budgetId, reserved, filters, routes); } - - if (retainAvailable) + else if (packetId > 0) { - final List retainedFilters = new ArrayList<>(); - filters.forEach(filter -> - { - final boolean sendRetained = (filter.flags() & SEND_RETAIN_FLAG) != 0; - if (sendRetained) - { - retainedFilters.add(new Subscription( - (int) filter.subscriptionId(), newString16FW(filter.pattern()), filter.qos(), filter.flags())); - final boolean rap = (filter.flags() & RETAIN_AS_PUBLISHED_FLAG) != 0; - retainAsPublished.put((int) filter.subscriptionId(), rap); - } - }); - - retainedSubscriptions.removeIf(rf -> !filters.anyMatch(f -> f.pattern().equals(rf.filter))); - if (!retainedFilters.isEmpty()) - { - if (MqttKafkaState.initialOpened(retained.state) && !MqttKafkaState.initialClosed(retained.state)) - { - retained.doKafkaFlush(traceId, authorization, budgetId, reserved, qos, retainedFilters); - } - else - { - final List newRetainedFilters = new ArrayList<>(); - retainedFilters.forEach(subscription -> - { - if (!retainedSubscriptions.contains(subscription)) - { - newRetainedFilters.add(subscription); - } - }); - retained.doKafkaBegin(traceId, authorization, 0, newRetainedFilters); - } - } + onMessageAcked(traceId, authorization, budgetId, reserved, packetId, mqttSubscribeFlushEx); } } - else if (packetId > 0) - { - final int qos = mqttSubscribeFlushEx.qos(); - final MqttOffsetStateFlags state = MqttOffsetStateFlags.valueOf(mqttSubscribeFlushEx.state()); - final PartitionOffset offset = state == MqttOffsetStateFlags.INCOMPLETE ? - offsetsPerPacketId.get(packetId) : offsetsPerPacketId.remove(packetId); + } + + private void onMessageAcked( + long traceId, + long authorization, + long budgetId, + int reserved, + int packetId, + MqttSubscribeFlushExFW mqttSubscribeFlushEx) + { + final int qos = mqttSubscribeFlushEx.qos(); + final MqttOffsetStateFlags state = MqttOffsetStateFlags.valueOf(mqttSubscribeFlushEx.state()); + final PartitionOffset offset = state == MqttOffsetStateFlags.INCOMPLETE ? + offsetsPerPacketId.get(packetId) : offsetsPerPacketId.remove(packetId); - final long topicPartitionKey = topicPartitionKey(offset.topicKey, offset.partitionId); - final long messagesId = messagesPerTopicKey.get(offset.topicKey); + final long topicPartitionKey = topicPartitionKey(offset.topicKey, offset.partitionId); + final long messagesId = messagesPerTopicKey.get(offset.topicKey); - OffsetCommit offsetCommit = new OffsetCommit(offset, qos, state, packetId); + OffsetCommit offsetCommit = new OffsetCommit(offset, qos, state, packetId); - final OffsetHighWaterMark highWaterMark = highWaterMarks.get(topicPartitionKey); + final OffsetHighWaterMark highWaterMark = highWaterMarks.get(topicPartitionKey); - final KafkaProxy proxy = messagesId != -1 ? messages.get(messagesId) : retained; + final KafkaProxy proxy = messagesId != -1 ? messages.get(messagesId) : retained; + + if (highWaterMark.offset >= offset.offset) + { + highWaterMark.increase(); + commitOffset(traceId, authorization, budgetId, reserved, proxy, offsetCommit); + commitDeferredOffsets(traceId, authorization, budgetId, reserved, highWaterMark); + } + else if (qos == MqttQoS.EXACTLY_ONCE.value() && state != MqttOffsetStateFlags.INCOMPLETE) + { + commitOffset(traceId, authorization, budgetId, reserved, proxy, offsetCommit); + } + else + { + highWaterMark.deferOffsetCommit(offsetCommit, proxy); + } + } - if (highWaterMark.offset >= offset.offset) + private void onFiltersChanged( + long traceId, + long authorization, + long budgetId, + int reserved, + Array32FW filters, + List routes) + { + if (routes != null) + { + routes.forEach(r -> { - highWaterMark.increase(); - commitOffset(traceId, authorization, budgetId, reserved, proxy, offsetCommit); - commitDeferredOffsets(traceId, authorization, budgetId, reserved, highWaterMark); - } - else if (qos == MqttQoS.EXACTLY_ONCE.value() && state != MqttOffsetStateFlags.INCOMPLETE) + final long routeOrder = r.order; + if (!messages.containsKey(routeOrder)) + { + KafkaMessagesProxy messagesProxy = new KafkaMessagesProxy(originId, r, this); + messages.put(routeOrder, messagesProxy); + messagesPerTopicKey.put(messagesProxy.topicKey, r.order); + messagesProxy.doKafkaBegin(traceId, authorization, 0, filters); + } + else + { + messages.get(routeOrder).doKafkaFlush(traceId, authorization, budgetId, reserved, qos, filters); + } + }); + } + + if (retainAvailable) + { + final List retainedFilters = new ArrayList<>(); + filters.forEach(filter -> { - commitOffset(traceId, authorization, budgetId, reserved, proxy, offsetCommit); - } - else + final boolean sendRetained = (filter.flags() & SEND_RETAIN_FLAG) != 0; + if (sendRetained) + { + retainedFilters.add(new Subscription((int) filter.subscriptionId(), + newString16FW(filter.pattern()), filter.qos(), filter.flags())); + final boolean rap = (filter.flags() & RETAIN_AS_PUBLISHED_FLAG) != 0; + retainAsPublished.put((int) filter.subscriptionId(), rap); + } + }); + + retainedSubscriptions.removeIf(rf -> !filters.anyMatch(f -> f.pattern().equals(rf.filter))); + if (!retainedFilters.isEmpty()) { - highWaterMark.deferOffsetCommit(offsetCommit, proxy); + if (MqttKafkaState.initialOpened(retained.state) && !MqttKafkaState.initialClosed(retained.state)) + { + retained.doKafkaFlush(traceId, authorization, budgetId, reserved, qos, retainedFilters); + } + else + { + final List newRetainedFilters = new ArrayList<>(); + retainedFilters.forEach(subscription -> + { + if (!retainedSubscriptions.contains(subscription)) + { + newRetainedFilters.add(subscription); + } + }); + retained.doKafkaBegin(traceId, authorization, 0, newRetainedFilters); + } } } } @@ -1846,7 +1871,16 @@ private IntArrayList stringToOffsetMetadataList( final IntArrayList metadataList = new IntArrayList(); UnsafeBuffer buffer = new UnsafeBuffer(BitUtil.fromHex(metadata.asString())); final MqttSubscribeOffsetMetadataFW offsetMetadata = mqttOffsetMetadataRO.wrap(buffer, 0, buffer.capacity()); - offsetMetadata.packetIds().forEachRemaining((IntConsumer) metadataList::add); + switch (offsetMetadata.kind()) + { + case V1: + offsetMetadata.subscribeMetadataV1().packetIds().forEachRemaining((IntConsumer) metadataList::add); + break; + case V2: + offsetMetadata.subscribeMetadataV2().packetIds().forEachRemaining((IntConsumer) metadataList::add); + break; + } + return metadataList; } @@ -1854,8 +1888,7 @@ private String16FW offsetMetadataListToString( IntArrayList metadataList) { mqttOffsetMetadataRW.wrap(offsetBuffer, 0, offsetBuffer.capacity()); - mqttOffsetMetadataRW.version(OFFSET_METADATA_VERSION); - metadataList.forEach(p -> mqttOffsetMetadataRW.appendPacketIds(p.shortValue())); + mqttOffsetMetadataRW.subscribeMetadataV2(m -> metadataList.forEach(p -> m.appendPacketIds(p.shortValue()))); final MqttSubscribeOffsetMetadataFW offsetMetadata = mqttOffsetMetadataRW.build(); return new String16FW(BitUtil.toHex(offsetMetadata.buffer().byteArray(), offsetMetadata.offset(), offsetMetadata.limit())); diff --git a/runtime/binding-mqtt-kafka/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeProxyIT.java b/runtime/binding-mqtt-kafka/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeProxyIT.java index eb92ff949f..3df88ab1d3 100644 --- a/runtime/binding-mqtt-kafka/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeProxyIT.java +++ b/runtime/binding-mqtt-kafka/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeProxyIT.java @@ -526,6 +526,17 @@ public void shouldReceiveMessageQoS2() throws Exception k3po.finish(); } + @Test + @Configuration("proxy.yaml") + @Configure(name = WILL_AVAILABLE_NAME, value = "false") + @Specification({ + "${mqtt}/subscribe.qos2.version1.offset.metadata/client", + "${kafka}/subscribe.qos2.version1.offset.metadata/server"}) + public void shouldReceiveMessageQoS2WithVersion1OffsetMetadata() throws Exception + { + k3po.finish(); + } + @Test @Configuration("proxy.yaml") @Configure(name = WILL_AVAILABLE_NAME, value = "false") diff --git a/specs/binding-mqtt-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/mqtt/kafka/internal/MqttKafkaFunctions.java b/specs/binding-mqtt-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/mqtt/kafka/internal/MqttKafkaFunctions.java index 3b67a1554e..e9df4a5040 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/mqtt/kafka/internal/MqttKafkaFunctions.java +++ b/specs/binding-mqtt-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/mqtt/kafka/internal/MqttKafkaFunctions.java @@ -23,6 +23,9 @@ import io.aklivity.k3po.runtime.lang.el.spi.FunctionMapperSpi; import io.aklivity.zilla.specs.binding.mqtt.kafka.internal.types.MqttPublishOffsetMetadataFW; import io.aklivity.zilla.specs.binding.mqtt.kafka.internal.types.MqttSubscribeOffsetMetadataFW; +import io.aklivity.zilla.specs.binding.mqtt.kafka.internal.types.MqttSubscribeOffsetMetadataV1FW; +import io.aklivity.zilla.specs.binding.mqtt.kafka.internal.types.MqttSubscribeOffsetMetadataV2FW; +import io.aklivity.zilla.specs.binding.mqtt.kafka.internal.types.MqttSubscribeOffsetMetadataVersion; public final class MqttKafkaFunctions { @@ -40,30 +43,84 @@ public static MqttPublishOffsetMetadataBuilder publishMetadata() public static final class MqttSubscribeOffsetMetadataBuilder { - private final MqttSubscribeOffsetMetadataFW.Builder offsetMetadataRW = new MqttSubscribeOffsetMetadataFW.Builder(); - - byte version = 1; + private final MqttSubscribeOffsetMetadataFW.Builder offsetMetadataRW = + new MqttSubscribeOffsetMetadataFW.Builder(); + private final MqttSubscribeOffsetMetadataFW offsetMetadataRO = new MqttSubscribeOffsetMetadataFW(); + private final MutableDirectBuffer writeBuffer = new UnsafeBuffer(new byte[1024 * 8]); private MqttSubscribeOffsetMetadataBuilder() { - MutableDirectBuffer writeBuffer = new UnsafeBuffer(new byte[1024 * 8]); offsetMetadataRW.wrap(writeBuffer, 0, writeBuffer.capacity()); - offsetMetadataRW.version(version); } - public MqttSubscribeOffsetMetadataBuilder metadata( - int packetId) + public MqttSubscribeOffsetMetadataV1Builder v1() { - offsetMetadataRW.appendPacketIds((short) packetId); - return this; + offsetMetadataRW.kind(MqttSubscribeOffsetMetadataVersion.V1); + return new MqttSubscribeOffsetMetadataV1Builder(); + } + + public MqttSubscribeOffsetMetadataV2Builder v2() + { + offsetMetadataRW.kind(MqttSubscribeOffsetMetadataVersion.V2); + return new MqttSubscribeOffsetMetadataV2Builder(); } public String build() { - final MqttSubscribeOffsetMetadataFW offsetMetadata = offsetMetadataRW.build(); + final MqttSubscribeOffsetMetadataFW offsetMetadata = offsetMetadataRO; return BitUtil.toHex(offsetMetadata.buffer().byteArray(), offsetMetadata.offset(), offsetMetadata.limit()); } + + public final class MqttSubscribeOffsetMetadataV1Builder + { + private final MqttSubscribeOffsetMetadataV1FW.Builder offsetMetadataV1RW = + new MqttSubscribeOffsetMetadataV1FW.Builder(); + + private MqttSubscribeOffsetMetadataV1Builder() + { + offsetMetadataV1RW.wrap(writeBuffer, 1, writeBuffer.capacity()); + } + + public MqttSubscribeOffsetMetadataV1Builder metadata( + int packetId) + { + offsetMetadataV1RW.appendPacketIds((short) packetId); + return this; + } + + public MqttSubscribeOffsetMetadataBuilder build() + { + final MqttSubscribeOffsetMetadataV1FW offsetMetadataV1 = offsetMetadataV1RW.build(); + offsetMetadataRO.wrap(writeBuffer, 0, offsetMetadataV1.limit()); + return MqttSubscribeOffsetMetadataBuilder.this; + } + } + + public final class MqttSubscribeOffsetMetadataV2Builder + { + private final MqttSubscribeOffsetMetadataV2FW.Builder offsetMetadataV2RW = + new MqttSubscribeOffsetMetadataV2FW.Builder(); + + private MqttSubscribeOffsetMetadataV2Builder() + { + offsetMetadataV2RW.wrap(writeBuffer, 1, writeBuffer.capacity()); + } + + public MqttSubscribeOffsetMetadataV2Builder metadata( + int packetId) + { + offsetMetadataV2RW.appendPacketIds((short) packetId); + return this; + } + + public MqttSubscribeOffsetMetadataBuilder build() + { + final MqttSubscribeOffsetMetadataV2FW offsetMetadataV2 = offsetMetadataV2RW.build(); + offsetMetadataRO.wrap(writeBuffer, 0, offsetMetadataV2.limit()); + return MqttSubscribeOffsetMetadataBuilder.this; + } + } } public static final class MqttPublishOffsetMetadataBuilder diff --git a/specs/binding-mqtt-kafka.spec/src/main/resources/META-INF/zilla/mqtt_kafka.idl b/specs/binding-mqtt-kafka.spec/src/main/resources/META-INF/zilla/mqtt_kafka.idl index ac11492a6b..a81acbcb3c 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/resources/META-INF/zilla/mqtt_kafka.idl +++ b/specs/binding-mqtt-kafka.spec/src/main/resources/META-INF/zilla/mqtt_kafka.idl @@ -15,13 +15,30 @@ scope mqtt_kafka { - struct MqttSubscribeOffsetMetadata + enum MqttSubscribeOffsetMetadataVersion (uint8) + { + V1 (1), + V2 (2) + } + + struct MqttSubscribeOffsetMetadataV1 { - uint8 version = 1; int8 length; int16[length] packetIds = null; } + struct MqttSubscribeOffsetMetadataV2 + { + int16 length; + int16[length] packetIds = null; + } + + union MqttSubscribeOffsetMetadata switch (MqttSubscribeOffsetMetadataVersion) + { + case V1: MqttSubscribeOffsetMetadataV1 subscribeMetadataV1; + case V2: MqttSubscribeOffsetMetadataV2 subscribeMetadataV2; + } + struct MqttPublishOffsetMetadata { uint8 version = 1; diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.qos2.version1.offset.metadata/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.qos2.version1.offset.metadata/client.rpt new file mode 100644 index 0000000000..f78727ec19 --- /dev/null +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.qos2.version1.offset.metadata/client.rpt @@ -0,0 +1,116 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +connect "zilla://streams/kafka0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("FETCH_ONLY") + .topic("mqtt-messages") + .groupId("zilla:test-mqtt0-client") + .filter() + .headers("zilla:filter") + .sequence("sensor") + .sequence("one") + .build() + .headerNot("zilla:qos", "0") + .headerNot("zilla:qos", "1") + .build() + .evaluation("EAGER") + .build() + .build()} + +read zilla:begin.ext ${kafka:matchBeginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("FETCH_ONLY") + .topic("mqtt-messages") + .partition(0, 0, 1, 1, mqtt_kafka:subscribeMetadata() + .v1() + .metadata(10) + .metadata(20) + .build() + .build()) + .build() + .build()} + +connected + + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .merged() + .fetch() + .filters(1) + .partition(0, 2, 2) + .progress(0, 3) + .progress(1, 1) + .key("sensor/one") + .header("zilla:filter", "sensor") + .header("zilla:filter", "one") + .header("zilla:local", "client") + .header("zilla:format", "TEXT") + .header("zilla:qos", "2") + .build() + .build()} +read "message" + +write advise zilla:flush ${kafka:flushEx() + .typeId(zilla:id("kafka")) + .merged() + .consumer() + .progress(0, 3, mqtt_kafka:subscribeMetadata() + .v2() + .metadata(10) + .metadata(20) + .metadata(1) + .build() + .build()) + .correlationId(1) + .build() + .build()} + +read advised zilla:flush ${kafka:matchFlushEx() + .typeId(zilla:id("kafka")) + .merged() + .consumer() + .progress(0, 3) + .correlationId(1) + .build() + .build()} + +write advise zilla:flush ${kafka:flushEx() + .typeId(zilla:id("kafka")) + .merged() + .consumer() + .progress(0, 3, mqtt_kafka:subscribeMetadata() + .v2() + .metadata(10) + .metadata(20) + .build() + .build()) + .build() + .build()} + +read advised zilla:flush ${kafka:matchFlushEx() + .typeId(zilla:id("kafka")) + .merged() + .consumer() + .progress(0, 3) + .build() + .build()} diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.qos2.version1.offset.metadata/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.qos2.version1.offset.metadata/server.rpt new file mode 100644 index 0000000000..901b145ea7 --- /dev/null +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.qos2.version1.offset.metadata/server.rpt @@ -0,0 +1,120 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +accept "zilla://streams/kafka0" + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${kafka:matchBeginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("FETCH_ONLY") + .topic("mqtt-messages") + .groupId("zilla:test-mqtt0-client") + .filter() + .headers("zilla:filter") + .sequence("sensor") + .sequence("one") + .build() + .headerNot("zilla:qos", "0") + .headerNot("zilla:qos", "1") + .build() + .evaluation("EAGER") + .build() + .build()} + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("FETCH_ONLY") + .topic("mqtt-messages") + .partition(0, 0, 1, 1,mqtt_kafka:subscribeMetadata() + .v1() + .metadata(10) + .metadata(20) + .build() + .build()) + .build() + .build()} + +connected + + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .merged() + .fetch() + .timestamp(kafka:timestamp()) + .filters(1) + .partition(0, 2, 2) + .progress(0, 3) + .progress(1, 1) + .key("sensor/one") + .header("zilla:filter", "sensor") + .header("zilla:filter", "one") + .header("zilla:local", "client") + .header("zilla:format", "TEXT") + .header("zilla:qos", "2") + .build() + .build()} +write "message" +write flush + +read advised zilla:flush ${kafka:matchFlushEx() + .typeId(zilla:id("kafka")) + .merged() + .consumer() + .progress(0, 3,mqtt_kafka:subscribeMetadata() + .v2() + .metadata(10) + .metadata(20) + .metadata(1) + .build() + .build()) + .correlationId(1) + .build() + .build()} + +write advise zilla:flush ${kafka:flushEx() + .typeId(zilla:id("kafka")) + .merged() + .consumer() + .progress(0, 3) + .correlationId(1) + .build() + .build()} + +read advised zilla:flush ${kafka:matchFlushEx() + .typeId(zilla:id("kafka")) + .merged() + .consumer() + .progress(0, 3, mqtt_kafka:subscribeMetadata() + .v2() + .metadata(10) + .metadata(20) + .build() + .build()) + .build() + .build()} + +write advise zilla:flush ${kafka:flushEx() + .typeId(zilla:id("kafka")) + .merged() + .consumer() + .progress(0, 3) + .build() + .build()} diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.receive.message.overlapping.wildcard.mixed.qos/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.receive.message.overlapping.wildcard.mixed.qos/client.rpt index de153e4569..0b108b3488 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.receive.message.overlapping.wildcard.mixed.qos/client.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.receive.message.overlapping.wildcard.mixed.qos/client.rpt @@ -226,8 +226,10 @@ write advise zilla:flush ${kafka:flushEx() .consumer() .progress(0, 3, mqtt_kafka:subscribeMetadata() - .metadata(2) - .build()) + .v2() + .metadata(2) + .build() + .build()) .correlationId(2) .build() .build()} diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.receive.message.overlapping.wildcard.mixed.qos/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.receive.message.overlapping.wildcard.mixed.qos/server.rpt index bcbc9570bd..d03a367483 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.receive.message.overlapping.wildcard.mixed.qos/server.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.receive.message.overlapping.wildcard.mixed.qos/server.rpt @@ -230,8 +230,10 @@ read advised zilla:flush ${kafka:matchFlushEx() .consumer() .progress(0, 3, mqtt_kafka:subscribeMetadata() - .metadata(2) - .build()) + .v2() + .metadata(2) + .build() + .build()) .correlationId(2) .build() .build()} diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.receive.message.qos2/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.receive.message.qos2/client.rpt index 2e02f04bc9..416256b334 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.receive.message.qos2/client.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.receive.message.qos2/client.rpt @@ -138,8 +138,10 @@ write advise zilla:flush ${kafka:flushEx() .consumer() .progress(0, 3, mqtt_kafka:subscribeMetadata() + .v2() .metadata(1) - .build()) + .build() + .build()) .correlationId(1) .build() .build()} diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.receive.message.qos2/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.receive.message.qos2/server.rpt index bb3cf4daba..b0fc95ef7a 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.receive.message.qos2/server.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.receive.message.qos2/server.rpt @@ -138,8 +138,10 @@ read advised zilla:flush ${kafka:matchFlushEx() .consumer() .progress(0, 3, mqtt_kafka:subscribeMetadata() + .v2() .metadata(1) - .build()) + .build() + .build()) .correlationId(1) .build() .build()} diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.receive.messages.mixture.qos/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.receive.messages.mixture.qos/client.rpt index 34ca570c6f..bd47cb6410 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.receive.messages.mixture.qos/client.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.receive.messages.mixture.qos/client.rpt @@ -190,8 +190,10 @@ write advise zilla:flush ${kafka:flushEx() .consumer() .progress(0, 5, mqtt_kafka:subscribeMetadata() - .metadata(2) - .build()) + .v2() + .metadata(2) + .build() + .build()) .correlationId(2) .build() .build()} diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.receive.messages.mixture.qos/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.receive.messages.mixture.qos/server.rpt index 89fd7a8a61..bd52691d33 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.receive.messages.mixture.qos/server.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.receive.messages.mixture.qos/server.rpt @@ -196,8 +196,10 @@ read advised zilla:flush ${kafka:matchFlushEx() .consumer() .progress(0, 5, mqtt_kafka:subscribeMetadata() - .metadata(2) - .build()) + .v2() + .metadata(2) + .build() + .build()) .correlationId(2) .build() .build()} diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.reconnect.replay.qos2.incomplete.message/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.reconnect.replay.qos2.incomplete.message/client.rpt index 77e749c090..c74893d15b 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.reconnect.replay.qos2.incomplete.message/client.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.reconnect.replay.qos2.incomplete.message/client.rpt @@ -142,8 +142,10 @@ write advise zilla:flush ${kafka:flushEx() .consumer() .progress(0, 3, mqtt_kafka:subscribeMetadata() - .metadata(1) - .build()) + .v2() + .metadata(1) + .build() + .build()) .correlationId(1) .build() .build()} @@ -250,8 +252,10 @@ read zilla:begin.ext ${kafka:matchBeginEx() .capabilities("FETCH_ONLY") .topic("mqtt-messages") .partition(0, 2, 3, 3, mqtt_kafka:subscribeMetadata() - .metadata(1) - .build()) + .v2() + .metadata(1) + .build() + .build()) .build() .build()} diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.reconnect.replay.qos2.incomplete.message/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.reconnect.replay.qos2.incomplete.message/server.rpt index 052bff1ecc..fd729f39a0 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.reconnect.replay.qos2.incomplete.message/server.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.reconnect.replay.qos2.incomplete.message/server.rpt @@ -142,8 +142,10 @@ read advised zilla:flush ${kafka:matchFlushEx() .consumer() .progress(0, 3, mqtt_kafka:subscribeMetadata() - .metadata(1) - .build()) + .v2() + .metadata(1) + .build() + .build()) .correlationId(1) .build() .build()} @@ -240,8 +242,10 @@ write zilla:begin.ext ${kafka:beginEx() .capabilities("FETCH_ONLY") .topic("mqtt-messages") .partition(0, 2, 3, 3, mqtt_kafka:subscribeMetadata() - .metadata(1) - .build()) + .v2() + .metadata(1) + .build() + .build()) .build() .build()} diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.reconnect.replay.qos2.unreceived.message/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.reconnect.replay.qos2.unreceived.message/client.rpt index 1183e456e7..ff5b94c545 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.reconnect.replay.qos2.unreceived.message/client.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.reconnect.replay.qos2.unreceived.message/client.rpt @@ -268,8 +268,10 @@ write advise zilla:flush ${kafka:flushEx() .consumer() .progress(0, 3, mqtt_kafka:subscribeMetadata() - .metadata(2) - .build()) + .v2() + .metadata(2) + .build() + .build()) .correlationId(2) .build() .build()} diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.reconnect.replay.qos2.unreceived.message/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.reconnect.replay.qos2.unreceived.message/server.rpt index 8e881de11b..5432b60077 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.reconnect.replay.qos2.unreceived.message/server.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.reconnect.replay.qos2.unreceived.message/server.rpt @@ -260,8 +260,10 @@ read advised zilla:flush ${kafka:matchFlushEx() .consumer() .progress(0, 3, mqtt_kafka:subscribeMetadata() - .metadata(2) - .build()) + .v2() + .metadata(2) + .build() + .build()) .correlationId(2) .build() .build()} diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.replay.retained.message.qos2/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.replay.retained.message.qos2/client.rpt index faaed22e95..486622e4d6 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.replay.retained.message.qos2/client.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.replay.retained.message.qos2/client.rpt @@ -218,8 +218,10 @@ write advise zilla:flush ${kafka:flushEx() .consumer() .progress(0, 3, mqtt_kafka:subscribeMetadata() - .metadata(1) - .build()) + .v2() + .metadata(1) + .build() + .build()) .correlationId(1) .build() .build()} diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.replay.retained.message.qos2/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.replay.retained.message.qos2/server.rpt index 9ce6f74566..83b365e3c0 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.replay.retained.message.qos2/server.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.replay.retained.message.qos2/server.rpt @@ -212,8 +212,10 @@ read advised zilla:flush ${kafka:matchFlushEx() .consumer() .progress(0, 3, mqtt_kafka:subscribeMetadata() - .metadata(1) - .build()) + .v2() + .metadata(1) + .build() + .build()) .correlationId(1) .build() .build()} diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.qos2.version1.offset.metadata/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.qos2.version1.offset.metadata/client.rpt new file mode 100644 index 0000000000..57bd5f0435 --- /dev/null +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.qos2.version1.offset.metadata/client.rpt @@ -0,0 +1,65 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +connect "zilla://streams/mqtt0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${mqtt:beginEx() + .typeId(zilla:id("mqtt")) + .subscribe() + .clientId("client") + .qos("EXACTLY_ONCE") + .filter("sensor/one", 1, "EXACTLY_ONCE") + .build() + .build()} + +connected + +read zilla:data.ext ${mqtt:matchDataEx() + .typeId(zilla:id("mqtt")) + .subscribe() + .topic("sensor/one") + .packetId(1) + .qos("EXACTLY_ONCE") + .subscriptionId(1) + .format("TEXT") + .build() + .build()} +read "message" + +write advise zilla:flush ${mqtt:flushEx() + .typeId(zilla:id("mqtt")) + .subscribe() + .qos("EXACTLY_ONCE") + .packetId(1) + .state("INCOMPLETE") + .build() + .build()} + +read advised zilla:flush ${mqtt:flushEx() + .typeId(zilla:id("mqtt")) + .subscribe() + .packetId(1) + .build() + .build()} + +write advise zilla:flush ${mqtt:flushEx() + .typeId(zilla:id("mqtt")) + .subscribe() + .qos("EXACTLY_ONCE") + .packetId(1) + .build() + .build()} diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.qos2.version1.offset.metadata/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.qos2.version1.offset.metadata/server.rpt new file mode 100644 index 0000000000..a8ac75fd8c --- /dev/null +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.qos2.version1.offset.metadata/server.rpt @@ -0,0 +1,67 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +accept "zilla://streams/mqtt0" + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${mqtt:matchBeginEx() + .typeId(zilla:id("mqtt")) + .subscribe() + .clientId("client") + .qos("EXACTLY_ONCE") + .filter("sensor/one", 1, "EXACTLY_ONCE") + .build() + .build()} + +connected + +write zilla:data.ext ${mqtt:dataEx() + .typeId(zilla:id("mqtt")) + .subscribe() + .topic("sensor/one") + .packetId(1) + .qos("EXACTLY_ONCE") + .subscriptionId(1) + .format("TEXT") + .build() + .build()} +write "message" + +read advised zilla:flush ${mqtt:flushEx() + .typeId(zilla:id("mqtt")) + .subscribe() + .qos("EXACTLY_ONCE") + .packetId(1) + .state("INCOMPLETE") + .build() + .build()} + +write advise zilla:flush ${mqtt:flushEx() + .typeId(zilla:id("mqtt")) + .subscribe() + .packetId(1) + .build() + .build()} + +read advised zilla:flush ${mqtt:flushEx() + .typeId(zilla:id("mqtt")) + .subscribe() + .qos("EXACTLY_ONCE") + .packetId(1) + .build() + .build()} diff --git a/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/internal/MqttKafkaFunctionsTest.java b/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/internal/MqttKafkaFunctionsTest.java index d8a121a61f..d15ea33c29 100644 --- a/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/internal/MqttKafkaFunctionsTest.java +++ b/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/internal/MqttKafkaFunctionsTest.java @@ -37,21 +37,45 @@ public void shouldGetMapper() assertEquals("mqtt_kafka", mapper.getPrefixName()); } @Test - public void shouldEncodeMqttOffsetMetadata() + public void shouldEncodeMqttOffsetMetadataV1() { final String state = MqttKafkaFunctions.subscribeMetadata() - .metadata(1) - .metadata(2) + .v1() + .metadata(10) + .metadata(14) + .metadata(15) + .build() .build(); final IntArrayList metadataList = new IntArrayList(); UnsafeBuffer buffer = new UnsafeBuffer(BitUtil.fromHex(state)); MqttSubscribeOffsetMetadataFW offsetMetadata = new MqttSubscribeOffsetMetadataFW().wrap(buffer, 0, buffer.capacity()); - offsetMetadata.packetIds().forEachRemaining((IntConsumer) metadataList::add); + offsetMetadata.subscribeMetadataV1().packetIds().forEachRemaining((IntConsumer) metadataList::add); - assertEquals(1, offsetMetadata.version()); - assertEquals(1, (int) metadataList.get(0)); - assertEquals(2, (int) metadataList.get(1)); + assertEquals(10, (int) metadataList.get(0)); + assertEquals(14, (int) metadataList.get(1)); + assertEquals(15, (int) metadataList.get(2)); + } + + @Test + public void shouldEncodeMqttOffsetMetadataV2() + { + final String state = MqttKafkaFunctions.subscribeMetadata() + .v2() + .metadata(10) + .metadata(14) + .metadata(15) + .build() + .build(); + + final IntArrayList metadataList = new IntArrayList(); + UnsafeBuffer buffer = new UnsafeBuffer(BitUtil.fromHex(state)); + MqttSubscribeOffsetMetadataFW offsetMetadata = new MqttSubscribeOffsetMetadataFW().wrap(buffer, 0, buffer.capacity()); + offsetMetadata.subscribeMetadataV2().packetIds().forEachRemaining((IntConsumer) metadataList::add); + + assertEquals(10, (int) metadataList.get(0)); + assertEquals(14, (int) metadataList.get(1)); + assertEquals(15, (int) metadataList.get(2)); } @Test diff --git a/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/KafkaIT.java b/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/KafkaIT.java index bcfa362d58..4bb2e16b56 100644 --- a/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/KafkaIT.java +++ b/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/KafkaIT.java @@ -1006,6 +1006,15 @@ public void shouldReceiveMessageQoS2() throws Exception k3po.finish(); } + @Test + @Specification({ + "${kafka}/subscribe.qos2.version1.offset.metadata/client", + "${kafka}/subscribe.qos2.version1.offset.metadata/server"}) + public void shouldReceiveMessageQoS2WithVersion1OffsetMetadata() throws Exception + { + k3po.finish(); + } + @Test @Specification({ "${kafka}/subscribe.receive.messages.mixture.qos/client", diff --git a/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/MqttIT.java b/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/MqttIT.java index 982482d72e..2ff6efcb53 100644 --- a/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/MqttIT.java +++ b/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/MqttIT.java @@ -792,6 +792,15 @@ public void shouldReceiveMessageQoS2() throws Exception k3po.finish(); } + @Test + @Specification({ + "${mqtt}/subscribe.qos2.version1.offset.metadata/client", + "${mqtt}/subscribe.qos2.version1.offset.metadata/server"}) + public void shouldReceiveMessageQoS2WithVersion1OffsetMetadata() throws Exception + { + k3po.finish(); + } + @Test @Specification({ "${mqtt}/subscribe.receive.messages.mixture.qos/client",