diff --git a/internal/write/service.go b/internal/write/service.go index 4f884e6b..3cc4e958 100644 --- a/internal/write/service.go +++ b/internal/write/service.go @@ -62,8 +62,8 @@ type Service struct { writeOptions *write.Options retryExponentialBase uint errorCb BatchErrorCallback - RetryDelay uint - RetryAttempts uint + retryDelay uint + retryAttempts uint } // NewService creates new write service @@ -89,8 +89,8 @@ func NewService(org string, bucket string, httpService http2.Service, options *w writeOptions: options, retryQueue: newQueue(int(retryBufferLimit)), retryExponentialBase: 2, - RetryDelay: 0, - RetryAttempts: 0, + retryDelay: 0, + retryAttempts: 0, } } @@ -137,7 +137,7 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error { } // Can we write? In case of retryable error we must wait a bit - if w.lastWriteAttempt.IsZero() || time.Now().After(w.lastWriteAttempt.Add(time.Millisecond*time.Duration(w.RetryDelay))) { + if w.lastWriteAttempt.IsZero() || time.Now().After(w.lastWriteAttempt.Add(time.Millisecond*time.Duration(w.retryDelay))) { retrying = true } else { log.Warn("Write proc: cannot write yet, storing batch to queue") @@ -164,9 +164,9 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error { if w.writeOptions.MaxRetries() != 0 && (perror.StatusCode == 0 || perror.StatusCode >= http.StatusTooManyRequests) { log.Errorf("Write error: %s, batch kept for retrying\n", perror.Error()) if perror.RetryAfter > 0 { - w.RetryDelay = perror.RetryAfter * 1000 + w.retryDelay = perror.RetryAfter * 1000 } else { - w.RetryDelay = w.computeRetryDelay(w.RetryAttempts) + w.retryDelay = w.computeRetryDelay(w.retryAttempts) } if w.errorCb != nil && !w.errorCb(batchToWrite, *perror) { log.Warn("Callback rejected batch, discarding") @@ -187,16 +187,16 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error { } } batchToWrite.RetryAttempts++ - w.RetryAttempts++ - log.Debugf("Write proc: next wait for write is %dms\n", w.RetryDelay) + w.retryAttempts++ + log.Debugf("Write proc: next wait for write is %dms\n", w.retryDelay) } else { log.Errorf("Write error: %s\n", perror.Error()) } return fmt.Errorf("write failed (attempts %d): %w", batchToWrite.RetryAttempts, perror) } - w.RetryDelay = w.writeOptions.RetryInterval() - w.RetryDelay = 0 + w.retryDelay = w.writeOptions.RetryInterval() + w.retryDelay = 0 if retrying && !batchToWrite.Evicted { w.retryQueue.pop() } diff --git a/internal/write/service_test.go b/internal/write/service_test.go index 2bc5ce23..8d98330a 100644 --- a/internal/write/service_test.go +++ b/internal/write/service_test.go @@ -87,37 +87,37 @@ func TestRetryStrategy(t *testing.T) { b1 := NewBatch("1\n", opts.MaxRetryTime()) err := srv.HandleWrite(ctx, b1) assert.NotNil(t, err) - assert.EqualValues(t, 1, srv.RetryDelay) + assert.EqualValues(t, 1, srv.retryDelay) assert.Equal(t, 1, srv.retryQueue.list.Len()) //wait retry delay + little more - <-time.After(time.Millisecond*time.Duration(srv.RetryDelay) + time.Microsecond*5) + <-time.After(time.Millisecond*time.Duration(srv.retryDelay) + time.Microsecond*5) // First batch will be tried to write again and this one will added to retry queue b2 := NewBatch("2\n", opts.MaxRetryTime()) err = srv.HandleWrite(ctx, b2) assert.NotNil(t, err) - assertBetween(t, srv.RetryDelay, 2, 4) + assertBetween(t, srv.retryDelay, 2, 4) assert.Equal(t, 2, srv.retryQueue.list.Len()) //wait retry delay + little more - <-time.After(time.Millisecond*time.Duration(srv.RetryDelay) + time.Microsecond*5) + <-time.After(time.Millisecond*time.Duration(srv.retryDelay) + time.Microsecond*5) // First batch will be tried to write again and this one will added to retry queue b3 := NewBatch("3\n", opts.MaxRetryTime()) err = srv.HandleWrite(ctx, b3) assert.NotNil(t, err) - assertBetween(t, srv.RetryDelay, 4, 8) + assertBetween(t, srv.retryDelay, 4, 8) assert.Equal(t, 3, srv.retryQueue.list.Len()) //wait retry delay + little more - <-time.After(time.Millisecond*time.Duration(srv.RetryDelay) + time.Microsecond*5) + <-time.After(time.Millisecond*time.Duration(srv.retryDelay) + time.Microsecond*5) // First batch will be tried to write again and this one will added to retry queue b4 := NewBatch("4\n", opts.MaxRetryTime()) err = srv.HandleWrite(ctx, b4) assert.NotNil(t, err) - assertBetween(t, srv.RetryDelay, 8, 16) + assertBetween(t, srv.retryDelay, 8, 16) assert.Equal(t, 4, srv.retryQueue.list.Len()) - <-time.After(time.Millisecond*time.Duration(srv.RetryDelay) + time.Microsecond*5) + <-time.After(time.Millisecond*time.Duration(srv.retryDelay) + time.Microsecond*5) // Clear error and let write pass hs.SetReplyError(nil) // Batches from retry queue will be sent first @@ -148,23 +148,23 @@ func TestBufferOverwrite(t *testing.T) { b1 := NewBatch("1\n", opts.MaxRetryTime()) err := srv.HandleWrite(ctx, b1) assert.NotNil(t, err) - assert.Equal(t, uint(1), srv.RetryDelay) + assert.Equal(t, uint(1), srv.retryDelay) assert.Equal(t, 1, srv.retryQueue.list.Len()) - <-time.After(time.Millisecond * time.Duration(srv.RetryDelay)) + <-time.After(time.Millisecond * time.Duration(srv.retryDelay)) b2 := NewBatch("2\n", opts.MaxRetryTime()) // First batch will be tried to write again and this one will added to retry queue err = srv.HandleWrite(ctx, b2) assert.NotNil(t, err) - assertBetween(t, srv.RetryDelay, 2, 4) + assertBetween(t, srv.retryDelay, 2, 4) assert.Equal(t, 2, srv.retryQueue.list.Len()) - <-time.After(time.Millisecond * time.Duration(srv.RetryDelay)) + <-time.After(time.Millisecond * time.Duration(srv.retryDelay)) b3 := NewBatch("3\n", opts.MaxRetryTime()) // First batch will be tried to write again and this one will added to retry queue err = srv.HandleWrite(ctx, b3) assert.NotNil(t, err) - assertBetween(t, srv.RetryDelay, 4, 8) + assertBetween(t, srv.retryDelay, 4, 8) assert.Equal(t, 3, srv.retryQueue.list.Len()) // Write early and overwrite @@ -172,15 +172,15 @@ func TestBufferOverwrite(t *testing.T) { // No write will occur, because retry delay has not passed yet // However new bach will be added to retry queue. Retry queue has limit 3, // so first batch will be discarded - priorRetryDelay := srv.RetryDelay + priorRetryDelay := srv.retryDelay err = srv.HandleWrite(ctx, b4) assert.NoError(t, err) - assert.Equal(t, priorRetryDelay, srv.RetryDelay) // Accumulated retry delay should be retained despite batch discard + assert.Equal(t, priorRetryDelay, srv.retryDelay) // Accumulated retry delay should be retained despite batch discard assert.Equal(t, 3, srv.retryQueue.list.Len()) // Overwrite // TODO check time.Duration(srv.RetryDelay)) - <-time.After(time.Millisecond * time.Duration(srv.RetryDelay) / 2) + <-time.After(time.Millisecond * time.Duration(srv.retryDelay) / 2) b5 := NewBatch("5\n", opts.MaxRetryTime()) // Second batch will be tried to write again // However, write will fail and as new batch is added to retry queue @@ -190,7 +190,7 @@ func TestBufferOverwrite(t *testing.T) { //TODO assertBetween(t, srv.RetryDelay, 2, 4) assert.Equal(t, 3, srv.retryQueue.list.Len()) - <-time.After(time.Millisecond * time.Duration(srv.RetryDelay)) + <-time.After(time.Millisecond * time.Duration(srv.retryDelay)) // Clear error and let write pass hs.SetReplyError(nil) // Batches from retry queue will be sent first @@ -219,33 +219,33 @@ func TestMaxRetryInterval(t *testing.T) { b1 := NewBatch("1\n", opts.MaxRetryTime()) err := srv.HandleWrite(ctx, b1) assert.NotNil(t, err) - assert.Equal(t, uint(1), srv.RetryDelay) + assert.Equal(t, uint(1), srv.retryDelay) assert.Equal(t, 1, srv.retryQueue.list.Len()) - <-time.After(time.Millisecond * time.Duration(srv.RetryDelay)) + <-time.After(time.Millisecond * time.Duration(srv.retryDelay)) b2 := NewBatch("2\n", opts.MaxRetryTime()) // First batch will be tried to write again and this one will added to retry queue err = srv.HandleWrite(ctx, b2) assert.NotNil(t, err) - assertBetween(t, srv.RetryDelay, 2, 4) + assertBetween(t, srv.retryDelay, 2, 4) assert.Equal(t, 2, srv.retryQueue.list.Len()) - <-time.After(time.Millisecond * time.Duration(srv.RetryDelay)) + <-time.After(time.Millisecond * time.Duration(srv.retryDelay)) b3 := NewBatch("3\n", opts.MaxRetryTime()) // First batch will be tried to write again and this one will added to retry queue err = srv.HandleWrite(ctx, b3) assert.NotNil(t, err) // New computed delay of first batch should be 4-8, is limited to 4 - assert.EqualValues(t, 4, srv.RetryDelay) + assert.EqualValues(t, 4, srv.retryDelay) assert.Equal(t, 3, srv.retryQueue.list.Len()) - <-time.After(time.Millisecond * time.Duration(srv.RetryDelay)) + <-time.After(time.Millisecond * time.Duration(srv.retryDelay)) b4 := NewBatch("4\n", opts.MaxRetryTime()) // First batch will be tried to write again and this one will added to retry queue err = srv.HandleWrite(ctx, b4) assert.NotNil(t, err) // New computed delay of first batch should be 8-116, is limited to 4 - assert.EqualValues(t, 4, srv.RetryDelay) + assert.EqualValues(t, 4, srv.retryDelay) assert.Equal(t, 4, srv.retryQueue.list.Len()) } @@ -270,17 +270,17 @@ func TestMaxRetries(t *testing.T) { b1 := NewBatch("1\n", opts.MaxRetryTime()) err := srv.HandleWrite(ctx, b1) assert.NotNil(t, err) - assert.EqualValues(t, 1, srv.RetryDelay) + assert.EqualValues(t, 1, srv.retryDelay) assert.Equal(t, 1, srv.retryQueue.list.Len()) // Write so many batches as it is maxRetries (5) // First batch will be written and it will reach max retry limit for i, e := uint(1), uint(2); i <= opts.MaxRetries(); i++ { //wait retry delay + little more - <-time.After(time.Millisecond*time.Duration(srv.RetryDelay) + time.Microsecond*5) + <-time.After(time.Millisecond*time.Duration(srv.retryDelay) + time.Microsecond*5) b := NewBatch(fmt.Sprintf("%d\n", i+1), opts.MaxRetryTime()) err = srv.HandleWrite(ctx, b) assert.NotNil(t, err) - assertBetween(t, srv.RetryDelay, e, e*2) + assertBetween(t, srv.retryDelay, e, e*2) exp := min(i+1, opts.MaxRetries()) assert.EqualValues(t, exp, srv.retryQueue.list.Len()) e *= 2 @@ -288,7 +288,7 @@ func TestMaxRetries(t *testing.T) { //Test if was removed from retry queue assert.True(t, b1.Evicted) - <-time.After(time.Millisecond*time.Duration(srv.RetryDelay) + time.Microsecond*5) + <-time.After(time.Millisecond*time.Duration(srv.retryDelay) + time.Microsecond*5) // Clear error and let write pass hs.SetReplyError(nil) // Batches from retry queue will be sent first @@ -316,7 +316,7 @@ func TestMaxRetryTime(t *testing.T) { b1 := NewBatch("1\n", opts.MaxRetryTime()) err := srv.HandleWrite(ctx, b1) assert.NotNil(t, err) - assert.EqualValues(t, 1, srv.RetryDelay) + assert.EqualValues(t, 1, srv.retryDelay) assert.Equal(t, 1, srv.retryQueue.list.Len()) // Wait for batch expiration @@ -337,7 +337,7 @@ func TestMaxRetryTime(t *testing.T) { assert.Equal(t, 1, srv.retryQueue.list.Len()) //wait until remaining accumulated retryDelay has passed, because there hasn't been a successful write yet - <-time.After(time.Until(srv.lastWriteAttempt.Add(time.Millisecond * time.Duration(srv.RetryDelay)))) + <-time.After(time.Until(srv.lastWriteAttempt.Add(time.Millisecond * time.Duration(srv.retryDelay)))) // Clear error and let write pass hs.SetReplyError(nil) // A batch from retry queue will be sent first @@ -366,28 +366,28 @@ func TestRetryOnConnectionError(t *testing.T) { b1 := NewBatch("1\n", opts.MaxRetryTime()) err := srv.HandleWrite(ctx, b1) assert.NotNil(t, err) - assert.EqualValues(t, 1, srv.RetryDelay) + assert.EqualValues(t, 1, srv.retryDelay) assert.Equal(t, 1, srv.retryQueue.list.Len()) - <-time.After(time.Millisecond * time.Duration(srv.RetryDelay)) + <-time.After(time.Millisecond * time.Duration(srv.retryDelay)) b2 := NewBatch("2\n", opts.MaxRetryTime()) // First batch will be tried to write again and this one will added to retry queue err = srv.HandleWrite(ctx, b2) assert.NotNil(t, err) - assertBetween(t, srv.RetryDelay, 2, 4) + assertBetween(t, srv.retryDelay, 2, 4) assert.Equal(t, 2, srv.retryQueue.list.Len()) - <-time.After(time.Millisecond * time.Duration(srv.RetryDelay)) + <-time.After(time.Millisecond * time.Duration(srv.retryDelay)) b3 := NewBatch("3\n", opts.MaxRetryTime()) // First batch will be tried to write again and this one will added to retry queue err = srv.HandleWrite(ctx, b3) assert.NotNil(t, err) - assertBetween(t, srv.RetryDelay, 4, 8) + assertBetween(t, srv.retryDelay, 4, 8) assert.Equal(t, 3, srv.retryQueue.list.Len()) - <-time.After(time.Millisecond * time.Duration(srv.RetryDelay)) + <-time.After(time.Millisecond * time.Duration(srv.retryDelay)) // Clear error and let write pass hs.SetReplyError(nil) // Batches from retry queue will be sent first @@ -484,13 +484,13 @@ func TestErrorCallback(t *testing.T) { assert.NotNil(t, err) assert.Equal(t, 1, srv.retryQueue.list.Len()) - <-time.After(time.Millisecond * time.Duration(srv.RetryDelay)) + <-time.After(time.Millisecond * time.Duration(srv.retryDelay)) b := NewBatch("2\n", opts.MaxRetryTime()) err = srv.HandleWrite(ctx, b) assert.NotNil(t, err) assert.Equal(t, 2, srv.retryQueue.list.Len()) - <-time.After(time.Millisecond * time.Duration(srv.RetryDelay)) + <-time.After(time.Millisecond * time.Duration(srv.retryDelay)) b = NewBatch("3\n", opts.MaxRetryTime()) err = srv.HandleWrite(ctx, b) assert.NotNil(t, err) @@ -523,30 +523,30 @@ func TestRetryIntervalAccumulation(t *testing.T) { hs.SetReplyError(&http.Error{StatusCode: 429}) lastInterval := uint(0) - assert.Equal(t, uint(0), srv.RetryDelay) // Should initialize to zero + assert.Equal(t, uint(0), srv.retryDelay) // Should initialize to zero i := 1 for ; i <= 45; i++ { b := NewBatch(fmt.Sprintf("%d\n", i), opts.MaxRetryTime()) err := srv.HandleWrite(ctx, b) assert.Equal(t, minInt(i, 5), srv.retryQueue.list.Len()) - assert.GreaterOrEqual(t, srv.RetryDelay, lastInterval) // Should not decrease while writes failing - assert.LessOrEqual(t, srv.RetryDelay, opts.MaxRetryInterval()) // Should not grow larger than max + assert.GreaterOrEqual(t, srv.retryDelay, lastInterval) // Should not decrease while writes failing + assert.LessOrEqual(t, srv.retryDelay, opts.MaxRetryInterval()) // Should not grow larger than max if err != nil { if lastInterval == opts.MaxRetryInterval() { // Write attempt failed, and interval was already at max, so should stay there - assert.Equal(t, srv.RetryDelay, opts.MaxRetryInterval()) - log.Log.Infof("Retry interval capped at %d ms", srv.RetryDelay) + assert.Equal(t, srv.retryDelay, opts.MaxRetryInterval()) + log.Log.Infof("Retry interval capped at %d ms", srv.retryDelay) } else { // A write attempt was made and failed, so retry interval should have increased - assert.Greater(t, srv.RetryDelay, lastInterval) - log.Log.Infof("Retry interval increased to %d ms", srv.RetryDelay) + assert.Greater(t, srv.retryDelay, lastInterval) + log.Log.Infof("Retry interval increased to %d ms", srv.retryDelay) } } else { // Write attempt was not made, so retry interval should remain the same - assert.Equal(t, srv.RetryDelay, lastInterval) - log.Log.Infof("Retry interval still at %d ms", srv.RetryDelay) + assert.Equal(t, srv.retryDelay, lastInterval) + log.Log.Infof("Retry interval still at %d ms", srv.retryDelay) } - lastInterval = srv.RetryDelay + lastInterval = srv.retryDelay <-time.After(writeInterval) } @@ -555,15 +555,15 @@ func TestRetryIntervalAccumulation(t *testing.T) { hs.SetReplyError(nil) // Wait until write queue is ready to retry; in meantime, keep writing and confirming queue state - retryTimeout := srv.lastWriteAttempt.Add(time.Millisecond * time.Duration(srv.RetryDelay)) + retryTimeout := srv.lastWriteAttempt.Add(time.Millisecond * time.Duration(srv.retryDelay)) log.Log.Infof("Continuing to write for %d ms until flushing write attempt", time.Until(retryTimeout).Milliseconds()) for ; time.Until(retryTimeout) >= 0; i++ { b := NewBatch(fmt.Sprintf("%d\n", i), opts.MaxRetryTime()) err := srv.HandleWrite(ctx, b) assert.Nil(t, err) // There should be no write attempt assert.Equal(t, minInt(i, 5), srv.retryQueue.list.Len()) - assert.Equal(t, srv.RetryDelay, opts.MaxRetryInterval()) // Should remain the same - log.Log.Infof("Retry interval still at %d ms", srv.RetryDelay) + assert.Equal(t, srv.retryDelay, opts.MaxRetryInterval()) // Should remain the same + log.Log.Infof("Retry interval still at %d ms", srv.retryDelay) <-time.After(writeInterval) } @@ -572,7 +572,7 @@ func TestRetryIntervalAccumulation(t *testing.T) { err := srv.HandleWrite(ctx, b) assert.Nil(t, err) assert.Equal(t, 0, srv.retryQueue.list.Len()) - assert.Equal(t, srv.RetryDelay, uint(0)) // Should reset to zero + assert.Equal(t, srv.retryDelay, uint(0)) // Should reset to zero // Ensure proper batches got written to server require.Len(t, hs.Lines(), 5)