diff --git a/CHANGELOG.md b/CHANGELOG.md index 42a69dd2..99651270 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## 1.2.0 [in progress] ### Features 1. [#120](https://github.com/influxdata/influxdb-client-go/pull/120) Health check API +1. [#122](https://github.com/influxdata/influxdb-client-go/pull/122) Delete API ### Breaking Change - [#107](https://github.com/influxdata/influxdb-client-go/pull/100) Renamed `InfluxDBClient` interface to `Client`, so the full name `influxdb2.Client` suits better to Go naming conventions diff --git a/api/delete.go b/api/delete.go new file mode 100644 index 00000000..9a1f4f76 --- /dev/null +++ b/api/delete.go @@ -0,0 +1,93 @@ +// Copyright 2020 InfluxData, Inc. All rights reserved. +// Use of this source code is governed by MIT +// license that can be found in the LICENSE file. + +package api + +import ( + "context" + "github.com/influxdata/influxdb-client-go/domain" + "time" +) + +// DeleteApi provides methods for deleting time series data from buckets. +// Deleted series are selected by the time range specified by start and stop arguments and optional predicate string which contains condition for selecting data for deletion, such as: +// tag1="value1" and (tag2="value2" and tag3!="value3"). Empty predicate string means all data from the given time range will be deleted. See https://v2.docs.influxdata.com/v2.0/reference/syntax/delete-predicate/ +// for more info about predicate syntax. +type DeleteApi interface { + // Delete deletes series selected by by the time range specified by start and stop arguments and optional predicate string from the bucket bucket belonging to the organization org. + Delete(ctx context.Context, org *domain.Organization, bucket *domain.Bucket, start, stop time.Time, predicate string) error + // Delete deletes series selected by by the time range specified by start and stop arguments and optional predicate string from the bucket with Id bucketId belonging to the organization with Id orgId. + DeleteWithId(ctx context.Context, orgId, bucketId string, start, stop time.Time, predicate string) error + // Delete deletes series selected by by the time range specified by start and stop arguments and optional predicate string from the bucket with name bucketName belonging to the organization with name orgName. + DeleteWithName(ctx context.Context, orgName, bucketName string, start, stop time.Time, predicate string) error +} + +type deleteApiImpl struct { + apiClient *domain.ClientWithResponses +} + +func NewDeleteApi(apiClient *domain.ClientWithResponses) DeleteApi { + return &deleteApiImpl{ + apiClient: apiClient, + } +} + +func (d *deleteApiImpl) delete(ctx context.Context, params *domain.PostDeleteParams, conditions *domain.DeletePredicateRequest) error { + resp, err := d.apiClient.PostDeleteWithResponse(ctx, params, domain.PostDeleteJSONRequestBody(*conditions)) + if err != nil { + return err + } + if resp.JSON404 != nil { + return domain.DomainErrorToError(resp.JSON404, resp.StatusCode()) + } + if resp.JSON403 != nil { + return domain.DomainErrorToError(resp.JSON403, resp.StatusCode()) + } + if resp.JSON400 != nil { + return domain.DomainErrorToError(resp.JSON400, resp.StatusCode()) + } + if resp.JSONDefault != nil { + return domain.DomainErrorToError(resp.JSONDefault, resp.StatusCode()) + } + return nil +} + +func (d *deleteApiImpl) Delete(ctx context.Context, org *domain.Organization, bucket *domain.Bucket, start, stop time.Time, predicate string) error { + params := &domain.PostDeleteParams{ + OrgID: org.Id, + BucketID: bucket.Id, + } + conditions := &domain.DeletePredicateRequest{ + Predicate: &predicate, + Start: start, + Stop: stop, + } + return d.delete(ctx, params, conditions) +} + +func (d *deleteApiImpl) DeleteWithId(ctx context.Context, orgId, bucketId string, start, stop time.Time, predicate string) error { + params := &domain.PostDeleteParams{ + OrgID: &orgId, + BucketID: &bucketId, + } + conditions := &domain.DeletePredicateRequest{ + Predicate: &predicate, + Start: start, + Stop: stop, + } + return d.delete(ctx, params, conditions) +} + +func (d *deleteApiImpl) DeleteWithName(ctx context.Context, orgName, bucketName string, start, stop time.Time, predicate string) error { + params := &domain.PostDeleteParams{ + Org: &orgName, + Bucket: &bucketName, + } + conditions := &domain.DeletePredicateRequest{ + Predicate: &predicate, + Start: start, + Stop: stop, + } + return d.delete(ctx, params, conditions) +} diff --git a/client.go b/client.go index d990e8f7..c4a745c2 100644 --- a/client.go +++ b/client.go @@ -21,6 +21,21 @@ import ( // WriteApi provides asynchronous, non-blocking, methods for writing time series data. // WriteApiBlocking provides blocking methods for writing time series data. type Client interface { + // Setup sends request to initialise new InfluxDB server with user, org and bucket, and data retention period + // and returns details about newly created entities along with the authorization object. + // Retention period of zero will result to infinite retention. + Setup(ctx context.Context, username, password, org, bucket string, retentionPeriodHours int) (*domain.OnboardingResponse, error) + // Ready checks InfluxDB server is running. It doesn't validate authentication params. + Ready(ctx context.Context) (bool, error) + // Health returns an InfluxDB server health check result. Read the HealthCheck.Status field to get server status. + // Health doesn't validate authentication params. + Health(ctx context.Context) (*domain.HealthCheck, error) + // Close ensures all ongoing asynchronous write clients finish + Close() + // Options returns the options associated with client + Options() *Options + // ServerUrl returns the url of the server url client talks to + ServerUrl() string // WriteApi returns the asynchronous, non-blocking, Write client WriteApi(org, bucket string) WriteApi // WriteApi returns the synchronous, blocking, Write client @@ -33,21 +48,8 @@ type Client interface { OrganizationsApi() api.OrganizationsApi // UsersApi returns Users API client UsersApi() api.UsersApi - // Close ensures all ongoing asynchronous write clients finish - Close() - // Options returns the options associated with client - Options() *Options - // ServerUrl returns the url of the server url client talks to - ServerUrl() string - // Setup sends request to initialise new InfluxDB server with user, org and bucket, and data retention period - // and returns details about newly created entities along with the authorization object. - // Retention period of zero will result to infinite retention. - Setup(ctx context.Context, username, password, org, bucket string, retentionPeriodHours int) (*domain.OnboardingResponse, error) - // Ready checks InfluxDB server is running. It doesn't validate authentication params. - Ready(ctx context.Context) (bool, error) - // Health returns an InfluxDB server health check result. Read the HealthCheck.Status field to get server status. - // Health doesn't validate authentication params. - Health(ctx context.Context) (*domain.HealthCheck, error) + // DeleteApi returns Delete API client + DeleteApi() api.DeleteApi } // clientImpl implements Client interface @@ -61,6 +63,7 @@ type clientImpl struct { authApi api.AuthorizationsApi orgApi api.OrganizationsApi usersApi api.UsersApi + deleteApi api.DeleteApi } // NewClient creates Client for connecting to given serverUrl with provided authentication token, with the default options. @@ -194,3 +197,12 @@ func (c *clientImpl) UsersApi() api.UsersApi { } return c.usersApi } + +func (c *clientImpl) DeleteApi() api.DeleteApi { + c.lock.Lock() + defer c.lock.Unlock() + if c.deleteApi == nil { + c.deleteApi = api.NewDeleteApi(c.apiClient) + } + return c.deleteApi +} diff --git a/client_e2e_test.go b/client_e2e_test.go index 66dafc38..327bfb7c 100644 --- a/client_e2e_test.go +++ b/client_e2e_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "strconv" + "strings" "testing" "time" ) @@ -398,3 +399,69 @@ func TestUsers(t *testing.T) { require.NotNil(t, users) assert.Len(t, *users, 1) } + +func TestDelete(t *testing.T) { + if !e2e { + t.Skip("e2e not enabled. Launch InfluxDB 2 on localhost and run test with -e2e") + } + client := NewClient("http://localhost:9999", authToken) + writeApi := client.WriteApiBlocking("my-org", "my-bucket") + queryApi := client.QueryApi("my-org") + tmStart := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC) + writeF := func(start time.Time, count int64) time.Time { + tm := start + for i, f := int64(0), 0.0; i < count; i++ { + p := NewPoint("test", + map[string]string{"a": strconv.FormatInt(i%2, 10), "b": "static"}, + map[string]interface{}{"f": f, "i": i}, + tm) + err := writeApi.WritePoint(context.Background(), p) + require.Nil(t, err, err) + f += 1.2 + tm = tm.Add(time.Minute) + } + return tm + } + countF := func(start, stop time.Time) int64 { + result, err := queryApi.Query(context.Background(), `from(bucket:"my-bucket")|> range(start: `+start.Format(time.RFC3339)+`, stop:`+stop.Format(time.RFC3339)+`) + |> filter(fn: (r) => r._measurement == "test" and r._field == "f") + |> drop(columns: ["a", "b"]) + |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") + |> count(column: "f")`) + + require.Nil(t, err, err) + count := int64(0) + if result.Next() { + require.NotNil(t, result.Record().ValueByKey("f")) + count = result.Record().ValueByKey("f").(int64) + } + return count + } + tmEnd := writeF(tmStart, 100) + assert.Equal(t, int64(100), countF(tmStart, tmEnd)) + deleteApi := client.DeleteApi() + + err := deleteApi.DeleteWithName(context.Background(), "my-org", "my-bucket", tmStart, tmEnd, "") + require.Nil(t, err, err) + assert.Equal(t, int64(0), countF(tmStart, tmEnd)) + + tmEnd = writeF(tmStart, 100) + assert.Equal(t, int64(100), countF(tmStart, tmEnd)) + + err = deleteApi.DeleteWithName(context.Background(), "my-org", "my-bucket", tmStart, tmEnd, "a=1") + require.Nil(t, err, err) + assert.Equal(t, int64(50), countF(tmStart, tmEnd)) + + err = deleteApi.DeleteWithName(context.Background(), "my-org", "my-bucket", tmStart.Add(50*time.Minute), tmEnd, "b=static") + require.Nil(t, err, err) + assert.Equal(t, int64(25), countF(tmStart, tmEnd)) + + err = deleteApi.DeleteWithName(context.Background(), "org", "my-bucket", tmStart.Add(50*time.Minute), tmEnd, "b=static") + require.NotNil(t, err, err) + assert.True(t, strings.Contains(err.Error(), "not found")) + + err = deleteApi.DeleteWithName(context.Background(), "my-org", "bucket", tmStart.Add(50*time.Minute), tmEnd, "b=static") + require.NotNil(t, err, err) + assert.True(t, strings.Contains(err.Error(), "not found")) + +}