Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix NegativeArraySizeException when receiving mqttFlush #1100

Merged
merged 2 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaPublishConfig;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaTopicsConfig;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.MqttKafkaBinding;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.MqttQoS;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.String16FW;
import io.aklivity.zilla.runtime.engine.config.OptionsConfig;
import io.aklivity.zilla.runtime.engine.config.OptionsConfigAdapterSpi;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.MqttQoS;

public class MqttKafkaOptionsConfigAdapter implements OptionsConfigAdapterSpi, JsonbAdapter<OptionsConfig, JsonObject>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.KafkaSkip;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.MqttPayloadFormat;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.MqttQoS;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.MqttSubscribeOffsetMetadataFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.MqttTopicFilterFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.OctetsFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.String16FW;
Expand Down Expand Up @@ -93,6 +92,7 @@
import io.aklivity.zilla.runtime.engine.binding.function.MessageConsumer;
import io.aklivity.zilla.runtime.engine.buffer.BufferPool;
import io.aklivity.zilla.runtime.engine.concurrent.Signaler;
import io.aklivity.zilla.specs.binding.mqtt.kafka.internal.types.MqttSubscribeOffsetMetadataFW;

public class MqttKafkaSubscribeFactory implements MqttKafkaStreamFactory
{
Expand All @@ -109,7 +109,7 @@ public class MqttKafkaSubscribeFactory implements MqttKafkaStreamFactory
private static final int DATA_FLAG_FIN = 0x01;
private static final OctetsFW EMPTY_OCTETS = new OctetsFW().wrap(new UnsafeBuffer(new byte[0]), 0, 0);
private static final String16FW EMPTY_STRING = new String16FW("");
private static final int OFFSET_METADATA_VERSION = 1;
private static final int OFFSET_METADATA_VERSION = 2;

private final OctetsFW emptyRO = new OctetsFW().wrap(new UnsafeBuffer(0L, 0), 0, 0);
private final BeginFW beginRO = new BeginFW();
Expand Down Expand Up @@ -467,93 +467,118 @@ private void onMqttFlush(
binding.resolveAll(authorization, filters) : null;
final int packetId = mqttSubscribeFlushEx.packetId();

if (!filters.isEmpty())
if (!MqttKafkaState.replyClosed(state))
{
if (routes != null)
if (!filters.isEmpty())
{
routes.forEach(r ->
{
final long routeOrder = r.order;
if (!messages.containsKey(routeOrder))
{
KafkaMessagesProxy messagesProxy = new KafkaMessagesProxy(originId, r, this);
messages.put(routeOrder, messagesProxy);
messagesPerTopicKey.put(messagesProxy.topicKey, r.order);
messagesProxy.doKafkaBegin(traceId, authorization, 0, filters);
}
else
{
messages.get(routeOrder).doKafkaFlush(traceId, authorization, budgetId, reserved, qos, filters);
}
});
onFiltersChanged(traceId, authorization, budgetId, reserved, filters, routes);
}

if (retainAvailable)
else if (packetId > 0)
{
final List<Subscription> retainedFilters = new ArrayList<>();
filters.forEach(filter ->
{
final boolean sendRetained = (filter.flags() & SEND_RETAIN_FLAG) != 0;
if (sendRetained)
{
retainedFilters.add(new Subscription(
(int) filter.subscriptionId(), newString16FW(filter.pattern()), filter.qos(), filter.flags()));
final boolean rap = (filter.flags() & RETAIN_AS_PUBLISHED_FLAG) != 0;
retainAsPublished.put((int) filter.subscriptionId(), rap);
}
});

retainedSubscriptions.removeIf(rf -> !filters.anyMatch(f -> f.pattern().equals(rf.filter)));
if (!retainedFilters.isEmpty())
{
if (MqttKafkaState.initialOpened(retained.state) && !MqttKafkaState.initialClosed(retained.state))
{
retained.doKafkaFlush(traceId, authorization, budgetId, reserved, qos, retainedFilters);
}
else
{
final List<Subscription> newRetainedFilters = new ArrayList<>();
retainedFilters.forEach(subscription ->
{
if (!retainedSubscriptions.contains(subscription))
{
newRetainedFilters.add(subscription);
}
});
retained.doKafkaBegin(traceId, authorization, 0, newRetainedFilters);
}
}
onMessageAcked(traceId, authorization, budgetId, reserved, packetId, mqttSubscribeFlushEx);
}
}
else if (packetId > 0)
{
final int qos = mqttSubscribeFlushEx.qos();
final MqttOffsetStateFlags state = MqttOffsetStateFlags.valueOf(mqttSubscribeFlushEx.state());
final PartitionOffset offset = state == MqttOffsetStateFlags.INCOMPLETE ?
offsetsPerPacketId.get(packetId) : offsetsPerPacketId.remove(packetId);
}

private void onMessageAcked(
long traceId,
long authorization,
long budgetId,
int reserved,
int packetId,
MqttSubscribeFlushExFW mqttSubscribeFlushEx)
{
final int qos = mqttSubscribeFlushEx.qos();
final MqttOffsetStateFlags state = MqttOffsetStateFlags.valueOf(mqttSubscribeFlushEx.state());
final PartitionOffset offset = state == MqttOffsetStateFlags.INCOMPLETE ?
offsetsPerPacketId.get(packetId) : offsetsPerPacketId.remove(packetId);

final long topicPartitionKey = topicPartitionKey(offset.topicKey, offset.partitionId);
final long messagesId = messagesPerTopicKey.get(offset.topicKey);
final long topicPartitionKey = topicPartitionKey(offset.topicKey, offset.partitionId);
final long messagesId = messagesPerTopicKey.get(offset.topicKey);

OffsetCommit offsetCommit = new OffsetCommit(offset, qos, state, packetId);
OffsetCommit offsetCommit = new OffsetCommit(offset, qos, state, packetId);

final OffsetHighWaterMark highWaterMark = highWaterMarks.get(topicPartitionKey);
final OffsetHighWaterMark highWaterMark = highWaterMarks.get(topicPartitionKey);

final KafkaProxy proxy = messagesId != -1 ? messages.get(messagesId) : retained;
final KafkaProxy proxy = messagesId != -1 ? messages.get(messagesId) : retained;

if (highWaterMark.offset >= offset.offset)
{
highWaterMark.increase();
commitOffset(traceId, authorization, budgetId, reserved, proxy, offsetCommit);
commitDeferredOffsets(traceId, authorization, budgetId, reserved, highWaterMark);
}
else if (qos == MqttQoS.EXACTLY_ONCE.value() && state != MqttOffsetStateFlags.INCOMPLETE)
{
commitOffset(traceId, authorization, budgetId, reserved, proxy, offsetCommit);
}
else
{
highWaterMark.deferOffsetCommit(offsetCommit, proxy);
}
}

if (highWaterMark.offset >= offset.offset)
private void onFiltersChanged(
long traceId,
long authorization,
long budgetId,
int reserved,
Array32FW<MqttTopicFilterFW> filters,
List<MqttKafkaRouteConfig> routes)
{
if (routes != null)
{
routes.forEach(r ->
{
highWaterMark.increase();
commitOffset(traceId, authorization, budgetId, reserved, proxy, offsetCommit);
commitDeferredOffsets(traceId, authorization, budgetId, reserved, highWaterMark);
}
else if (qos == MqttQoS.EXACTLY_ONCE.value() && state != MqttOffsetStateFlags.INCOMPLETE)
final long routeOrder = r.order;
if (!messages.containsKey(routeOrder))
{
KafkaMessagesProxy messagesProxy = new KafkaMessagesProxy(originId, r, this);
messages.put(routeOrder, messagesProxy);
messagesPerTopicKey.put(messagesProxy.topicKey, r.order);
messagesProxy.doKafkaBegin(traceId, authorization, 0, filters);
}
else
{
messages.get(routeOrder).doKafkaFlush(traceId, authorization, budgetId, reserved, qos, filters);
}
});
}

if (retainAvailable)
{
final List<Subscription> retainedFilters = new ArrayList<>();
filters.forEach(filter ->
{
commitOffset(traceId, authorization, budgetId, reserved, proxy, offsetCommit);
}
else
final boolean sendRetained = (filter.flags() & SEND_RETAIN_FLAG) != 0;
if (sendRetained)
{
retainedFilters.add(new Subscription((int) filter.subscriptionId(),
newString16FW(filter.pattern()), filter.qos(), filter.flags()));
final boolean rap = (filter.flags() & RETAIN_AS_PUBLISHED_FLAG) != 0;
retainAsPublished.put((int) filter.subscriptionId(), rap);
}
});

retainedSubscriptions.removeIf(rf -> !filters.anyMatch(f -> f.pattern().equals(rf.filter)));
if (!retainedFilters.isEmpty())
{
highWaterMark.deferOffsetCommit(offsetCommit, proxy);
if (MqttKafkaState.initialOpened(retained.state) && !MqttKafkaState.initialClosed(retained.state))
{
retained.doKafkaFlush(traceId, authorization, budgetId, reserved, qos, retainedFilters);
}
else
{
final List<Subscription> newRetainedFilters = new ArrayList<>();
retainedFilters.forEach(subscription ->
{
if (!retainedSubscriptions.contains(subscription))
{
newRetainedFilters.add(subscription);
}
});
retained.doKafkaBegin(traceId, authorization, 0, newRetainedFilters);
}
}
}
}
Expand Down Expand Up @@ -1846,16 +1871,24 @@ private IntArrayList stringToOffsetMetadataList(
final IntArrayList metadataList = new IntArrayList();
UnsafeBuffer buffer = new UnsafeBuffer(BitUtil.fromHex(metadata.asString()));
final MqttSubscribeOffsetMetadataFW offsetMetadata = mqttOffsetMetadataRO.wrap(buffer, 0, buffer.capacity());
offsetMetadata.packetIds().forEachRemaining((IntConsumer) metadataList::add);
switch (offsetMetadata.kind())
{
case V1:
offsetMetadata.subscribeMetadataV1().packetIds().forEachRemaining((IntConsumer) metadataList::add);
break;
case V2:
offsetMetadata.subscribeMetadataV2().packetIds().forEachRemaining((IntConsumer) metadataList::add);
break;
}

return metadataList;
}

