mirror of
https://github.com/grafana/grafana.git
synced 2025-07-30 19:22:25 +08:00
postgres: simplify proxy code (#80121)
This commit is contained in:
@ -16,6 +16,7 @@ import (
|
|||||||
sdkproxy "github.com/grafana/grafana-plugin-sdk-go/backend/proxy"
|
sdkproxy "github.com/grafana/grafana-plugin-sdk-go/backend/proxy"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/data/sqlutil"
|
"github.com/grafana/grafana-plugin-sdk-go/data/sqlutil"
|
||||||
|
"github.com/lib/pq"
|
||||||
|
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
||||||
"github.com/grafana/grafana/pkg/setting"
|
"github.com/grafana/grafana/pkg/setting"
|
||||||
@ -99,15 +100,22 @@ func (s *Service) newInstanceSettings(cfg *setting.Cfg) datasource.InstanceFacto
|
|||||||
logger.Debug("GetEngine", "connection", cnnstr)
|
logger.Debug("GetEngine", "connection", cnnstr)
|
||||||
}
|
}
|
||||||
|
|
||||||
driverName := "postgres"
|
connector, err := pq.NewConnector(cnnstr)
|
||||||
// register a proxy driver if the secure socks proxy is enabled
|
if err != nil {
|
||||||
|
logger.Error("postgres connector creation failed", "error", err)
|
||||||
|
return nil, fmt.Errorf("postgres connector creation failed")
|
||||||
|
}
|
||||||
|
|
||||||
|
// use the proxy-dialer if the secure socks proxy is enabled
|
||||||
proxyOpts := proxyutil.GetSQLProxyOptions(cfg.SecureSocksDSProxy, dsInfo)
|
proxyOpts := proxyutil.GetSQLProxyOptions(cfg.SecureSocksDSProxy, dsInfo)
|
||||||
if sdkproxy.New(proxyOpts).SecureSocksProxyEnabled() {
|
if sdkproxy.New(proxyOpts).SecureSocksProxyEnabled() {
|
||||||
driverName, err = createPostgresProxyDriver(cnnstr, proxyOpts)
|
dialer, err := newPostgresProxyDialer(proxyOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("postgres proxy creation failed", "error", err)
|
logger.Error("postgres proxy creation failed", "error", err)
|
||||||
return nil, fmt.Errorf("postgres proxy creation failed")
|
return nil, fmt.Errorf("postgres proxy creation failed")
|
||||||
}
|
}
|
||||||
|
// update the postgres dialer with the proxy dialer
|
||||||
|
connector.Dialer(dialer)
|
||||||
}
|
}
|
||||||
|
|
||||||
config := sqleng.DataPluginConfiguration{
|
config := sqleng.DataPluginConfiguration{
|
||||||
@ -118,10 +126,7 @@ func (s *Service) newInstanceSettings(cfg *setting.Cfg) datasource.InstanceFacto
|
|||||||
|
|
||||||
queryResultTransformer := postgresQueryResultTransformer{}
|
queryResultTransformer := postgresQueryResultTransformer{}
|
||||||
|
|
||||||
db, err := sql.Open(driverName, cnnstr)
|
db := sql.OpenDB(connector)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
db.SetMaxOpenConns(config.DSInfo.JsonData.MaxOpenConns)
|
db.SetMaxOpenConns(config.DSInfo.JsonData.MaxOpenConns)
|
||||||
db.SetMaxIdleConns(config.DSInfo.JsonData.MaxIdleConns)
|
db.SetMaxIdleConns(config.DSInfo.JsonData.MaxIdleConns)
|
||||||
|
@ -2,12 +2,7 @@ package postgres
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/md5"
|
|
||||||
"database/sql"
|
|
||||||
"database/sql/driver"
|
|
||||||
"fmt"
|
|
||||||
"net"
|
"net"
|
||||||
"slices"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
sdkproxy "github.com/grafana/grafana-plugin-sdk-go/backend/proxy"
|
sdkproxy "github.com/grafana/grafana-plugin-sdk-go/backend/proxy"
|
||||||
@ -15,52 +10,20 @@ import (
|
|||||||
"golang.org/x/net/proxy"
|
"golang.org/x/net/proxy"
|
||||||
)
|
)
|
||||||
|
|
||||||
// createPostgresProxyDriver creates and registers a new sql driver that uses a postgres connector and updates the dialer to
|
|
||||||
// route connections through the secure socks proxy
|
|
||||||
func createPostgresProxyDriver(cnnstr string, opts *sdkproxy.Options) (string, error) {
|
|
||||||
// create a unique driver per connection string
|
|
||||||
hash := fmt.Sprintf("%x", md5.Sum([]byte(cnnstr)))
|
|
||||||
driverName := "postgres-proxy-" + hash
|
|
||||||
|
|
||||||
// only register the driver once
|
|
||||||
if !slices.Contains(sql.Drivers(), driverName) {
|
|
||||||
connector, err := pq.NewConnector(cnnstr)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
driver, err := newPostgresProxyDriver(connector, opts)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
sql.Register(driverName, driver)
|
|
||||||
}
|
|
||||||
return driverName, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// postgresProxyDriver is a regular postgres driver with an updated dialer.
|
|
||||||
// This is done because there is no way to save a dialer to the postgres driver in xorm
|
|
||||||
type postgresProxyDriver struct {
|
|
||||||
c *pq.Connector
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ driver.DriverContext = (*postgresProxyDriver)(nil)
|
|
||||||
|
|
||||||
// newPostgresProxyDriver updates the dialer for a postgres connector with a dialer that proxies connections through the secure socks proxy
|
// newPostgresProxyDriver updates the dialer for a postgres connector with a dialer that proxies connections through the secure socks proxy
|
||||||
// and returns a new postgres driver to register
|
// and returns a new postgres driver to register
|
||||||
func newPostgresProxyDriver(connector *pq.Connector, opts *sdkproxy.Options) (*postgresProxyDriver, error) {
|
func newPostgresProxyDialer(opts *sdkproxy.Options) (pq.Dialer, error) {
|
||||||
dialer, err := sdkproxy.New(opts).NewSecureSocksProxyContextDialer()
|
dialer, err := sdkproxy.New(opts).NewSecureSocksProxyContextDialer()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// update the postgres dialer with the proxy dialer
|
// update the postgres dialer with the proxy dialer
|
||||||
connector.Dialer(&postgresProxyDialer{d: dialer})
|
return &postgresProxyDialer{d: dialer}, nil
|
||||||
|
|
||||||
return &postgresProxyDriver{connector}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var _ pq.Dialer = (&postgresProxyDialer{})
|
||||||
|
|
||||||
// postgresProxyDialer implements the postgres dialer using a proxy dialer, as their functions differ slightly
|
// postgresProxyDialer implements the postgres dialer using a proxy dialer, as their functions differ slightly
|
||||||
type postgresProxyDialer struct {
|
type postgresProxyDialer struct {
|
||||||
d proxy.Dialer
|
d proxy.Dialer
|
||||||
@ -78,13 +41,3 @@ func (p *postgresProxyDialer) DialTimeout(network, address string, timeout time.
|
|||||||
|
|
||||||
return p.d.(proxy.ContextDialer).DialContext(ctx, network, address)
|
return p.d.(proxy.ContextDialer).DialContext(ctx, network, address)
|
||||||
}
|
}
|
||||||
|
|
||||||
// OpenConnector returns the normal postgres connector that has the updated dialer context
|
|
||||||
func (d *postgresProxyDriver) OpenConnector(name string) (driver.Connector, error) {
|
|
||||||
return d.c, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Open uses the connector with the updated dialer to open a new connection
|
|
||||||
func (d *postgresProxyDriver) Open(dsn string) (driver.Conn, error) {
|
|
||||||
return d.c.Connect(context.Background())
|
|
||||||
}
|
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
package postgres
|
package postgres
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
@ -25,44 +25,18 @@ func TestPostgresProxyDriver(t *testing.T) {
|
|||||||
opts := proxyutil.GetSQLProxyOptions(proxySettings, sqleng.DataSourceInfo{UID: "1", JsonData: sqleng.JsonData{SecureDSProxy: true}})
|
opts := proxyutil.GetSQLProxyOptions(proxySettings, sqleng.DataSourceInfo{UID: "1", JsonData: sqleng.JsonData{SecureDSProxy: true}})
|
||||||
dbURL := "localhost:5432"
|
dbURL := "localhost:5432"
|
||||||
cnnstr := fmt.Sprintf("postgres://auser:password@%s/db?sslmode=disable", dbURL)
|
cnnstr := fmt.Sprintf("postgres://auser:password@%s/db?sslmode=disable", dbURL)
|
||||||
driverName, err := createPostgresProxyDriver(cnnstr, opts)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
t.Run("Driver should not be registered more than once", func(t *testing.T) {
|
|
||||||
testDriver, err := createPostgresProxyDriver(cnnstr, opts)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, driverName, testDriver)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("A new driver should be created for a new connection string", func(t *testing.T) {
|
|
||||||
testDriver, err := createPostgresProxyDriver("server=localhost;user id=sa;password=yourStrong(!)Password;database=db2", opts)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.NotEqual(t, driverName, testDriver)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("Connector should use dialer context that routes through the socks proxy to db", func(t *testing.T) {
|
t.Run("Connector should use dialer context that routes through the socks proxy to db", func(t *testing.T) {
|
||||||
connector, err := pq.NewConnector(cnnstr)
|
connector, err := pq.NewConnector(cnnstr)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
driver, err := newPostgresProxyDriver(connector, opts)
|
dialer, err := newPostgresProxyDialer(opts)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
conn, err := driver.OpenConnector(cnnstr)
|
connector.Dialer(dialer)
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
_, err = conn.Connect(context.Background())
|
db := sql.OpenDB(connector)
|
||||||
require.Contains(t, err.Error(), fmt.Sprintf("socks connect %s %s->%s", "tcp", settings.ProxyAddress, dbURL))
|
err = db.Ping()
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("Connector should use dialer context that routes through the socks proxy to db", func(t *testing.T) {
|
|
||||||
connector, err := pq.NewConnector(cnnstr)
|
|
||||||
require.NoError(t, err)
|
|
||||||
driver, err := newPostgresProxyDriver(connector, opts)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
conn, err := driver.OpenConnector(cnnstr)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
_, err = conn.Connect(context.Background())
|
|
||||||
require.Contains(t, err.Error(), fmt.Sprintf("socks connect %s %s->%s", "tcp", settings.ProxyAddress, dbURL))
|
require.Contains(t, err.Error(), fmt.Sprintf("socks connect %s %s->%s", "tcp", settings.ProxyAddress, dbURL))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user