Skip to content

Commit

Permalink
Merge pull request #324 from bonitoo-io/fix/write_flush_closing
Browse files Browse the repository at this point in the history
fix: Non-empty error channel will not block writes
  • Loading branch information
vlastahajek authored May 18, 2022
2 parents 6591ddc + d99575f commit 841a340
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 23 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
## 2.9.0 unreleased
## Unreleased
### Features
- [#323](https://github.com/influxdata/influxdb-client-go/pull/323) Added `TasksAPI.CreateTaskByFlux` to allow full control of task script.

### Bug fixes
- [#324](https://github.com/influxdata/influxdb-client-go/issues/324) Non-empty error channel will not block writes

## 2.8.2 [2022-04-19]
### Bug fixes
- [#319](https://github.com/influxdata/influxdb-client-go/pull/319) Synchronize `WriteAPIImpl.Close` to prevent panic when closing client by multiple go-routines.
Expand Down
53 changes: 31 additions & 22 deletions api/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"strings"
"sync"
"sync/atomic"
"time"

http2 "github.com/influxdata/influxdb-client-go/v2/api/http"
Expand Down Expand Up @@ -50,17 +51,18 @@ type WriteAPIImpl struct {
service *iwrite.Service
writeBuffer []string

errCh chan error
writeCh chan *iwrite.Batch
bufferCh chan string
writeStop chan struct{}
bufferStop chan struct{}
bufferFlush chan struct{}
doneCh chan struct{}
bufferInfoCh chan writeBuffInfoReq
writeInfoCh chan writeBuffInfoReq
writeOptions *write.Options
closingMu *sync.Mutex
errCh chan error
writeCh chan *iwrite.Batch
bufferCh chan string
writeStop chan struct{}
bufferStop chan struct{}
bufferFlush chan struct{}
doneCh chan struct{}
bufferInfoCh chan writeBuffInfoReq
writeInfoCh chan writeBuffInfoReq
writeOptions *write.Options
closingMu *sync.Mutex
isErrChReader int32
}

type writeBuffInfoReq struct {
Expand All @@ -71,6 +73,7 @@ type writeBuffInfoReq struct {
func NewWriteAPI(org string, bucket string, service http2.Service, writeOptions *write.Options) *WriteAPIImpl {
w := &WriteAPIImpl{
service: iwrite.NewService(org, bucket, service, writeOptions),
errCh: make(chan error, 1),
writeBuffer: make([]string, 0, writeOptions.BatchSize()+1),
writeCh: make(chan *iwrite.Batch),
bufferCh: make(chan string),
Expand Down Expand Up @@ -100,11 +103,9 @@ func (w *WriteAPIImpl) SetWriteFailedCallback(cb WriteFailedCallback) {

// Errors returns a channel for reading errors which occurs during async writes.
// Must be called before performing any writes for errors to be collected.
// The chan is unbuffered and must be drained or the writer will block.
// New error is skipped when channel is not read.
func (w *WriteAPIImpl) Errors() <-chan error {
if w.errCh == nil {
w.errCh = make(chan error)
}
w.setErrChanRead()
return w.errCh
}

Expand Down Expand Up @@ -171,6 +172,13 @@ func (w *WriteAPIImpl) flushBuffer() {
w.writeBuffer = w.writeBuffer[:0]
}
}
func (w *WriteAPIImpl) isErrChanRead() bool {
return atomic.LoadInt32(&w.isErrChReader) > 0
}

func (w *WriteAPIImpl) setErrChanRead() {
atomic.StoreInt32(&w.isErrChReader, 1)
}

func (w *WriteAPIImpl) writeProc() {
log.Info("Write proc started")
Expand All @@ -179,8 +187,12 @@ x:
select {
case batch := <-w.writeCh:
err := w.service.HandleWrite(context.Background(), batch)
if err != nil && w.errCh != nil {
w.errCh <- err
if err != nil && w.isErrChanRead() {
select {
case w.errCh <- err:
default:
log.Warn("Cannot write error to error channel, it is not read")
}
}
case <-w.writeStop:
log.Info("Write proc: received stop")
Expand Down Expand Up @@ -219,11 +231,8 @@ func (w *WriteAPIImpl) Close() {
close(w.bufferInfoCh)
w.writeCh = nil

// close errors if open
if w.errCh != nil {
close(w.errCh)
w.errCh = nil
}
close(w.errCh)
w.errCh = nil
}
}

Expand Down
28 changes: 28 additions & 0 deletions api/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,31 @@ func TestWriteErrorCallback(t *testing.T) {

writeAPI.Close()
}

func TestClosing(t *testing.T) {
service := test.NewTestService(t, "http://localhost:8888")
log.Log.SetLogLevel(log.DebugLevel)
writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetRetryInterval(10000))
points := test.GenPoints(15)
for i := 0; i < 5; i++ {
writeAPI.WritePoint(points[i])
}
writeAPI.Close()
require.Len(t, service.Lines(), 5)

writeAPI = NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetRetryInterval(10000))
service.Close()
service.SetReplyError(&http.Error{
StatusCode: 425,
})
_ = writeAPI.Errors()
for i := 0; i < 15; i++ {
writeAPI.WritePoint(points[i])
}
start := time.Now()
writeAPI.Close()
diff := time.Since(start)
fmt.Println("Diff", diff)
assert.Len(t, service.Lines(), 0)

}

0 comments on commit 841a340

Please sign in to comment.