Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: write retry queue stops retrying #332

Merged
merged 1 commit into from
Jun 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions internal/write/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,17 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
log.Debug("Write proc: taking batch from retry queue")
if !retrying {
b := w.retryQueue.first()

// Discard batches at beginning of retryQueue that have already expired
if time.Now().After(b.Expires) {
log.Warn("Write proc: oldest batch in retry queue expired, discarding")
if !b.Evicted {
w.retryQueue.pop()
}

continue
}

// Can we write? In case of retryable error we must wait a bit
if w.lastWriteAttempt.IsZero() || time.Now().After(w.lastWriteAttempt.Add(time.Millisecond*time.Duration(b.RetryDelay))) {
retrying = true
Expand All @@ -147,12 +158,6 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
}
// write batch
if batchToWrite != nil {
if time.Now().After(batchToWrite.Expires) {
if !batchToWrite.Evicted {
w.retryQueue.pop()
}
return fmt.Errorf("write failed (attempts %d): max retry time exceeded", batchToWrite.RetryAttempts)
}
perror := w.WriteBatch(ctx, batchToWrite)
if perror != nil {
if w.writeOptions.MaxRetries() != 0 && (perror.StatusCode == 0 || perror.StatusCode >= http.StatusTooManyRequests) {
Expand Down
18 changes: 14 additions & 4 deletions internal/write/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"fmt"
ilog "log"
"runtime"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -313,14 +314,23 @@ func TestMaxRetryTime(t *testing.T) {

// Wait for batch expiration
<-time.After(5 * time.Millisecond)
b := NewBatch("2\n", opts.RetryInterval(), opts.MaxRetryTime())
// First batch will be tried to write again and it will be checked agains maxRetryTime. New batch will added to retry queue

exp := opts.MaxRetryTime()
// sleep takes at least more than 10ms (sometimes 15ms) on Windows https://github.com/golang/go/issues/44343
if runtime.GOOS == "windows" {
exp = 20
}
// create new batch for sending
b := NewBatch("2\n", opts.RetryInterval(), exp)
// First batch will be checked against maxRetryTime and it will expire. New batch will fail and it will added to retry queue
err = srv.HandleWrite(ctx, b)
require.NotNil(t, err)
// Error about batch expiration
assert.Equal(t, "write failed (attempts 1): max retry time exceeded", err.Error())
// 1st Batch expires and writing 2nd trows error
assert.Equal(t, "write failed (attempts 1): Unexpected status code 429", err.Error())
assert.Equal(t, 1, srv.retryQueue.list.Len())

//wait minimum retry time
<-time.After(time.Millisecond)
// Clear error and let write pass
hs.SetReplyError(nil)
// A batch from retry queue will be sent first
Expand Down