From 6e7308d76ad7e58a1e4641e106b0ae83bf41b18c Mon Sep 17 00:00:00 2001 From: Ankit Kumar Date: Fri, 14 Feb 2025 20:32:29 +0530 Subject: [PATCH 1/2] fix: mqtt-kafka routing fix --- .../internal/stream/AsyncapiProxyFactory.java | 2 +- .../stream/MqttKafkaSessionFactory.java | 26 +++++++++---------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/stream/AsyncapiProxyFactory.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/stream/AsyncapiProxyFactory.java index d8e817dddb..06633bf65c 100644 --- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/stream/AsyncapiProxyFactory.java +++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/stream/AsyncapiProxyFactory.java @@ -192,7 +192,7 @@ public MessageConsumer newStream( } else { - // JRF: can we remove this? current used by will stream which arrives proactively, so no affinity + // JRF: can we remove this? current used by will stream which arrives proactively, so no compositeId Optional routeRef = binding.routes.stream().findFirst(); final long resolvedId = routeRef.map(r -> r.id).orElse(0L); final long resolvedApiId = routeRef.map(r -> composite.resolveApiId(r.with.apiId)).orElse(0L); diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java index 47ffb13180..7471c5783c 100644 --- a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java @@ -475,7 +475,7 @@ private MqttSessionProxy( this.resolvedId = resolvedId; this.initialId = initialId; this.replyId = supplyReplyId.applyAsLong(initialId); - this.session = new KafkaFetchWillSignalStream(originId, resolvedId, this); + this.session = new KafkaFetchWillSignalStream(routedId, resolvedId, this); this.sessionsTopic = sessionsTopic; this.sessionId = new String16FW(sessionIds.get(bindingId)); this.leaderEpochs = new Long2LongHashMap(-2); @@ -570,8 +570,8 @@ private void onMqttBegin( if (!isSetWillFlag(sessionFlags) || isSetCleanStart(sessionFlags)) { - final long routedId = session.routedId; - session = new KafkaSessionSignalStream(originId, routedId, this); + //final long routedId = session.routedId; + session = new KafkaSessionSignalStream(routedId, resolvedId, this); } if (isSetWillFlag(sessionFlags)) { @@ -1145,13 +1145,13 @@ private void openMetaStreams( messagesTopics.forEach(t -> { final KafkaMetaStream meta = - new KafkaMetaStream(originId, resolvedId, this, t, false); + new KafkaMetaStream(routedId, resolvedId, this, t, false); metas.add(meta); meta.doKafkaBegin(traceId, authorization, 0); }); final KafkaMetaStream retainedMeta = - new KafkaMetaStream(originId, resolvedId, this, retainedTopic, true); + new KafkaMetaStream(routedId, resolvedId, this, retainedTopic, true); metas.add(retainedMeta); retainedMeta.doKafkaBegin(traceId, authorization, 0); } @@ -1198,8 +1198,8 @@ private void onSessionBecomesLeader( if (publishQosMax < 2) { - final long routedId = session.routedId; - session = new KafkaSessionStateProxy(originId, routedId, this); + //final long routedId = session.routedId; + session = new KafkaSessionStateProxy(routedId, resolvedId, this); session.doKafkaBeginIfNecessary(traceId, authorization, 0); } else @@ -1266,8 +1266,8 @@ private void onOffsetFetched( if (unfetchedKafkaTopics == 0 && initProducer) { - final long routedId = session.routedId; - producerInit = new KafkaInitProducerStream(originId, routedId, this); + //final long routedId = session.routedId; + producerInit = new KafkaInitProducerStream(routedId, resolvedId, this); producerInit.doKafkaBegin(traceId, authorization, 0); } else if (unfetchedKafkaTopics == 0) @@ -1308,8 +1308,8 @@ private void onProducerInit( long traceId, long authorization) { - final long routedId = session.routedId; - offsetCommit = new KafkaOffsetCommitStream(originId, routedId, this, groupHost, groupPort); + //final long routedId = session.routedId; + offsetCommit = new KafkaOffsetCommitStream(routedId, resolvedId, this, groupHost, groupPort); offsetCommit.doKafkaBegin(traceId, authorization, 0); } @@ -1388,7 +1388,7 @@ private void doFetchOffsetMetadata( final String topic0 = topic.asString(); final KafkaOffsetFetchStream offsetFetch = - new KafkaOffsetFetchStream(originId, resolvedId, this, groupHost, groupPort, topic0, partitions); + new KafkaOffsetFetchStream(routedId, resolvedId, this, groupHost, groupPort, topic0, partitions); offsetFetches.add(offsetFetch); offsetFetch.doKafkaBegin(traceId, authorization, 0); } @@ -1473,7 +1473,7 @@ private void doCreateSessionStream( }).build(); doMqttBegin(traceId, authorization, 0, mqttBeginEx); - session = new KafkaSessionStateProxy(originId, resolvedId, this); + session = new KafkaSessionStateProxy(routedId, resolvedId, this); session.doKafkaBeginIfNecessary(traceId, authorization, 0); } } From 88d1b52de58751f0ccffe7cc9bfe7f3dfa7cc0a5 Mon Sep 17 00:00:00 2001 From: Ankit Kumar Date: Fri, 14 Feb 2025 23:56:51 +0530 Subject: [PATCH 2/2] remove commented lines --- .../mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java index 7471c5783c..f1cae2f4c3 100644 --- a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java @@ -570,7 +570,6 @@ private void onMqttBegin( if (!isSetWillFlag(sessionFlags) || isSetCleanStart(sessionFlags)) { - //final long routedId = session.routedId; session = new KafkaSessionSignalStream(routedId, resolvedId, this); } if (isSetWillFlag(sessionFlags)) @@ -1198,7 +1197,6 @@ private void onSessionBecomesLeader( if (publishQosMax < 2) { - //final long routedId = session.routedId; session = new KafkaSessionStateProxy(routedId, resolvedId, this); session.doKafkaBeginIfNecessary(traceId, authorization, 0); } @@ -1266,7 +1264,6 @@ private void onOffsetFetched( if (unfetchedKafkaTopics == 0 && initProducer) { - //final long routedId = session.routedId; producerInit = new KafkaInitProducerStream(routedId, resolvedId, this); producerInit.doKafkaBegin(traceId, authorization, 0); } @@ -1308,7 +1305,6 @@ private void onProducerInit( long traceId, long authorization) { - //final long routedId = session.routedId; offsetCommit = new KafkaOffsetCommitStream(routedId, resolvedId, this, groupHost, groupPort); offsetCommit.doKafkaBegin(traceId, authorization, 0); }