Skip to content

Commit

Permalink
Support for tcp binding to route by port numbers (#299)
Browse files Browse the repository at this point in the history
  • Loading branch information
lukefallows authored Jul 25, 2023
1 parent d407803 commit 3496bde
Show file tree
Hide file tree
Showing 27 changed files with 546 additions and 35 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ buildNumber.properties
*.iml
*.ipr
*.iws
.vscode/
.DS_Store
.zpm/
**/.zilla/engine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ public PollerKey[] attach(
}

public TcpRouteConfig resolve(
InetSocketAddress remote)
InetSocketAddress address)
{
return routes.stream()
.filter(r -> r.matches(remote.getAddress()))
.filter(r -> r.matches(address))
.findFirst()
.orElse(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ public final class TcpConditionConfig extends ConditionConfig
{
public final String cidr;
public final String authority;
public final int[] ports;

public TcpConditionConfig(
String cidr,
String authority)
String authority,
int[] ports)
{
this.cidr = cidr;
this.authority = authority;
this.ports = ports;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,19 @@
*/
package io.aklivity.zilla.runtime.binding.tcp.internal.config;

import static io.aklivity.zilla.runtime.binding.tcp.internal.config.TcpOptionsConfigAdapter.adaptPortsValueFromJson;

import jakarta.json.Json;
import jakarta.json.JsonArray;
import jakarta.json.JsonArrayBuilder;
import jakarta.json.JsonObject;
import jakarta.json.JsonObjectBuilder;
import jakarta.json.JsonValue;
import jakarta.json.bind.adapter.JsonbAdapter;

import org.agrona.collections.IntHashSet;
import org.agrona.collections.MutableInteger;

import io.aklivity.zilla.runtime.binding.tcp.internal.TcpBinding;
import io.aklivity.zilla.runtime.engine.config.ConditionConfig;
import io.aklivity.zilla.runtime.engine.config.ConditionConfigAdapterSpi;
Expand All @@ -28,6 +36,7 @@ public final class TcpConditionConfigAdapter implements ConditionConfigAdapterSp
{
private static final String CIDR_NAME = "cidr";
private static final String AUTHORITY_NAME = "authority";
private static final String PORT_NAME = "port";

@Override
public String type()
Expand All @@ -53,6 +62,24 @@ public JsonObject adaptToJson(
object.add(AUTHORITY_NAME, tcpCondition.authority);
}

if (tcpCondition.ports != null)
{
if (tcpCondition.ports.length == 1)
{
object.add(PORT_NAME, tcpCondition.ports[0]);
}
else
{
JsonArrayBuilder ports = Json.createArrayBuilder();
for (int port : tcpCondition.ports)
{
ports.add(port);
}

object.add(PORT_NAME, ports);
}
}

return object.build();
}

Expand All @@ -62,7 +89,30 @@ public ConditionConfig adaptFromJson(
{
String cidr = object.containsKey(CIDR_NAME) ? object.getString(CIDR_NAME) : null;
String authority = object.containsKey(AUTHORITY_NAME) ? object.getString(AUTHORITY_NAME) : null;
JsonValue portsValue = object.containsKey(PORT_NAME) ? object.get(PORT_NAME) : null;

int[] ports = null;

if (portsValue != null)
{
IntHashSet portsSet = new IntHashSet();
switch (portsValue.getValueType())
{
case ARRAY:
JsonArray portsArray = portsValue.asJsonArray();
portsArray.forEach(value -> adaptPortsValueFromJson(value, portsSet));
break;
default:
adaptPortsValueFromJson(portsValue, portsSet);
break;
}

int[] ports0 = new int[portsSet.size()];
MutableInteger index = new MutableInteger();
portsSet.forEach(i -> ports0[index.value++] = i);
ports = ports0;
}

return new TcpConditionConfig(cidr, authority);
return new TcpConditionConfig(cidr, authority, ports);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,35 @@
package io.aklivity.zilla.runtime.binding.tcp.internal.config;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.agrona.collections.IntHashSet;

import io.aklivity.zilla.runtime.binding.tcp.internal.util.Cidr;

public final class TcpConditionMatcher
{
public final Cidr cidr;
public final Matcher authority;
public final IntHashSet ports;

public TcpConditionMatcher(
TcpConditionConfig condition)
{
this.cidr = condition.cidr != null ? new Cidr(condition.cidr) : null;
this.authority = condition.authority != null ? asMatcher(condition.authority) : null;
this.ports = condition.ports != null ? asIntHashSet(condition.ports) : null;
}

public boolean matches(
InetAddress remote)
InetSocketAddress remote)
{
return matchesCidr(remote) &&
matchesAuthority(remote);
return matchesCidr(remote.getAddress()) &&
matchesAuthority(remote.getAddress()) &&
matchesPort(remote.getPort());
}

private boolean matchesCidr(
Expand All @@ -52,9 +59,23 @@ private boolean matchesAuthority(
return authority == null || authority.reset(remote.getHostName()).matches();
}

private boolean matchesPort(
int port)
{
return ports == null || ports.contains(port);
}

private static Matcher asMatcher(
String wildcard)
{
return Pattern.compile(wildcard.replace(".", "\\.").replace("*", ".*")).matcher("");
}

private static IntHashSet asIntHashSet(
int[] ports)
{
IntHashSet set = new IntHashSet(ports.length);
Arrays.stream(ports).forEach(set::add);
return set;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public OptionsConfig adaptFromJson(
return new TcpOptionsConfig(host, ports, backlog, nodelay, keepalive);
}

private void adaptPortsValueFromJson(
static void adaptPortsValueFromJson(
JsonValue value,
IntHashSet ports)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import static java.util.stream.Collectors.toList;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.function.LongPredicate;
import java.util.function.Predicate;
Expand Down Expand Up @@ -49,13 +49,13 @@ public boolean authorized(
}

public boolean matches(
InetAddress address)
InetSocketAddress address)
{
return when.isEmpty() || when.stream().anyMatch(m -> m.matches(address));
}

public boolean matchesExplicit(
InetAddress address)
InetSocketAddress address)
{
return when.stream().anyMatch(m -> m.matches(address));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.agrona.collections.Long2ObjectHashMap;

Expand Down Expand Up @@ -97,12 +98,16 @@ public InetSocketAddress resolve(
ProxyInfoFW authorityInfo = infos.matchFirst(i -> i.kind() == AUTHORITY);
if (authorityInfo != null && route.matchesExplicit(r -> r.authority != null))
{
final List<InetAddress> authorities = Arrays.asList(resolveHost.apply(authorityInfo.authority().asString()));
for (InetAddress authority : authorities)
final List<InetSocketAddress> authorities = Arrays
.stream(resolveHost.apply(authorityInfo.authority().asString()))
.map(a -> new InetSocketAddress(a, port))
.collect(Collectors.toList());

for (InetSocketAddress authority : authorities)
{
if (route.matchesExplicit(authority))
{
resolved = new InetSocketAddress(authority, port);
resolved = authority;
break;
}
}
Expand All @@ -121,7 +126,10 @@ public InetSocketAddress resolve(

if (resolved == null && options.host != null && !"*".equals(options.host))
{
final List<InetAddress> host = Arrays.asList(resolveHost.apply(options.host));
final List<InetSocketAddress> host = Arrays
.stream(resolveHost.apply(options.host))
.map(a -> new InetSocketAddress(a, port))
.collect(Collectors.toList());

for (TcpRouteConfig route : binding.routes)
{
Expand Down Expand Up @@ -158,7 +166,7 @@ public String toString()
private InetSocketAddress resolve(
ProxyAddressFW address,
long authorization,
Predicate<? super InetAddress> filter)
Predicate<? super InetSocketAddress> filter)
{
InetSocketAddress resolved = null;

Expand Down Expand Up @@ -192,19 +200,19 @@ private InetSocketAddress resolve(

private InetSocketAddress resolveInet(
ProxyAddressInetFW address,
Predicate<? super InetAddress> filter)
Predicate<? super InetSocketAddress> filter)
{
return Arrays
.stream(resolveHost.apply(address.destination().asString()))
.map(a -> new InetSocketAddress(a, address.destinationPort()))
.filter(filter)
.findFirst()
.map(a -> new InetSocketAddress(a, address.destinationPort()))
.orElse(null);
}

private InetSocketAddress resolveInet4(
ProxyAddressInet4FW address,
Predicate<? super InetAddress> filter) throws UnknownHostException
Predicate<? super InetSocketAddress> filter) throws UnknownHostException
{
OctetsFW destination = address.destination();
int destinationPort = address.destinationPort();
Expand All @@ -213,15 +221,14 @@ private InetSocketAddress resolveInet4(
destination.buffer().getBytes(destination.offset(), ipv4);

return Optional
.of(InetAddress.getByAddress(ipv4))
.of(new InetSocketAddress(InetAddress.getByAddress(ipv4), destinationPort))
.filter(filter)
.map(a -> new InetSocketAddress(a, destinationPort))
.orElse(null);
}

private InetSocketAddress resolveInet6(
ProxyAddressInet6FW address,
Predicate<? super InetAddress> filter) throws UnknownHostException
Predicate<? super InetSocketAddress> filter) throws UnknownHostException
{
OctetsFW destination = address.destination();
int destinationPort = address.destinationPort();
Expand All @@ -230,9 +237,8 @@ private InetSocketAddress resolveInet6(
destination.buffer().getBytes(destination.offset(), ipv6);

return Optional
.of(InetAddress.getByAddress(ipv6))
.of(new InetSocketAddress(InetAddress.getByAddress(ipv6), destinationPort))
.filter(filter)
.map(a -> new InetSocketAddress(a, destinationPort))
.orElse(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ public class TcpServerFactory implements TcpStreamFactory

private final ProxyBeginExFW.Builder beginExRW = new ProxyBeginExFW.Builder();

private final EngineContext context;
private final TcpServerRouter router;

private final LongUnaryOperator supplyInitialId;
Expand All @@ -107,7 +106,6 @@ public TcpServerFactory(
EngineContext context,
LongFunction<TcpServerBindingConfig> servers)
{
this.context = context;
this.router = new TcpServerRouter(config, context, this::handleAccept, servers);
this.writeBuffer = context.writeBuffer();
this.writeByteBuffer = ByteBuffer.allocateDirect(writeBuffer.capacity()).order(nativeOrder());
Expand Down Expand Up @@ -180,9 +178,9 @@ private int handleAccept(
channel.setOption(TCP_NODELAY, options.nodelay);
channel.setOption(SO_KEEPALIVE, options.keepalive);

InetSocketAddress remote = (InetSocketAddress) channel.getRemoteAddress();
InetSocketAddress local = (InetSocketAddress) channel.getLocalAddress();

onAccepted(binding, channel, remote);
onAccepted(binding, channel, local);
}
}
catch (Exception ex)
Expand All @@ -196,9 +194,9 @@ private int handleAccept(
private void onAccepted(
TcpBindingConfig binding,
SocketChannel network,
InetSocketAddress remote)
InetSocketAddress local)
{
final TcpRouteConfig route = binding.resolve(remote);
final TcpRouteConfig route = binding.resolve(local);

if (route != null)
{
Expand Down
Loading

0 comments on commit 3496bde

Please sign in to comment.