From b4ebfb874aeb544691a8391a8865ac356ad2120e Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Thu, 11 Jan 2024 09:11:49 +0100 Subject: [PATCH] [FIXED] Invalid value of max_bytes in pull request Signed-off-by: Piotr Piotrowski --- jetstream/pull.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/jetstream/pull.go b/jetstream/pull.go index fb7c0527b..50d1fbb43 100644 --- a/jetstream/pull.go +++ b/jetstream/pull.go @@ -420,13 +420,15 @@ func (s *pullSubscription) checkPending() { if (s.pending.msgCount < s.consumeOpts.ThresholdMessages || (s.pending.byteCount < s.consumeOpts.ThresholdBytes && s.consumeOpts.MaxBytes != 0)) && atomic.LoadUint32(&s.fetchInProgress) == 0 { - var batchSize int + + var batchSize, maxBytes int if s.consumeOpts.MaxBytes == 0 { // if using messages, calculate appropriate batch size batchSize = s.consumeOpts.MaxMessages - s.pending.msgCount } else { // if using bytes, use the max value batchSize = s.consumeOpts.MaxMessages + maxBytes = s.consumeOpts.MaxBytes - s.pending.byteCount } if s.consumeOpts.StopAfter > 0 { batchSize = min(batchSize, s.consumeOpts.StopAfter-s.delivered-s.pending.msgCount) @@ -435,7 +437,7 @@ func (s *pullSubscription) checkPending() { s.fetchNext <- &pullRequest{ Expires: s.consumeOpts.Expires, Batch: batchSize, - MaxBytes: s.consumeOpts.MaxBytes - s.pending.byteCount, + MaxBytes: maxBytes, Heartbeat: s.consumeOpts.Heartbeat, }