From 080d2a2054e64b7e1427c1aa91d44bf08a4d9cd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torkel=20=C3=96degaard?= Date: Thu, 30 Mar 2017 13:46:46 +0200 Subject: [PATCH] mysql: reading arbitrary SQL data in go is really strange, data is only in strings? --- pkg/services/sqlstore/migrations/stats_mig.go | 35 +++ pkg/tsdb/mysql/mysql.go | 223 +++++++++++------- .../plugins/datasource/mysql/datasource.ts | 28 ++- public/app/plugins/datasource/mysql/module.ts | 3 +- .../mysql/partials/query.editor.html | 6 +- .../app/plugins/datasource/mysql/plugin.json | 11 + public/img/mysql_logo.svg | 27 +++ 7 files changed, 238 insertions(+), 95 deletions(-) create mode 100644 pkg/services/sqlstore/migrations/stats_mig.go create mode 100644 public/img/mysql_logo.svg diff --git a/pkg/services/sqlstore/migrations/stats_mig.go b/pkg/services/sqlstore/migrations/stats_mig.go new file mode 100644 index 00000000000..8b0deed89b8 --- /dev/null +++ b/pkg/services/sqlstore/migrations/stats_mig.go @@ -0,0 +1,35 @@ +package migrations + +import . "github.com/grafana/grafana/pkg/services/sqlstore/migrator" + +func addStatsMigrations(mg *Migrator) { + statTable := Table{ + Name: "stat", + Columns: []*Column{ + {Name: "id", Type: DB_Int, IsPrimaryKey: true, IsAutoIncrement: true}, + {Name: "metric", Type: DB_Varchar, Length: 20, Nullable: false}, + {Name: "type", Type: DB_Int, Nullable: false}, + }, + Indices: []*Index{ + {Cols: []string{"metric"}, Type: UniqueIndex}, + }, + } + + // create table + mg.AddMigration("create stat table", NewAddTableMigration(statTable)) + + // create indices + mg.AddMigration("add index stat.metric", NewAddIndexMigration(statTable, statTable.Indices[0])) + + statValue := Table{ + Name: "stat_value", + Columns: []*Column{ + {Name: "id", Type: DB_Int, IsPrimaryKey: true, IsAutoIncrement: true}, + {Name: "value", Type: DB_Double, Nullable: false}, + {Name: "time", Type: DB_DateTime, Nullable: false}, + }, + } + + // create table + mg.AddMigration("create stat_value table", NewAddTableMigration(statValue)) +} diff --git a/pkg/tsdb/mysql/mysql.go b/pkg/tsdb/mysql/mysql.go index 73fdc75a995..912150a5d14 100644 --- a/pkg/tsdb/mysql/mysql.go +++ b/pkg/tsdb/mysql/mysql.go @@ -2,6 +2,7 @@ package mysql import ( "context" + "database/sql" "fmt" "sync" @@ -12,9 +13,9 @@ import ( ) type MysqlExecutor struct { - *models.DataSource - engine *xorm.Engine - log log.Logger + datasource *models.DataSource + engine *xorm.Engine + log log.Logger } type engineCacheType struct { @@ -28,116 +29,174 @@ var engineCache = engineCacheType{ versions: make(map[int64]int), } +func init() { + tsdb.RegisterExecutor("mysql", NewMysqlExecutor) +} + func NewMysqlExecutor(datasource *models.DataSource) (tsdb.Executor, error) { - engine, err := getEngineFor(datasource) + executor := &MysqlExecutor{ + datasource: datasource, + log: log.New("tsdb.mysql"), + } + + err := executor.initEngine() if err != nil { return nil, err } - return &MysqlExecutor{ - log: log.New("tsdb.mysql"), - engine: engine, - }, nil + return executor, nil } -func getEngineFor(ds *models.DataSource) (*xorm.Engine, error) { +func (e *MysqlExecutor) initEngine() error { engineCache.Lock() defer engineCache.Unlock() - if engine, present := engineCache.cache[ds.Id]; present { - if version, _ := engineCache.versions[ds.Id]; version == ds.Version { - return engine, nil + if engine, present := engineCache.cache[e.datasource.Id]; present { + if version, _ := engineCache.versions[e.datasource.Id]; version == e.datasource.Version { + e.engine = engine + return nil } } - cnnstr := fmt.Sprintf("%s:%s@%s(%s)/%s?charset=utf8mb4", ds.User, ds.Password, "tcp", ds.Url, ds.Database) + cnnstr := fmt.Sprintf("%s:%s@%s(%s)/%s?charset=utf8mb4", e.datasource.User, e.datasource.Password, "tcp", e.datasource.Url, e.datasource.Database) + e.log.Debug("getEngine", "connection", cnnstr) + engine, err := xorm.NewEngine("mysql", cnnstr) engine.SetMaxConns(10) engine.SetMaxIdleConns(10) if err != nil { - return nil, err + return err } - engineCache.cache[ds.Id] = engine - return engine, nil -} - -func init() { - tsdb.RegisterExecutor("graphite", NewMysqlExecutor) + engineCache.cache[e.datasource.Id] = engine + e.engine = engine + return nil } func (e *MysqlExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult { result := &tsdb.BatchResult{} - session := engine.NewSession() + session := e.engine.NewSession() defer session.Close() db := session.DB() - result, err := getData(db, &req) - if err != nil { - return + // queries := strings.Split(req.Query, ";") + // + // data := dataStruct{} + // data.Results = make([]resultsStruct, 1) + // data.Results[0].Series = make([]seriesStruct, 0) + + for _, query := range queries { + rawSql := query.Model.Get("rawSql").MustString() + if rawSql == "" { + continue + } + + rows, err := db.Query(rawSql) + if err != nil { + result.Error = err + return result + } + defer rows.Close() + + columnNames, err := rows.Columns() + if err != nil { + result.Error = err + return result + } + + rc := NewStringStringScan(columnNames) + for rows.Next() { + err := rc.Update(rows.Rows) + if err != nil { + e.log.Error("Mysql response parsing", "error", err) + result.Error = err + return result + } + + rowValues := rc.Get() + e.log.Info("Rows", "row", rowValues) + } + + // for rows.Next() { + // columnValues := make([]interface{}, len(columnNames)) + // + // err = rows.ScanSlice(&columnValues) + // if err != nil { + // result.Error = err + // return result + // } + // + // // bytes -> string + // for i := range columnValues { + // rowType := reflect.TypeOf(columnValues[i]) + // e.log.Info("row", "type", rowType) + // + // rawValue := reflect.Indirect(reflect.ValueOf(columnValues[i])) + // + // // if rawValue is null then ignore + // if rawValue.Interface() == nil { + // continue + // } + // + // rawValueType := reflect.TypeOf(rawValue.Interface()) + // vv := reflect.ValueOf(rawValue.Interface()) + // e.log.Info("column type", "name", columnNames[i], "type", rawValueType, "vv", vv) + // } + // } } + + return result +} + +type stringStringScan struct { + // cp are the column pointers + cp []interface{} + // row contains the final result + row []string + colCount int + colNames []string +} + +func NewStringStringScan(columnNames []string) *stringStringScan { + lenCN := len(columnNames) + s := &stringStringScan{ + cp: make([]interface{}, lenCN), + row: make([]string, lenCN*2), + colCount: lenCN, + colNames: columnNames, + } + j := 0 + for i := 0; i < lenCN; i++ { + s.cp[i] = new(sql.RawBytes) + s.row[j] = s.colNames[i] + j = j + 2 + } + return s +} + +func (s *stringStringScan) Update(rows *sql.Rows) error { + if err := rows.Scan(s.cp...); err != nil { + return err + } + j := 0 + for i := 0; i < s.colCount; i++ { + if rb, ok := s.cp[i].(*sql.RawBytes); ok { + s.row[j+1] = string(*rb) + *rb = nil // reset pointer to discard current value to avoid a bug + } else { + return fmt.Errorf("Cannot convert index %d column %s to type *sql.RawBytes", i, s.colNames[i]) + } + j = j + 2 + } + return nil +} + +func (s *stringStringScan) Get() []string { + return s.row } -// func getData(db *core.DB, req *sqlDataRequest) (interface{}, error) { -// queries := strings.Split(req.Query, ";") -// -// data := dataStruct{} -// data.Results = make([]resultsStruct, 1) -// data.Results[0].Series = make([]seriesStruct, 0) -// -// for i := range queries { -// if queries[i] == "" { -// continue -// } -// -// rows, err := db.Query(queries[i]) -// if err != nil { -// return nil, err -// } -// defer rows.Close() -// -// name := fmt.Sprintf("table_%d", i+1) -// series, err := arrangeResult(rows, name) -// if err != nil { -// return nil, err -// } -// data.Results[0].Series = append(data.Results[0].Series, series.(seriesStruct)) -// } -// -// return data, nil -// } -// -// func arrangeResult(rows *core.Rows, name string) (interface{}, error) { -// columnNames, err := rows.Columns() -// -// series := seriesStruct{} -// series.Columns = columnNames -// series.Name = name -// -// for rows.Next() { -// columnValues := make([]interface{}, len(columnNames)) -// -// err = rows.ScanSlice(&columnValues) -// if err != nil { -// return nil, err -// } -// -// // bytes -> string -// for i := range columnValues { -// switch columnValues[i].(type) { -// case []byte: -// columnValues[i] = fmt.Sprintf("%s", columnValues[i]) -// } -// } -// -// series.Values = append(series.Values, columnValues) -// } -// -// return series, err -// } -// // type sqlDataRequest struct { // Query string `json:"query"` // Body []byte `json:"-"` diff --git a/public/app/plugins/datasource/mysql/datasource.ts b/public/app/plugins/datasource/mysql/datasource.ts index 33c42cee8ab..7b22e150b54 100644 --- a/public/app/plugins/datasource/mysql/datasource.ts +++ b/public/app/plugins/datasource/mysql/datasource.ts @@ -7,24 +7,34 @@ export class MysqlDatasource { name: any; /** @ngInject */ - constructor(instanceSettings, private backendSrv) { + constructor(instanceSettings, private backendSrv, private $q) { this.name = instanceSettings.name; this.id = instanceSettings.id; } query(options) { + var queries = _.filter(options.targets, item => { + return item.hide !== true; + }).map(item => { + return { + refId: item.refId, + intervalMs: options.intervalMs, + maxDataPoints: options.maxDataPoints, + datasourceId: this.id, + rawSql: item.rawSql, + }; + }); + + if (queries.length === 0) { + return this.$q.when({data: []}); + } + return this.backendSrv.post('/api/tsdb/query', { from: options.range.from.valueOf().toString(), to: options.range.to.valueOf().toString(), - queries: [ - { - "refId": "A", - "intervalMs": options.intervalMs, - "maxDataPoints": options.maxDataPoints, - "datasourceId": this.id, - } - ] + queries: queries, }).then(res => { + console.log('mysql response', res); var data = []; if (res.results) { diff --git a/public/app/plugins/datasource/mysql/module.ts b/public/app/plugins/datasource/mysql/module.ts index af7f525a69d..7d9a65b1609 100644 --- a/public/app/plugins/datasource/mysql/module.ts +++ b/public/app/plugins/datasource/mysql/module.ts @@ -8,7 +8,7 @@ class MysqlQueryCtrl extends QueryCtrl { static templateUrl = 'partials/query.editor.html'; } -class InfluxConfigCtrl { +class MysqlConfigCtrl { static templateUrl = 'partials/config.html'; } @@ -16,5 +16,6 @@ export { MysqlDatasource, MysqlDatasource as Datasource, MysqlQueryCtrl as QueryCtrl, + MysqlConfigCtrl as ConfigCtrl, }; diff --git a/public/app/plugins/datasource/mysql/partials/query.editor.html b/public/app/plugins/datasource/mysql/partials/query.editor.html index 880402573d5..0e8a76bd21c 100644 --- a/public/app/plugins/datasource/mysql/partials/query.editor.html +++ b/public/app/plugins/datasource/mysql/partials/query.editor.html @@ -1,7 +1,7 @@ -
-
- +
+
+
diff --git a/public/app/plugins/datasource/mysql/plugin.json b/public/app/plugins/datasource/mysql/plugin.json index 51e8988c7fd..662cde1c3e4 100644 --- a/public/app/plugins/datasource/mysql/plugin.json +++ b/public/app/plugins/datasource/mysql/plugin.json @@ -3,6 +3,17 @@ "name": "MySQL", "id": "mysql", + "info": { + "author": { + "name": "Grafana Project", + "url": "https://grafana.com" + }, + "logos": { + "small": "img/mysql_logo.svg", + "large": "img/mysql_logo.svg" + } + }, + "annotations": true, "metrics": true } diff --git a/public/img/mysql_logo.svg b/public/img/mysql_logo.svg new file mode 100644 index 00000000000..6d1d5c81e85 --- /dev/null +++ b/public/img/mysql_logo.svg @@ -0,0 +1,27 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + +