Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added Delete API for deleting data from bucket #122

Merged
merged 1 commit into from
May 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## 1.2.0 [in progress]
### Features
1. [#120](https://github.com/influxdata/influxdb-client-go/pull/120) Health check API
1. [#122](https://github.com/influxdata/influxdb-client-go/pull/122) Delete API

### Breaking Change
- [#107](https://github.com/influxdata/influxdb-client-go/pull/100) Renamed `InfluxDBClient` interface to `Client`, so the full name `influxdb2.Client` suits better to Go naming conventions
Expand Down
93 changes: 93 additions & 0 deletions api/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2020 InfluxData, Inc. All rights reserved.
// Use of this source code is governed by MIT
// license that can be found in the LICENSE file.

package api

import (
"context"
"github.com/influxdata/influxdb-client-go/domain"
"time"
)

// DeleteApi provides methods for deleting time series data from buckets.
// Deleted series are selected by the time range specified by start and stop arguments and optional predicate string which contains condition for selecting data for deletion, such as:
// tag1="value1" and (tag2="value2" and tag3!="value3"). Empty predicate string means all data from the given time range will be deleted. See https://v2.docs.influxdata.com/v2.0/reference/syntax/delete-predicate/
// for more info about predicate syntax.
type DeleteApi interface {
// Delete deletes series selected by by the time range specified by start and stop arguments and optional predicate string from the bucket bucket belonging to the organization org.
Delete(ctx context.Context, org *domain.Organization, bucket *domain.Bucket, start, stop time.Time, predicate string) error
// Delete deletes series selected by by the time range specified by start and stop arguments and optional predicate string from the bucket with Id bucketId belonging to the organization with Id orgId.
DeleteWithId(ctx context.Context, orgId, bucketId string, start, stop time.Time, predicate string) error
// Delete deletes series selected by by the time range specified by start and stop arguments and optional predicate string from the bucket with name bucketName belonging to the organization with name orgName.
DeleteWithName(ctx context.Context, orgName, bucketName string, start, stop time.Time, predicate string) error
}

type deleteApiImpl struct {
apiClient *domain.ClientWithResponses
}

func NewDeleteApi(apiClient *domain.ClientWithResponses) DeleteApi {
return &deleteApiImpl{
apiClient: apiClient,
}
}

func (d *deleteApiImpl) delete(ctx context.Context, params *domain.PostDeleteParams, conditions *domain.DeletePredicateRequest) error {
resp, err := d.apiClient.PostDeleteWithResponse(ctx, params, domain.PostDeleteJSONRequestBody(*conditions))
if err != nil {
return err
}
if resp.JSON404 != nil {
return domain.DomainErrorToError(resp.JSON404, resp.StatusCode())
}
if resp.JSON403 != nil {
return domain.DomainErrorToError(resp.JSON403, resp.StatusCode())
}
if resp.JSON400 != nil {
return domain.DomainErrorToError(resp.JSON400, resp.StatusCode())
}
if resp.JSONDefault != nil {
return domain.DomainErrorToError(resp.JSONDefault, resp.StatusCode())
}
return nil
}

func (d *deleteApiImpl) Delete(ctx context.Context, org *domain.Organization, bucket *domain.Bucket, start, stop time.Time, predicate string) error {
params := &domain.PostDeleteParams{
OrgID: org.Id,
BucketID: bucket.Id,
}
conditions := &domain.DeletePredicateRequest{
Predicate: &predicate,
Start: start,
Stop: stop,
}
return d.delete(ctx, params, conditions)
}

func (d *deleteApiImpl) DeleteWithId(ctx context.Context, orgId, bucketId string, start, stop time.Time, predicate string) error {
params := &domain.PostDeleteParams{
OrgID: &orgId,
BucketID: &bucketId,
}
conditions := &domain.DeletePredicateRequest{
Predicate: &predicate,
Start: start,
Stop: stop,
}
return d.delete(ctx, params, conditions)
}

func (d *deleteApiImpl) DeleteWithName(ctx context.Context, orgName, bucketName string, start, stop time.Time, predicate string) error {
params := &domain.PostDeleteParams{
Org: &orgName,
Bucket: &bucketName,
}
conditions := &domain.DeletePredicateRequest{
Predicate: &predicate,
Start: start,
Stop: stop,
}
return d.delete(ctx, params, conditions)
}
42 changes: 27 additions & 15 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,21 @@ import (
// WriteApi provides asynchronous, non-blocking, methods for writing time series data.
// WriteApiBlocking provides blocking methods for writing time series data.
type Client interface {
// Setup sends request to initialise new InfluxDB server with user, org and bucket, and data retention period
// and returns details about newly created entities along with the authorization object.
// Retention period of zero will result to infinite retention.
Setup(ctx context.Context, username, password, org, bucket string, retentionPeriodHours int) (*domain.OnboardingResponse, error)
// Ready checks InfluxDB server is running. It doesn't validate authentication params.
Ready(ctx context.Context) (bool, error)
// Health returns an InfluxDB server health check result. Read the HealthCheck.Status field to get server status.
// Health doesn't validate authentication params.
Health(ctx context.Context) (*domain.HealthCheck, error)
// Close ensures all ongoing asynchronous write clients finish
Close()
// Options returns the options associated with client
Options() *Options
// ServerUrl returns the url of the server url client talks to
ServerUrl() string
// WriteApi returns the asynchronous, non-blocking, Write client
WriteApi(org, bucket string) WriteApi
// WriteApi returns the synchronous, blocking, Write client
Expand All @@ -33,21 +48,8 @@ type Client interface {
OrganizationsApi() api.OrganizationsApi
// UsersApi returns Users API client
UsersApi() api.UsersApi
// Close ensures all ongoing asynchronous write clients finish
Close()
// Options returns the options associated with client
Options() *Options
// ServerUrl returns the url of the server url client talks to
ServerUrl() string
// Setup sends request to initialise new InfluxDB server with user, org and bucket, and data retention period
// and returns details about newly created entities along with the authorization object.
// Retention period of zero will result to infinite retention.
Setup(ctx context.Context, username, password, org, bucket string, retentionPeriodHours int) (*domain.OnboardingResponse, error)
// Ready checks InfluxDB server is running. It doesn't validate authentication params.
Ready(ctx context.Context) (bool, error)
// Health returns an InfluxDB server health check result. Read the HealthCheck.Status field to get server status.
// Health doesn't validate authentication params.
Health(ctx context.Context) (*domain.HealthCheck, error)
// DeleteApi returns Delete API client
DeleteApi() api.DeleteApi
}

// clientImpl implements Client interface
Expand All @@ -61,6 +63,7 @@ type clientImpl struct {
authApi api.AuthorizationsApi
orgApi api.OrganizationsApi
usersApi api.UsersApi
deleteApi api.DeleteApi
}

// NewClient creates Client for connecting to given serverUrl with provided authentication token, with the default options.
Expand Down Expand Up @@ -194,3 +197,12 @@ func (c *clientImpl) UsersApi() api.UsersApi {
}
return c.usersApi
}

func (c *clientImpl) DeleteApi() api.DeleteApi {
c.lock.Lock()
defer c.lock.Unlock()
if c.deleteApi == nil {
c.deleteApi = api.NewDeleteApi(c.apiClient)
}
return c.deleteApi
}
67 changes: 67 additions & 0 deletions client_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"strconv"
"strings"
"testing"
"time"
)
Expand Down Expand Up @@ -398,3 +399,69 @@ func TestUsers(t *testing.T) {
require.NotNil(t, users)
assert.Len(t, *users, 1)
}

func TestDelete(t *testing.T) {
if !e2e {
t.Skip("e2e not enabled. Launch InfluxDB 2 on localhost and run test with -e2e")
}
client := NewClient("http://localhost:9999", authToken)
writeApi := client.WriteApiBlocking("my-org", "my-bucket")
queryApi := client.QueryApi("my-org")
tmStart := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)
writeF := func(start time.Time, count int64) time.Time {
tm := start
for i, f := int64(0), 0.0; i < count; i++ {
p := NewPoint("test",
map[string]string{"a": strconv.FormatInt(i%2, 10), "b": "static"},
map[string]interface{}{"f": f, "i": i},
tm)
err := writeApi.WritePoint(context.Background(), p)
require.Nil(t, err, err)
f += 1.2
tm = tm.Add(time.Minute)
}
return tm
}
countF := func(start, stop time.Time) int64 {
result, err := queryApi.Query(context.Background(), `from(bucket:"my-bucket")|> range(start: `+start.Format(time.RFC3339)+`, stop:`+stop.Format(time.RFC3339)+`)
|> filter(fn: (r) => r._measurement == "test" and r._field == "f")
|> drop(columns: ["a", "b"])
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> count(column: "f")`)

require.Nil(t, err, err)
count := int64(0)
if result.Next() {
require.NotNil(t, result.Record().ValueByKey("f"))
count = result.Record().ValueByKey("f").(int64)
}
return count
}
tmEnd := writeF(tmStart, 100)
assert.Equal(t, int64(100), countF(tmStart, tmEnd))
deleteApi := client.DeleteApi()

err := deleteApi.DeleteWithName(context.Background(), "my-org", "my-bucket", tmStart, tmEnd, "")
require.Nil(t, err, err)
assert.Equal(t, int64(0), countF(tmStart, tmEnd))

tmEnd = writeF(tmStart, 100)
assert.Equal(t, int64(100), countF(tmStart, tmEnd))

err = deleteApi.DeleteWithName(context.Background(), "my-org", "my-bucket", tmStart, tmEnd, "a=1")
require.Nil(t, err, err)
assert.Equal(t, int64(50), countF(tmStart, tmEnd))

err = deleteApi.DeleteWithName(context.Background(), "my-org", "my-bucket", tmStart.Add(50*time.Minute), tmEnd, "b=static")
require.Nil(t, err, err)
assert.Equal(t, int64(25), countF(tmStart, tmEnd))

err = deleteApi.DeleteWithName(context.Background(), "org", "my-bucket", tmStart.Add(50*time.Minute), tmEnd, "b=static")
require.NotNil(t, err, err)
assert.True(t, strings.Contains(err.Error(), "not found"))

err = deleteApi.DeleteWithName(context.Background(), "my-org", "bucket", tmStart.Add(50*time.Minute), tmEnd, "b=static")
require.NotNil(t, err, err)
assert.True(t, strings.Contains(err.Error(), "not found"))

}