diff --git a/pkg/tsdb/cloudmonitoring/cloudmonitoring.go b/pkg/tsdb/cloudmonitoring/cloudmonitoring.go index c9a216aafbd..f94d7725f70 100644 --- a/pkg/tsdb/cloudmonitoring/cloudmonitoring.go +++ b/pkg/tsdb/cloudmonitoring/cloudmonitoring.go @@ -502,11 +502,43 @@ func (e *CloudMonitoringExecutor) unmarshalResponse(res *http.Response) (cloudMo return data, nil } +func handleDistributionSeries(series timeSeries, defaultMetricName string, seriesLabels map[string]string, + query *cloudMonitoringQuery, queryRes *tsdb.QueryResult) { + points := make([]tsdb.TimePoint, 0) + for i := len(series.Points) - 1; i >= 0; i-- { + point := series.Points[i] + value := point.Value.DoubleValue + + if series.ValueType == "INT64" { + parsedValue, err := strconv.ParseFloat(point.Value.IntValue, 64) + if err == nil { + value = parsedValue + } + } + + if series.ValueType == "BOOL" { + if point.Value.BoolValue { + value = 1 + } else { + value = 0 + } + } + + points = append(points, tsdb.NewTimePoint(null.FloatFrom(value), float64((point.Interval.EndTime).Unix())*1000)) + } + + metricName := formatLegendKeys(series.Metric.Type, defaultMetricName, seriesLabels, nil, query) + + queryRes.Series = append(queryRes.Series, &tsdb.TimeSeries{ + Name: metricName, + Points: points, + }) +} + func (e *CloudMonitoringExecutor) parseResponse(queryRes *tsdb.QueryResult, data cloudMonitoringResponse, query *cloudMonitoringQuery) error { labels := make(map[string]map[string]bool) for _, series := range data.TimeSeries { - points := make([]tsdb.TimePoint, 0) seriesLabels := make(map[string]string) defaultMetricName := series.Metric.Type labels["resource.type"] = map[string]bool{series.Resource.Type: true} @@ -566,34 +598,7 @@ func (e *CloudMonitoringExecutor) parseResponse(queryRes *tsdb.QueryResult, data // reverse the order to be ascending if series.ValueType != "DISTRIBUTION" { - for i := len(series.Points) - 1; i >= 0; i-- { - point := series.Points[i] - value := point.Value.DoubleValue - - if series.ValueType == "INT64" { - parsedValue, err := strconv.ParseFloat(point.Value.IntValue, 64) - if err == nil { - value = parsedValue - } - } - - if series.ValueType == "BOOL" { - if point.Value.BoolValue { - value = 1 - } else { - value = 0 - } - } - - points = append(points, tsdb.NewTimePoint(null.FloatFrom(value), float64((point.Interval.EndTime).Unix())*1000)) - } - - metricName := formatLegendKeys(series.Metric.Type, defaultMetricName, seriesLabels, nil, query) - - queryRes.Series = append(queryRes.Series, &tsdb.TimeSeries{ - Name: metricName, - Points: points, - }) + handleDistributionSeries(series, defaultMetricName, seriesLabels, query, queryRes) } else { buckets := make(map[int]*tsdb.TimeSeries) diff --git a/pkg/tsdb/cloudmonitoring/types.go b/pkg/tsdb/cloudmonitoring/types.go index 4c6bb2dc145..4d297714bdf 100644 --- a/pkg/tsdb/cloudmonitoring/types.go +++ b/pkg/tsdb/cloudmonitoring/types.go @@ -65,46 +65,48 @@ type ( } cloudMonitoringResponse struct { - TimeSeries []struct { - Metric struct { - Labels map[string]string `json:"labels"` - Type string `json:"type"` - } `json:"metric"` - Resource struct { - Type string `json:"type"` - Labels map[string]string `json:"labels"` - } `json:"resource"` - MetaData map[string]map[string]interface{} `json:"metadata"` - MetricKind string `json:"metricKind"` - ValueType string `json:"valueType"` - Points []struct { - Interval struct { - StartTime time.Time `json:"startTime"` - EndTime time.Time `json:"endTime"` - } `json:"interval"` - Value struct { - DoubleValue float64 `json:"doubleValue"` - StringValue string `json:"stringValue"` - BoolValue bool `json:"boolValue"` - IntValue string `json:"int64Value"` - DistributionValue struct { - Count string `json:"count"` - Mean float64 `json:"mean"` - SumOfSquaredDeviation float64 `json:"sumOfSquaredDeviation"` - Range struct { - Min int `json:"min"` - Max int `json:"max"` - } `json:"range"` - BucketOptions cloudMonitoringBucketOptions `json:"bucketOptions"` - BucketCounts []string `json:"bucketCounts"` - Examplars []struct { - Value float64 `json:"value"` - Timestamp string `json:"timestamp"` - // attachments - } `json:"examplars"` - } `json:"distributionValue"` - } `json:"value"` - } `json:"points"` - } `json:"timeSeries"` + TimeSeries []timeSeries `json:"timeSeries"` } ) + +type timeSeries struct { + Metric struct { + Labels map[string]string `json:"labels"` + Type string `json:"type"` + } `json:"metric"` + Resource struct { + Type string `json:"type"` + Labels map[string]string `json:"labels"` + } `json:"resource"` + MetaData map[string]map[string]interface{} `json:"metadata"` + MetricKind string `json:"metricKind"` + ValueType string `json:"valueType"` + Points []struct { + Interval struct { + StartTime time.Time `json:"startTime"` + EndTime time.Time `json:"endTime"` + } `json:"interval"` + Value struct { + DoubleValue float64 `json:"doubleValue"` + StringValue string `json:"stringValue"` + BoolValue bool `json:"boolValue"` + IntValue string `json:"int64Value"` + DistributionValue struct { + Count string `json:"count"` + Mean float64 `json:"mean"` + SumOfSquaredDeviation float64 `json:"sumOfSquaredDeviation"` + Range struct { + Min int `json:"min"` + Max int `json:"max"` + } `json:"range"` + BucketOptions cloudMonitoringBucketOptions `json:"bucketOptions"` + BucketCounts []string `json:"bucketCounts"` + Examplars []struct { + Value float64 `json:"value"` + Timestamp string `json:"timestamp"` + // attachments + } `json:"examplars"` + } `json:"distributionValue"` + } `json:"value"` + } `json:"points"` +} diff --git a/pkg/tsdb/elasticsearch/time_series_query.go b/pkg/tsdb/elasticsearch/time_series_query.go index 87aea5305be..64f6654b32f 100644 --- a/pkg/tsdb/elasticsearch/time_series_query.go +++ b/pkg/tsdb/elasticsearch/time_series_query.go @@ -24,9 +24,6 @@ var newTimeSeriesQuery = func(client es.Client, tsdbQuery *tsdb.TsdbQuery, inter } func (e *timeSeriesQuery) execute() (*tsdb.Response, error) { - result := &tsdb.Response{} - result.Results = make(map[string]*tsdb.QueryResult) - tsQueryParser := newTimeSeriesQueryParser() queries, err := tsQueryParser.parse(e.tsdbQuery) if err != nil { @@ -37,121 +34,13 @@ func (e *timeSeriesQuery) execute() (*tsdb.Response, error) { from := fmt.Sprintf("%d", e.tsdbQuery.TimeRange.GetFromAsMsEpoch()) to := fmt.Sprintf("%d", e.tsdbQuery.TimeRange.GetToAsMsEpoch()) - + result := &tsdb.Response{ + Results: make(map[string]*tsdb.QueryResult), + } for _, q := range queries { - minInterval, err := e.client.GetMinInterval(q.Interval) - if err != nil { + if err := e.processQuery(q, ms, from, to, result); err != nil { return nil, err } - interval := e.intervalCalculator.Calculate(e.tsdbQuery.TimeRange, minInterval) - - b := ms.Search(interval) - b.Size(0) - filters := b.Query().Bool().Filter() - filters.AddDateRangeFilter(e.client.GetTimeField(), to, from, es.DateFormatEpochMS) - - if q.RawQuery != "" { - filters.AddQueryStringFilter(q.RawQuery, true) - } - - if len(q.BucketAggs) == 0 { - if len(q.Metrics) == 0 || q.Metrics[0].Type != "raw_document" { - result.Results[q.RefID] = &tsdb.QueryResult{ - RefId: q.RefID, - Error: fmt.Errorf("invalid query, missing metrics and aggregations"), - ErrorString: "invalid query, missing metrics and aggregations", - } - continue - } - metric := q.Metrics[0] - b.Size(metric.Settings.Get("size").MustInt(500)) - b.SortDesc("@timestamp", "boolean") - b.AddDocValueField("@timestamp") - continue - } - - aggBuilder := b.Agg() - - // iterate backwards to create aggregations bottom-down - for _, bucketAgg := range q.BucketAggs { - switch bucketAgg.Type { - case dateHistType: - aggBuilder = addDateHistogramAgg(aggBuilder, bucketAgg, from, to) - case histogramType: - aggBuilder = addHistogramAgg(aggBuilder, bucketAgg) - case filtersType: - aggBuilder = addFiltersAgg(aggBuilder, bucketAgg) - case termsType: - aggBuilder = addTermsAgg(aggBuilder, bucketAgg, q.Metrics) - case geohashGridType: - aggBuilder = addGeoHashGridAgg(aggBuilder, bucketAgg) - } - } - - for _, m := range q.Metrics { - m := m - if m.Type == countType { - continue - } - - if isPipelineAgg(m.Type) { - if isPipelineAggWithMultipleBucketPaths(m.Type) { - if len(m.PipelineVariables) > 0 { - bucketPaths := map[string]interface{}{} - for name, pipelineAgg := range m.PipelineVariables { - if _, err := strconv.Atoi(pipelineAgg); err == nil { - var appliedAgg *MetricAgg - for _, pipelineMetric := range q.Metrics { - if pipelineMetric.ID == pipelineAgg { - appliedAgg = pipelineMetric - break - } - } - if appliedAgg != nil { - if appliedAgg.Type == countType { - bucketPaths[name] = "_count" - } else { - bucketPaths[name] = pipelineAgg - } - } - } - } - - aggBuilder.Pipeline(m.ID, m.Type, bucketPaths, func(a *es.PipelineAggregation) { - a.Settings = m.Settings.MustMap() - }) - } else { - continue - } - } else { - if _, err := strconv.Atoi(m.PipelineAggregate); err == nil { - var appliedAgg *MetricAgg - for _, pipelineMetric := range q.Metrics { - if pipelineMetric.ID == m.PipelineAggregate { - appliedAgg = pipelineMetric - break - } - } - if appliedAgg != nil { - bucketPath := m.PipelineAggregate - if appliedAgg.Type == countType { - bucketPath = "_count" - } - - aggBuilder.Pipeline(m.ID, m.Type, bucketPath, func(a *es.PipelineAggregation) { - a.Settings = m.Settings.MustMap() - }) - } - } else { - continue - } - } - } else { - aggBuilder.Metric(m.ID, m.Type, m.Field, func(a *es.MetricAggregation) { - a.Settings = m.Settings.MustMap() - }) - } - } } req, err := ms.Build() @@ -168,6 +57,125 @@ func (e *timeSeriesQuery) execute() (*tsdb.Response, error) { return rp.getTimeSeries() } +func (e *timeSeriesQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilder, from, to string, + result *tsdb.Response) error { + minInterval, err := e.client.GetMinInterval(q.Interval) + if err != nil { + return err + } + interval := e.intervalCalculator.Calculate(e.tsdbQuery.TimeRange, minInterval) + + b := ms.Search(interval) + b.Size(0) + filters := b.Query().Bool().Filter() + filters.AddDateRangeFilter(e.client.GetTimeField(), to, from, es.DateFormatEpochMS) + + if q.RawQuery != "" { + filters.AddQueryStringFilter(q.RawQuery, true) + } + + if len(q.BucketAggs) == 0 { + if len(q.Metrics) == 0 || q.Metrics[0].Type != "raw_document" { + result.Results[q.RefID] = &tsdb.QueryResult{ + RefId: q.RefID, + Error: fmt.Errorf("invalid query, missing metrics and aggregations"), + ErrorString: "invalid query, missing metrics and aggregations", + } + return nil + } + metric := q.Metrics[0] + b.Size(metric.Settings.Get("size").MustInt(500)) + b.SortDesc("@timestamp", "boolean") + b.AddDocValueField("@timestamp") + return nil + } + + aggBuilder := b.Agg() + + // iterate backwards to create aggregations bottom-down + for _, bucketAgg := range q.BucketAggs { + switch bucketAgg.Type { + case dateHistType: + aggBuilder = addDateHistogramAgg(aggBuilder, bucketAgg, from, to) + case histogramType: + aggBuilder = addHistogramAgg(aggBuilder, bucketAgg) + case filtersType: + aggBuilder = addFiltersAgg(aggBuilder, bucketAgg) + case termsType: + aggBuilder = addTermsAgg(aggBuilder, bucketAgg, q.Metrics) + case geohashGridType: + aggBuilder = addGeoHashGridAgg(aggBuilder, bucketAgg) + } + } + + for _, m := range q.Metrics { + m := m + if m.Type == countType { + continue + } + + if isPipelineAgg(m.Type) { + if isPipelineAggWithMultipleBucketPaths(m.Type) { + if len(m.PipelineVariables) > 0 { + bucketPaths := map[string]interface{}{} + for name, pipelineAgg := range m.PipelineVariables { + if _, err := strconv.Atoi(pipelineAgg); err == nil { + var appliedAgg *MetricAgg + for _, pipelineMetric := range q.Metrics { + if pipelineMetric.ID == pipelineAgg { + appliedAgg = pipelineMetric + break + } + } + if appliedAgg != nil { + if appliedAgg.Type == countType { + bucketPaths[name] = "_count" + } else { + bucketPaths[name] = pipelineAgg + } + } + } + } + + aggBuilder.Pipeline(m.ID, m.Type, bucketPaths, func(a *es.PipelineAggregation) { + a.Settings = m.Settings.MustMap() + }) + } else { + continue + } + } else { + if _, err := strconv.Atoi(m.PipelineAggregate); err == nil { + var appliedAgg *MetricAgg + for _, pipelineMetric := range q.Metrics { + if pipelineMetric.ID == m.PipelineAggregate { + appliedAgg = pipelineMetric + break + } + } + if appliedAgg != nil { + bucketPath := m.PipelineAggregate + if appliedAgg.Type == countType { + bucketPath = "_count" + } + + aggBuilder.Pipeline(m.ID, m.Type, bucketPath, func(a *es.PipelineAggregation) { + a.Settings = m.Settings.MustMap() + }) + } + } else { + continue + } + } + } else { + aggBuilder.Metric(m.ID, m.Type, m.Field, func(a *es.MetricAggregation) { + a.Settings = m.Settings.MustMap() + }) + } + } + + return nil +} + func addDateHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, timeFrom, timeTo string) es.AggBuilder { aggBuilder.DateHistogram(bucketAgg.ID, bucketAgg.Field, func(a *es.DateHistogramAgg, b es.AggBuilder) { a.Interval = bucketAgg.Settings.Get("interval").MustString("auto") diff --git a/pkg/tsdb/sqleng/sql_engine.go b/pkg/tsdb/sqleng/sql_engine.go index 6271921d2ba..410c90bda75 100644 --- a/pkg/tsdb/sqleng/sql_engine.go +++ b/pkg/tsdb/sqleng/sql_engine.go @@ -276,45 +276,60 @@ func (e *sqlQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows, return nil } -func (e *sqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error { - pointsBySeries := make(map[string]*tsdb.TimeSeries) - seriesByQueryOrder := list.New() - +func newProcessCfg(query *tsdb.Query, tsdbQuery *tsdb.TsdbQuery, rows *core.Rows) (*processCfg, error) { columnNames, err := rows.Columns() if err != nil { - return err + return nil, err + } + columnTypes, err := rows.ColumnTypes() + if err != nil { + return nil, err } - columnTypes, err := rows.ColumnTypes() + fillMissing := query.Model.Get("fill").MustBool(false) + + cfg := &processCfg{ + rowCount: 0, + columnTypes: columnTypes, + columnNames: columnNames, + rows: rows, + timeIndex: -1, + metricIndex: -1, + metricPrefix: false, + fillMissing: fillMissing, + seriesByQueryOrder: list.New(), + pointsBySeries: make(map[string]*tsdb.TimeSeries), + tsdbQuery: tsdbQuery, + } + return cfg, nil +} + +func (e *sqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, + tsdbQuery *tsdb.TsdbQuery) error { + cfg, err := newProcessCfg(query, tsdbQuery, rows) if err != nil { return err } - rowCount := 0 - timeIndex := -1 - metricIndex := -1 - metricPrefix := false - var metricPrefixValue string - // check columns of resultset: a column named time is mandatory // the first text column is treated as metric name unless a column named metric is present - for i, col := range columnNames { + for i, col := range cfg.columnNames { for _, tc := range e.timeColumnNames { if col == tc { - timeIndex = i + cfg.timeIndex = i continue } } switch col { case "metric": - metricIndex = i + cfg.metricIndex = i default: - if metricIndex == -1 { - columnType := columnTypes[i].DatabaseTypeName() + if cfg.metricIndex == -1 { + columnType := cfg.columnTypes[i].DatabaseTypeName() for _, mct := range e.metricColumnTypes { if columnType == mct { - metricIndex = i + cfg.metricIndex = i continue } } @@ -323,154 +338,179 @@ func (e *sqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.R } // use metric column as prefix with multiple value columns - if metricIndex != -1 && len(columnNames) > 3 { - metricPrefix = true + if cfg.metricIndex != -1 && len(cfg.columnNames) > 3 { + cfg.metricPrefix = true } - if timeIndex == -1 { + if cfg.timeIndex == -1 { return fmt.Errorf("Found no column named %s", strings.Join(e.timeColumnNames, " or ")) } - fillMissing := query.Model.Get("fill").MustBool(false) - var fillInterval float64 - fillValue := null.Float{} - fillPrevious := false - - if fillMissing { - fillInterval = query.Model.Get("fillInterval").MustFloat64() * 1000 + if cfg.fillMissing { + cfg.fillInterval = query.Model.Get("fillInterval").MustFloat64() * 1000 switch query.Model.Get("fillMode").MustString() { case "null": case "previous": - fillPrevious = true + cfg.fillPrevious = true case "value": - fillValue.Float64 = query.Model.Get("fillValue").MustFloat64() - fillValue.Valid = true + cfg.fillValue.Float64 = query.Model.Get("fillValue").MustFloat64() + cfg.fillValue.Valid = true } } for rows.Next() { - var timestamp float64 - var value null.Float - var metric string - - if rowCount > rowLimit { - return fmt.Errorf("query row limit exceeded, limit %d", rowLimit) - } - - values, err := e.queryResultTransformer.TransformQueryResult(columnTypes, rows) - if err != nil { + if err := e.processRow(cfg); err != nil { return err } - - // converts column named time to unix timestamp in milliseconds to make - // native mysql datetime types and epoch dates work in - // annotation and table queries. - ConvertSqlTimeColumnToEpochMs(values, timeIndex) - - switch columnValue := values[timeIndex].(type) { - case int64: - timestamp = float64(columnValue) - case float64: - timestamp = columnValue - default: - return fmt.Errorf("Invalid type for column time, must be of type timestamp or unix timestamp, got: %T %v", columnValue, columnValue) - } - - if metricIndex >= 0 { - if columnValue, ok := values[metricIndex].(string); ok { - if metricPrefix { - metricPrefixValue = columnValue - } else { - metric = columnValue - } - } else { - return fmt.Errorf("Column metric must be of type %s. metric column name: %s type: %s but datatype is %T", strings.Join(e.metricColumnTypes, ", "), columnNames[metricIndex], columnTypes[metricIndex].DatabaseTypeName(), values[metricIndex]) - } - } - - for i, col := range columnNames { - if i == timeIndex || i == metricIndex { - continue - } - - if value, err = ConvertSqlValueColumnToFloat(col, values[i]); err != nil { - return err - } - - if metricIndex == -1 { - metric = col - } else if metricPrefix { - metric = metricPrefixValue + " " + col - } - - series, exist := pointsBySeries[metric] - if !exist { - series = &tsdb.TimeSeries{Name: metric} - pointsBySeries[metric] = series - seriesByQueryOrder.PushBack(metric) - } - - if fillMissing { - var intervalStart float64 - if !exist { - intervalStart = float64(tsdbQuery.TimeRange.MustGetFrom().UnixNano() / 1e6) - } else { - intervalStart = series.Points[len(series.Points)-1][1].Float64 + fillInterval - } - - if fillPrevious { - if len(series.Points) > 0 { - fillValue = series.Points[len(series.Points)-1][0] - } else { - fillValue.Valid = false - } - } - - // align interval start - intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval - - for i := intervalStart; i < timestamp; i += fillInterval { - series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)}) - rowCount++ - } - } - - series.Points = append(series.Points, tsdb.TimePoint{value, null.FloatFrom(timestamp)}) - - if setting.Env == setting.DEV { - e.log.Debug("Rows", "metric", metric, "time", timestamp, "value", value) - } - } } - for elem := seriesByQueryOrder.Front(); elem != nil; elem = elem.Next() { + for elem := cfg.seriesByQueryOrder.Front(); elem != nil; elem = elem.Next() { key := elem.Value.(string) - result.Series = append(result.Series, pointsBySeries[key]) + result.Series = append(result.Series, cfg.pointsBySeries[key]) + if !cfg.fillMissing { + continue + } - if fillMissing { - series := pointsBySeries[key] - // fill in values from last fetched value till interval end - intervalStart := series.Points[len(series.Points)-1][1].Float64 - intervalEnd := float64(tsdbQuery.TimeRange.MustGetTo().UnixNano() / 1e6) + series := cfg.pointsBySeries[key] + // fill in values from last fetched value till interval end + intervalStart := series.Points[len(series.Points)-1][1].Float64 + intervalEnd := float64(tsdbQuery.TimeRange.MustGetTo().UnixNano() / 1e6) - if fillPrevious { + if cfg.fillPrevious { + if len(series.Points) > 0 { + cfg.fillValue = series.Points[len(series.Points)-1][0] + } else { + cfg.fillValue.Valid = false + } + } + + // align interval start + intervalStart = math.Floor(intervalStart/cfg.fillInterval) * cfg.fillInterval + for i := intervalStart + cfg.fillInterval; i < intervalEnd; i += cfg.fillInterval { + series.Points = append(series.Points, tsdb.TimePoint{cfg.fillValue, null.FloatFrom(i)}) + cfg.rowCount++ + } + } + + result.Meta.Set("rowCount", cfg.rowCount) + return nil +} + +type processCfg struct { + rowCount int + columnTypes []*sql.ColumnType + columnNames []string + rows *core.Rows + timeIndex int + metricIndex int + metricPrefix bool + metricPrefixValue string + fillMissing bool + pointsBySeries map[string]*tsdb.TimeSeries + seriesByQueryOrder *list.List + fillValue null.Float + tsdbQuery *tsdb.TsdbQuery + fillInterval float64 + fillPrevious bool +} + +func (e *sqlQueryEndpoint) processRow(cfg *processCfg) error { + var timestamp float64 + var value null.Float + var metric string + + if cfg.rowCount > rowLimit { + return fmt.Errorf("query row limit exceeded, limit %d", rowLimit) + } + + values, err := e.queryResultTransformer.TransformQueryResult(cfg.columnTypes, cfg.rows) + if err != nil { + return err + } + + // converts column named time to unix timestamp in milliseconds to make + // native mysql datetime types and epoch dates work in + // annotation and table queries. + ConvertSqlTimeColumnToEpochMs(values, cfg.timeIndex) + + switch columnValue := values[cfg.timeIndex].(type) { + case int64: + timestamp = float64(columnValue) + case float64: + timestamp = columnValue + default: + return fmt.Errorf("invalid type for column time, must be of type timestamp or unix timestamp, got: %T %v", + columnValue, columnValue) + } + + if cfg.metricIndex >= 0 { + if columnValue, ok := values[cfg.metricIndex].(string); ok { + if cfg.metricPrefix { + cfg.metricPrefixValue = columnValue + } else { + metric = columnValue + } + } else { + return fmt.Errorf("column metric must be of type %s. metric column name: %s type: %s but datatype is %T", + strings.Join(e.metricColumnTypes, ", "), cfg.columnNames[cfg.metricIndex], + cfg.columnTypes[cfg.metricIndex].DatabaseTypeName(), values[cfg.metricIndex]) + } + } + + for i, col := range cfg.columnNames { + if i == cfg.timeIndex || i == cfg.metricIndex { + continue + } + + if value, err = ConvertSqlValueColumnToFloat(col, values[i]); err != nil { + return err + } + + if cfg.metricIndex == -1 { + metric = col + } else if cfg.metricPrefix { + metric = cfg.metricPrefixValue + " " + col + } + + series, exist := cfg.pointsBySeries[metric] + if !exist { + series = &tsdb.TimeSeries{Name: metric} + cfg.pointsBySeries[metric] = series + cfg.seriesByQueryOrder.PushBack(metric) + } + + if cfg.fillMissing { + var intervalStart float64 + if !exist { + intervalStart = float64(cfg.tsdbQuery.TimeRange.MustGetFrom().UnixNano() / 1e6) + } else { + intervalStart = series.Points[len(series.Points)-1][1].Float64 + cfg.fillInterval + } + + if cfg.fillPrevious { if len(series.Points) > 0 { - fillValue = series.Points[len(series.Points)-1][0] + cfg.fillValue = series.Points[len(series.Points)-1][0] } else { - fillValue.Valid = false + cfg.fillValue.Valid = false } } // align interval start - intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval - for i := intervalStart + fillInterval; i < intervalEnd; i += fillInterval { - series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)}) - rowCount++ + intervalStart = math.Floor(intervalStart/cfg.fillInterval) * cfg.fillInterval + + for i := intervalStart; i < timestamp; i += cfg.fillInterval { + series.Points = append(series.Points, tsdb.TimePoint{cfg.fillValue, null.FloatFrom(i)}) + cfg.rowCount++ } } + + series.Points = append(series.Points, tsdb.TimePoint{value, null.FloatFrom(timestamp)}) + + if setting.Env == setting.DEV { + e.log.Debug("Rows", "metric", metric, "time", timestamp, "value", value) + } } - result.Meta.Set("rowCount", rowCount) return nil } @@ -526,6 +566,7 @@ func ConvertSqlTimeColumnToEpochMs(values tsdb.RowValues, timeIndex int) { } // ConvertSqlValueColumnToFloat converts timeseries value column to float. +//nolint: gocyclo func ConvertSqlValueColumnToFloat(columnName string, columnValue interface{}) (null.Float, error) { var value null.Float