Skip to content

Commit

Permalink
Support ability to connect to specific kafka cluster node hostname (#633
Browse files Browse the repository at this point in the history
)
  • Loading branch information
akrambek authored Dec 12, 2023
1 parent 0a98e52 commit ec1db3b
Show file tree
Hide file tree
Showing 138 changed files with 594 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package io.aklivity.zilla.runtime.binding.kafka.internal.stream;

import static io.aklivity.zilla.runtime.binding.kafka.internal.types.ProxyAddressProtocol.STREAM;
import static io.aklivity.zilla.runtime.engine.budget.BudgetCreditor.NO_BUDGET_ID;
import static io.aklivity.zilla.runtime.engine.buffer.BufferPool.NO_SLOT;
import static io.aklivity.zilla.runtime.engine.concurrent.Signaler.NO_CANCEL_ID;
Expand Down Expand Up @@ -79,7 +78,7 @@ public final class KafkaClientConnectionPool extends KafkaClientSaslHandshaker
private static final int SIGNAL_STREAM_WINDOW = 0x80000006;
private static final int SIGNAL_CONNECTION_CLEANUP = 0x80000007;
private static final int SIGNAL_NEXT_REQUEST = 0x80000008;
private static final String CLUSTER = "";
private static final StringBuilder CLUSTER = new StringBuilder("");

