diff --git a/jetstream/jetstream_test.go b/jetstream/jetstream_test.go index baa4a8fb4..62af4ef02 100644 --- a/jetstream/jetstream_test.go +++ b/jetstream/jetstream_test.go @@ -300,7 +300,8 @@ func TestPullConsumer_checkPending(t *testing.T) { name: "pending msgs below threshold, send pull request", givenSub: &pullSubscription{ pending: pendingMsgs{ - msgCount: 4, + msgCount: 4, + byteCount: 400, // byte count should be ignored }, consumeOpts: &consumeOpts{ ThresholdMessages: 5, diff --git a/jetstream/pull.go b/jetstream/pull.go index fb7c0527b..50d1fbb43 100644 --- a/jetstream/pull.go +++ b/jetstream/pull.go @@ -420,13 +420,15 @@ func (s *pullSubscription) checkPending() { if (s.pending.msgCount < s.consumeOpts.ThresholdMessages || (s.pending.byteCount < s.consumeOpts.ThresholdBytes && s.consumeOpts.MaxBytes != 0)) && atomic.LoadUint32(&s.fetchInProgress) == 0 { - var batchSize int + + var batchSize, maxBytes int if s.consumeOpts.MaxBytes == 0 { // if using messages, calculate appropriate batch size batchSize = s.consumeOpts.MaxMessages - s.pending.msgCount } else { // if using bytes, use the max value batchSize = s.consumeOpts.MaxMessages + maxBytes = s.consumeOpts.MaxBytes - s.pending.byteCount } if s.consumeOpts.StopAfter > 0 { batchSize = min(batchSize, s.consumeOpts.StopAfter-s.delivered-s.pending.msgCount) @@ -435,7 +437,7 @@ func (s *pullSubscription) checkPending() { s.fetchNext <- &pullRequest{ Expires: s.consumeOpts.Expires, Batch: batchSize, - MaxBytes: s.consumeOpts.MaxBytes - s.pending.byteCount, + MaxBytes: maxBytes, Heartbeat: s.consumeOpts.Heartbeat, }