Skip to content

Commit

Permalink
Optimize memory allocation for mqtt-kafka offset tracking (#694)
Browse files Browse the repository at this point in the history
  • Loading branch information
bmaidics authored Jan 15, 2024
1 parent e45ca4e commit 6191589
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ public class MqttKafkaSessionFactory implements MqttKafkaStreamFactory
public static final int MQTT_NOT_AUTHORIZED = 0x87;
public static final int MQTT_IMPLEMENTATION_SPECIFIC_ERROR = 0x83;
public static final String MQTT_INVALID_SESSION_TIMEOUT_REASON = "Invalid session expiry interval";
private static final String16FW EMPTY_STRING = new String16FW("");

static
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.IntConsumer;
import java.util.function.LongFunction;
import java.util.function.LongUnaryOperator;
import java.util.function.Supplier;
Expand Down Expand Up @@ -108,6 +109,8 @@ public class MqttKafkaSubscribeFactory implements MqttKafkaStreamFactory
private static final int DATA_FLAG_INIT = 0x02;
private static final int DATA_FLAG_FIN = 0x01;
private static final OctetsFW EMPTY_OCTETS = new OctetsFW().wrap(new UnsafeBuffer(new byte[0]), 0, 0);
private static final String16FW EMPTY_STRING = new String16FW("");
private static final int OFFSET_METADATA_VERSION = 1;

private final OctetsFW emptyRO = new OctetsFW().wrap(new UnsafeBuffer(0L, 0), 0, 0);
private final BeginFW beginRO = new BeginFW();
Expand Down Expand Up @@ -1199,8 +1202,8 @@ else if (state == MqttOffsetStateFlags.INCOMPLETE)
{
p.partitionId(offset.partitionId).partitionOffset(offset.offset + 1);
final IntArrayList incomplete = incompletePacketIds.get(offset.partitionId);
final String partitionMetadata =
incomplete == null || incomplete.isEmpty() ? "" : offSetMetadataListToString(incomplete);
final String16FW partitionMetadata = incomplete == null || incomplete.isEmpty() ?
EMPTY_STRING : offsetMetadataListToString(incomplete);
p.metadata(partitionMetadata);
});
f.correlationId(correlationId);
Expand Down Expand Up @@ -1824,26 +1827,25 @@ public void flushDataIfNecessary(
}
}

//TODO: how to make these more efficient while keeping the internal object easily modifieable (not using FW)?
private IntArrayList stringToOffsetMetadataList(
String16FW metadata)
{
final IntArrayList metadataList = new IntArrayList();
UnsafeBuffer buffer = new UnsafeBuffer(BitUtil.fromHex(metadata.asString()));
final MqttOffsetMetadataFW offsetMetadata = mqttOffsetMetadataRO.wrap(buffer, 0, buffer.capacity());
offsetMetadata.metadata().forEach(m -> metadataList.add(m.packetId()));
offsetMetadata.packetIds().forEachRemaining((IntConsumer) metadataList::add);
return metadataList;
}

private String offSetMetadataListToString(
private String16FW offsetMetadataListToString(
IntArrayList metadataList)
{
mqttOffsetMetadataRW.wrap(offsetBuffer, 0, offsetBuffer.capacity());
metadataList.forEach(m -> mqttOffsetMetadataRW.metadataItem(mi -> mi.packetId(m)));
mqttOffsetMetadataRW.version(OFFSET_METADATA_VERSION);
metadataList.forEach(p -> mqttOffsetMetadataRW.appendPacketIds(p.shortValue()));
final MqttOffsetMetadataFW offsetMetadata = mqttOffsetMetadataRW.build();
final byte[] array = new byte[offsetMetadata.sizeof()];
offsetMetadata.buffer().getBytes(offsetMetadata.offset(), array);
return BitUtil.toHex(array);
return new String16FW(BitUtil.toHex(offsetMetadata.buffer().byteArray(),
offsetMetadata.offset(), offsetMetadata.limit()));
}

final class KafkaRetainedProxy extends KafkaProxy
Expand Down Expand Up @@ -1970,8 +1972,8 @@ protected void doKafkaConsumerFlush(
{
p.partitionId(offset.partitionId).partitionOffset(offset.offset + 1);
final IntArrayList incomplete = incompletePacketIds.get(offset.partitionId);
final String partitionMetadata =
incomplete == null || incomplete.isEmpty() ? "" : offSetMetadataListToString(incomplete);
final String16FW partitionMetadata = incomplete == null || incomplete.isEmpty() ?
EMPTY_STRING : offsetMetadataListToString(incomplete);
p.metadata(partitionMetadata);
});
f.correlationId(correlationId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -864,25 +864,27 @@ public static final class MqttOffsetMetadataBuilder
{
private final MqttOffsetMetadataFW.Builder offsetMetadataRW = new MqttOffsetMetadataFW.Builder();

byte version = 1;


private MqttOffsetMetadataBuilder()
{
MutableDirectBuffer writeBuffer = new UnsafeBuffer(new byte[1024 * 8]);
offsetMetadataRW.wrap(writeBuffer, 0, writeBuffer.capacity());
offsetMetadataRW.version(version);
}

public MqttOffsetMetadataBuilder metadata(
int packetId)
{
offsetMetadataRW.metadataItem(f -> f.packetId(packetId));
offsetMetadataRW.appendPacketIds((short) packetId);
return this;
}

public String build()
{
final MqttOffsetMetadataFW offsetMetadata = offsetMetadataRW.build();
final byte[] array = new byte[offsetMetadata.sizeof()];
offsetMetadata.buffer().getBytes(offsetMetadata.offset(), array);
return BitUtil.toHex(array);
return BitUtil.toHex(offsetMetadata.buffer().byteArray(), offsetMetadata.offset(), offsetMetadata.limit());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,15 +250,11 @@ scope mqtt
INCOMPLETE(1)
}

struct MqttOffsetState
{
uint16 packetId;
}

struct MqttOffsetMetadata
{
uint8 version = 1;
MqttOffsetState[] metadata;
uint8 length;
int16[length] packetIds;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@

import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.function.IntConsumer;

import org.agrona.BitUtil;
import org.agrona.DirectBuffer;
import org.agrona.collections.IntArrayList;
import org.agrona.concurrent.UnsafeBuffer;
import org.junit.Test;
import org.kaazing.k3po.lang.el.BytesMatcher;
Expand Down Expand Up @@ -1278,16 +1280,14 @@ public void shouldEncodeMqttOffsetMetadata()
.metadata(2)
.build();

DirectBuffer buffer = new UnsafeBuffer(BitUtil.fromHex(state));
final IntArrayList metadataList = new IntArrayList();
UnsafeBuffer buffer = new UnsafeBuffer(BitUtil.fromHex(state));
MqttOffsetMetadataFW offsetMetadata = new MqttOffsetMetadataFW().wrap(buffer, 0, buffer.capacity());
offsetMetadata.packetIds().forEachRemaining((IntConsumer) metadataList::add);

assertNotNull(offsetMetadata.metadata()
.matchFirst(m ->
1 == m.packetId()));

assertNotNull(offsetMetadata.metadata()
.matchFirst(m ->
2 == m.packetId()));
assertEquals(1, offsetMetadata.version());
assertEquals(1, (int) metadataList.get(0));
assertEquals(2, (int) metadataList.get(1));
}

@Test
Expand Down

0 comments on commit 6191589

Please sign in to comment.