diff --git a/internal/write/service_test.go b/internal/write/service_test.go index ce062095..2bc5ce23 100644 --- a/internal/write/service_test.go +++ b/internal/write/service_test.go @@ -497,3 +497,91 @@ func TestErrorCallback(t *testing.T) { assert.Equal(t, 2, srv.retryQueue.list.Len()) } + +func minInt(a, b int) int { + if a > b { + return b + } + return a +} + +func TestRetryIntervalAccumulation(t *testing.T) { + // log.Log.SetLogLevel(log.DebugLevel) + log.Log.SetLogLevel(log.InfoLevel) + + // Setup test service with scenario's configuration + hs := test.NewTestService(t, "http://localhost:8086") + opts := write.DefaultOptions(). + SetRetryInterval(20). + SetMaxRetryInterval(300). + SetMaxRetryTime(100) + ctx := context.Background() + srv := NewService("my-org", "my-bucket", hs, opts) + writeInterval := time.Duration(opts.RetryInterval()) * time.Millisecond + + // Set permanent reply error to force writes fail and retry + hs.SetReplyError(&http.Error{StatusCode: 429}) + + lastInterval := uint(0) + assert.Equal(t, uint(0), srv.RetryDelay) // Should initialize to zero + i := 1 + for ; i <= 45; i++ { + b := NewBatch(fmt.Sprintf("%d\n", i), opts.MaxRetryTime()) + err := srv.HandleWrite(ctx, b) + assert.Equal(t, minInt(i, 5), srv.retryQueue.list.Len()) + assert.GreaterOrEqual(t, srv.RetryDelay, lastInterval) // Should not decrease while writes failing + assert.LessOrEqual(t, srv.RetryDelay, opts.MaxRetryInterval()) // Should not grow larger than max + if err != nil { + if lastInterval == opts.MaxRetryInterval() { + // Write attempt failed, and interval was already at max, so should stay there + assert.Equal(t, srv.RetryDelay, opts.MaxRetryInterval()) + log.Log.Infof("Retry interval capped at %d ms", srv.RetryDelay) + } else { + // A write attempt was made and failed, so retry interval should have increased + assert.Greater(t, srv.RetryDelay, lastInterval) + log.Log.Infof("Retry interval increased to %d ms", srv.RetryDelay) + } + } else { + // Write attempt was not made, so retry interval should remain the same + assert.Equal(t, srv.RetryDelay, lastInterval) + log.Log.Infof("Retry interval still at %d ms", srv.RetryDelay) + } + lastInterval = srv.RetryDelay + + <-time.After(writeInterval) + } + + // Clear error and let write pass + hs.SetReplyError(nil) + + // Wait until write queue is ready to retry; in meantime, keep writing and confirming queue state + retryTimeout := srv.lastWriteAttempt.Add(time.Millisecond * time.Duration(srv.RetryDelay)) + log.Log.Infof("Continuing to write for %d ms until flushing write attempt", time.Until(retryTimeout).Milliseconds()) + for ; time.Until(retryTimeout) >= 0; i++ { + b := NewBatch(fmt.Sprintf("%d\n", i), opts.MaxRetryTime()) + err := srv.HandleWrite(ctx, b) + assert.Nil(t, err) // There should be no write attempt + assert.Equal(t, minInt(i, 5), srv.retryQueue.list.Len()) + assert.Equal(t, srv.RetryDelay, opts.MaxRetryInterval()) // Should remain the same + log.Log.Infof("Retry interval still at %d ms", srv.RetryDelay) + <-time.After(writeInterval) + } + + // Retry interval should now have expired, so this write attempt should succeed and cause retry queue to flush + b := NewBatch(fmt.Sprintf("%d\n", i), opts.MaxRetryTime()) + err := srv.HandleWrite(ctx, b) + assert.Nil(t, err) + assert.Equal(t, 0, srv.retryQueue.list.Len()) + assert.Equal(t, srv.RetryDelay, uint(0)) // Should reset to zero + + // Ensure proper batches got written to server + require.Len(t, hs.Lines(), 5) + assert.Equal(t, fmt.Sprintf("%d", i-4), hs.Lines()[0]) + assert.Equal(t, fmt.Sprintf("%d", i-3), hs.Lines()[1]) + assert.Equal(t, fmt.Sprintf("%d", i-2), hs.Lines()[2]) + assert.Equal(t, fmt.Sprintf("%d", i-1), hs.Lines()[3]) + assert.Equal(t, fmt.Sprintf("%d", i-0), hs.Lines()[4]) + + // Debug line to capture output of successful test + // assert.True(t, false) +}