diff --git a/pkg/tsdb/postgres/postgres.go b/pkg/tsdb/postgres/postgres.go index 3cf4d25f1f7..d7bc42b3a11 100644 --- a/pkg/tsdb/postgres/postgres.go +++ b/pkg/tsdb/postgres/postgres.go @@ -4,6 +4,7 @@ import ( "container/list" "context" "fmt" + "math" "net/url" "strconv" "time" @@ -198,6 +199,18 @@ func (e PostgresQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *co return fmt.Errorf("Found no column named time") } + fillMissing := query.Model.Get("fill").MustBool(false) + var fillInterval float64 + fillValue := null.Float{} + if fillMissing { + fillInterval = query.Model.Get("fillInterval").MustFloat64() * 1000 + if query.Model.Get("fillNULL").MustBool(false) == false { + fillValue.Float64 = query.Model.Get("fillValue").MustFloat64() + fillValue.Valid = true + } + + } + for rows.Next() { var timestamp float64 var value null.Float @@ -249,7 +262,34 @@ func (e PostgresQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *co if metricIndex == -1 { metric = col } - e.appendTimePoint(pointsBySeries, seriesByQueryOrder, metric, timestamp, value) + + series, exist := pointsBySeries[metric] + if exist == false { + series = &tsdb.TimeSeries{Name: metric} + pointsBySeries[metric] = series + seriesByQueryOrder.PushBack(metric) + } + + if fillMissing { + var intervalStart float64 + if exist == false { + intervalStart = float64(tsdbQuery.TimeRange.MustGetFrom().UnixNano() / 1e6) + } else { + intervalStart = series.Points[len(series.Points)-1][1].Float64 + fillInterval + } + + // align interval start + intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval + + for i := intervalStart; i < timestamp; i += fillInterval { + series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)}) + rowCount++ + } + } + + series.Points = append(series.Points, tsdb.TimePoint{value, null.FloatFrom(timestamp)}) + + e.log.Debug("Rows", "metric", metric, "time", timestamp, "value", value) rowCount++ } @@ -258,20 +298,22 @@ func (e PostgresQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *co for elem := seriesByQueryOrder.Front(); elem != nil; elem = elem.Next() { key := elem.Value.(string) result.Series = append(result.Series, pointsBySeries[key]) + + if fillMissing { + series := pointsBySeries[key] + // fill in values from last fetched value till interval end + intervalStart := series.Points[len(series.Points)-1][1].Float64 + intervalEnd := float64(tsdbQuery.TimeRange.MustGetTo().UnixNano() / 1e6) + + // align interval start + intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval + for i := intervalStart + fillInterval; i < intervalEnd; i += fillInterval { + series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)}) + rowCount++ + } + } } result.Meta.Set("rowCount", rowCount) return nil } - -func (e PostgresQueryEndpoint) appendTimePoint(pointsBySeries map[string]*tsdb.TimeSeries, seriesByQueryOrder *list.List, metric string, timestamp float64, value null.Float) { - if series, exist := pointsBySeries[metric]; exist { - series.Points = append(series.Points, tsdb.TimePoint{value, null.FloatFrom(timestamp)}) - } else { - series := &tsdb.TimeSeries{Name: metric} - series.Points = append(series.Points, tsdb.TimePoint{value, null.FloatFrom(timestamp)}) - pointsBySeries[metric] = series - seriesByQueryOrder.PushBack(metric) - } - e.log.Debug("Rows", "metric", metric, "time", timestamp, "value", value) -}