diff --git a/CHANGELOG.md b/CHANGELOG.md index 77132f4a..2b7e91e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### Bug fixes 1. [#132](https://github.com/influxdata/influxdb-client-go/pull/132) Properly handle errors instead of panics +1. [#134](https://github.com/influxdata/influxdb-client-go/pull/134) FluxQueryResult: support reordering of annotations ## 1.2.0 [2020-05-15] ### Breaking Changes diff --git a/api/query.go b/api/query.go index 79e1a274..0b844094 100644 --- a/api/query.go +++ b/api/query.go @@ -179,7 +179,7 @@ type QueryTableResult struct { } // TablePosition returns actual flux table position in the result, or -1 if no table was found yet -// Each new table is introduced by the #dataType annotation in csv +// Each new table is introduced by an annotation in csv func (q *QueryTableResult) TablePosition() int { if q.table != nil { return q.table.Position() @@ -208,6 +208,7 @@ type parsingState int const ( parsingStateNormal parsingState = iota + parsingStateAnnotation parsingStateNameRow parsingStateError ) @@ -233,6 +234,7 @@ func (q *QueryTableResult) Next() bool { }() parsingState := parsingStateNormal q.tableChanged = false + dataTypeAnnotationFound := false readRow: row, q.err = q.csvReader.Read() if q.err == io.EOF { @@ -246,23 +248,36 @@ readRow: if len(row) <= 1 { goto readRow } - + if len(row[0]) > 0 && row[0][0] == '#' { + if parsingState == parsingStateNormal { + q.table = query.NewFluxTableMetadata(q.tablePosition) + q.tablePosition++ + q.tableChanged = true + for i := range row[1:] { + q.table.AddColumn(query.NewFluxColumn(i)) + } + parsingState = parsingStateAnnotation + } + } + if q.table == nil { + q.err = errors.New("parsing error, annotations not found") + return false + } + if len(row)-1 != len(q.table.Columns()) { + q.err = fmt.Errorf("parsing error, row has different number of columns than the table: %d vs %d", len(row)-1, len(q.table.Columns())) + return false + } switch row[0] { case "": - if parsingState == parsingStateError { - var message string - if len(row) > 1 && len(row[1]) > 0 { - message = row[1] - } else { - message = "unknown query error" - } - reference := "" - if len(row) > 2 && len(row[2]) > 0 { - reference = fmt.Sprintf(",%s", row[2]) + switch parsingState { + case parsingStateAnnotation: + if !dataTypeAnnotationFound { + q.err = errors.New("parsing error, datatype annotation not found") + return false } - q.err = fmt.Errorf("%s%s", message, reference) - return false - } else if parsingState == parsingStateNameRow { + parsingState = parsingStateNameRow + fallthrough + case parsingStateNameRow: if row[1] == "error" { parsingState = parsingStateError } else { @@ -274,13 +289,18 @@ readRow: parsingState = parsingStateNormal } goto readRow - } - if q.table == nil { - q.err = errors.New("parsing error, datatype annotation not found") - return false - } - if len(row)-1 != len(q.table.Columns()) { - q.err = fmt.Errorf("parsing error, row has different number of columns than table: %d vs %d", len(row)-1, len(q.table.Columns())) + case parsingStateError: + var message string + if len(row) > 1 && len(row[1]) > 0 { + message = row[1] + } else { + message = "unknown query error" + } + reference := "" + if len(row) > 2 && len(row[2]) > 0 { + reference = fmt.Sprintf(",%s", row[2]) + } + q.err = fmt.Errorf("%s%s", message, reference) return false } values := make(map[string]interface{}) @@ -294,18 +314,14 @@ readRow: } q.record = query.NewFluxRecord(q.table.Position(), values) case "#datatype": - q.table = query.NewFluxTableMetadata(q.tablePosition) - q.tablePosition++ - q.tableChanged = true + dataTypeAnnotationFound = true for i, d := range row[1:] { - q.table.AddColumn(query.NewFluxColumn(i, d)) + if q.table.Column(i) != nil { + q.table.Column(i).SetDataType(d) + } } goto readRow case "#group": - if q.table == nil { - q.err = errors.New("parsing error, datatype annotation not found") - return false - } for i, g := range row[1:] { if q.table.Column(i) != nil { q.table.Column(i).SetGroup(g == "true") @@ -313,17 +329,11 @@ readRow: } goto readRow case "#default": - if q.table == nil { - q.err = errors.New("parsing error, datatype annotation not found") - return false - } for i, c := range row[1:] { if q.table.Column(i) != nil { q.table.Column(i).SetDefaultValue(c) } } - // there comes column names after defaults - parsingState = parsingStateNameRow goto readRow } // don't close query diff --git a/api/query/table.go b/api/query/table.go index e09757d1..c1f891fa 100644 --- a/api/query/table.go +++ b/api/query/table.go @@ -80,9 +80,9 @@ func (f *FluxTableMetadata) String() string { return buffer.String() } -// newFluxColumn creates FluxColumn for position and data type -func NewFluxColumn(index int, dataType string) *FluxColumn { - return &FluxColumn{index: index, dataType: dataType} +// newFluxColumn creates FluxColumn for position +func NewFluxColumn(index int) *FluxColumn { + return &FluxColumn{index: index} } // newFluxColumn creates FluxColumn diff --git a/api/query_test.go b/api/query_test.go index 662e7672..de8803e4 100644 --- a/api/query_test.go +++ b/api/query_test.go @@ -585,6 +585,183 @@ func TestInvalidDataType(t *testing.T) { assert.Equal(t, "deviceId has unknown data type int", queryResult.Err().Error()) } +func TestReorderedAnnotations(t *testing.T) { + expectedTable := query.NewFluxTableMetadataFull(0, + []*query.FluxColumn{ + query.NewFluxColumnFull("string", "_result", "result", false, 0), + query.NewFluxColumnFull("long", "", "table", false, 1), + query.NewFluxColumnFull("dateTime:RFC3339", "", "_start", true, 2), + query.NewFluxColumnFull("dateTime:RFC3339", "", "_stop", true, 3), + query.NewFluxColumnFull("dateTime:RFC3339", "", "_time", false, 4), + query.NewFluxColumnFull("double", "", "_value", false, 5), + query.NewFluxColumnFull("string", "", "_field", true, 6), + query.NewFluxColumnFull("string", "", "_measurement", true, 7), + query.NewFluxColumnFull("string", "", "a", true, 8), + query.NewFluxColumnFull("string", "", "b", true, 9), + }, + ) + expectedRecord1 := query.NewFluxRecord(0, + map[string]interface{}{ + "result": "_result", + "table": int64(0), + "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"), + "_stop": mustParseTime("2020-02-18T22:19:49.747562847Z"), + "_time": mustParseTime("2020-02-18T10:34:08.135814545Z"), + "_value": 1.4, + "_field": "f", + "_measurement": "test", + "a": "1", + "b": "adsfasdf", + }, + ) + + expectedRecord2 := query.NewFluxRecord(0, + map[string]interface{}{ + "result": "_result", + "table": int64(0), + "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"), + "_stop": mustParseTime("2020-02-18T22:19:49.747562847Z"), + "_time": mustParseTime("2020-02-18T22:08:44.850214724Z"), + "_value": 6.6, + "_field": "f", + "_measurement": "test", + "a": "1", + "b": "adsfasdf", + }, + ) + + csvTable1 := `#group,false,false,true,true,false,false,true,true,true,true +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string +#default,_result,,,,,,,,, +,result,table,_start,_stop,_time,_value,_field,_measurement,a,b +,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,1.4,f,test,1,adsfasdf +,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,6.6,f,test,1,adsfasdf + +` + reader := strings.NewReader(csvTable1) + csvReader := csv.NewReader(reader) + csvReader.FieldsPerRecord = -1 + queryResult := &QueryTableResult{Closer: ioutil.NopCloser(reader), csvReader: csvReader} + require.True(t, queryResult.Next(), queryResult.Err()) + require.Nil(t, queryResult.Err()) + + require.Equal(t, queryResult.table, expectedTable) + assert.True(t, queryResult.tableChanged) + require.NotNil(t, queryResult.Record()) + require.Equal(t, queryResult.Record(), expectedRecord1) + + require.True(t, queryResult.Next(), queryResult.Err()) + require.Nil(t, queryResult.Err()) + assert.False(t, queryResult.tableChanged) + require.NotNil(t, queryResult.Record()) + require.Equal(t, queryResult.Record(), expectedRecord2) + + require.False(t, queryResult.Next()) + require.Nil(t, queryResult.Err()) + + csvTable2 := `#default,_result,,,,,,,,, +#group,false,false,true,true,false,false,true,true,true,true +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string +,result,table,_start,_stop,_time,_value,_field,_measurement,a,b +,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,1.4,f,test,1,adsfasdf +,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,6.6,f,test,1,adsfasdf + +` + reader = strings.NewReader(csvTable2) + csvReader = csv.NewReader(reader) + csvReader.FieldsPerRecord = -1 + queryResult = &QueryTableResult{Closer: ioutil.NopCloser(reader), csvReader: csvReader} + require.True(t, queryResult.Next(), queryResult.Err()) + require.Nil(t, queryResult.Err()) + + require.Equal(t, queryResult.table, expectedTable) + assert.True(t, queryResult.tableChanged) + require.NotNil(t, queryResult.Record()) + require.Equal(t, queryResult.Record(), expectedRecord1) + + require.True(t, queryResult.Next(), queryResult.Err()) + require.Nil(t, queryResult.Err()) + assert.False(t, queryResult.tableChanged) + require.NotNil(t, queryResult.Record()) + require.Equal(t, queryResult.Record(), expectedRecord2) + + require.False(t, queryResult.Next()) + require.Nil(t, queryResult.Err()) +} + +func TestDatatypeOnlyAnnotation(t *testing.T) { + expectedTable := query.NewFluxTableMetadataFull(0, + []*query.FluxColumn{ + query.NewFluxColumnFull("string", "", "result", false, 0), + query.NewFluxColumnFull("long", "", "table", false, 1), + query.NewFluxColumnFull("dateTime:RFC3339", "", "_start", false, 2), + query.NewFluxColumnFull("dateTime:RFC3339", "", "_stop", false, 3), + query.NewFluxColumnFull("dateTime:RFC3339", "", "_time", false, 4), + query.NewFluxColumnFull("double", "", "_value", false, 5), + query.NewFluxColumnFull("string", "", "_field", false, 6), + query.NewFluxColumnFull("string", "", "_measurement", false, 7), + query.NewFluxColumnFull("string", "", "a", false, 8), + query.NewFluxColumnFull("string", "", "b", false, 9), + }, + ) + expectedRecord1 := query.NewFluxRecord(0, + map[string]interface{}{ + "result": nil, + "table": int64(0), + "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"), + "_stop": mustParseTime("2020-02-18T22:19:49.747562847Z"), + "_time": mustParseTime("2020-02-18T10:34:08.135814545Z"), + "_value": 1.4, + "_field": "f", + "_measurement": "test", + "a": "1", + "b": "adsfasdf", + }, + ) + + expectedRecord2 := query.NewFluxRecord(0, + map[string]interface{}{ + "result": nil, + "table": int64(0), + "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"), + "_stop": mustParseTime("2020-02-18T22:19:49.747562847Z"), + "_time": mustParseTime("2020-02-18T22:08:44.850214724Z"), + "_value": 6.6, + "_field": "f", + "_measurement": "test", + "a": "1", + "b": "adsfasdf", + }, + ) + + csvTable1 := `#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string +,result,table,_start,_stop,_time,_value,_field,_measurement,a,b +,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,1.4,f,test,1,adsfasdf +,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,6.6,f,test,1,adsfasdf + +` + reader := strings.NewReader(csvTable1) + csvReader := csv.NewReader(reader) + csvReader.FieldsPerRecord = -1 + queryResult := &QueryTableResult{Closer: ioutil.NopCloser(reader), csvReader: csvReader} + require.True(t, queryResult.Next(), queryResult.Err()) + require.Nil(t, queryResult.Err()) + + require.Equal(t, queryResult.table, expectedTable) + assert.True(t, queryResult.tableChanged) + require.NotNil(t, queryResult.Record()) + require.Equal(t, queryResult.Record(), expectedRecord1) + + require.True(t, queryResult.Next(), queryResult.Err()) + require.Nil(t, queryResult.Err()) + assert.False(t, queryResult.tableChanged) + require.NotNil(t, queryResult.Record()) + require.Equal(t, queryResult.Record(), expectedRecord2) + + require.False(t, queryResult.Next()) + require.Nil(t, queryResult.Err()) +} + func TestMissingDatatypeAnnotation(t *testing.T) { csvTable1 := ` #group,false,false,true,true,false,true,true,false,false,false @@ -617,26 +794,28 @@ func TestMissingDatatypeAnnotation(t *testing.T) { require.False(t, queryResult.Next()) require.NotNil(t, queryResult.Err()) assert.Equal(t, "parsing error, datatype annotation not found", queryResult.Err().Error()) +} +func TestMissingAnnotations(t *testing.T) { csvTable3 := ` ,result,table,_start,_stop,_time,deviceId,sensor,elapsed,note,start ,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:38:11.480545389Z,1467463,BME280,1m1s,ZGF0YWluYmFzZTY0,2020-04-27T00:00:00Z ,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:39:36.330153686Z,1467463,BME280,1h20m30.13245s,eHh4eHhjY2NjY2NkZGRkZA==,2020-04-28T00:00:00Z -` - reader = strings.NewReader(csvTable3) - csvReader = csv.NewReader(reader) +` + reader := strings.NewReader(csvTable3) + csvReader := csv.NewReader(reader) csvReader.FieldsPerRecord = -1 - queryResult = &QueryTableResult{Closer: ioutil.NopCloser(reader), csvReader: csvReader} + queryResult := &QueryTableResult{Closer: ioutil.NopCloser(reader), csvReader: csvReader} require.False(t, queryResult.Next()) require.NotNil(t, queryResult.Err()) - assert.Equal(t, "parsing error, datatype annotation not found", queryResult.Err().Error()) + assert.Equal(t, "parsing error, annotations not found", queryResult.Err().Error()) } func TestDifferentNumberOfColumns(t *testing.T) { csvTable := `#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,int,string,duration,base64Binary,dateTime:RFC3339 #group,false,false,true,true,false,true,true,false,false, -#default,_result,,,,,,, +#default,_result,,,,,,,,, ,result,table,_start,_stop,_time,deviceId,sensor,elapsed,note,start ,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:38:11.480545389Z,1467463,BME280,1m1s,ZGF0YWluYmFzZTY0,2020-04-27T00:00:00Z,2345234 ` @@ -647,7 +826,37 @@ func TestDifferentNumberOfColumns(t *testing.T) { queryResult := &QueryTableResult{Closer: ioutil.NopCloser(reader), csvReader: csvReader} require.False(t, queryResult.Next()) require.NotNil(t, queryResult.Err()) - assert.Equal(t, "parsing error, row has different number of columns than table: 11 vs 10", queryResult.Err().Error()) + assert.Equal(t, "parsing error, row has different number of columns than the table: 11 vs 10", queryResult.Err().Error()) + + csvTable2 := `#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,int,string,duration,base64Binary,dateTime:RFC3339 +#group,false,false,true,true,false,true,true,false,false, +#default,_result,,,,,,, +,result,table,_start,_stop,_time,deviceId,sensor,elapsed,note,start +,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:38:11.480545389Z,1467463,BME280,1m1s,ZGF0YWluYmFzZTY0,2020-04-27T00:00:00Z,2345234 +` + + reader = strings.NewReader(csvTable2) + csvReader = csv.NewReader(reader) + csvReader.FieldsPerRecord = -1 + queryResult = &QueryTableResult{Closer: ioutil.NopCloser(reader), csvReader: csvReader} + require.False(t, queryResult.Next()) + require.NotNil(t, queryResult.Err()) + assert.Equal(t, "parsing error, row has different number of columns than the table: 8 vs 10", queryResult.Err().Error()) + + csvTable3 := `#default,_result,,,,,,, +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,int,string,duration,base64Binary,dateTime:RFC3339 +#group,false,false,true,true,false,true,true,false,false, +,result,table,_start,_stop,_time,deviceId,sensor,elapsed,note,start +,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:38:11.480545389Z,1467463,BME280,1m1s,ZGF0YWluYmFzZTY0,2020-04-27T00:00:00Z,2345234 +` + + reader = strings.NewReader(csvTable3) + csvReader = csv.NewReader(reader) + csvReader.FieldsPerRecord = -1 + queryResult = &QueryTableResult{Closer: ioutil.NopCloser(reader), csvReader: csvReader} + require.False(t, queryResult.Next()) + require.NotNil(t, queryResult.Err()) + assert.Equal(t, "parsing error, row has different number of columns than the table: 10 vs 8", queryResult.Err().Error()) } func TestEmptyValue(t *testing.T) { diff --git a/client_e2e_test.go b/client_e2e_test.go index 4030da02..0af507f9 100644 --- a/client_e2e_test.go +++ b/client_e2e_test.go @@ -78,23 +78,26 @@ func TestWrite(t *testing.T) { fmt.Println("Write error: ", err.Error()) } }() + timestamp := time.Now() for i, f := 0, 3.3; i < 10; i++ { - writeApi.WriteRecord(fmt.Sprintf("test,a=%d,b=local f=%.2f,i=%di", i%2, f, i)) + writeApi.WriteRecord(fmt.Sprintf("test,a=%d,b=local f=%.2f,i=%di %d", i%2, f, i, timestamp.UnixNano())) //writeApi.Flush() f += 3.3 + timestamp = timestamp.Add(time.Nanosecond) } for i, f := int64(10), 33.0; i < 20; i++ { p := influxdb2.NewPoint("test", map[string]string{"a": strconv.FormatInt(i%2, 10), "b": "static"}, map[string]interface{}{"f": f, "i": i}, - time.Now()) + timestamp) writeApi.WritePoint(p) f += 3.3 + timestamp = timestamp.Add(time.Nanosecond) } err := client.WriteApiBlocking("my-org", "my-bucket").WritePoint(context.Background(), influxdb2.NewPointWithMeasurement("test"). - AddTag("a", "3").AddField("i", 20)) + AddTag("a", "3").AddField("i", 20).AddField("f", 4.4)) assert.Nil(t, err) client.Close() @@ -106,7 +109,7 @@ func TestQueryRaw(t *testing.T) { client := influxdb2.NewClient("http://localhost:9999", authToken) queryApi := client.QueryApi("my-org") - res, err := queryApi.QueryRaw(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "test")`, influxdb2.DefaultDialect()) + res, err := queryApi.QueryRaw(context.Background(), `from(bucket:"my-bucket")|> range(start: -24h) |> filter(fn: (r) => r._measurement == "test")`, influxdb2.DefaultDialect()) if err != nil { t.Error(err) } else { @@ -124,7 +127,9 @@ func TestQuery(t *testing.T) { if err != nil { t.Error(err) } else { + rows := 0 for result.Next() { + rows++ if result.TableChanged() { fmt.Printf("table: %s\n", result.TableMetadata().String()) } @@ -133,7 +138,9 @@ func TestQuery(t *testing.T) { if result.Err() != nil { t.Error(result.Err()) } + assert.Equal(t, 42, rows) } + } func TestAuthorizationsApi(t *testing.T) {