private String16FW offsetMetadataListToString(
IntArrayList metadataList)
{
mqttOffsetMetadataRW.wrap(offsetBuffer, 0, offsetBuffer.capacity());
mqttOffsetMetadataRW.version(OFFSET_METADATA_VERSION);
metadataList.forEach(p -> mqttOffsetMetadataRW.appendPacketIds(p.shortValue()));
mqttOffsetMetadataRW.subscribeMetadataV2(m -> metadataList.forEach(p -> m.appendPacketIds(p.shortValue())));
final MqttSubscribeOffsetMetadataFW offsetMetadata = mqttOffsetMetadataRW.build();
return new String16FW(BitUtil.toHex(offsetMetadata.buffer().byteArray(),
offsetMetadata.offset(), offsetMetadata.limit()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,17 @@ public void shouldReceiveMessageQoS2() throws Exception
k3po.finish();
}

@Test
@Configuration("proxy.yaml")
@Configure(name = WILL_AVAILABLE_NAME, value = "false")
@Specification({
"${mqtt}/subscribe.qos2.version1.offset.metadata/client",
"${kafka}/subscribe.qos2.version1.offset.metadata/server"})
public void shouldReceiveMessageQoS2WithVersion1OffsetMetadata() throws Exception
{
k3po.finish();
}

@Test
@Configuration("proxy.yaml")
@Configure(name = WILL_AVAILABLE_NAME, value = "false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import io.aklivity.k3po.runtime.lang.el.spi.FunctionMapperSpi;
import io.aklivity.zilla.specs.binding.mqtt.kafka.internal.types.MqttPublishOffsetMetadataFW;
import io.aklivity.zilla.specs.binding.mqtt.kafka.internal.types.MqttSubscribeOffsetMetadataFW;
import io.aklivity.zilla.specs.binding.mqtt.kafka.internal.types.MqttSubscribeOffsetMetadataV1FW;
import io.aklivity.zilla.specs.binding.mqtt.kafka.internal.types.MqttSubscribeOffsetMetadataV2FW;
import io.aklivity.zilla.specs.binding.mqtt.kafka.internal.types.MqttSubscribeOffsetMetadataVersion;

public final class MqttKafkaFunctions
{
Expand All @@ -40,30 +43,84 @@ public static MqttPublishOffsetMetadataBuilder publishMetadata()

public static final class MqttSubscribeOffsetMetadataBuilder
{
private final MqttSubscribeOffsetMetadataFW.Builder offsetMetadataRW = new MqttSubscribeOffsetMetadataFW.Builder();

byte version = 1;
private final MqttSubscribeOffsetMetadataFW.Builder offsetMetadataRW =
new MqttSubscribeOffsetMetadataFW.Builder();

private final MqttSubscribeOffsetMetadataFW offsetMetadataRO = new MqttSubscribeOffsetMetadataFW();
private final MutableDirectBuffer writeBuffer = new UnsafeBuffer(new byte[1024 * 8]);

private MqttSubscribeOffsetMetadataBuilder()
{
MutableDirectBuffer writeBuffer = new UnsafeBuffer(new byte[1024 * 8]);
offsetMetadataRW.wrap(writeBuffer, 0, writeBuffer.capacity());
offsetMetadataRW.version(version);
}

public MqttSubscribeOffsetMetadataBuilder metadata(
int packetId)
public MqttSubscribeOffsetMetadataV1Builder v1()
{
offsetMetadataRW.appendPacketIds((short) packetId);
return this;
offsetMetadataRW.kind(MqttSubscribeOffsetMetadataVersion.V1);
return new MqttSubscribeOffsetMetadataV1Builder();
}

public MqttSubscribeOffsetMetadataV2Builder v2()
{
offsetMetadataRW.kind(MqttSubscribeOffsetMetadataVersion.V2);
return new MqttSubscribeOffsetMetadataV2Builder();
}

public String build()
{
final MqttSubscribeOffsetMetadataFW offsetMetadata = offsetMetadataRW.build();
final MqttSubscribeOffsetMetadataFW offsetMetadata = offsetMetadataRO;
return BitUtil.toHex(offsetMetadata.buffer().byteArray(), offsetMetadata.offset(), offsetMetadata.limit());
}

public final class MqttSubscribeOffsetMetadataV1Builder
{
private final MqttSubscribeOffsetMetadataV1FW.Builder offsetMetadataV1RW =
new MqttSubscribeOffsetMetadataV1FW.Builder();

private MqttSubscribeOffsetMetadataV1Builder()
{
offsetMetadataV1RW.wrap(writeBuffer, 1, writeBuffer.capacity());
}

public MqttSubscribeOffsetMetadataV1Builder metadata(
int packetId)
{
offsetMetadataV1RW.appendPacketIds((short) packetId);
return this;
}

public MqttSubscribeOffsetMetadataBuilder build()
{
final MqttSubscribeOffsetMetadataV1FW offsetMetadataV1 = offsetMetadataV1RW.build();
offsetMetadataRO.wrap(writeBuffer, 0, offsetMetadataV1.limit());
return MqttSubscribeOffsetMetadataBuilder.this;
}
}

public final class MqttSubscribeOffsetMetadataV2Builder
{
private final MqttSubscribeOffsetMetadataV2FW.Builder offsetMetadataV2RW =
new MqttSubscribeOffsetMetadataV2FW.Builder();

private MqttSubscribeOffsetMetadataV2Builder()
{
offsetMetadataV2RW.wrap(writeBuffer, 1, writeBuffer.capacity());
}

public MqttSubscribeOffsetMetadataV2Builder metadata(
int packetId)
{
offsetMetadataV2RW.appendPacketIds((short) packetId);
return this;
}

public MqttSubscribeOffsetMetadataBuilder build()
{
final MqttSubscribeOffsetMetadataV2FW offsetMetadataV2 = offsetMetadataV2RW.build();
offsetMetadataRO.wrap(writeBuffer, 0, offsetMetadataV2.limit());
return MqttSubscribeOffsetMetadataBuilder.this;
}
}
}

public static final class MqttPublishOffsetMetadataBuilder
Expand Down
Loading