Skip to content

Commit

Permalink
Create an appropriate buffer with the size that accommodates signal f…
Browse files Browse the repository at this point in the history
…rame payload (#537)
  • Loading branch information
akrambek authored Oct 26, 2023
1 parent f1985a3 commit 99419ab
Showing 1 changed file with 8 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ public DispatchAgent(
this.timerWheel = new DeadlineTimerWheel(MILLISECONDS, currentTimeMillis(), 512, 1024);
this.tasksByTimerId = new Long2ObjectHashMap<>();
this.futuresById = new Long2ObjectHashMap<>();
this.signaler = new ElektronSignaler(executor);
this.signaler = new ElektronSignaler(executor, Math.max(config.bufferSlotCapacity(), 512));

this.poller = new Poller();

Expand Down Expand Up @@ -1672,9 +1672,10 @@ public Affinity resolveAffinity(
return affinity;
}

private static SignalFW.Builder newSignalRW()
private static SignalFW.Builder newSignalRW(
int capacity)
{
MutableDirectBuffer buffer = new UnsafeBuffer(new byte[512]);
MutableDirectBuffer buffer = new UnsafeBuffer(new byte[capacity]);
return new SignalFW.Builder().wrap(buffer, 0, buffer.capacity());
}

Expand All @@ -1691,16 +1692,18 @@ private Int2ObjectHashMap<MessageConsumer>[] initDispatcher()

private final class ElektronSignaler implements Signaler
{
private final ThreadLocal<SignalFW.Builder> signalRW = withInitial(DispatchAgent::newSignalRW);
private final ThreadLocal<SignalFW.Builder> signalRW;

private final ExecutorService executorService;

private long nextFutureId;

private ElektronSignaler(
ExecutorService executorService)
ExecutorService executorService,
int slotCapacity)
{
this.executorService = executorService;
signalRW = withInitial(() -> newSignalRW(slotCapacity));
}

public void executeTaskAt(
Expand Down

0 comments on commit 99419ab

Please sign in to comment.