Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minimize performance overhead for metric collection #217

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
"scheme":
{
"title": "Scheme",
"type": "string"
"enum":
[
"http"
]
},
"port":
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1128,8 +1128,6 @@ private void handleReadInitial(
final MessageConsumer handler = dispatcher.get(instanceId);
if (handler != null)
{
supplyOriginMetricRecorder.apply(originId).accept(msgTypeId, buffer, index, length);
supplyRoutedMetricRecorder.apply(routedId).accept(msgTypeId, buffer, index, length);
switch (msgTypeId)
{
case BeginFW.TYPE_ID:
Expand Down Expand Up @@ -1450,15 +1448,17 @@ private MessageConsumer handleBeginInitial(
final BindingHandler streamFactory = binding != null ? binding.streamFactory() : null;
if (streamFactory != null)
{
final MessageConsumer replyTo = supplyReplyTo(initialId);
MessageConsumer metricHandler = supplyOriginMetricRecorder.apply(originId)
.andThen(supplyRoutedMetricRecorder.apply(routedId));
final MessageConsumer replyTo = metricHandler.andThen(supplyReplyTo(initialId));
newStream = streamFactory.newStream(msgTypeId, buffer, index, length, replyTo);
if (newStream != null)
{
newStream = metricHandler.andThen(newStream);

final long replyId = supplyReplyId(initialId);
streams[streamIndex(initialId)].put(instanceId(initialId), newStream);
throttles[throttleIndex(replyId)].put(instanceId(replyId), newStream);
supplyOriginMetricRecorder.apply(originId).accept(msgTypeId, buffer, index, length);
supplyRoutedMetricRecorder.apply(routedId).accept(msgTypeId, buffer, index, length);
streamSets.computeIfAbsent(routedId, k -> new LongHashSet())
.add(initialId);
}
Expand Down Expand Up @@ -1635,8 +1635,7 @@ private MessageConsumer supplyRoutedMetricRecorder(
private Target newTarget(
int index)
{
return new Target(config, index, writeBuffer, correlations, streams, streamSets, throttles, supplyLoadEntry,
supplyOriginMetricRecorder, supplyRoutedMetricRecorder);
return new Target(config, index, writeBuffer, correlations, streams, streamSets, throttles, supplyLoadEntry);
}

private DefaultBudgetDebitor newBudgetDebitor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ public final class Target implements AutoCloseable
private final Int2ObjectHashMap<MessageConsumer>[] throttles;
private final MessageConsumer writeHandler;
private final LongFunction<LoadEntry> supplyLoadEntry;
private final LongFunction<MessageConsumer> supplyOriginMetricRecorder;
private final LongFunction<MessageConsumer> supplyRoutedMetricRecorder;

private MessagePredicate streamsBuffer;

Expand All @@ -77,9 +75,7 @@ public Target(
Int2ObjectHashMap<MessageConsumer>[] streams,
Long2ObjectHashMap<LongHashSet> streamSets,
Int2ObjectHashMap<MessageConsumer>[] throttles,
LongFunction<LoadEntry> supplyLoadEntry,
LongFunction<MessageConsumer> supplyOriginMetricRecorder,
LongFunction<MessageConsumer> supplyRoutedMetricRecorder)
LongFunction<LoadEntry> supplyLoadEntry)
{
this.timestamps = config.timestamps();
this.localIndex = index;
Expand All @@ -97,8 +93,6 @@ public Target(

this.writeBuffer = writeBuffer;
this.supplyLoadEntry = supplyLoadEntry;
this.supplyOriginMetricRecorder = supplyOriginMetricRecorder;
this.supplyRoutedMetricRecorder = supplyRoutedMetricRecorder;
this.correlations = correlations;
this.streams = streams;
this.streamSets = streamSets;
Expand Down Expand Up @@ -234,8 +228,6 @@ private boolean handleWriteInitial(
}
else
{
supplyOriginMetricRecorder.apply(originId).accept(msgTypeId, buffer, index, length);
supplyRoutedMetricRecorder.apply(routedId).accept(msgTypeId, buffer, index, length);
switch (msgTypeId)
{
case WindowFW.TYPE_ID:
Expand Down Expand Up @@ -279,8 +271,6 @@ private boolean handleWriteReply(

if ((msgTypeId & 0x4000_0000) == 0)
{
supplyOriginMetricRecorder.apply(originId).accept(msgTypeId, buffer, index, length);
supplyRoutedMetricRecorder.apply(routedId).accept(msgTypeId, buffer, index, length);
switch (msgTypeId)
{
case BeginFW.TYPE_ID:
Expand Down