private final BeginFW beginRO = new BeginFW();
private final DataFW dataRO = new DataFW();
Expand Down Expand Up @@ -174,17 +173,29 @@ private MessageConsumer newStream(
final ProxyBeginExFW proxyBeginEx = extension.get(proxyBeginExRO::tryWrap);

MessageConsumer newStream = null;
String address = CLUSTER;
CLUSTER.setLength(0);

if (proxyBeginEx != null)
{
final ProxyAddressInetFW inet = proxyBeginEx.address().inet();
String host = inet.destination().asString();
int port = inet.destinationPort();
address = String.format("%s:%d", host, port);

CLUSTER.append(host);
CLUSTER.append(":");
CLUSTER.append(port);

if (proxyBeginEx.infos() != null)
{
proxyBeginEx.infos().forEach(i ->
{
CLUSTER.append(":");
CLUSTER.append(i.authority().asString());
});
}
}

final KafkaClientConnection connection = connectionPool.computeIfAbsent(address, s ->
final KafkaClientConnection connection = connectionPool.computeIfAbsent(CLUSTER.toString(), s ->
newConnection(originId, routedId, authorization));
newStream = connection.newStream(msgTypeId, buffer, index, length, sender);

Expand Down Expand Up @@ -243,7 +254,7 @@ private MessageConsumer newNetworkStream(
long traceId,
long authorization,
long affinity,
Consumer<OctetsFW.Builder> extension)
Flyweight extension)
{
final BeginFW begin = beginRW.wrap(writeBuffer, 0, writeBuffer.capacity())
.originId(originId)
Expand All @@ -255,7 +266,7 @@ private MessageConsumer newNetworkStream(
.traceId(traceId)
.authorization(authorization)
.affinity(affinity)
.extension(extension)
.extension(extension.buffer(), extension.offset(), extension.sizeof())
.build();

final MessageConsumer receiver =
Expand Down Expand Up @@ -744,19 +755,8 @@ private void onStreamBeginInit(

final long traceId = begin.traceId();
final OctetsFW extension = begin.extension();
final ProxyBeginExFW proxyBeginEx = extension.get(proxyBeginExRO::tryWrap);

String host = null;
int port = 0;

if (proxyBeginEx != null)
{
final ProxyAddressInetFW inet = proxyBeginEx.address().inet();
host = inet.destination().asString();
port = inet.destinationPort();
}

connection.doConnectionBegin(traceId, host, port);
connection.doConnectionBegin(traceId, extension);
}


Expand Down Expand Up @@ -1241,8 +1241,7 @@ private KafkaClientConnection(

private void doConnectionBegin(
long traceId,
String host,
int port)
OctetsFW extension)
{
if (KafkaState.closed(state))
{
Expand All @@ -1267,23 +1266,8 @@ private void doConnectionBegin(
this.initialId = supplyInitialId.applyAsLong(routedId);
this.replyId = supplyReplyId.applyAsLong(initialId);

Consumer<OctetsFW.Builder> extension = EMPTY_EXTENSION;

state = KafkaState.openingInitial(state);

if (host != null)
{
extension = e -> e.set((b, o, l) -> proxyBeginExRW.wrap(b, o, l)
.typeId(proxyTypeId)
.address(a -> a.inet(i -> i.protocol(p -> p.set(STREAM))
.source("0.0.0.0")
.destination(host)
.sourcePort(0)
.destinationPort(port)))
.build()
.sizeof());
}

this.receiver = newNetworkStream(this::onConnectionMessage,
originId, routedId, initialId, initialSeq, initialAck, initialMax,
traceId, authorization, 0L, extension);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public KafkaClientFactory(
config, context, bindings::get, accountant::supplyDebitor);

final KafkaClientGroupFactory clientGroupFactory = new KafkaClientGroupFactory(
config, context, bindings::get, accountant::supplyDebitor, signaler, streamFactory, resolveSasl);
config, context, bindings::get, accountant::supplyDebitor, signaler, streamFactory, resolveSasl, supplyClientRoute);

final KafkaClientFetchFactory clientFetchFactory = new KafkaClientFetchFactory(
config, context, bindings::get, accountant::supplyDebitor, supplyClientRoute);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2436,6 +2436,7 @@ else if (nextOffset == OFFSET_LIVE || nextOffset == OFFSET_HISTORICAL)
.destination(broker.host)
.sourcePort(0)
.destinationPort(broker.port)))
.infos(i -> i.item(ii -> ii.authority(broker.host)))
.build()
.sizeof());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ public final class KafkaClientGroupFactory extends KafkaClientSaslHandshaker imp
private final Signaler signaler;
private final BindingHandler streamFactory;
private final UnaryOperator<KafkaSaslConfig> resolveSasl;
private final LongFunction<KafkaClientRoute> supplyClientRoute;
private final LongFunction<KafkaBindingConfig> supplyBinding;
private final Supplier<String> supplyInstanceId;
private final LongFunction<BudgetDebitor> supplyDebitor;
Expand All @@ -298,7 +299,8 @@ public KafkaClientGroupFactory(
LongFunction<BudgetDebitor> supplyDebitor,
Signaler signaler,
BindingHandler streamFactory,
UnaryOperator<KafkaSaslConfig> resolveSasl)
UnaryOperator<KafkaSaslConfig> resolveSasl,
LongFunction<KafkaClientRoute> supplyClientRoute)
{
super(config, context);
this.rebalanceTimeout = config.clientGroupRebalanceTimeout();
Expand All @@ -315,6 +317,7 @@ public KafkaClientGroupFactory(
this.signaler = signaler;
this.streamFactory = streamFactory;
this.resolveSasl = resolveSasl;
this.supplyClientRoute = supplyClientRoute;
this.instanceIds = new Long2ObjectHashMap<>();
this.groupStreams = new Object2ObjectHashMap<>();
this.configs = new LinkedHashMap<>();
Expand Down Expand Up @@ -2702,8 +2705,26 @@ private void doNetworkBegin(

state = KafkaState.openingInitial(state);

Consumer<OctetsFW.Builder> extension = EMPTY_EXTENSION;

final KafkaClientRoute clientRoute = supplyClientRoute.apply(routedId);
final KafkaBrokerInfo broker = clientRoute.brokers.get(Long.parseLong(delegate.nodeId));
if (broker != null)
{
extension = e -> e.set((b, o, l) -> proxyBeginExRW.wrap(b, o, l)
.typeId(proxyTypeId)
.address(a -> a.inet(i -> i.protocol(p -> p.set(STREAM))
.source("0.0.0.0")
.destination(broker.host)
.sourcePort(0)
.destinationPort(broker.port)))
.infos(i -> i.item(ii -> ii.authority(broker.host)))
.build()
.sizeof());
}

network = newStream(this::onNetwork, originId, routedId, initialId, initialSeq, initialAck, initialMax,
traceId, authorization, affinity, EMPTY_EXTENSION);
traceId, authorization, affinity, extension);
}

@Override
Expand Down Expand Up @@ -3470,6 +3491,7 @@ private void doNetworkBegin(
.destination(delegate.host)
.sourcePort(0)
.destinationPort(delegate.port)))
.infos(i -> i.item(ii -> ii.authority(delegate.host)))
.build()
.sizeof());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1477,6 +1477,7 @@ private void doNetworkBegin(
.destination(broker.host)
.sourcePort(0)
.destinationPort(broker.port)))
.infos(i -> i.item(ii -> ii.authority(broker.host)))
.build()
.sizeof());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public SSLEngine newClientEngine(
engine.setUseClientMode(true);

List<String> sni = options.sni;
if (sni == null && beginEx != null)
if (beginEx != null)
{
ProxyInfoFW info = beginEx.infos().matchFirst(a -> a.kind() == AUTHORITY);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ write zilla:begin.ext ${proxy:beginEx()
.sourcePort(0)
.destinationPort(9092)
.build()
.info()
.authority("broker1.example.com")
.build()
.build()}

connected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ write 97 # size

accepted

read zilla:begin.ext ${proxy:beginEx()
read zilla:begin.ext ${proxy:matchBeginEx()
.typeId(zilla:id("proxy"))
.addressInet()
.protocol("stream")
Expand All @@ -98,6 +98,9 @@ read zilla:begin.ext ${proxy:beginEx()
.sourcePort(0)
.destinationPort(9092)
.build()
.info()
.authority("broker1.example.com")
.build()
.build()}

connected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ write zilla:begin.ext ${proxy:beginEx()
.sourcePort(0)
.destinationPort(9092)
.build()
.info()
.authority("broker1.example.com")
.build()
.build()}

connected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ write 97 # size

accepted

read zilla:begin.ext ${proxy:beginEx()
read zilla:begin.ext ${proxy:matchBeginEx()
.typeId(zilla:id("proxy"))
.addressInet()
.protocol("stream")
Expand All @@ -112,6 +112,9 @@ read zilla:begin.ext ${proxy:beginEx()
.sourcePort(0)
.destinationPort(9092)
.build()
.info()
.authority("broker1.example.com")
.build()
.build()}

connected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ write zilla:begin.ext ${proxy:beginEx()
.sourcePort(0)
.destinationPort(9092)
.build()
.info()
.authority("broker1.example.com")
.build()
.build()}

connected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ write 97 # size

accepted

read zilla:begin.ext ${proxy:beginEx()
read zilla:begin.ext ${proxy:matchBeginEx()
.typeId(zilla:id("proxy"))
.addressInet()
.protocol("stream")
Expand All @@ -70,6 +70,9 @@ read zilla:begin.ext ${proxy:beginEx()
.sourcePort(0)
.destinationPort(9092)
.build()
.info()
.authority("broker1.example.com")
.build()
.build()}

connected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ write zilla:begin.ext ${proxy:beginEx()
.sourcePort(0)
.destinationPort(9092)
.build()
.info()
.authority("broker1.example.com")
.build()
.build()}

connected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ write 97 # size

accepted

read zilla:begin.ext ${proxy:beginEx()
read zilla:begin.ext ${proxy:matchBeginEx()
.typeId(zilla:id("proxy"))
.addressInet()
.protocol("stream")
Expand All @@ -70,6 +70,9 @@ read zilla:begin.ext ${proxy:beginEx()
.sourcePort(0)
.destinationPort(9092)
.build()
.info()
.authority("broker1.example.com")
.build()
.build()}

connected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ write zilla:begin.ext ${proxy:beginEx()
.sourcePort(0)
.destinationPort(9092)
.build()
.info()
.authority("broker1.example.com")
.build()
.build()}

connected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ write 97 # size

accepted

read zilla:begin.ext ${proxy:beginEx()
read zilla:begin.ext ${proxy:matchBeginEx()
.typeId(zilla:id("proxy"))
.addressInet()
.protocol("stream")
Expand All @@ -70,6 +70,9 @@ read zilla:begin.ext ${proxy:beginEx()
.sourcePort(0)
.destinationPort(9092)
.build()
.info()
.authority("broker1.example.com")
.build()
.build()}

connected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ write zilla:begin.ext ${proxy:beginEx()
.sourcePort(0)
.destinationPort(9092)
.build()
.info()
.authority("broker1.example.com")
.build()
.build()}

connected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ write 97 # size

accepted

read zilla:begin.ext ${proxy:beginEx()
read zilla:begin.ext ${proxy:matchBeginEx()
.typeId(zilla:id("proxy"))
.addressInet()
.protocol("stream")
Expand All @@ -70,6 +70,9 @@ read zilla:begin.ext ${proxy:beginEx()
.sourcePort(0)
.destinationPort(9092)
.build()
.info()
.authority("broker1.example.com")
.build()
.build()}

connected
Expand Down
Loading

0 comments on commit ec1db3b

Please sign in to comment.