Skip to content

Commit

Permalink
test: add write queue retry delay accumulation test
Browse files Browse the repository at this point in the history
  • Loading branch information
seth-hunter committed Jun 20, 2022
1 parent 42d6592 commit 29369c7
Showing 1 changed file with 88 additions and 0 deletions.
88 changes: 88 additions & 0 deletions internal/write/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,3 +497,91 @@ func TestErrorCallback(t *testing.T) {
assert.Equal(t, 2, srv.retryQueue.list.Len())

}

func minInt(a, b int) int {
if a > b {
return b
}
return a
}

func TestRetryIntervalAccumulation(t *testing.T) {
// log.Log.SetLogLevel(log.DebugLevel)
log.Log.SetLogLevel(log.InfoLevel)

// Setup test service with scenario's configuration
hs := test.NewTestService(t, "http://localhost:8086")
opts := write.DefaultOptions().
SetRetryInterval(20).
SetMaxRetryInterval(300).
SetMaxRetryTime(100)
ctx := context.Background()
srv := NewService("my-org", "my-bucket", hs, opts)
writeInterval := time.Duration(opts.RetryInterval()) * time.Millisecond

// Set permanent reply error to force writes fail and retry
hs.SetReplyError(&http.Error{StatusCode: 429})

lastInterval := uint(0)
assert.Equal(t, uint(0), srv.RetryDelay) // Should initialize to zero
i := 1
for ; i <= 45; i++ {
b := NewBatch(fmt.Sprintf("%d\n", i), opts.MaxRetryTime())
err := srv.HandleWrite(ctx, b)
assert.Equal(t, minInt(i, 5), srv.retryQueue.list.Len())
assert.GreaterOrEqual(t, srv.RetryDelay, lastInterval) // Should not decrease while writes failing
assert.LessOrEqual(t, srv.RetryDelay, opts.MaxRetryInterval()) // Should not grow larger than max
if err != nil {
if lastInterval == opts.MaxRetryInterval() {
// Write attempt failed, and interval was already at max, so should stay there
assert.Equal(t, srv.RetryDelay, opts.MaxRetryInterval())
log.Log.Infof("Retry interval capped at %d ms", srv.RetryDelay)
} else {
// A write attempt was made and failed, so retry interval should have increased
assert.Greater(t, srv.RetryDelay, lastInterval)
log.Log.Infof("Retry interval increased to %d ms", srv.RetryDelay)
}
} else {
// Write attempt was not made, so retry interval should remain the same
assert.Equal(t, srv.RetryDelay, lastInterval)
log.Log.Infof("Retry interval still at %d ms", srv.RetryDelay)
}
lastInterval = srv.RetryDelay

<-time.After(writeInterval)
}

// Clear error and let write pass
hs.SetReplyError(nil)

// Wait until write queue is ready to retry; in meantime, keep writing and confirming queue state
retryTimeout := srv.lastWriteAttempt.Add(time.Millisecond * time.Duration(srv.RetryDelay))
log.Log.Infof("Continuing to write for %d ms until flushing write attempt", time.Until(retryTimeout).Milliseconds())
for ; time.Until(retryTimeout) >= 0; i++ {
b := NewBatch(fmt.Sprintf("%d\n", i), opts.MaxRetryTime())
err := srv.HandleWrite(ctx, b)
assert.Nil(t, err) // There should be no write attempt
assert.Equal(t, minInt(i, 5), srv.retryQueue.list.Len())
assert.Equal(t, srv.RetryDelay, opts.MaxRetryInterval()) // Should remain the same
log.Log.Infof("Retry interval still at %d ms", srv.RetryDelay)
<-time.After(writeInterval)
}

// Retry interval should now have expired, so this write attempt should succeed and cause retry queue to flush
b := NewBatch(fmt.Sprintf("%d\n", i), opts.MaxRetryTime())
err := srv.HandleWrite(ctx, b)
assert.Nil(t, err)
assert.Equal(t, 0, srv.retryQueue.list.Len())
assert.Equal(t, srv.RetryDelay, uint(0)) // Should reset to zero

// Ensure proper batches got written to server
require.Len(t, hs.Lines(), 5)
assert.Equal(t, fmt.Sprintf("%d", i-4), hs.Lines()[0])
assert.Equal(t, fmt.Sprintf("%d", i-3), hs.Lines()[1])
assert.Equal(t, fmt.Sprintf("%d", i-2), hs.Lines()[2])
assert.Equal(t, fmt.Sprintf("%d", i-1), hs.Lines()[3])
assert.Equal(t, fmt.Sprintf("%d", i-0), hs.Lines()[4])

// Debug line to capture output of successful test
// assert.True(t, false)
}

0 comments on commit 29369c7

Please sign in to comment.