Postgres: Switch the datasource plugin from lib/pq to pgx (#103961)

* Create libpqToPGX feature toggle

* Refactor PostgreSQL datasource to support PGX with feature toggle

- Updated `ProvideService` to accept feature toggles for enabling PGX.
- Modified integration tests to use the new PGX connection method.
- Introduced new functions for handling PGX connections and queries.
- Enhanced TLS configuration handling for PostgreSQL connections.
- Updated existing tests to ensure compatibility with PGX and new connection methods.

* Update PostgreSQL datasource to enhance connection pooling and error handling

- Increased `MaxOpenConns` to 10 in integration tests for improved connection management.
- Refactored connection handling in `newPostgresPGX` to return a connection pool instead of a single connection.
- Updated health check error handling to utilize context and feature toggles for better error reporting.
- Adjusted `DisposePGX` method to close the connection pool properly.
- Enhanced query execution to acquire connections from the pool, ensuring efficient resource usage.

* Cleanup

* Revert postgres_test unnecessary changes

* Rename feature toggle from `libpqToPGX` to `postgresDSUsePGX`

* Add null check to dispose method

* Fix lint issues

* Refactor connection string generation

* Address comment in health check file

* Rename p to pool

* Refactor executeQueryPGX and split into multiple functions

* Fix lint issues

* The returning error message from PGX is enough no need to separate the error code.

* Move TLS handling to newPostgresPGX function

* Disable ssl for integration tests

* Use MaxIdleConns option

* Remove old feature toggle

* Rename`generateConnectionConfigPGX` to `generateConnectionStringPGX`

* Add back part of the error messages

* Don't show max idle connections option when PGX enabled

* Address comments from Sriram

* Add back Sriram's changes

* PostgreSQL: Rework tls manager to use temporary files instead (#105330)

* Rework tls manager to use temporary files instead

* Lint and test fixes

* Update pkg/tsdb/grafana-postgresql-datasource/postgres.go

Co-authored-by: Ivana Huckova <30407135+ivanahuckova@users.noreply.github.com>

* Update betterer

---------

Co-authored-by: Ivana Huckova <30407135+ivanahuckova@users.noreply.github.com>

---------

Co-authored-by: Ivana Huckova <30407135+ivanahuckova@users.noreply.github.com>
This commit is contained in:
Zoltán Bedi
2025-05-26 08:54:18 +02:00
committed by GitHub
parent 745eda2848
commit 1e383b0c1e
25 changed files with 1233 additions and 778 deletions

View File

@ -14,7 +14,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb/grafana-postgresql-datasource/sqleng"
_ "github.com/lib/pq"
@ -25,8 +24,6 @@ func TestIntegrationGenerateConnectionString(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
cfg := setting.NewCfg()
cfg.DataPath = t.TempDir()
testCases := []struct {
desc string
@ -147,8 +144,7 @@ func TestIntegrationGenerateConnectionString(t *testing.T) {
for _, tt := range testCases {
t.Run(tt.desc, func(t *testing.T) {
svc := Service{
tlsManager: &tlsTestManager{settings: tt.tlsSettings},
logger: backend.NewLoggerWith("logger", "tsdb.postgres"),
logger: backend.NewLoggerWith("logger", "tsdb.postgres"),
}
ds := sqleng.DataSourceInfo{
@ -159,7 +155,7 @@ func TestIntegrationGenerateConnectionString(t *testing.T) {
UID: tt.uid,
}
connStr, err := svc.generateConnectionString(ds)
connStr, err := svc.generateConnectionString(ds, tt.tlsSettings, false)
if tt.expErr == "" {
require.NoError(t, err, tt.desc)
@ -201,10 +197,11 @@ func TestIntegrationPostgres(t *testing.T) {
}
jsonData := sqleng.JsonData{
MaxOpenConns: 0,
MaxOpenConns: 10,
MaxIdleConns: 2,
ConnMaxLifetime: 14400,
Timescaledb: false,
Mode: "disable",
ConfigurationMethod: "file-path",
}
@ -217,7 +214,7 @@ func TestIntegrationPostgres(t *testing.T) {
cnnstr := postgresTestDBConnString()
db, exe, err := newPostgres(context.Background(), "error", 10000, dsInfo, cnnstr, logger, backend.DataSourceInstanceSettings{})
p, exe, err := newPostgresPGX(context.Background(), "error", 10000, dsInfo, cnnstr, logger, backend.DataSourceInstanceSettings{})
require.NoError(t, err)
@ -250,7 +247,7 @@ func TestIntegrationPostgres(t *testing.T) {
c16_smallint smallint
);
`
_, err := db.Exec(sql)
_, err := p.Exec(context.Background(), sql)
require.NoError(t, err)
sql = `
@ -263,7 +260,7 @@ func TestIntegrationPostgres(t *testing.T) {
null
);
`
_, err = db.Exec(sql)
_, err = p.Exec(context.Background(), sql)
require.NoError(t, err)
t.Run("When doing a table query should map Postgres column types to Go types", func(t *testing.T) {
@ -278,7 +275,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
},
}
resp, err := exe.QueryData(context.Background(), query)
resp, err := exe.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["A"]
require.NoError(t, queryResult.Error)
@ -306,9 +303,9 @@ func TestIntegrationPostgres(t *testing.T) {
require.True(t, ok)
_, ok = frames[0].Fields[12].At(0).(*time.Time)
require.True(t, ok)
_, ok = frames[0].Fields[13].At(0).(*time.Time)
_, ok = frames[0].Fields[13].At(0).(*string)
require.True(t, ok)
_, ok = frames[0].Fields[14].At(0).(*time.Time)
_, ok = frames[0].Fields[14].At(0).(*string)
require.True(t, ok)
_, ok = frames[0].Fields[15].At(0).(*time.Time)
require.True(t, ok)
@ -326,7 +323,7 @@ func TestIntegrationPostgres(t *testing.T) {
)
`
_, err := db.Exec(sql)
_, err := p.Exec(context.Background(), sql)
require.NoError(t, err)
type metric struct {
@ -353,7 +350,7 @@ func TestIntegrationPostgres(t *testing.T) {
}
for _, m := range series {
_, err := db.Exec(`INSERT INTO metric ("time", value) VALUES ($1, $2)`, m.Time.UTC(), m.Value)
_, err := p.Exec(context.Background(), `INSERT INTO metric ("time", value) VALUES ($1, $2)`, m.Time.UTC(), m.Value)
require.NoError(t, err)
}
@ -370,7 +367,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
}
resp, err := exe.QueryData(context.Background(), query)
resp, err := exe.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["A"]
require.NoError(t, queryResult.Error)
@ -426,7 +423,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
}
resp, err := exe.QueryData(context.Background(), query)
resp, err := exe.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["A"]
frames := queryResult.Frames
@ -454,7 +451,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
}
resp, err := exe.QueryData(context.Background(), query)
resp, err := exe.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["A"]
require.NoError(t, queryResult.Error)
@ -508,7 +505,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
}
resp, err := exe.QueryData(context.Background(), query)
resp, err := exe.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["A"]
require.NoError(t, queryResult.Error)
@ -534,7 +531,7 @@ func TestIntegrationPostgres(t *testing.T) {
}
for _, m := range series {
_, err := db.Exec(`INSERT INTO metric ("time", value) VALUES ($1, $2)`, m.Time.UTC(), m.Value)
_, err := p.Exec(context.Background(), `INSERT INTO metric ("time", value) VALUES ($1, $2)`, m.Time.UTC(), m.Value)
require.NoError(t, err)
}
@ -555,7 +552,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
}
resp, err := exe.QueryData(context.Background(), query)
resp, err := exe.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["A"]
require.NoError(t, queryResult.Error)
@ -590,7 +587,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
}
resp, err := exe.QueryData(context.Background(), query)
resp, err := exe.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["A"]
require.NoError(t, queryResult.Error)
@ -618,10 +615,10 @@ func TestIntegrationPostgres(t *testing.T) {
ValueTwo int64
}
_, err := db.Exec("DROP TABLE IF EXISTS metric_values")
_, err := p.Exec(context.Background(), "DROP TABLE IF EXISTS metric_values")
require.NoError(t, err)
_, err = db.Exec(`CREATE TABLE metric_values (
_, err = p.Exec(context.Background(), `CREATE TABLE metric_values (
"time" TIMESTAMP NULL,
"timeInt64" BIGINT NOT NULL, "timeInt64Nullable" BIGINT NULL,
"timeFloat64" DOUBLE PRECISION NOT NULL, "timeFloat64Nullable" DOUBLE PRECISION NULL,
@ -674,7 +671,7 @@ func TestIntegrationPostgres(t *testing.T) {
// _, err = session.InsertMulti(series)
for _, m := range series {
_, err := db.Exec(`INSERT INTO "metric_values" (
_, err := p.Exec(context.Background(), `INSERT INTO "metric_values" (
time,
"timeInt64", "timeInt64Nullable",
"timeFloat64", "timeFloat64Nullable",
@ -707,7 +704,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
}
resp, err := exe.QueryData(context.Background(), query)
resp, err := exe.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["A"]
require.NoError(t, queryResult.Error)
@ -731,7 +728,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
}
resp, err := exe.QueryData(context.Background(), query)
resp, err := exe.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["A"]
require.NoError(t, queryResult.Error)
@ -755,7 +752,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
}
resp, err := exe.QueryData(context.Background(), query)
resp, err := exe.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["A"]
require.NoError(t, queryResult.Error)
@ -779,7 +776,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
}
resp, err := exe.QueryData(context.Background(), query)
resp, err := exe.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["A"]
require.NoError(t, queryResult.Error)
@ -803,7 +800,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
}
resp, err := exe.QueryData(context.Background(), query)
resp, err := exe.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["A"]
require.NoError(t, queryResult.Error)
@ -827,7 +824,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
}
resp, err := exe.QueryData(context.Background(), query)
resp, err := exe.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["A"]
require.NoError(t, queryResult.Error)
@ -851,7 +848,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
}
resp, err := exe.QueryData(context.Background(), query)
resp, err := exe.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["A"]
require.NoError(t, queryResult.Error)
@ -876,7 +873,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
}
resp, err := exe.QueryData(context.Background(), query)
resp, err := exe.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["A"]
require.NoError(t, queryResult.Error)
@ -900,7 +897,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
}
resp, err := exe.QueryData(context.Background(), query)
resp, err := exe.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["A"]
require.NoError(t, queryResult.Error)
@ -925,7 +922,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
}
resp, err := exe.QueryData(context.Background(), query)
resp, err := exe.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["A"]
require.NoError(t, queryResult.Error)
@ -957,7 +954,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
}
resp, err := exe.QueryData(context.Background(), query)
resp, err := exe.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["A"]
require.NoError(t, queryResult.Error)
@ -992,7 +989,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
}
resp, err := exe.QueryData(context.Background(), query)
resp, err := exe.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["A"]
require.NoError(t, queryResult.Error)
@ -1011,9 +1008,9 @@ func TestIntegrationPostgres(t *testing.T) {
Tags string
}
_, err := db.Exec("DROP TABLE IF EXISTS event")
_, err := p.Exec(context.Background(), "DROP TABLE IF EXISTS event")
require.NoError(t, err)
_, err = db.Exec(`CREATE TABLE event (time_sec BIGINT NULL, description VARCHAR(255) NULL, tags VARCHAR(255) NULL)`)
_, err = p.Exec(context.Background(), `CREATE TABLE event (time_sec BIGINT NULL, description VARCHAR(255) NULL, tags VARCHAR(255) NULL)`)
require.NoError(t, err)
events := []*event{}
@ -1031,7 +1028,7 @@ func TestIntegrationPostgres(t *testing.T) {
}
for _, e := range events {
_, err := db.Exec("INSERT INTO event (time_sec, description, tags) VALUES ($1, $2, $3)", e.TimeSec, e.Description, e.Tags)
_, err := p.Exec(context.Background(), "INSERT INTO event (time_sec, description, tags) VALUES ($1, $2, $3)", e.TimeSec, e.Description, e.Tags)
require.NoError(t, err)
}
@ -1052,7 +1049,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
}
resp, err := exe.QueryData(context.Background(), query)
resp, err := exe.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["Deploys"]
@ -1079,7 +1076,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
}
resp, err := exe.QueryData(context.Background(), query)
resp, err := exe.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["Tickets"]
@ -1102,7 +1099,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
}
resp, err := exe.QueryData(context.Background(), query)
resp, err := exe.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["A"]
require.NoError(t, queryResult.Error)
@ -1127,7 +1124,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
}
resp, err := exe.QueryData(context.Background(), query)
resp, err := exe.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["A"]
require.NoError(t, queryResult.Error)
@ -1152,7 +1149,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
}
resp, err := exe.QueryData(context.Background(), query)
resp, err := exe.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["A"]
require.NoError(t, queryResult.Error)
@ -1178,7 +1175,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
}
resp, err := exe.QueryData(context.Background(), query)
resp, err := exe.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["A"]
require.NoError(t, queryResult.Error)
@ -1204,7 +1201,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
}
resp, err := exe.QueryData(context.Background(), query)
resp, err := exe.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["A"]
require.NoError(t, queryResult.Error)
@ -1230,7 +1227,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
}
resp, err := exe.QueryData(context.Background(), query)
resp, err := exe.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["A"]
require.NoError(t, queryResult.Error)
@ -1256,7 +1253,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
}
resp, err := exe.QueryData(context.Background(), query)
resp, err := exe.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["A"]
require.NoError(t, queryResult.Error)
@ -1270,8 +1267,20 @@ func TestIntegrationPostgres(t *testing.T) {
})
t.Run("When row limit set to 1", func(t *testing.T) {
dsInfo := sqleng.DataSourceInfo{}
_, handler, err := newPostgres(context.Background(), "error", 1, dsInfo, cnnstr, logger, backend.DataSourceInstanceSettings{})
jsonData := sqleng.JsonData{
MaxOpenConns: 10,
MaxIdleConns: 2,
ConnMaxLifetime: 14400,
Timescaledb: false,
Mode: "disable",
ConfigurationMethod: "file-path",
}
dsInfo := sqleng.DataSourceInfo{
JsonData: jsonData,
DecryptedSecureJSONData: map[string]string{},
}
_, handler, err := newPostgresPGX(context.Background(), "error", 1, dsInfo, cnnstr, logger, backend.DataSourceInstanceSettings{})
require.NoError(t, err)
@ -1292,7 +1301,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
}
resp, err := handler.QueryData(context.Background(), query)
resp, err := handler.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["A"]
require.NoError(t, queryResult.Error)
@ -1322,7 +1331,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
}
resp, err := handler.QueryData(context.Background(), query)
resp, err := handler.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["A"]
require.NoError(t, queryResult.Error)
@ -1338,9 +1347,9 @@ func TestIntegrationPostgres(t *testing.T) {
})
t.Run("Given an empty table", func(t *testing.T) {
_, err := db.Exec("DROP TABLE IF EXISTS empty_obj")
_, err := p.Exec(context.Background(), "DROP TABLE IF EXISTS empty_obj")
require.NoError(t, err)
_, err = db.Exec("CREATE TABLE empty_obj (empty_key VARCHAR(255) NULL, empty_val BIGINT NULL)")
_, err = p.Exec(context.Background(), "CREATE TABLE empty_obj (empty_key VARCHAR(255) NULL, empty_val BIGINT NULL)")
require.NoError(t, err)
t.Run("When no rows are returned, should return an empty frame", func(t *testing.T) {
@ -1360,7 +1369,7 @@ func TestIntegrationPostgres(t *testing.T) {
},
}
resp, err := exe.QueryData(context.Background(), query)
resp, err := exe.QueryDataPGX(context.Background(), query)
require.NoError(t, err)
queryResult := resp.Responses["A"]
@ -1386,14 +1395,6 @@ func genTimeRangeByInterval(from time.Time, duration time.Duration, interval tim
return timeRange
}
type tlsTestManager struct {
settings tlsSettings
}
func (m *tlsTestManager) getTLSSettings(dsInfo sqleng.DataSourceInfo) (tlsSettings, error) {
return m.settings, nil
}
func isTestDbPostgres() bool {
if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
return db == "postgres"