From d99575f1fda74f03a2cfb1e94b8a5260f89d9e37 Mon Sep 17 00:00:00 2001 From: vlastahajek Date: Tue, 19 Apr 2022 15:50:27 +0200 Subject: [PATCH] fix: Non-empty error channel will not block writes --- CHANGELOG.md | 5 ++++- api/write.go | 53 +++++++++++++++++++++++++++-------------------- api/write_test.go | 28 +++++++++++++++++++++++++ 3 files changed, 63 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0fddafcd..ba81fab0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,10 @@ -## 2.9.0 unreleased +## Unreleased ### Features - [#323](https://github.com/influxdata/influxdb-client-go/pull/323) Added `TasksAPI.CreateTaskByFlux` to allow full control of task script. +### Bug fixes +- [#324](https://github.com/influxdata/influxdb-client-go/issues/324) Non-empty error channel will not block writes + ## 2.8.2 [2022-04-19] ### Bug fixes - [#319](https://github.com/influxdata/influxdb-client-go/pull/319) Synchronize `WriteAPIImpl.Close` to prevent panic when closing client by multiple go-routines. diff --git a/api/write.go b/api/write.go index f9fd4aae..419af937 100644 --- a/api/write.go +++ b/api/write.go @@ -8,6 +8,7 @@ import ( "context" "strings" "sync" + "sync/atomic" "time" http2 "github.com/influxdata/influxdb-client-go/v2/api/http" @@ -50,17 +51,18 @@ type WriteAPIImpl struct { service *iwrite.Service writeBuffer []string - errCh chan error - writeCh chan *iwrite.Batch - bufferCh chan string - writeStop chan struct{} - bufferStop chan struct{} - bufferFlush chan struct{} - doneCh chan struct{} - bufferInfoCh chan writeBuffInfoReq - writeInfoCh chan writeBuffInfoReq - writeOptions *write.Options - closingMu *sync.Mutex + errCh chan error + writeCh chan *iwrite.Batch + bufferCh chan string + writeStop chan struct{} + bufferStop chan struct{} + bufferFlush chan struct{} + doneCh chan struct{} + bufferInfoCh chan writeBuffInfoReq + writeInfoCh chan writeBuffInfoReq + writeOptions *write.Options + closingMu *sync.Mutex + isErrChReader int32 } type writeBuffInfoReq struct { @@ -71,6 +73,7 @@ type writeBuffInfoReq struct { func NewWriteAPI(org string, bucket string, service http2.Service, writeOptions *write.Options) *WriteAPIImpl { w := &WriteAPIImpl{ service: iwrite.NewService(org, bucket, service, writeOptions), + errCh: make(chan error, 1), writeBuffer: make([]string, 0, writeOptions.BatchSize()+1), writeCh: make(chan *iwrite.Batch), bufferCh: make(chan string), @@ -100,11 +103,9 @@ func (w *WriteAPIImpl) SetWriteFailedCallback(cb WriteFailedCallback) { // Errors returns a channel for reading errors which occurs during async writes. // Must be called before performing any writes for errors to be collected. -// The chan is unbuffered and must be drained or the writer will block. +// New error is skipped when channel is not read. func (w *WriteAPIImpl) Errors() <-chan error { - if w.errCh == nil { - w.errCh = make(chan error) - } + w.setErrChanRead() return w.errCh } @@ -171,6 +172,13 @@ func (w *WriteAPIImpl) flushBuffer() { w.writeBuffer = w.writeBuffer[:0] } } +func (w *WriteAPIImpl) isErrChanRead() bool { + return atomic.LoadInt32(&w.isErrChReader) > 0 +} + +func (w *WriteAPIImpl) setErrChanRead() { + atomic.StoreInt32(&w.isErrChReader, 1) +} func (w *WriteAPIImpl) writeProc() { log.Info("Write proc started") @@ -179,8 +187,12 @@ x: select { case batch := <-w.writeCh: err := w.service.HandleWrite(context.Background(), batch) - if err != nil && w.errCh != nil { - w.errCh <- err + if err != nil && w.isErrChanRead() { + select { + case w.errCh <- err: + default: + log.Warn("Cannot write error to error channel, it is not read") + } } case <-w.writeStop: log.Info("Write proc: received stop") @@ -219,11 +231,8 @@ func (w *WriteAPIImpl) Close() { close(w.bufferInfoCh) w.writeCh = nil - // close errors if open - if w.errCh != nil { - close(w.errCh) - w.errCh = nil - } + close(w.errCh) + w.errCh = nil } } diff --git a/api/write_test.go b/api/write_test.go index 3de04b5c..9f6c07af 100644 --- a/api/write_test.go +++ b/api/write_test.go @@ -198,3 +198,31 @@ func TestWriteErrorCallback(t *testing.T) { writeAPI.Close() } + +func TestClosing(t *testing.T) { + service := test.NewTestService(t, "http://localhost:8888") + log.Log.SetLogLevel(log.DebugLevel) + writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetRetryInterval(10000)) + points := test.GenPoints(15) + for i := 0; i < 5; i++ { + writeAPI.WritePoint(points[i]) + } + writeAPI.Close() + require.Len(t, service.Lines(), 5) + + writeAPI = NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetRetryInterval(10000)) + service.Close() + service.SetReplyError(&http.Error{ + StatusCode: 425, + }) + _ = writeAPI.Errors() + for i := 0; i < 15; i++ { + writeAPI.WritePoint(points[i]) + } + start := time.Now() + writeAPI.Close() + diff := time.Since(start) + fmt.Println("Diff", diff) + assert.Len(t, service.Lines(), 0) + +}