From ad6374bbdb23d1282e425cbf564557ffbd8845dc Mon Sep 17 00:00:00 2001 From: vlastahajek Date: Tue, 19 Apr 2022 15:50:27 +0200 Subject: [PATCH] fix: Non-empty error channel will not block writes --- CHANGELOG.md | 5 ++++- api/write.go | 18 +++++++++++++++--- api/write_test.go | 28 ++++++++++++++++++++++++++++ 3 files changed, 47 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0fddafcd..ba81fab0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,10 @@ -## 2.9.0 unreleased +## Unreleased ### Features - [#323](https://github.com/influxdata/influxdb-client-go/pull/323) Added `TasksAPI.CreateTaskByFlux` to allow full control of task script. +### Bug fixes +- [#324](https://github.com/influxdata/influxdb-client-go/issues/324) Non-empty error channel will not block writes + ## 2.8.2 [2022-04-19] ### Bug fixes - [#319](https://github.com/influxdata/influxdb-client-go/pull/319) Synchronize `WriteAPIImpl.Close` to prevent panic when closing client by multiple go-routines. diff --git a/api/write.go b/api/write.go index f9fd4aae..487c304a 100644 --- a/api/write.go +++ b/api/write.go @@ -61,6 +61,7 @@ type WriteAPIImpl struct { writeInfoCh chan writeBuffInfoReq writeOptions *write.Options closingMu *sync.Mutex + errMu *sync.Mutex } type writeBuffInfoReq struct { @@ -82,6 +83,7 @@ func NewWriteAPI(org string, bucket string, service http2.Service, writeOptions writeInfoCh: make(chan writeBuffInfoReq), writeOptions: writeOptions, closingMu: &sync.Mutex{}, + errMu: &sync.Mutex{}, } go w.bufferProc() @@ -100,10 +102,10 @@ func (w *WriteAPIImpl) SetWriteFailedCallback(cb WriteFailedCallback) { // Errors returns a channel for reading errors which occurs during async writes. // Must be called before performing any writes for errors to be collected. -// The chan is unbuffered and must be drained or the writer will block. +// New error is skipped when channel is not read. func (w *WriteAPIImpl) Errors() <-chan error { if w.errCh == nil { - w.errCh = make(chan error) + w.errCh = make(chan error, 1) } return w.errCh } @@ -180,7 +182,15 @@ x: case batch := <-w.writeCh: err := w.service.HandleWrite(context.Background(), batch) if err != nil && w.errCh != nil { - w.errCh <- err + go func(err error) { + w.errMu.Lock() + defer w.errMu.Unlock() + select { + case w.errCh <- err: + default: + log.Warn("Cannot write error to error channel, it is not read") + } + }(err) } case <-w.writeStop: log.Info("Write proc: received stop") @@ -221,8 +231,10 @@ func (w *WriteAPIImpl) Close() { // close errors if open if w.errCh != nil { + w.errMu.Lock() close(w.errCh) w.errCh = nil + w.errMu.Unlock() } } } diff --git a/api/write_test.go b/api/write_test.go index 3de04b5c..9f6c07af 100644 --- a/api/write_test.go +++ b/api/write_test.go @@ -198,3 +198,31 @@ func TestWriteErrorCallback(t *testing.T) { writeAPI.Close() } + +func TestClosing(t *testing.T) { + service := test.NewTestService(t, "http://localhost:8888") + log.Log.SetLogLevel(log.DebugLevel) + writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetRetryInterval(10000)) + points := test.GenPoints(15) + for i := 0; i < 5; i++ { + writeAPI.WritePoint(points[i]) + } + writeAPI.Close() + require.Len(t, service.Lines(), 5) + + writeAPI = NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetRetryInterval(10000)) + service.Close() + service.SetReplyError(&http.Error{ + StatusCode: 425, + }) + _ = writeAPI.Errors() + for i := 0; i < 15; i++ { + writeAPI.WritePoint(points[i]) + } + start := time.Now() + writeAPI.Close() + diff := time.Since(start) + fmt.Println("Diff", diff) + assert.Len(t, service.Lines(), 0) + +}