mirror of
https://github.com/grafana/alloy.git
synced 2025-11-05 13:28:02 +08:00
database_observability: Pass queries for explain plans more permissively. (#4706)
* Pass queries for explain plans more permissively. In fact, pass everything, besides what is on the denylist which (hopefully) covers all of the possible write reserved words * Changelog updates * Great opportunity to exclude our own explain queries * Okay, maybe use a lexer instead * Add a couple of overlooked locking commands for reserved word deny list * Use the correct DBMS type for each reserved words check * Vestigial over verbose debug output * Initialize the reserved words var in the desired format from the start. * Linting, but also that was foolish and shouldn't have required a linter to catch it * Remove safe reserved words from denylist
This commit is contained in:
@@ -20,10 +20,14 @@ Main (unreleased)
|
||||
### Features
|
||||
|
||||
- (_Experimental_) Additions to experimental `database_observability.mysql` component:
|
||||
- `explain_plans` collector now changes schema before returning the connection to the pool (@cristiangreco)
|
||||
- `explain_plans`
|
||||
- collector now changes schema before returning the connection to the pool (@cristiangreco)
|
||||
- collector now passes queries more permissively, expressly to allow queries beginning in `with` (@rgeyer)
|
||||
|
||||
- (_Experimental_) Additions to experimental `database_observability.postgres` component:
|
||||
- `explain_plans` added the explain plan collector (@rgeyer)
|
||||
- `explain_plans`
|
||||
- added the explain plan collector (@rgeyer)
|
||||
- collector now passes queries more permissively, expressly to allow queries beginning in `with` (@rgeyer)
|
||||
- add `user` field to wait events within `query_samples` collector (@gaantunes)
|
||||
- rework the query samples collector to buffer per-query execution state across scrapes and emit finalized entries (@gaantunes)
|
||||
|
||||
|
||||
@@ -35,6 +35,44 @@ const (
|
||||
ExplainPlanJoinAlgorithmNestedLoop ExplainPlanJoinAlgorithm = "nested_loop"
|
||||
)
|
||||
|
||||
// ExplainReservedWordDenyList contains SQL reserved words that indicate write operations
|
||||
// to the database. These are primarily DML (Data Manipulation Language) and DDL
|
||||
// (Data Definition Language) commands that modify database state.
|
||||
// This was extracted from the MySQL and PostgreSQL documentation by Claude Sonnet 4 on Oct 28, 2025
|
||||
// and audited by @rgeyer and others in the dbo11y team.
|
||||
var ExplainReservedWordDenyList = map[string]bool{
|
||||
// Data Manipulation Language (DML) - Write operations
|
||||
"INSERT": true, "UPDATE": true, "DELETE": true, "REPLACE": true, "MERGE": true, "UPSERT": true,
|
||||
"FOR UPDATE": true,
|
||||
|
||||
// Data Definition Language (DDL) - Schema modifications
|
||||
"CREATE": true, "ALTER": true, "DROP": true, "RENAME": true, "TRUNCATE": true,
|
||||
|
||||
// Transaction control that can commit writes
|
||||
"COMMIT": true, "ROLLBACK": true, "SAVEPOINT": true,
|
||||
|
||||
// Database/Schema management
|
||||
"USE": true, "DATABASE": true, "SCHEMA": true,
|
||||
|
||||
// Index operations
|
||||
"REINDEX": true, "ANALYZE": true, "OPTIMIZE": true,
|
||||
"REINDEX TABLE": true, "ANALYZE TABLE": true, "OPTIMIZE TABLE": true,
|
||||
// User/Permission management
|
||||
"GRANT": true, "REVOKE": true, "CREATE USER": true, "DROP USER": true, "ALTER USER": true,
|
||||
|
||||
// MySQL specific write operations
|
||||
"LOAD": true, "DELAYED": true, "IGNORE": true, "ON DUPLICATE KEY": true,
|
||||
"LOW_PRIORITY": true, "HIGH_PRIORITY": true, "QUICK": true,
|
||||
"LOCK IN SHARE MODE": true,
|
||||
|
||||
// PostgreSQL specific write operations
|
||||
"COPY": true, "VACUUM": true, "CLUSTER": true, "LISTEN": true, "NOTIFY": true, "DISCARD": true,
|
||||
"PREPARE": true, "EXECUTE": true, "DEALLOCATE": true, "RESET": true, "SET": true,
|
||||
|
||||
// dbo11 specific operations we'd like to exclude
|
||||
"EXPLAIN": true,
|
||||
}
|
||||
|
||||
type ExplainPlanOutput struct {
|
||||
Metadata ExplainPlanMetadataInfo `json:"metadata"`
|
||||
Plan ExplainPlanNode `json:"plan"`
|
||||
|
||||
63
internal/component/database_observability/lexer.go
Normal file
63
internal/component/database_observability/lexer.go
Normal file
@@ -0,0 +1,63 @@
|
||||
package database_observability
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/DataDog/go-sqllexer"
|
||||
)
|
||||
|
||||
// ExtractTableNames extracts the table names from a SQL query
|
||||
func ExtractTableNames(sql string) ([]string, error) {
|
||||
normalizer := sqllexer.NewNormalizer(
|
||||
sqllexer.WithCollectTables(true),
|
||||
)
|
||||
_, metadata, err := normalizer.Normalize(sql, sqllexer.WithDBMS(sqllexer.DBMSPostgres))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to normalize SQL: %w", err)
|
||||
}
|
||||
|
||||
// Return all table names, including those that end with "..." for truncated queries, as we can't know if the table name was truncated or not
|
||||
return metadata.Tables, nil
|
||||
}
|
||||
|
||||
// RedactSql obfuscates a SQL query by replacing literals with ? placeholders
|
||||
func RedactSql(sql string) string {
|
||||
obfuscatedSql := sqllexer.NewObfuscator().Obfuscate(sql)
|
||||
return obfuscatedSql
|
||||
}
|
||||
|
||||
// ContainsReservedKeywords checks if the SQL query contains any reserved keywords
|
||||
// that indicate write operations, excluding those in string literals or comments
|
||||
func ContainsReservedKeywords(query string, reservedWords map[string]bool, dbms sqllexer.DBMSType) bool {
|
||||
// Use the lexer to tokenize the query
|
||||
lexer := sqllexer.New(query, sqllexer.WithDBMS(dbms))
|
||||
|
||||
// Scan all tokens
|
||||
for {
|
||||
token := lexer.Scan()
|
||||
if token.Type == sqllexer.EOF {
|
||||
break
|
||||
}
|
||||
if token.Type == sqllexer.ERROR {
|
||||
// If lexing fails, fall back to simple string search for safety
|
||||
uppercaseQuery := strings.ToUpper(query)
|
||||
for word := range reservedWords {
|
||||
if strings.Contains(uppercaseQuery, word) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Check commands, keywords, and identifiers (since some reserved words might be classified as identifiers)
|
||||
// but exclude string literals, comments, and other non-SQL-keyword tokens
|
||||
if token.Type == sqllexer.COMMAND || token.Type == sqllexer.KEYWORD || token.Type == sqllexer.IDENT {
|
||||
if _, ok := reservedWords[strings.ToUpper(token.Value)]; ok {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
@@ -1,7 +1,9 @@
|
||||
package collector
|
||||
package database_observability
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/DataDog/go-sqllexer"
|
||||
)
|
||||
|
||||
func TestPgSqlParser_Redact(t *testing.T) {
|
||||
@@ -174,7 +176,7 @@ func TestPgSqlParser_Redact(t *testing.T) {
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got := redact(tt.sql)
|
||||
got := RedactSql(tt.sql)
|
||||
if got != tt.want {
|
||||
t.Errorf("\nRedact()\nGOT:\n%s\nWANT:\n%s", got, tt.want)
|
||||
}
|
||||
@@ -276,7 +278,7 @@ func TestPgSqlParser_ExtractTableNames(t *testing.T) {
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := extractTableNames(tt.sql)
|
||||
got, err := ExtractTableNames(tt.sql)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("ExtractTableNames() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
@@ -309,3 +311,177 @@ func TestPgSqlParser_ExtractTableNames(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestContainsReservedKeywords(t *testing.T) {
|
||||
reservedWords := map[string]bool{"INSERT": true, "UPDATE": true, "DELETE": true, "CREATE": true, "DROP": true}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
query string
|
||||
expected bool
|
||||
}{
|
||||
{
|
||||
name: "actual INSERT statement",
|
||||
query: "INSERT INTO users (name) VALUES ('John')",
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "SELECT with INSERT in string literal",
|
||||
query: "SELECT 'INSERT INTO table' FROM users",
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "SELECT with insert in column name",
|
||||
query: "SELECT insert_date FROM users",
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "SELECT with INSERT in comment",
|
||||
query: "SELECT * FROM users -- INSERT comment",
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "SELECT with INSERT in block comment",
|
||||
query: "SELECT * FROM users /* INSERT block comment */",
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "CREATE TABLE statement",
|
||||
query: "CREATE TABLE users (id INT, name VARCHAR(50))",
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "SELECT with CREATE in quoted identifier",
|
||||
query: `SELECT "create_date" FROM users`,
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "UPDATE statement",
|
||||
query: "UPDATE users SET name = 'John' WHERE id = 1",
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "SELECT with update in string",
|
||||
query: "SELECT * FROM users WHERE status = 'update_pending'",
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "DELETE statement",
|
||||
query: "DELETE FROM users WHERE id = 1",
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "SELECT with delete in table name",
|
||||
query: "SELECT * FROM delete_log",
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "plain SELECT statement",
|
||||
query: "SELECT * FROM users WHERE name = 'John'",
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "complex SELECT with joins",
|
||||
query: "SELECT u.name, p.title FROM users u JOIN posts p ON u.id = p.user_id",
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "SELECT with reserved word in WHERE clause string",
|
||||
query: "SELECT * FROM users WHERE description LIKE '%CREATE%'",
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "DROP statement",
|
||||
query: "DROP TABLE users",
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "SELECT with drop in column alias",
|
||||
query: "SELECT name AS drop_reason FROM users",
|
||||
expected: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
result := ContainsReservedKeywords(tt.query, reservedWords, sqllexer.DBMSMySQL)
|
||||
if result != tt.expected {
|
||||
t.Errorf("Expected %v, got %v for query: %s", tt.expected, result, tt.query)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestContainsReservedKeywords_WithActualDenyList(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
query string
|
||||
expected bool
|
||||
}{
|
||||
{
|
||||
name: "EXPLAIN statement should be detected",
|
||||
query: "EXPLAIN SELECT * FROM users",
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "SELECT with explain in string should not be detected",
|
||||
query: "SELECT 'explain this' FROM users",
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "PREPARE statement should be detected",
|
||||
query: "PREPARE stmt AS SELECT * FROM users WHERE id = $1",
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "SELECT with prepare in column name should not be detected",
|
||||
query: "SELECT prepare_date FROM users",
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "SET statement should be detected",
|
||||
query: "SET search_path TO public",
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "SELECT with set in string should not be detected",
|
||||
query: "SELECT 'set this value' FROM users",
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "SELECT with reserved word in quoted identifier",
|
||||
query: `SELECT "insert" FROM users`,
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "SELECT with reserved word in table alias",
|
||||
query: "SELECT * FROM users AS insert_table",
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "SELECT with legacy LOCK IN SHARE MODE",
|
||||
query: "SELECT name FROM users LOCK IN SHARE MODE",
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "SELECT with FOR UPDATE",
|
||||
query: "SELECT name FROM users FOR UPDATE",
|
||||
expected: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run("MySQL: "+tt.name, func(t *testing.T) {
|
||||
result := ContainsReservedKeywords(tt.query, ExplainReservedWordDenyList, sqllexer.DBMSMySQL)
|
||||
if result != tt.expected {
|
||||
t.Errorf("Expected %v, got %v for query: %s", tt.expected, result, tt.query)
|
||||
}
|
||||
})
|
||||
t.Run("PostgreSQL: "+tt.name, func(t *testing.T) {
|
||||
result := ContainsReservedKeywords(tt.query, ExplainReservedWordDenyList, sqllexer.DBMSPostgres)
|
||||
if result != tt.expected {
|
||||
t.Errorf("Expected %v, got %v for query: %s", tt.expected, result, tt.query)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"time"
|
||||
"unicode/utf8"
|
||||
|
||||
"github.com/DataDog/go-sqllexer"
|
||||
"github.com/go-kit/log"
|
||||
"go.uber.org/atomic"
|
||||
|
||||
@@ -568,7 +569,10 @@ func (c *ExplainPlans) fetchExplainPlans(ctx context.Context) error {
|
||||
continue
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(strings.ToLower(qi.queryText), "select") {
|
||||
containsReservedWord := database_observability.ContainsReservedKeywords(qi.queryText, database_observability.ExplainReservedWordDenyList, sqllexer.DBMSMySQL)
|
||||
|
||||
if containsReservedWord {
|
||||
level.Debug(logger).Log("msg", "skipping query containing reserved word")
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
@@ -1666,6 +1666,43 @@ func TestExplainPlans(t *testing.T) {
|
||||
)
|
||||
})
|
||||
|
||||
t.Run("passes queries beginning in with", func(t *testing.T) {
|
||||
lokiClient.Clear()
|
||||
logBuffer.Reset()
|
||||
mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
|
||||
"schema_name",
|
||||
"digest",
|
||||
"query_sample_text",
|
||||
"last_seen",
|
||||
}).AddRow(
|
||||
"some_schema",
|
||||
"some_digest",
|
||||
"with cte as (select * from some_table where id = 1) select * from cte",
|
||||
lastSeen,
|
||||
))
|
||||
|
||||
mock.ExpectExec("USE `some_schema`").WithoutArgs().WillReturnResult(sqlmock.NewResult(0, 0))
|
||||
|
||||
mock.ExpectQuery(selectExplainPlanPrefix + "with cte as (select * from some_table where id = 1) select * from cte").WillReturnRows(sqlmock.NewRows([]string{
|
||||
"json",
|
||||
}).AddRow(
|
||||
[]byte(`{"query_block": {"select_id": 1}}`),
|
||||
))
|
||||
|
||||
err = c.fetchExplainPlans(t.Context())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NotContains(t, logBuffer.String(), "error")
|
||||
|
||||
require.Eventually(
|
||||
t,
|
||||
func() bool { return len(lokiClient.Received()) == 1 },
|
||||
5*time.Second,
|
||||
10*time.Millisecond,
|
||||
"did not receive the explain plan output log message within the timeout",
|
||||
)
|
||||
})
|
||||
|
||||
err = mock.ExpectationsWereMet()
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"time"
|
||||
"unicode/utf8"
|
||||
|
||||
"github.com/DataDog/go-sqllexer"
|
||||
"github.com/blang/semver/v4"
|
||||
"github.com/go-kit/log"
|
||||
"go.uber.org/atomic"
|
||||
@@ -125,7 +126,7 @@ func (p *PlanNode) ToExplainPlanOutputNode() (database_observability.ExplainPlan
|
||||
}
|
||||
|
||||
if !strings.EqualFold(p.Filter, "") {
|
||||
redacted := redact(p.Filter)
|
||||
redacted := database_observability.RedactSql(p.Filter)
|
||||
output.Details.Condition = &redacted
|
||||
}
|
||||
|
||||
@@ -410,7 +411,10 @@ func (c *ExplainPlan) fetchExplainPlans(ctx context.Context) error {
|
||||
continue
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(strings.ToLower(qi.queryText), "select") {
|
||||
containsReservedWord := database_observability.ContainsReservedKeywords(qi.queryText, database_observability.ExplainReservedWordDenyList, sqllexer.DBMSPostgres)
|
||||
|
||||
if containsReservedWord {
|
||||
level.Debug(logger).Log("msg", "skipping query containing reserved word")
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -440,7 +444,7 @@ func (c *ExplainPlan) fetchExplainPlans(ctx context.Context) error {
|
||||
continue
|
||||
}
|
||||
|
||||
redactedByteExplainPlanJSON := redact(string(byteExplainPlanJSON))
|
||||
redactedByteExplainPlanJSON := database_observability.RedactSql(string(byteExplainPlanJSON))
|
||||
|
||||
level.Debug(logger).Log("msg", "db native explain plan", "db_native_explain_plan", base64.StdEncoding.EncodeToString([]byte(redactedByteExplainPlanJSON)))
|
||||
|
||||
@@ -492,8 +496,9 @@ func (c *ExplainPlan) fetchExplainPlanJSON(ctx context.Context, qi queryInfo) ([
|
||||
defer conn.Close()
|
||||
|
||||
preparedStatementName := strings.ReplaceAll(fmt.Sprintf("explain_plan_%s", qi.queryId), "-", "_")
|
||||
logger := log.With(c.logger, "query_id", qi.queryId, "datname", qi.datname, "preparedStatementName", preparedStatementName)
|
||||
if _, err := conn.ExecContext(ctx, fmt.Sprintf("PREPARE %s AS %s", preparedStatementName, qi.queryText)); err != nil {
|
||||
preparedStatementText := fmt.Sprintf("PREPARE %s AS %s", preparedStatementName, qi.queryText)
|
||||
logger := log.With(c.logger, "query_id", qi.queryId, "datname", qi.datname, "preparedStatementName", preparedStatementName, "preparedStatementText", preparedStatementText)
|
||||
if _, err := conn.ExecContext(ctx, preparedStatementText); err != nil {
|
||||
return nil, fmt.Errorf("failed to prepare explain plan: %w", err)
|
||||
}
|
||||
|
||||
|
||||
@@ -2814,6 +2814,10 @@ func TestExplainPlanFetchExplainPlans(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("passes queries beginning in select", func(t *testing.T) {
|
||||
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
|
||||
lokiClient.Clear()
|
||||
logBuffer.Reset()
|
||||
dbConnFactory := &mockDbConnectionFactory{
|
||||
@@ -2876,6 +2880,73 @@ func TestExplainPlanFetchExplainPlans(t *testing.T) {
|
||||
require.NotContains(t, logBuffer.String(), "error")
|
||||
})
|
||||
|
||||
t.Run("passes queries beginning in with", func(t *testing.T) {
|
||||
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
|
||||
lokiClient.Clear()
|
||||
logBuffer.Reset()
|
||||
dbConnFactory := &mockDbConnectionFactory{
|
||||
db: db,
|
||||
Mock: &mock,
|
||||
InstantiationCount: 0,
|
||||
}
|
||||
explainPlan = &ExplainPlan{
|
||||
dbConnection: db,
|
||||
dbDSN: "postgres://user:pass@host:1234/database",
|
||||
dbConnectionFactory: dbConnFactory.NewDBConnection,
|
||||
dbVersion: post17ver,
|
||||
queryCache: map[string]*queryInfo{
|
||||
"testdb123456": {
|
||||
datname: "testdb",
|
||||
queryId: "123456",
|
||||
queryText: "with cte as (select * from some_table where id = $1) select * from cte",
|
||||
calls: int64(10),
|
||||
callsReset: time.Now(),
|
||||
},
|
||||
},
|
||||
queryDenylist: map[string]*queryInfo{},
|
||||
finishedQueryCache: map[string]*queryInfo{},
|
||||
excludeSchemas: []string{},
|
||||
perScrapeRatio: 1.0,
|
||||
logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)),
|
||||
currentBatchSize: 1,
|
||||
entryHandler: lokiClient,
|
||||
}
|
||||
|
||||
archive, err := txtar.ParseFile("./testdata/explain_plan/complex_aggregation_with_case.txtar")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, len(archive.Files))
|
||||
jsonFile := archive.Files[0]
|
||||
require.Equal(t, "complex_aggregation_with_case.json", jsonFile.Name)
|
||||
jsonData := jsonFile.Data
|
||||
|
||||
mock.ExpectExec("PREPARE explain_plan_123456 AS with cte as (select * from some_table where id = $1) select * from cte").WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec("SET search_path TO testdb, public").WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec("SET plan_cache_mode = force_generic_plan").WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectQuery("EXPLAIN (FORMAT JSON) EXECUTE explain_plan_123456(null)").WillReturnRows(sqlmock.NewRows([]string{"json"}).AddRow(jsonData))
|
||||
mock.ExpectExec("DEALLOCATE explain_plan_123456").WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
err = explainPlan.fetchExplainPlans(t.Context())
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.NoError(t, mock.ExpectationsWereMet())
|
||||
assert.Equal(t, 1, dbConnFactory.InstantiationCount)
|
||||
|
||||
require.Eventually(
|
||||
t,
|
||||
func() bool {
|
||||
return len(lokiClient.Received()) == 1
|
||||
},
|
||||
5*time.Second,
|
||||
10*time.Millisecond,
|
||||
"did not receive the explain plan output log message within the timeout",
|
||||
)
|
||||
|
||||
require.NotContains(t, logBuffer.String(), "error")
|
||||
})
|
||||
|
||||
err = mock.ExpectationsWereMet()
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
@@ -1,27 +0,0 @@
|
||||
package collector
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/DataDog/go-sqllexer"
|
||||
)
|
||||
|
||||
// extractTableNames extracts the table names from a SQL query
|
||||
func extractTableNames(sql string) ([]string, error) {
|
||||
normalizer := sqllexer.NewNormalizer(
|
||||
sqllexer.WithCollectTables(true),
|
||||
)
|
||||
_, metadata, err := normalizer.Normalize(sql, sqllexer.WithDBMS(sqllexer.DBMSPostgres))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to normalize SQL: %w", err)
|
||||
}
|
||||
|
||||
// Return all table names, including those that end with "..." for truncated queries, as we can't know if the table name was truncated or not
|
||||
return metadata.Tables, nil
|
||||
}
|
||||
|
||||
// redact obfuscates a SQL query by replacing literals with ? placeholders
|
||||
func redact(sql string) string {
|
||||
obfuscatedSql := sqllexer.NewObfuscator().Obfuscate(sql)
|
||||
return obfuscatedSql
|
||||
}
|
||||
@@ -166,7 +166,7 @@ func (c QueryDetails) fetchAndAssociate(ctx context.Context) error {
|
||||
|
||||
func (c QueryDetails) tryTokenizeTableNames(sqlText string) ([]string, error) {
|
||||
sqlText = strings.TrimSuffix(sqlText, "...")
|
||||
tables, err := extractTableNames(sqlText)
|
||||
tables, err := database_observability.ExtractTableNames(sqlText)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to extract table names: %w", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user