diff --git a/api/write.go b/api/write.go index 419af937..fabb4e5d 100644 --- a/api/write.go +++ b/api/write.go @@ -167,7 +167,7 @@ x: func (w *WriteAPIImpl) flushBuffer() { if len(w.writeBuffer) > 0 { log.Info("sending batch") - batch := iwrite.NewBatch(buffer(w.writeBuffer), w.writeOptions.RetryInterval(), w.writeOptions.MaxRetryTime()) + batch := iwrite.NewBatch(buffer(w.writeBuffer), w.writeOptions.MaxRetryTime()) w.writeCh <- batch w.writeBuffer = w.writeBuffer[:0] } diff --git a/api/writeAPIBlocking.go b/api/writeAPIBlocking.go index e726e3f6..8569c53c 100644 --- a/api/writeAPIBlocking.go +++ b/api/writeAPIBlocking.go @@ -73,7 +73,7 @@ func NewWriteAPIBlocking(org string, bucket string, service http2.Service, write } func (w *writeAPIBlocking) write(ctx context.Context, line string) error { - err := w.service.WriteBatch(ctx, iwrite.NewBatch(line, w.writeOptions.RetryInterval(), w.writeOptions.MaxRetryTime())) + err := w.service.WriteBatch(ctx, iwrite.NewBatch(line, w.writeOptions.MaxRetryTime())) if err != nil { return err } diff --git a/api/write_test.go b/api/write_test.go index 9f6c07af..f55aaa6d 100644 --- a/api/write_test.go +++ b/api/write_test.go @@ -174,7 +174,7 @@ func TestWriteErrorCallback(t *testing.T) { return retryAttempts < 2 }) points := test.GenPoints(10) - // first two batches will be discarded by callback after 3 write attempts for each + // first batch will be discarded by callback after 3 write attempts, second batch should survive with only one failed attempt for i, j := 0, 0; i < 6; i++ { writeAPI.WritePoint(points[i]) writeAPI.waitForFlushing() @@ -194,7 +194,7 @@ func TestWriteErrorCallback(t *testing.T) { writeAPI.WritePoint(points[i]) } writeAPI.waitForFlushing() - assert.Len(t, service.Lines(), 8) + assert.Len(t, service.Lines(), 9) writeAPI.Close() } diff --git a/internal/write/queue_test.go b/internal/write/queue_test.go index 01c0c9e8..33f7b8cc 100644 --- a/internal/write/queue_test.go +++ b/internal/write/queue_test.go @@ -15,7 +15,7 @@ func TestQueue(t *testing.T) { assert.True(t, que.isEmpty()) assert.Nil(t, que.first()) assert.Nil(t, que.pop()) - b := &Batch{Batch: "batch", RetryDelay: 3, RetryAttempts: 3} + b := &Batch{Batch: "batch", RetryAttempts: 3} que.push(b) assert.False(t, que.isEmpty()) b2 := que.pop() diff --git a/internal/write/service.go b/internal/write/service.go index 9e54bd33..3cc4e958 100644 --- a/internal/write/service.go +++ b/internal/write/service.go @@ -30,8 +30,6 @@ import ( type Batch struct { // lines to send Batch string - // current retry delay - RetryDelay uint // retry attempts so far RetryAttempts uint // true if it was removed from queue @@ -41,11 +39,10 @@ type Batch struct { } // NewBatch creates new batch -func NewBatch(data string, retryDelay uint, expireDelayMs uint) *Batch { +func NewBatch(data string, expireDelayMs uint) *Batch { return &Batch{ - Batch: data, - RetryDelay: retryDelay, - Expires: time.Now().Add(time.Duration(expireDelayMs) * time.Millisecond), + Batch: data, + Expires: time.Now().Add(time.Duration(expireDelayMs) * time.Millisecond), } } @@ -65,6 +62,8 @@ type Service struct { writeOptions *write.Options retryExponentialBase uint errorCb BatchErrorCallback + retryDelay uint + retryAttempts uint } // NewService creates new write service @@ -90,6 +89,8 @@ func NewService(org string, bucket string, httpService http2.Service, options *w writeOptions: options, retryQueue: newQueue(int(retryBufferLimit)), retryExponentialBase: 2, + retryDelay: 0, + retryAttempts: 0, } } @@ -136,7 +137,7 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error { } // Can we write? In case of retryable error we must wait a bit - if w.lastWriteAttempt.IsZero() || time.Now().After(w.lastWriteAttempt.Add(time.Millisecond*time.Duration(b.RetryDelay))) { + if w.lastWriteAttempt.IsZero() || time.Now().After(w.lastWriteAttempt.Add(time.Millisecond*time.Duration(w.retryDelay))) { retrying = true } else { log.Warn("Write proc: cannot write yet, storing batch to queue") @@ -161,11 +162,11 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error { perror := w.WriteBatch(ctx, batchToWrite) if perror != nil { if w.writeOptions.MaxRetries() != 0 && (perror.StatusCode == 0 || perror.StatusCode >= http.StatusTooManyRequests) { - log.Errorf("Write error: %s\nBatch kept for retrying\n", perror.Error()) + log.Errorf("Write error: %s, batch kept for retrying\n", perror.Error()) if perror.RetryAfter > 0 { - batchToWrite.RetryDelay = perror.RetryAfter * 1000 + w.retryDelay = perror.RetryAfter * 1000 } else { - batchToWrite.RetryDelay = w.computeRetryDelay(batchToWrite.RetryAttempts) + w.retryDelay = w.computeRetryDelay(w.retryAttempts) } if w.errorCb != nil && !w.errorCb(batchToWrite, *perror) { log.Warn("Callback rejected batch, discarding") @@ -186,12 +187,16 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error { } } batchToWrite.RetryAttempts++ - log.Debugf("Write proc: next wait for write is %dms\n", batchToWrite.RetryDelay) + w.retryAttempts++ + log.Debugf("Write proc: next wait for write is %dms\n", w.retryDelay) } else { log.Errorf("Write error: %s\n", perror.Error()) } return fmt.Errorf("write failed (attempts %d): %w", batchToWrite.RetryAttempts, perror) } + + w.retryDelay = w.writeOptions.RetryInterval() + w.retryDelay = 0 if retrying && !batchToWrite.Evicted { w.retryQueue.pop() } @@ -351,10 +356,3 @@ func precisionToString(precision time.Duration) string { } return prec } - -func min(a, b uint) uint { - if a > b { - return b - } - return a -} diff --git a/internal/write/service_test.go b/internal/write/service_test.go index ed4c1f73..8d98330a 100644 --- a/internal/write/service_test.go +++ b/internal/write/service_test.go @@ -84,44 +84,44 @@ func TestRetryStrategy(t *testing.T) { StatusCode: 429, }) // This batch will fail and it be added to retry queue - b1 := NewBatch("1\n", opts.RetryInterval(), opts.MaxRetryTime()) + b1 := NewBatch("1\n", opts.MaxRetryTime()) err := srv.HandleWrite(ctx, b1) assert.NotNil(t, err) - assert.EqualValues(t, 1, b1.RetryDelay) + assert.EqualValues(t, 1, srv.retryDelay) assert.Equal(t, 1, srv.retryQueue.list.Len()) //wait retry delay + little more - <-time.After(time.Millisecond*time.Duration(b1.RetryDelay) + time.Microsecond*5) + <-time.After(time.Millisecond*time.Duration(srv.retryDelay) + time.Microsecond*5) // First batch will be tried to write again and this one will added to retry queue - b2 := NewBatch("2\n", opts.RetryInterval(), opts.MaxRetryTime()) + b2 := NewBatch("2\n", opts.MaxRetryTime()) err = srv.HandleWrite(ctx, b2) assert.NotNil(t, err) - assertBetween(t, b1.RetryDelay, 2, 4) + assertBetween(t, srv.retryDelay, 2, 4) assert.Equal(t, 2, srv.retryQueue.list.Len()) //wait retry delay + little more - <-time.After(time.Millisecond*time.Duration(b1.RetryDelay) + time.Microsecond*5) + <-time.After(time.Millisecond*time.Duration(srv.retryDelay) + time.Microsecond*5) // First batch will be tried to write again and this one will added to retry queue - b3 := NewBatch("3\n", opts.RetryInterval(), opts.MaxRetryTime()) + b3 := NewBatch("3\n", opts.MaxRetryTime()) err = srv.HandleWrite(ctx, b3) assert.NotNil(t, err) - assertBetween(t, b1.RetryDelay, 4, 8) + assertBetween(t, srv.retryDelay, 4, 8) assert.Equal(t, 3, srv.retryQueue.list.Len()) //wait retry delay + little more - <-time.After(time.Millisecond*time.Duration(b1.RetryDelay) + time.Microsecond*5) + <-time.After(time.Millisecond*time.Duration(srv.retryDelay) + time.Microsecond*5) // First batch will be tried to write again and this one will added to retry queue - b4 := NewBatch("4\n", opts.RetryInterval(), opts.MaxRetryTime()) + b4 := NewBatch("4\n", opts.MaxRetryTime()) err = srv.HandleWrite(ctx, b4) assert.NotNil(t, err) - assertBetween(t, b1.RetryDelay, 8, 16) + assertBetween(t, srv.retryDelay, 8, 16) assert.Equal(t, 4, srv.retryQueue.list.Len()) - <-time.After(time.Millisecond*time.Duration(b1.RetryDelay) + time.Microsecond*5) + <-time.After(time.Millisecond*time.Duration(srv.retryDelay) + time.Microsecond*5) // Clear error and let write pass hs.SetReplyError(nil) // Batches from retry queue will be sent first - err = srv.HandleWrite(ctx, NewBatch("5\n", opts.RetryInterval(), opts.MaxRetryTime())) + err = srv.HandleWrite(ctx, NewBatch("5\n", opts.MaxRetryTime())) assert.Nil(t, err) assert.Equal(t, 0, srv.retryQueue.list.Len()) require.Len(t, hs.Lines(), 5) @@ -145,56 +145,56 @@ func TestBufferOverwrite(t *testing.T) { StatusCode: 429, }) // This batch will fail and it be added to retry queue - b1 := NewBatch("1\n", opts.RetryInterval(), opts.MaxRetryTime()) + b1 := NewBatch("1\n", opts.MaxRetryTime()) err := srv.HandleWrite(ctx, b1) assert.NotNil(t, err) - assert.Equal(t, uint(1), b1.RetryDelay) + assert.Equal(t, uint(1), srv.retryDelay) assert.Equal(t, 1, srv.retryQueue.list.Len()) - <-time.After(time.Millisecond * time.Duration(b1.RetryDelay)) - b2 := NewBatch("2\n", opts.RetryInterval(), opts.MaxRetryTime()) + <-time.After(time.Millisecond * time.Duration(srv.retryDelay)) + b2 := NewBatch("2\n", opts.MaxRetryTime()) // First batch will be tried to write again and this one will added to retry queue err = srv.HandleWrite(ctx, b2) assert.NotNil(t, err) - assertBetween(t, b1.RetryDelay, 2, 4) + assertBetween(t, srv.retryDelay, 2, 4) assert.Equal(t, 2, srv.retryQueue.list.Len()) - <-time.After(time.Millisecond * time.Duration(b1.RetryDelay)) - b3 := NewBatch("3\n", opts.RetryInterval(), opts.MaxRetryTime()) + <-time.After(time.Millisecond * time.Duration(srv.retryDelay)) + b3 := NewBatch("3\n", opts.MaxRetryTime()) // First batch will be tried to write again and this one will added to retry queue err = srv.HandleWrite(ctx, b3) assert.NotNil(t, err) - assertBetween(t, b1.RetryDelay, 4, 8) + assertBetween(t, srv.retryDelay, 4, 8) assert.Equal(t, 3, srv.retryQueue.list.Len()) // Write early and overwrite - b4 := NewBatch("4\n", opts.RetryInterval(), opts.MaxRetryTime()) + b4 := NewBatch("4\n", opts.MaxRetryTime()) // No write will occur, because retry delay has not passed yet // However new bach will be added to retry queue. Retry queue has limit 3, // so first batch will be discarded + priorRetryDelay := srv.retryDelay err = srv.HandleWrite(ctx, b4) assert.NoError(t, err) - assert.Equal(t, uint(1), b2.RetryDelay) + assert.Equal(t, priorRetryDelay, srv.retryDelay) // Accumulated retry delay should be retained despite batch discard assert.Equal(t, 3, srv.retryQueue.list.Len()) // Overwrite - // TODO check time.Duration(b2.RetryDelay)) - <-time.After(time.Millisecond * time.Duration(b1.RetryDelay) / 2) - b5 := NewBatch("5\n", opts.RetryInterval(), opts.MaxRetryTime()) + // TODO check time.Duration(srv.RetryDelay)) + <-time.After(time.Millisecond * time.Duration(srv.retryDelay) / 2) + b5 := NewBatch("5\n", opts.MaxRetryTime()) // Second batch will be tried to write again // However, write will fail and as new batch is added to retry queue // the second batch will be discarded err = srv.HandleWrite(ctx, b5) - assert.Error(t, err) - assert.Equal(t, uint(1), b2.RetryDelay) - //TODO assertBetween(t, b2.retryDelay, 2, 4) + assert.Nil(t, err) // No error should be returned, because no write was attempted (still waiting for retryDelay to expire) + //TODO assertBetween(t, srv.RetryDelay, 2, 4) assert.Equal(t, 3, srv.retryQueue.list.Len()) - <-time.After(time.Millisecond * time.Duration(b1.RetryDelay)) + <-time.After(time.Millisecond * time.Duration(srv.retryDelay)) // Clear error and let write pass hs.SetReplyError(nil) // Batches from retry queue will be sent first - err = srv.HandleWrite(ctx, NewBatch("6\n", opts.RetryInterval(), opts.MaxRetryTime())) + err = srv.HandleWrite(ctx, NewBatch("6\n", opts.MaxRetryTime())) assert.Nil(t, err) assert.Equal(t, 0, srv.retryQueue.list.Len()) require.Len(t, hs.Lines(), 4) @@ -216,39 +216,46 @@ func TestMaxRetryInterval(t *testing.T) { StatusCode: 503, }) // This batch will fail and it be added to retry queue - b1 := NewBatch("1\n", opts.RetryInterval(), opts.MaxRetryTime()) + b1 := NewBatch("1\n", opts.MaxRetryTime()) err := srv.HandleWrite(ctx, b1) assert.NotNil(t, err) - assert.Equal(t, uint(1), b1.RetryDelay) + assert.Equal(t, uint(1), srv.retryDelay) assert.Equal(t, 1, srv.retryQueue.list.Len()) - <-time.After(time.Millisecond * time.Duration(b1.RetryDelay)) - b2 := NewBatch("2\n", opts.RetryInterval(), opts.MaxRetryTime()) + <-time.After(time.Millisecond * time.Duration(srv.retryDelay)) + b2 := NewBatch("2\n", opts.MaxRetryTime()) // First batch will be tried to write again and this one will added to retry queue err = srv.HandleWrite(ctx, b2) assert.NotNil(t, err) - assertBetween(t, b1.RetryDelay, 2, 4) + assertBetween(t, srv.retryDelay, 2, 4) assert.Equal(t, 2, srv.retryQueue.list.Len()) - <-time.After(time.Millisecond * time.Duration(b1.RetryDelay)) - b3 := NewBatch("3\n", opts.RetryInterval(), opts.MaxRetryTime()) + <-time.After(time.Millisecond * time.Duration(srv.retryDelay)) + b3 := NewBatch("3\n", opts.MaxRetryTime()) // First batch will be tried to write again and this one will added to retry queue err = srv.HandleWrite(ctx, b3) assert.NotNil(t, err) // New computed delay of first batch should be 4-8, is limited to 4 - assert.EqualValues(t, 4, b1.RetryDelay) + assert.EqualValues(t, 4, srv.retryDelay) assert.Equal(t, 3, srv.retryQueue.list.Len()) - <-time.After(time.Millisecond * time.Duration(b1.RetryDelay)) - b4 := NewBatch("4\n", opts.RetryInterval(), opts.MaxRetryTime()) + <-time.After(time.Millisecond * time.Duration(srv.retryDelay)) + b4 := NewBatch("4\n", opts.MaxRetryTime()) // First batch will be tried to write again and this one will added to retry queue err = srv.HandleWrite(ctx, b4) assert.NotNil(t, err) // New computed delay of first batch should be 8-116, is limited to 4 - assert.EqualValues(t, 4, b1.RetryDelay) + assert.EqualValues(t, 4, srv.retryDelay) assert.Equal(t, 4, srv.retryQueue.list.Len()) } +func min(a, b uint) uint { + if a > b { + return b + } + return a +} + func TestMaxRetries(t *testing.T) { log.Log.SetLogLevel(log.DebugLevel) hs := test.NewTestService(t, "http://localhost:8086") @@ -260,20 +267,20 @@ func TestMaxRetries(t *testing.T) { StatusCode: 429, }) // This batch will fail and it be added to retry queue - b1 := NewBatch("1\n", opts.RetryInterval(), opts.MaxRetryTime()) + b1 := NewBatch("1\n", opts.MaxRetryTime()) err := srv.HandleWrite(ctx, b1) assert.NotNil(t, err) - assert.EqualValues(t, 1, b1.RetryDelay) + assert.EqualValues(t, 1, srv.retryDelay) assert.Equal(t, 1, srv.retryQueue.list.Len()) // Write so many batches as it is maxRetries (5) // First batch will be written and it will reach max retry limit for i, e := uint(1), uint(2); i <= opts.MaxRetries(); i++ { //wait retry delay + little more - <-time.After(time.Millisecond*time.Duration(b1.RetryDelay) + time.Microsecond*5) - b := NewBatch(fmt.Sprintf("%d\n", i+1), opts.RetryInterval(), opts.MaxRetryTime()) + <-time.After(time.Millisecond*time.Duration(srv.retryDelay) + time.Microsecond*5) + b := NewBatch(fmt.Sprintf("%d\n", i+1), opts.MaxRetryTime()) err = srv.HandleWrite(ctx, b) assert.NotNil(t, err) - assertBetween(t, b1.RetryDelay, e, e*2) + assertBetween(t, srv.retryDelay, e, e*2) exp := min(i+1, opts.MaxRetries()) assert.EqualValues(t, exp, srv.retryQueue.list.Len()) e *= 2 @@ -281,11 +288,11 @@ func TestMaxRetries(t *testing.T) { //Test if was removed from retry queue assert.True(t, b1.Evicted) - <-time.After(time.Millisecond*time.Duration(b1.RetryDelay) + time.Microsecond*5) + <-time.After(time.Millisecond*time.Duration(srv.retryDelay) + time.Microsecond*5) // Clear error and let write pass hs.SetReplyError(nil) // Batches from retry queue will be sent first - err = srv.HandleWrite(ctx, NewBatch(fmt.Sprintf("%d\n", opts.MaxRetries()+2), opts.RetryInterval(), opts.MaxRetryTime())) + err = srv.HandleWrite(ctx, NewBatch(fmt.Sprintf("%d\n", opts.MaxRetries()+2), opts.MaxRetryTime())) assert.Nil(t, err) assert.Equal(t, 0, srv.retryQueue.list.Len()) require.Len(t, hs.Lines(), int(opts.MaxRetries()+1)) @@ -306,10 +313,10 @@ func TestMaxRetryTime(t *testing.T) { StatusCode: 429, }) // This batch will fail and it be added to retry queue and it will expire 5ms after - b1 := NewBatch("1\n", opts.RetryInterval(), opts.MaxRetryTime()) + b1 := NewBatch("1\n", opts.MaxRetryTime()) err := srv.HandleWrite(ctx, b1) assert.NotNil(t, err) - assert.EqualValues(t, 1, b1.RetryDelay) + assert.EqualValues(t, 1, srv.retryDelay) assert.Equal(t, 1, srv.retryQueue.list.Len()) // Wait for batch expiration @@ -321,7 +328,7 @@ func TestMaxRetryTime(t *testing.T) { exp = 20 } // create new batch for sending - b := NewBatch("2\n", opts.RetryInterval(), exp) + b := NewBatch("2\n", exp) // First batch will be checked against maxRetryTime and it will expire. New batch will fail and it will added to retry queue err = srv.HandleWrite(ctx, b) require.NotNil(t, err) @@ -329,12 +336,12 @@ func TestMaxRetryTime(t *testing.T) { assert.Equal(t, "write failed (attempts 1): Unexpected status code 429", err.Error()) assert.Equal(t, 1, srv.retryQueue.list.Len()) - //wait minimum retry time - <-time.After(time.Millisecond) + //wait until remaining accumulated retryDelay has passed, because there hasn't been a successful write yet + <-time.After(time.Until(srv.lastWriteAttempt.Add(time.Millisecond * time.Duration(srv.retryDelay)))) // Clear error and let write pass hs.SetReplyError(nil) // A batch from retry queue will be sent first - err = srv.HandleWrite(ctx, NewBatch("3\n", opts.RetryInterval(), opts.MaxRetryTime())) + err = srv.HandleWrite(ctx, NewBatch("3\n", opts.MaxRetryTime())) assert.Nil(t, err) assert.Equal(t, 0, srv.retryQueue.list.Len()) require.Len(t, hs.Lines(), 2) @@ -356,35 +363,35 @@ func TestRetryOnConnectionError(t *testing.T) { }) // This batch will fail and it be added to retry queue - b1 := NewBatch("1\n", opts.RetryInterval(), opts.MaxRetryTime()) + b1 := NewBatch("1\n", opts.MaxRetryTime()) err := srv.HandleWrite(ctx, b1) assert.NotNil(t, err) - assert.EqualValues(t, 1, b1.RetryDelay) + assert.EqualValues(t, 1, srv.retryDelay) assert.Equal(t, 1, srv.retryQueue.list.Len()) - <-time.After(time.Millisecond * time.Duration(b1.RetryDelay)) + <-time.After(time.Millisecond * time.Duration(srv.retryDelay)) - b2 := NewBatch("2\n", opts.RetryInterval(), opts.MaxRetryTime()) + b2 := NewBatch("2\n", opts.MaxRetryTime()) // First batch will be tried to write again and this one will added to retry queue err = srv.HandleWrite(ctx, b2) assert.NotNil(t, err) - assertBetween(t, b1.RetryDelay, 2, 4) + assertBetween(t, srv.retryDelay, 2, 4) assert.Equal(t, 2, srv.retryQueue.list.Len()) - <-time.After(time.Millisecond * time.Duration(b1.RetryDelay)) + <-time.After(time.Millisecond * time.Duration(srv.retryDelay)) - b3 := NewBatch("3\n", opts.RetryInterval(), opts.MaxRetryTime()) + b3 := NewBatch("3\n", opts.MaxRetryTime()) // First batch will be tried to write again and this one will added to retry queue err = srv.HandleWrite(ctx, b3) assert.NotNil(t, err) - assertBetween(t, b1.RetryDelay, 4, 8) + assertBetween(t, srv.retryDelay, 4, 8) assert.Equal(t, 3, srv.retryQueue.list.Len()) - <-time.After(time.Millisecond * time.Duration(b1.RetryDelay)) + <-time.After(time.Millisecond * time.Duration(srv.retryDelay)) // Clear error and let write pass hs.SetReplyError(nil) // Batches from retry queue will be sent first - err = srv.HandleWrite(ctx, NewBatch("4\n", opts.RetryInterval(), opts.MaxRetryTime())) + err = srv.HandleWrite(ctx, NewBatch("4\n", opts.MaxRetryTime())) assert.Nil(t, err) assert.Equal(t, 0, srv.retryQueue.list.Len()) require.Len(t, hs.Lines(), 4) @@ -406,7 +413,7 @@ func TestNoRetryIfMaxRetriesIsZero(t *testing.T) { Err: errors.New("connection refused"), }) - b1 := NewBatch("1\n", opts.RetryInterval(), opts.MaxRetryTime()) + b1 := NewBatch("1\n", opts.MaxRetryTime()) err := srv.HandleWrite(ctx, b1) assert.NotNil(t, err) assert.Equal(t, 0, srv.retryQueue.list.Len()) @@ -423,7 +430,7 @@ func TestWriteContextCancel(t *testing.T) { wg.Add(1) go func() { <-time.After(10 * time.Millisecond) - err = srv.HandleWrite(ctx, NewBatch(strings.Join(lines, "\n"), opts.RetryInterval(), opts.MaxRetryTime())) + err = srv.HandleWrite(ctx, NewBatch(strings.Join(lines, "\n"), opts.MaxRetryTime())) wg.Done() }() cancel() @@ -472,21 +479,109 @@ func TestErrorCallback(t *testing.T) { srv.SetBatchErrorCallback(func(batch *Batch, error2 http.Error) bool { return batch.RetryAttempts < 2 }) - b1 := NewBatch("1\n", opts.RetryInterval(), opts.MaxRetryTime()) + b1 := NewBatch("1\n", opts.MaxRetryTime()) err := srv.HandleWrite(ctx, b1) assert.NotNil(t, err) assert.Equal(t, 1, srv.retryQueue.list.Len()) - <-time.After(time.Millisecond * time.Duration(b1.RetryDelay)) - b := NewBatch("2\n", opts.RetryInterval(), opts.MaxRetryTime()) + <-time.After(time.Millisecond * time.Duration(srv.retryDelay)) + b := NewBatch("2\n", opts.MaxRetryTime()) err = srv.HandleWrite(ctx, b) assert.NotNil(t, err) assert.Equal(t, 2, srv.retryQueue.list.Len()) - <-time.After(time.Millisecond * time.Duration(b1.RetryDelay)) - b = NewBatch("3\n", opts.RetryInterval(), opts.MaxRetryTime()) + <-time.After(time.Millisecond * time.Duration(srv.retryDelay)) + b = NewBatch("3\n", opts.MaxRetryTime()) err = srv.HandleWrite(ctx, b) assert.NotNil(t, err) assert.Equal(t, 2, srv.retryQueue.list.Len()) } + +func minInt(a, b int) int { + if a > b { + return b + } + return a +} + +func TestRetryIntervalAccumulation(t *testing.T) { + // log.Log.SetLogLevel(log.DebugLevel) + log.Log.SetLogLevel(log.InfoLevel) + + // Setup test service with scenario's configuration + hs := test.NewTestService(t, "http://localhost:8086") + opts := write.DefaultOptions(). + SetRetryInterval(20). + SetMaxRetryInterval(300). + SetMaxRetryTime(100) + ctx := context.Background() + srv := NewService("my-org", "my-bucket", hs, opts) + writeInterval := time.Duration(opts.RetryInterval()) * time.Millisecond + + // Set permanent reply error to force writes fail and retry + hs.SetReplyError(&http.Error{StatusCode: 429}) + + lastInterval := uint(0) + assert.Equal(t, uint(0), srv.retryDelay) // Should initialize to zero + i := 1 + for ; i <= 45; i++ { + b := NewBatch(fmt.Sprintf("%d\n", i), opts.MaxRetryTime()) + err := srv.HandleWrite(ctx, b) + assert.Equal(t, minInt(i, 5), srv.retryQueue.list.Len()) + assert.GreaterOrEqual(t, srv.retryDelay, lastInterval) // Should not decrease while writes failing + assert.LessOrEqual(t, srv.retryDelay, opts.MaxRetryInterval()) // Should not grow larger than max + if err != nil { + if lastInterval == opts.MaxRetryInterval() { + // Write attempt failed, and interval was already at max, so should stay there + assert.Equal(t, srv.retryDelay, opts.MaxRetryInterval()) + log.Log.Infof("Retry interval capped at %d ms", srv.retryDelay) + } else { + // A write attempt was made and failed, so retry interval should have increased + assert.Greater(t, srv.retryDelay, lastInterval) + log.Log.Infof("Retry interval increased to %d ms", srv.retryDelay) + } + } else { + // Write attempt was not made, so retry interval should remain the same + assert.Equal(t, srv.retryDelay, lastInterval) + log.Log.Infof("Retry interval still at %d ms", srv.retryDelay) + } + lastInterval = srv.retryDelay + + <-time.After(writeInterval) + } + + // Clear error and let write pass + hs.SetReplyError(nil) + + // Wait until write queue is ready to retry; in meantime, keep writing and confirming queue state + retryTimeout := srv.lastWriteAttempt.Add(time.Millisecond * time.Duration(srv.retryDelay)) + log.Log.Infof("Continuing to write for %d ms until flushing write attempt", time.Until(retryTimeout).Milliseconds()) + for ; time.Until(retryTimeout) >= 0; i++ { + b := NewBatch(fmt.Sprintf("%d\n", i), opts.MaxRetryTime()) + err := srv.HandleWrite(ctx, b) + assert.Nil(t, err) // There should be no write attempt + assert.Equal(t, minInt(i, 5), srv.retryQueue.list.Len()) + assert.Equal(t, srv.retryDelay, opts.MaxRetryInterval()) // Should remain the same + log.Log.Infof("Retry interval still at %d ms", srv.retryDelay) + <-time.After(writeInterval) + } + + // Retry interval should now have expired, so this write attempt should succeed and cause retry queue to flush + b := NewBatch(fmt.Sprintf("%d\n", i), opts.MaxRetryTime()) + err := srv.HandleWrite(ctx, b) + assert.Nil(t, err) + assert.Equal(t, 0, srv.retryQueue.list.Len()) + assert.Equal(t, srv.retryDelay, uint(0)) // Should reset to zero + + // Ensure proper batches got written to server + require.Len(t, hs.Lines(), 5) + assert.Equal(t, fmt.Sprintf("%d", i-4), hs.Lines()[0]) + assert.Equal(t, fmt.Sprintf("%d", i-3), hs.Lines()[1]) + assert.Equal(t, fmt.Sprintf("%d", i-2), hs.Lines()[2]) + assert.Equal(t, fmt.Sprintf("%d", i-1), hs.Lines()[3]) + assert.Equal(t, fmt.Sprintf("%d", i-0), hs.Lines()[4]) + + // Debug line to capture output of successful test + // assert.True(t, false) +}