diff --git a/internal/write/service.go b/internal/write/service.go index 2d6985fa..9e54bd33 100644 --- a/internal/write/service.go +++ b/internal/write/service.go @@ -124,6 +124,17 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error { log.Debug("Write proc: taking batch from retry queue") if !retrying { b := w.retryQueue.first() + + // Discard batches at beginning of retryQueue that have already expired + if time.Now().After(b.Expires) { + log.Warn("Write proc: oldest batch in retry queue expired, discarding") + if !b.Evicted { + w.retryQueue.pop() + } + + continue + } + // Can we write? In case of retryable error we must wait a bit if w.lastWriteAttempt.IsZero() || time.Now().After(w.lastWriteAttempt.Add(time.Millisecond*time.Duration(b.RetryDelay))) { retrying = true @@ -147,12 +158,6 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error { } // write batch if batchToWrite != nil { - if time.Now().After(batchToWrite.Expires) { - if !batchToWrite.Evicted { - w.retryQueue.pop() - } - return fmt.Errorf("write failed (attempts %d): max retry time exceeded", batchToWrite.RetryAttempts) - } perror := w.WriteBatch(ctx, batchToWrite) if perror != nil { if w.writeOptions.MaxRetries() != 0 && (perror.StatusCode == 0 || perror.StatusCode >= http.StatusTooManyRequests) { diff --git a/internal/write/service_test.go b/internal/write/service_test.go index a0525712..ed4c1f73 100644 --- a/internal/write/service_test.go +++ b/internal/write/service_test.go @@ -9,6 +9,7 @@ import ( "errors" "fmt" ilog "log" + "runtime" "strings" "sync" "testing" @@ -313,14 +314,23 @@ func TestMaxRetryTime(t *testing.T) { // Wait for batch expiration <-time.After(5 * time.Millisecond) - b := NewBatch("2\n", opts.RetryInterval(), opts.MaxRetryTime()) - // First batch will be tried to write again and it will be checked agains maxRetryTime. New batch will added to retry queue + + exp := opts.MaxRetryTime() + // sleep takes at least more than 10ms (sometimes 15ms) on Windows https://github.com/golang/go/issues/44343 + if runtime.GOOS == "windows" { + exp = 20 + } + // create new batch for sending + b := NewBatch("2\n", opts.RetryInterval(), exp) + // First batch will be checked against maxRetryTime and it will expire. New batch will fail and it will added to retry queue err = srv.HandleWrite(ctx, b) require.NotNil(t, err) - // Error about batch expiration - assert.Equal(t, "write failed (attempts 1): max retry time exceeded", err.Error()) + // 1st Batch expires and writing 2nd trows error + assert.Equal(t, "write failed (attempts 1): Unexpected status code 429", err.Error()) assert.Equal(t, 1, srv.retryQueue.list.Len()) + //wait minimum retry time + <-time.After(time.Millisecond) // Clear error and let write pass hs.SetReplyError(nil) // A batch from retry queue will be sent first