Skip to content

Commit

Permalink
fix: Non-empty error channel will not block writes
Browse files Browse the repository at this point in the history
  • Loading branch information
vlastahajek committed May 17, 2022
1 parent 0ac36c7 commit ad6374b
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 4 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
## 2.9.0 unreleased
## Unreleased
### Features
- [#323](https://github.com/influxdata/influxdb-client-go/pull/323) Added `TasksAPI.CreateTaskByFlux` to allow full control of task script.

### Bug fixes
- [#324](https://github.com/influxdata/influxdb-client-go/issues/324) Non-empty error channel will not block writes

## 2.8.2 [2022-04-19]
### Bug fixes
- [#319](https://github.com/influxdata/influxdb-client-go/pull/319) Synchronize `WriteAPIImpl.Close` to prevent panic when closing client by multiple go-routines.
Expand Down
18 changes: 15 additions & 3 deletions api/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type WriteAPIImpl struct {
writeInfoCh chan writeBuffInfoReq
writeOptions *write.Options
closingMu *sync.Mutex
errMu *sync.Mutex
}

type writeBuffInfoReq struct {
Expand All @@ -82,6 +83,7 @@ func NewWriteAPI(org string, bucket string, service http2.Service, writeOptions
writeInfoCh: make(chan writeBuffInfoReq),
writeOptions: writeOptions,
closingMu: &sync.Mutex{},
errMu: &sync.Mutex{},
}

go w.bufferProc()
Expand All @@ -100,10 +102,10 @@ func (w *WriteAPIImpl) SetWriteFailedCallback(cb WriteFailedCallback) {

// Errors returns a channel for reading errors which occurs during async writes.
// Must be called before performing any writes for errors to be collected.
// The chan is unbuffered and must be drained or the writer will block.
// New error is skipped when channel is not read.
func (w *WriteAPIImpl) Errors() <-chan error {
if w.errCh == nil {
w.errCh = make(chan error)
w.errCh = make(chan error, 1)
}
return w.errCh
}
Expand Down Expand Up @@ -180,7 +182,15 @@ x:
case batch := <-w.writeCh:
err := w.service.HandleWrite(context.Background(), batch)
if err != nil && w.errCh != nil {
w.errCh <- err
go func(err error) {
w.errMu.Lock()
defer w.errMu.Unlock()
select {
case w.errCh <- err:
default:
log.Warn("Cannot write error to error channel, it is not read")
}
}(err)
}
case <-w.writeStop:
log.Info("Write proc: received stop")
Expand Down Expand Up @@ -221,8 +231,10 @@ func (w *WriteAPIImpl) Close() {

// close errors if open
if w.errCh != nil {
w.errMu.Lock()
close(w.errCh)
w.errCh = nil
w.errMu.Unlock()
}
}
}
Expand Down
28 changes: 28 additions & 0 deletions api/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,31 @@ func TestWriteErrorCallback(t *testing.T) {

writeAPI.Close()
}

func TestClosing(t *testing.T) {
service := test.NewTestService(t, "http://localhost:8888")
log.Log.SetLogLevel(log.DebugLevel)
writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetRetryInterval(10000))
points := test.GenPoints(15)
for i := 0; i < 5; i++ {
writeAPI.WritePoint(points[i])
}
writeAPI.Close()
require.Len(t, service.Lines(), 5)

writeAPI = NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetRetryInterval(10000))
service.Close()
service.SetReplyError(&http.Error{
StatusCode: 425,
})
_ = writeAPI.Errors()
for i := 0; i < 15; i++ {
writeAPI.WritePoint(points[i])
}
start := time.Now()
writeAPI.Close()
diff := time.Since(start)
fmt.Println("Diff", diff)
assert.Len(t, service.Lines(), 0)

}

0 comments on commit ad6374b

Please sign in to comment.