Skip to content

Commit

Permalink
Refactor dispatch agent (#699)
Browse files Browse the repository at this point in the history
  • Loading branch information
jfallows authored Jan 5, 2024
1 parent 910c19a commit 33da37c
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -73,7 +72,7 @@
import io.aklivity.zilla.runtime.engine.internal.Tuning;
import io.aklivity.zilla.runtime.engine.internal.layouts.BindingsLayout;
import io.aklivity.zilla.runtime.engine.internal.registry.ConfigurationManager;
import io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent;
import io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker;
import io.aklivity.zilla.runtime.engine.internal.registry.FileWatcherTask;
import io.aklivity.zilla.runtime.engine.internal.registry.HttpWatcherTask;
import io.aklivity.zilla.runtime.engine.internal.registry.WatcherTask;
Expand All @@ -100,7 +99,7 @@ public final class Engine implements Collector, AutoCloseable
private final WatcherTask watcherTask;
private final Map<URL, NamespaceConfig> namespaces;
private final URL rootConfigURL;
private final Collection<DispatchAgent> dispatchers;
private final List<EngineWorker> workers;
private final boolean readonly;
private final EngineConfiguration config;
private final Map<String, Binding> bindingsByType;
Expand Down Expand Up @@ -164,16 +163,16 @@ public final class Engine implements Collector, AutoCloseable
}
this.tuning = tuning;

Collection<DispatchAgent> dispatchers = new LinkedHashSet<>();
List<EngineWorker> workers = new ArrayList<>(workerCount);
for (int coreIndex = 0; coreIndex < workerCount; coreIndex++)
{
DispatchAgent agent =
new DispatchAgent(config, tasks, labels, errorHandler, tuning::affinity,
EngineWorker worker =
new EngineWorker(config, tasks, labels, errorHandler, tuning::affinity,
bindings, exporters, guards, vaults, catalogs, metricGroups, validatorFactory,
this, coreIndex, readonly);
dispatchers.add(agent);
workers.add(worker);
}
this.dispatchers = dispatchers;
this.workers = workers;

final Consumer<String> logger = config.verbose() ? System.out::println : m -> {};

Expand Down Expand Up @@ -213,12 +212,12 @@ else if ("http".equals(protocol) || "https".equals(protocol))
}

this.configurationManager = new ConfigurationManager(schemaTypes, guardsByType::get, labels::supplyLabelId, maxWorkers,
tuning, dispatchers, logger, context, config, extensions, this::readURL);
tuning, workers, logger, context, config, extensions, this::readURL);

this.namespaces = new HashMap<>();

List<AgentRunner> runners = new ArrayList<>(dispatchers.size());
dispatchers.forEach(d -> runners.add(d.runner()));
List<AgentRunner> runners = new ArrayList<>(workers.size());
workers.forEach(d -> runners.add(d.runner()));

