Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize memory allocation for mqtt-kafka offset tracking #694

Merged
merged 3 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ public class MqttKafkaSessionFactory implements MqttKafkaStreamFactory
public static final int MQTT_NOT_AUTHORIZED = 0x87;
public static final int MQTT_IMPLEMENTATION_SPECIFIC_ERROR = 0x83;
public static final String MQTT_INVALID_SESSION_TIMEOUT_REASON = "Invalid session expiry interval";
private static final String16FW EMPTY_STRING = new String16FW("");

static
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.IntConsumer;
import java.util.function.LongFunction;
import java.util.function.LongUnaryOperator;
import java.util.function.Supplier;
Expand Down Expand Up @@ -108,6 +109,8 @@ public class MqttKafkaSubscribeFactory implements MqttKafkaStreamFactory
private static final int DATA_FLAG_INIT = 0x02;
private static final int DATA_FLAG_FIN = 0x01;
private static final OctetsFW EMPTY_OCTETS = new OctetsFW().wrap(new UnsafeBuffer(new byte[0]), 0, 0);
private static final String16FW EMPTY_STRING = new String16FW("");
private static final int OFFSET_METADATA_VERSION = 1;

private final OctetsFW emptyRO = new OctetsFW().wrap(new UnsafeBuffer(0L, 0), 0, 0);
private final BeginFW beginRO = new BeginFW();
Expand Down Expand Up @@ -1200,8 +1203,8 @@ else if (state == MqttOffsetStateFlags.INCOMPLETE)
{
p.partitionId(offset.partitionId).partitionOffset(offset.offset + 1);
final IntArrayList incomplete = incompletePacketIds.get(offset.partitionId);
final String partitionMetadata =
incomplete == null || incomplete.isEmpty() ? "" : offSetMetadataListToString(incomplete);
final String16FW partitionMetadata = incomplete == null || incomplete.isEmpty() ?
EMPTY_STRING : offsetMetadataListToString(incomplete);
p.metadata(partitionMetadata);
});
f.correlationId(correlationId);
Expand Down Expand Up @@ -1826,26 +1829,25 @@ public void flushDataIfNecessary(
}
}

//TODO: how to make these more efficient while keeping the internal object easily modifieable (not using FW)?
private IntArrayList stringToOffsetMetadataList(
String16FW metadata)
{
final IntArrayList metadataList = new IntArrayList();
UnsafeBuffer buffer = new UnsafeBuffer(BitUtil.fromHex(metadata.asString()));
final MqttOffsetMetadataFW offsetMetadata = mqttOffsetMetadataRO.wrap(buffer, 0, buffer.capacity());
offsetMetadata.metadata().forEach(m -> metadataList.add(m.packetId()));
offsetMetadata.packetIds().forEachRemaining((IntConsumer) metadataList::add);
return metadataList;
}

private String offSetMetadataListToString(
private String16FW offsetMetadataListToString(
IntArrayList metadataList)
{
mqttOffsetMetadataRW.wrap(offsetBuffer, 0, offsetBuffer.capacity());
metadataList.forEach(m -> mqttOffsetMetadataRW.metadataItem(mi -> mi.packetId(m)));
mqttOffsetMetadataRW.version(OFFSET_METADATA_VERSION);
metadataList.forEach(p -> mqttOffsetMetadataRW.appendPacketIds(p.shortValue()));
final MqttOffsetMetadataFW offsetMetadata = mqttOffsetMetadataRW.build();
final byte[] array = new byte[offsetMetadata.sizeof()];
offsetMetadata.buffer().getBytes(offsetMetadata.offset(), array);
return BitUtil.toHex(array);
return new String16FW(BitUtil.toHex(offsetMetadata.buffer().byteArray(),
offsetMetadata.offset(), offsetMetadata.limit()));
}

final class KafkaRetainedProxy extends KafkaProxy
Expand Down Expand Up @@ -1972,8 +1974,8 @@ protected void doKafkaConsumerFlush(
{
p.partitionId(offset.partitionId).partitionOffset(offset.offset + 1);
final IntArrayList incomplete = incompletePacketIds.get(offset.partitionId);
final String partitionMetadata =
incomplete == null || incomplete.isEmpty() ? "" : offSetMetadataListToString(incomplete);
final String16FW partitionMetadata = incomplete == null || incomplete.isEmpty() ?
EMPTY_STRING : offsetMetadataListToString(incomplete);
p.metadata(partitionMetadata);
});
f.correlationId(correlationId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -850,25 +850,27 @@ public static final class MqttOffsetMetadataBuilder
{
private final MqttOffsetMetadataFW.Builder offsetMetadataRW = new MqttOffsetMetadataFW.Builder();

byte version = 1;


private MqttOffsetMetadataBuilder()
{
MutableDirectBuffer writeBuffer = new UnsafeBuffer(new byte[1024 * 8]);
offsetMetadataRW.wrap(writeBuffer, 0, writeBuffer.capacity());
offsetMetadataRW.version(version);
}

public MqttOffsetMetadataBuilder metadata(
int packetId)
{
offsetMetadataRW.metadataItem(f -> f.packetId(packetId));
offsetMetadataRW.appendPacketIds((short) packetId);
return this;
}

public String build()
{
final MqttOffsetMetadataFW offsetMetadata = offsetMetadataRW.build();
final byte[] array = new byte[offsetMetadata.sizeof()];
offsetMetadata.buffer().getBytes(offsetMetadata.offset(), array);
return BitUtil.toHex(array);
return BitUtil.toHex(offsetMetadata.buffer().byteArray(), offsetMetadata.offset(), offsetMetadata.limit());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,15 +249,11 @@ scope mqtt
INCOMPLETE(1)
}

struct MqttOffsetState
{
uint16 packetId;
}

struct MqttOffsetMetadata
{
uint8 version = 1;
MqttOffsetState[] metadata;
uint8 length;
int16[length] packetIds;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@

import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.function.IntConsumer;

import org.agrona.BitUtil;
import org.agrona.DirectBuffer;
import org.agrona.collections.IntArrayList;
import org.agrona.concurrent.UnsafeBuffer;
import org.junit.Test;
import org.kaazing.k3po.lang.el.BytesMatcher;
Expand Down Expand Up @@ -1272,16 +1274,14 @@ public void shouldEncodeMqttOffsetMetadata()
.metadata(2)
.build();

DirectBuffer buffer = new UnsafeBuffer(BitUtil.fromHex(state));
final IntArrayList metadataList = new IntArrayList();
UnsafeBuffer buffer = new UnsafeBuffer(BitUtil.fromHex(state));
MqttOffsetMetadataFW offsetMetadata = new MqttOffsetMetadataFW().wrap(buffer, 0, buffer.capacity());
offsetMetadata.packetIds().forEachRemaining((IntConsumer) metadataList::add);

assertNotNull(offsetMetadata.metadata()
.matchFirst(m ->
1 == m.packetId()));

assertNotNull(offsetMetadata.metadata()
.matchFirst(m ->
2 == m.packetId()));
assertEquals(1, offsetMetadata.version());
assertEquals(1, (int) metadataList.get(0));
assertEquals(2, (int) metadataList.get(1));
}

@Test
Expand Down