this.bindings = bindings;
this.tasks = tasks;
Expand Down Expand Up @@ -257,7 +256,7 @@ public void close() throws Exception
{
if (config.drainOnClose())
{
dispatchers.forEach(DispatchAgent::drain);
workers.forEach(EngineWorker::drain);
}

final List<Throwable> errors = new ArrayList<>();
Expand Down Expand Up @@ -422,10 +421,10 @@ private long aggregateCounterValue(
long metricId)
{
long result = 0;
for (DispatchAgent dispatchAgent : dispatchers)
for (EngineWorker worker : workers)
{
LongSupplier counterReader = dispatchAgent.supplyCounter(bindingId, metricId);
result += counterReader.getAsLong();
LongSupplier reader = worker.supplyCounter(bindingId, metricId);
result += reader.getAsLong();
}
return result;
}
Expand All @@ -436,8 +435,8 @@ public LongConsumer counterWriter(
long metricId,
int core)
{
DispatchAgent dispatcher = dispatchers.toArray(DispatchAgent[]::new)[core];
return dispatcher.supplyCounterWriter(bindingId, metricId);
EngineWorker worker = workers.toArray(EngineWorker[]::new)[core];
return worker.supplyCounterWriter(bindingId, metricId);
}

@Override
Expand All @@ -453,10 +452,10 @@ private long aggregateGaugeValue(
long metricId)
{
long result = 0;
for (DispatchAgent dispatchAgent : dispatchers)
for (EngineWorker worker : workers)
{
LongSupplier counterReader = dispatchAgent.supplyGauge(bindingId, metricId);
result += counterReader.getAsLong();
LongSupplier reader = worker.supplyGauge(bindingId, metricId);
result += reader.getAsLong();
}
return result;
}
Expand All @@ -467,8 +466,8 @@ public LongConsumer gaugeWriter(
long metricId,
int core)
{
DispatchAgent dispatcher = dispatchers.toArray(DispatchAgent[]::new)[core];
return dispatcher.supplyGaugeWriter(bindingId, metricId);
EngineWorker worker = workers.get(core);
return worker.supplyGaugeWriter(bindingId, metricId);
}

@Override
Expand Down Expand Up @@ -498,9 +497,9 @@ private long aggregateHistogramBucketValue(
int index)
{
long result = 0L;
for (DispatchAgent dispatchAgent : dispatchers)
for (EngineWorker worker : workers)
{
LongSupplier[] readers = dispatchAgent.supplyHistogram(bindingId, metricId);
LongSupplier[] readers = worker.supplyHistogram(bindingId, metricId);
result += readers[index].getAsLong();
}
return result;
Expand All @@ -512,46 +511,46 @@ public LongConsumer histogramWriter(
long metricId,
int core)
{
DispatchAgent dispatcher = dispatchers.toArray(DispatchAgent[]::new)[core];
return dispatcher.supplyHistogramWriter(bindingId, metricId);
EngineWorker worker = workers.get(core);
return worker.supplyHistogramWriter(bindingId, metricId);
}

@Override
public long[][] counterIds()
{
// the list of counter ids are expected to be identical in all cores
DispatchAgent dispatchAgent = dispatchers.iterator().next();
return dispatchAgent.counterIds();
EngineWorker worker = workers.get(0);
return worker.counterIds();
}

@Override
public long[][] gaugeIds()
{
// the list of gauge ids are expected to be identical in all cores
DispatchAgent dispatchAgent = dispatchers.iterator().next();
return dispatchAgent.gaugeIds();
EngineWorker worker = workers.get(0);
return worker.gaugeIds();
}

@Override
public long[][] histogramIds()
{
// the list of histogram ids are expected to be identical in all cores
DispatchAgent dispatchAgent = dispatchers.iterator().next();
return dispatchAgent.histogramIds();
EngineWorker worker = workers.get(0);
return worker.histogramIds();
}

public String supplyLocalName(
long namespacedId)
{
DispatchAgent dispatchAgent = dispatchers.iterator().next();
return dispatchAgent.supplyLocalName(namespacedId);
EngineWorker worker = workers.get(0);
return worker.supplyLocalName(namespacedId);
}

public int supplyLabelId(
String label)
{
DispatchAgent dispatchAgent = dispatchers.iterator().next();
return dispatchAgent.supplyTypeId(label);
EngineWorker worker = workers.get(0);
return worker.supplyTypeId(label);
}

// visible for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class ConfigurationManager
private final ToIntFunction<String> supplyId;
private final IntFunction<ToIntFunction<KindConfig>> maxWorkers;
private final Tuning tuning;
private final Collection<DispatchAgent> dispatchers;
private final Collection<EngineWorker> dispatchers;
private final Consumer<String> logger;
private final EngineExtContext context;
private final EngineConfiguration config;
Expand All @@ -75,7 +75,7 @@ public ConfigurationManager(
ToIntFunction<String> supplyId,
IntFunction<ToIntFunction<KindConfig>> maxWorkers,
Tuning tuning,
Collection<DispatchAgent> dispatchers,
Collection<EngineWorker> dispatchers,
Consumer<String> logger,
EngineExtContext context,
EngineConfiguration config,
Expand Down Expand Up @@ -177,14 +177,14 @@ public NamespaceConfig parse(
.filter(g -> g.id == guarded.id)
.findFirst()
.map(g -> guardByType.apply(g.type))
.map(g -> g.verifier(DispatchAgent::indexOfId, guarded))
.map(g -> g.verifier(EngineWorker::indexOfId, guarded))
.orElse(session -> false);

LongFunction<String> identifier = namespace.guards.stream()
.filter(g -> g.id == guarded.id)
.findFirst()
.map(g -> guardByType.apply(g.type))
.map(g -> g.identifier(DispatchAgent::indexOfId, guarded))
.map(g -> g.identifier(EngineWorker::indexOfId, guarded))
.orElse(session -> null);

guarded.identity = identifier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@
import io.aklivity.zilla.runtime.engine.vault.VaultContext;
import io.aklivity.zilla.runtime.engine.vault.VaultHandler;

public class DispatchAgent implements EngineContext, Agent
public class EngineWorker implements EngineContext, Agent
{
private static final int RESERVED_SIZE = 33;

Expand Down Expand Up @@ -217,7 +217,7 @@ public class DispatchAgent implements EngineContext, Agent

private long lastReadStreamId;

public DispatchAgent(
public EngineWorker(
EngineConfiguration config,
ExecutorService executor,
LabelManager labels,
Expand Down

0 comments on commit 33da37c

Please sign in to comment.