mirror of
https://github.com/grafana/grafana.git
synced 2025-07-31 10:32:42 +08:00
888 lines
29 KiB
Go
888 lines
29 KiB
Go
package loganalytics
|
|
|
|
import (
|
|
"bytes"
|
|
"compress/gzip"
|
|
"context"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"path"
|
|
"regexp"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
|
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/trace"
|
|
|
|
"github.com/grafana/grafana/pkg/tsdb/azuremonitor/kinds/dataquery"
|
|
"github.com/grafana/grafana/pkg/tsdb/azuremonitor/macros"
|
|
"github.com/grafana/grafana/pkg/tsdb/azuremonitor/types"
|
|
"github.com/grafana/grafana/pkg/tsdb/azuremonitor/utils"
|
|
)
|
|
|
|
// Returns tables with the `HasData` field set to true
|
|
func filterTablesWithData(tables []types.MetadataTable) []types.MetadataTable {
|
|
filtered := []types.MetadataTable{}
|
|
for _, table := range tables {
|
|
if table.HasData {
|
|
filtered = append(filtered, table)
|
|
}
|
|
}
|
|
|
|
return filtered
|
|
}
|
|
|
|
func writeErrorResponse(rw http.ResponseWriter, statusCode int, message string) error {
|
|
rw.Header().Set("Content-Type", "application/json")
|
|
rw.WriteHeader(statusCode)
|
|
|
|
// Log the raw error message
|
|
backend.Logger.Error(message)
|
|
|
|
// Set error response to initial error message
|
|
errorBody := map[string]string{"error": message}
|
|
|
|
// Attempt to locate JSON portion in error message
|
|
re := regexp.MustCompile(`\{.*\}`)
|
|
jsonPart := re.FindString(message)
|
|
if jsonPart != "" {
|
|
var jsonData map[string]interface{}
|
|
if unmarshalErr := json.Unmarshal([]byte(jsonPart), &jsonData); unmarshalErr != nil {
|
|
errorBody["error"] = fmt.Sprintf("Invalid JSON format in error message. Raw error: %s", message)
|
|
backend.Logger.Error("failed to unmarshal JSON error message", "error", unmarshalErr)
|
|
} else {
|
|
// Extract relevant fields for a formatted error message
|
|
errorType, ok := jsonData["error"].(string)
|
|
if ok {
|
|
errorDescription, ok := jsonData["error_description"].(string)
|
|
if !ok {
|
|
backend.Logger.Error("unable to convert error_description to string", "rawError", jsonData["error_description"])
|
|
// Attempt to just format the error as a string
|
|
errorDescription = fmt.Sprintf("%v", jsonData["error_description"])
|
|
}
|
|
if errorType == "" {
|
|
errorType = "UnknownError"
|
|
}
|
|
|
|
errorBody["error"] = fmt.Sprintf("%s: %s", errorType, errorDescription)
|
|
} else {
|
|
nestedError, ok := jsonData["error"].(map[string]interface{})
|
|
|
|
if !ok {
|
|
errorBody["error"] = fmt.Sprintf("Invalid JSON format in error message. Raw error: %s", message)
|
|
backend.Logger.Error("failed to unmarshal JSON error message", "error", unmarshalErr)
|
|
}
|
|
|
|
errorType := nestedError["code"].(string)
|
|
errorDescription, ok := nestedError["message"].(string)
|
|
if !ok {
|
|
backend.Logger.Error("unable to convert error_description to string", "rawError", jsonData["error_description"])
|
|
// Attempt to just format the error as a string
|
|
errorDescription = fmt.Sprintf("%v", nestedError["message"])
|
|
}
|
|
|
|
if errorType == "" {
|
|
errorType = "UnknownError"
|
|
}
|
|
errorBody["error"] = fmt.Sprintf("%s: %s", errorType, errorDescription)
|
|
}
|
|
}
|
|
}
|
|
|
|
jsonRes, _ := json.Marshal(errorBody)
|
|
_, err := rw.Write(jsonRes)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to write HTTP response: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (e *AzureLogAnalyticsDatasource) ResourceRequest(rw http.ResponseWriter, req *http.Request, cli *http.Client) (http.ResponseWriter, error) {
|
|
if req.URL.Path == "/usage/basiclogs" {
|
|
newUrl := &url.URL{
|
|
Scheme: req.URL.Scheme,
|
|
Host: req.URL.Host,
|
|
Path: "/v1/query",
|
|
}
|
|
return e.GetBasicLogsUsage(req.Context(), newUrl.String(), cli, rw, req.Body)
|
|
} else if strings.Contains(req.URL.Path, "/metadata") {
|
|
isAppInsights := strings.Contains(strings.ToLower(req.URL.Path), "microsoft.insights/components")
|
|
// Add necessary headers
|
|
if isAppInsights {
|
|
// metadata-format-v4 is not supported for AppInsights resources
|
|
req.Header.Set("Prefer", "metadata-format-v3,exclude-resourcetypes,exclude-customfunctions")
|
|
} else {
|
|
req.Header.Set("Prefer", "metadata-format-v4,exclude-resourcetypes,exclude-customfunctions")
|
|
}
|
|
queryParams := req.URL.Query()
|
|
// Add necessary query params
|
|
queryParams.Add("select", "categories,solutions,tables,workspaces")
|
|
req.URL.RawQuery = queryParams.Encode()
|
|
resp, err := cli.Do(req)
|
|
if err != nil {
|
|
return nil, writeErrorResponse(rw, resp.StatusCode, fmt.Sprintf("failed to fetch metadata: %s", err))
|
|
}
|
|
|
|
defer func() {
|
|
if err := resp.Body.Close(); err != nil {
|
|
e.Logger.Warn("Failed to close response body for metadata request", "err", err)
|
|
}
|
|
}()
|
|
|
|
encoding := resp.Header.Get("Content-Encoding")
|
|
body, err := decode(encoding, resp.Body)
|
|
if err != nil {
|
|
return nil, writeErrorResponse(rw, resp.StatusCode, fmt.Sprintf("failed to read metadata response: %s", err))
|
|
}
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
return nil, writeErrorResponse(rw, resp.StatusCode, fmt.Sprintf("metadata API error: %s", string(body)))
|
|
}
|
|
|
|
var metadata types.AzureLogAnalyticsMetadata
|
|
err = json.Unmarshal(body, &metadata)
|
|
if err != nil {
|
|
return nil, writeErrorResponse(rw, http.StatusInternalServerError, fmt.Sprintf("failed to unmarshal metadata response: %s", err))
|
|
}
|
|
|
|
// AppInsights metadata requests do not return the HasData field
|
|
// So we return all tables
|
|
if !isAppInsights {
|
|
metadata.Tables = filterTablesWithData(metadata.Tables)
|
|
}
|
|
|
|
responseBody, err := json.Marshal(metadata)
|
|
if err != nil {
|
|
return nil, writeErrorResponse(rw, http.StatusInternalServerError, fmt.Sprintf("failed to marshal metadata response: %s", err))
|
|
}
|
|
|
|
rw.Header().Set("Content-Type", "application/json")
|
|
rw.WriteHeader(http.StatusOK)
|
|
_, err = rw.Write(responseBody)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to write metadata response: %w", err)
|
|
}
|
|
|
|
return rw, nil
|
|
}
|
|
|
|
// Default behavior for other requests
|
|
return e.Proxy.Do(rw, req, cli)
|
|
}
|
|
|
|
// builds and executes a new query request that will get the data ingeted for the given table in the basic logs query
|
|
func (e *AzureLogAnalyticsDatasource) GetBasicLogsUsage(ctx context.Context, url string, client *http.Client, rw http.ResponseWriter, reqBody io.ReadCloser) (http.ResponseWriter, error) {
|
|
// read the full body
|
|
originalPayload, readErr := io.ReadAll(reqBody)
|
|
if readErr != nil {
|
|
return rw, fmt.Errorf("failed to read request body %w", readErr)
|
|
}
|
|
var payload BasicLogsUsagePayload
|
|
jsonErr := json.Unmarshal(originalPayload, &payload)
|
|
if jsonErr != nil {
|
|
return rw, fmt.Errorf("error decoding basic logs table usage payload: %w", jsonErr)
|
|
}
|
|
table := payload.Table
|
|
|
|
from, fromErr := ConvertTime(payload.From)
|
|
if fromErr != nil {
|
|
return rw, fmt.Errorf("failed to convert from time: %w", fromErr)
|
|
}
|
|
|
|
to, toErr := ConvertTime(payload.To)
|
|
if toErr != nil {
|
|
return rw, fmt.Errorf("failed to convert to time: %w", toErr)
|
|
}
|
|
|
|
// basic logs queries only show data for last 8 days or less
|
|
// data volume query should also only calculate volume for last 8 days if time range exceeds that.
|
|
diff := to.Sub(from).Hours()
|
|
if diff > float64(MaxHoursBasicLogs) {
|
|
from = to.Add(-time.Duration(MaxHoursBasicLogs) * time.Hour)
|
|
}
|
|
|
|
dataVolumeQueryRaw := GetDataVolumeRawQuery(table)
|
|
dataVolumeQuery := &AzureLogAnalyticsQuery{
|
|
Query: dataVolumeQueryRaw,
|
|
DashboardTime: true, // necessary to ensure TimeRange property is used since query will not have an in-query time filter
|
|
TimeRange: backend.TimeRange{
|
|
From: from,
|
|
To: to,
|
|
},
|
|
TimeColumn: "TimeGenerated",
|
|
Resources: []string{payload.Resource},
|
|
QueryType: dataquery.AzureQueryTypeLogAnalytics,
|
|
URL: getApiURL(payload.Resource, false, false),
|
|
}
|
|
|
|
req, err := e.createRequest(ctx, url, dataVolumeQuery)
|
|
if err != nil {
|
|
return rw, err
|
|
}
|
|
|
|
_, span := tracing.DefaultTracer().Start(ctx, "azure basic logs usage query", trace.WithAttributes(
|
|
attribute.String("target", dataVolumeQuery.Query),
|
|
attribute.String("table", table),
|
|
attribute.Int64("from", dataVolumeQuery.TimeRange.From.UnixNano()/int64(time.Millisecond)),
|
|
attribute.Int64("until", dataVolumeQuery.TimeRange.To.UnixNano()/int64(time.Millisecond)),
|
|
))
|
|
defer span.End()
|
|
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
return rw, err
|
|
}
|
|
|
|
defer func() {
|
|
if err := resp.Body.Close(); err != nil {
|
|
e.Logger.Warn("Failed to close response body for data volume request", "err", err)
|
|
}
|
|
}()
|
|
|
|
logResponse, err := e.unmarshalResponse(resp)
|
|
if err != nil {
|
|
return rw, err
|
|
}
|
|
|
|
t, err := logResponse.GetPrimaryResultTable()
|
|
if err != nil {
|
|
return rw, err
|
|
}
|
|
|
|
num := t.Rows[0][0].(json.Number)
|
|
value, err := num.Float64()
|
|
if err != nil {
|
|
return rw, err
|
|
}
|
|
_, err = fmt.Fprintf(rw, "%f", value)
|
|
if err != nil {
|
|
return rw, err
|
|
}
|
|
|
|
return rw, err
|
|
}
|
|
|
|
// executeTimeSeriesQuery does the following:
|
|
// 1. build the AzureMonitor url and querystring for each query
|
|
// 2. executes each query by calling the Azure Monitor API
|
|
// 3. parses the responses for each query into data frames
|
|
func (e *AzureLogAnalyticsDatasource) ExecuteTimeSeriesQuery(ctx context.Context, originalQueries []backend.DataQuery, dsInfo types.DatasourceInfo, client *http.Client, url string, fromAlert bool) (*backend.QueryDataResponse, error) {
|
|
result := backend.NewQueryDataResponse()
|
|
|
|
for _, query := range originalQueries {
|
|
logsQuery, err := e.buildQuery(ctx, query, dsInfo, fromAlert)
|
|
if err != nil {
|
|
result.Responses[query.RefID] = backend.ErrorResponseWithErrorSource(err)
|
|
continue
|
|
}
|
|
res, err := e.executeQuery(ctx, logsQuery, dsInfo, client, url)
|
|
if err != nil {
|
|
result.Responses[query.RefID] = backend.ErrorResponseWithErrorSource(err)
|
|
continue
|
|
}
|
|
result.Responses[query.RefID] = *res
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func buildLogAnalyticsQuery(query backend.DataQuery, dsInfo types.DatasourceInfo, appInsightsRegExp *regexp.Regexp, fromAlert bool) (*AzureLogAnalyticsQuery, error) {
|
|
queryJSONModel := types.LogJSONQuery{}
|
|
err := json.Unmarshal(query.JSON, &queryJSONModel)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to decode the Azure Log Analytics query object from JSON: %w", err)
|
|
}
|
|
|
|
var queryString string
|
|
appInsightsQuery := false
|
|
dashboardTime := false
|
|
timeColumn := ""
|
|
azureLogAnalyticsTarget := queryJSONModel.AzureLogAnalytics
|
|
basicLogsQuery := false
|
|
basicLogsEnabled := false
|
|
|
|
resultFormat := ParseResultFormat(azureLogAnalyticsTarget.ResultFormat, dataquery.AzureQueryTypeLogAnalytics)
|
|
|
|
basicLogsQueryFlag := false
|
|
if azureLogAnalyticsTarget.BasicLogsQuery != nil {
|
|
basicLogsQueryFlag = *azureLogAnalyticsTarget.BasicLogsQuery
|
|
}
|
|
|
|
resources, resourceOrWorkspace := retrieveResources(azureLogAnalyticsTarget)
|
|
appInsightsQuery = appInsightsRegExp.Match([]byte(resourceOrWorkspace))
|
|
|
|
if value, ok := dsInfo.JSONData["basicLogsEnabled"].(bool); ok {
|
|
basicLogsEnabled = value
|
|
}
|
|
|
|
if basicLogsQueryFlag {
|
|
if meetsBasicLogsCriteria, meetsBasicLogsCriteriaErr := meetsBasicLogsCriteria(resources, fromAlert, basicLogsEnabled); meetsBasicLogsCriteriaErr != nil {
|
|
return nil, meetsBasicLogsCriteriaErr
|
|
} else {
|
|
basicLogsQuery = meetsBasicLogsCriteria
|
|
}
|
|
}
|
|
|
|
if azureLogAnalyticsTarget.Query != nil {
|
|
queryString = *azureLogAnalyticsTarget.Query
|
|
}
|
|
|
|
if azureLogAnalyticsTarget.DashboardTime != nil {
|
|
dashboardTime = *azureLogAnalyticsTarget.DashboardTime
|
|
if dashboardTime {
|
|
if azureLogAnalyticsTarget.TimeColumn != nil {
|
|
timeColumn = *azureLogAnalyticsTarget.TimeColumn
|
|
} else {
|
|
// Final fallback to TimeGenerated if no column is provided
|
|
timeColumn = "TimeGenerated"
|
|
}
|
|
}
|
|
}
|
|
|
|
apiURL := getApiURL(resourceOrWorkspace, appInsightsQuery, basicLogsQuery)
|
|
|
|
rawQuery, err := macros.KqlInterpolate(query, dsInfo, queryString, "TimeGenerated")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &AzureLogAnalyticsQuery{
|
|
RefID: query.RefID,
|
|
ResultFormat: resultFormat,
|
|
URL: apiURL,
|
|
JSON: query.JSON,
|
|
TimeRange: query.TimeRange,
|
|
Query: rawQuery,
|
|
Resources: resources,
|
|
QueryType: dataquery.AzureQueryType(query.QueryType),
|
|
AppInsightsQuery: appInsightsQuery,
|
|
DashboardTime: dashboardTime,
|
|
TimeColumn: timeColumn,
|
|
BasicLogs: basicLogsQuery,
|
|
}, nil
|
|
}
|
|
|
|
func (e *AzureLogAnalyticsDatasource) buildQuery(ctx context.Context, query backend.DataQuery, dsInfo types.DatasourceInfo, fromAlert bool) (*AzureLogAnalyticsQuery, error) {
|
|
var azureLogAnalyticsQuery *AzureLogAnalyticsQuery
|
|
appInsightsRegExp, err := regexp.Compile("(?i)providers/microsoft.insights/components")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to compile Application Insights regex")
|
|
}
|
|
|
|
if query.QueryType == string(dataquery.AzureQueryTypeLogAnalytics) {
|
|
azureLogAnalyticsQuery, err = buildLogAnalyticsQuery(query, dsInfo, appInsightsRegExp, fromAlert)
|
|
if err != nil {
|
|
errorMessage := fmt.Errorf("failed to build azure log analytics query: %w", err)
|
|
return nil, utils.ApplySourceFromError(errorMessage, err)
|
|
}
|
|
}
|
|
|
|
if query.QueryType == string(dataquery.AzureQueryTypeAzureTraces) || query.QueryType == string(dataquery.AzureQueryTypeTraceExemplar) {
|
|
if query.QueryType == string(dataquery.AzureQueryTypeTraceExemplar) {
|
|
cfg := backend.GrafanaConfigFromContext(ctx)
|
|
hasPromExemplarsToggle := cfg.FeatureToggles().IsEnabled("azureMonitorPrometheusExemplars")
|
|
if !hasPromExemplarsToggle {
|
|
return nil, backend.DownstreamError(fmt.Errorf("query type unsupported as azureMonitorPrometheusExemplars feature toggle is not enabled"))
|
|
}
|
|
}
|
|
azureAppInsightsQuery, err := buildAppInsightsQuery(ctx, query, dsInfo, appInsightsRegExp, e.Logger)
|
|
if err != nil {
|
|
errorMessage := fmt.Errorf("failed to build azure application insights query: %w", err)
|
|
return nil, utils.ApplySourceFromError(errorMessage, err)
|
|
}
|
|
azureLogAnalyticsQuery = azureAppInsightsQuery
|
|
}
|
|
|
|
return azureLogAnalyticsQuery, nil
|
|
}
|
|
|
|
func (e *AzureLogAnalyticsDatasource) executeQuery(ctx context.Context, query *AzureLogAnalyticsQuery, dsInfo types.DatasourceInfo, client *http.Client, url string) (*backend.DataResponse, error) {
|
|
// If azureLogAnalyticsSameAs is defined and set to false, return an error
|
|
if sameAs, ok := dsInfo.JSONData["azureLogAnalyticsSameAs"]; ok {
|
|
sameAsValue, ok := sameAs.(bool)
|
|
if !ok {
|
|
stringVal, ok := sameAs.(string)
|
|
if !ok {
|
|
return nil, backend.DownstreamError(fmt.Errorf("unknown value for Log Analytics credentials. Go to the data source configuration to update Azure Monitor credentials"))
|
|
}
|
|
|
|
var err error
|
|
sameAsValue, err = strconv.ParseBool(stringVal)
|
|
if err != nil {
|
|
return nil, backend.DownstreamError(fmt.Errorf("unknown value for Log Analytics credentials. Go to the data source configuration to update Azure Monitor credentials"))
|
|
}
|
|
}
|
|
if !sameAsValue {
|
|
return nil, backend.DownstreamError(fmt.Errorf("credentials for Log Analytics are no longer supported. Go to the data source configuration to update Azure Monitor credentials"))
|
|
}
|
|
}
|
|
|
|
queryJSONModel := dataquery.AzureMonitorQuery{}
|
|
err := json.Unmarshal(query.JSON, &queryJSONModel)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if query.QueryType == dataquery.AzureQueryTypeAzureTraces {
|
|
if query.ResultFormat == dataquery.ResultFormatTrace && query.Query == "" {
|
|
return nil, backend.DownstreamError(fmt.Errorf("cannot visualise trace events using the trace visualiser"))
|
|
}
|
|
}
|
|
|
|
req, err := e.createRequest(ctx, url, query)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
_, span := tracing.DefaultTracer().Start(ctx, "azure log analytics query", trace.WithAttributes(
|
|
attribute.String("target", query.Query),
|
|
attribute.Bool("basic_logs", query.BasicLogs),
|
|
attribute.Int64("from", query.TimeRange.From.UnixNano()/int64(time.Millisecond)),
|
|
attribute.Int64("until", query.TimeRange.To.UnixNano()/int64(time.Millisecond)),
|
|
attribute.Int64("datasource_id", dsInfo.DatasourceID),
|
|
attribute.Int64("org_id", dsInfo.OrgID),
|
|
))
|
|
defer span.End()
|
|
|
|
res, err := client.Do(req)
|
|
if err != nil {
|
|
return nil, backend.DownstreamError(err)
|
|
}
|
|
|
|
defer func() {
|
|
if err := res.Body.Close(); err != nil {
|
|
e.Logger.Warn("Failed to close response body", "err", err)
|
|
}
|
|
}()
|
|
|
|
logResponse, err := e.unmarshalResponse(res)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
t, err := logResponse.GetPrimaryResultTable()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
logLimitDisabled := backend.GrafanaConfigFromContext(ctx).FeatureToggles().IsEnabled("azureMonitorDisableLogLimit")
|
|
|
|
frame, err := ResponseTableToFrame(t, query.RefID, query.Query, query.QueryType, query.ResultFormat, logLimitDisabled)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
frame = appendErrorNotice(frame, logResponse.Error)
|
|
if frame == nil {
|
|
return &backend.DataResponse{}, nil
|
|
}
|
|
|
|
// Ensure Meta.Custom is initialized
|
|
if frame.Meta.Custom == nil {
|
|
frame.Meta.Custom = &LogAnalyticsMeta{
|
|
ColumnTypes: make([]string, 0),
|
|
}
|
|
}
|
|
|
|
queryUrl, err := getQueryUrl(query.Query, query.Resources, dsInfo.Routes["Azure Portal"].URL, query.TimeRange)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Set the preferred visualization
|
|
switch query.ResultFormat {
|
|
case dataquery.ResultFormatTrace:
|
|
if query.QueryType == dataquery.AzureQueryTypeAzureTraces || query.QueryType == dataquery.AzureQueryTypeTraceExemplar {
|
|
frame.Meta.PreferredVisualization = data.VisTypeTrace
|
|
}
|
|
case dataquery.ResultFormatTable:
|
|
frame.Meta.PreferredVisualization = data.VisTypeTable
|
|
case dataquery.ResultFormatLogs:
|
|
frame.Meta.PreferredVisualization = data.VisTypeLogs
|
|
if logMeta, ok := frame.Meta.Custom.(*LogAnalyticsMeta); ok {
|
|
frame.Meta.Custom = &LogAnalyticsMeta{
|
|
ColumnTypes: logMeta.ColumnTypes,
|
|
AzurePortalLink: queryUrl,
|
|
}
|
|
} else {
|
|
frame.Meta.Custom = &LogAnalyticsMeta{
|
|
AzurePortalLink: queryUrl,
|
|
}
|
|
}
|
|
case dataquery.ResultFormatTimeSeries:
|
|
tsSchema := frame.TimeSeriesSchema()
|
|
if tsSchema.Type == data.TimeSeriesTypeLong {
|
|
wideFrame, err := data.LongToWide(frame, nil)
|
|
if err == nil {
|
|
frame = wideFrame
|
|
} else {
|
|
frame.AppendNotices(data.Notice{
|
|
Severity: data.NoticeSeverityWarning,
|
|
Text: "could not convert frame to time series, returning raw table: " + err.Error(),
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
// Use the parent span query for the parent span data link
|
|
err = addDataLinksToFields(query, dsInfo.Routes["Azure Portal"].URL, frame, dsInfo, queryUrl)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
dataResponse := backend.DataResponse{Frames: data.Frames{frame}}
|
|
return &dataResponse, nil
|
|
}
|
|
|
|
func addDataLinksToFields(query *AzureLogAnalyticsQuery, azurePortalBaseUrl string, frame *data.Frame, dsInfo types.DatasourceInfo, queryUrl string) error {
|
|
if query.QueryType == dataquery.AzureQueryTypeAzureTraces {
|
|
err := addTraceDataLinksToFields(query, azurePortalBaseUrl, frame, dsInfo)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
if query.ResultFormat == dataquery.ResultFormatLogs {
|
|
return nil
|
|
}
|
|
|
|
AddConfigLinks(*frame, queryUrl, nil)
|
|
|
|
return nil
|
|
}
|
|
|
|
func addTraceDataLinksToFields(query *AzureLogAnalyticsQuery, azurePortalBaseUrl string, frame *data.Frame, dsInfo types.DatasourceInfo) error {
|
|
tracesUrl, err := getTracesQueryUrl(azurePortalBaseUrl)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
queryJSONModel := dataquery.AzureMonitorQuery{}
|
|
err = json.Unmarshal(query.JSON, &queryJSONModel)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
traceIdVariable := "${__data.fields.traceID}"
|
|
resultFormat := dataquery.ResultFormatTrace
|
|
queryJSONModel.AzureTraces.ResultFormat = &resultFormat
|
|
queryJSONModel.AzureTraces.Query = &query.TraceExploreQuery
|
|
if queryJSONModel.AzureTraces.OperationId == nil || *queryJSONModel.AzureTraces.OperationId == "" {
|
|
queryJSONModel.AzureTraces.OperationId = &traceIdVariable
|
|
}
|
|
|
|
logsQueryType := string(dataquery.AzureQueryTypeLogAnalytics)
|
|
logsJSONModel := dataquery.AzureMonitorQuery{
|
|
QueryType: &logsQueryType,
|
|
AzureLogAnalytics: &dataquery.AzureLogsQuery{
|
|
Query: &query.TraceLogsExploreQuery,
|
|
Resources: []string{queryJSONModel.AzureTraces.Resources[0]},
|
|
},
|
|
}
|
|
|
|
if query.ResultFormat == dataquery.ResultFormatTable {
|
|
AddCustomDataLink(*frame, data.DataLink{
|
|
Title: "Explore Trace: ${__data.fields.traceID}",
|
|
URL: "",
|
|
Internal: &data.InternalDataLink{
|
|
DatasourceUID: dsInfo.DatasourceUID,
|
|
DatasourceName: dsInfo.DatasourceName,
|
|
Query: queryJSONModel,
|
|
},
|
|
}, MultiField)
|
|
|
|
queryJSONModel.AzureTraces.Query = &query.TraceParentExploreQuery
|
|
AddCustomDataLink(*frame, data.DataLink{
|
|
Title: "Explore Parent Span: ${__data.fields.parentSpanID}",
|
|
URL: "",
|
|
Internal: &data.InternalDataLink{
|
|
DatasourceUID: dsInfo.DatasourceUID,
|
|
DatasourceName: dsInfo.DatasourceName,
|
|
Query: queryJSONModel,
|
|
},
|
|
}, MultiField)
|
|
|
|
linkTitle := "Explore Trace in Azure Portal"
|
|
AddConfigLinks(*frame, tracesUrl, &linkTitle)
|
|
}
|
|
|
|
AddCustomDataLink(*frame, data.DataLink{
|
|
Title: "Explore Trace Logs",
|
|
URL: "",
|
|
Internal: &data.InternalDataLink{
|
|
DatasourceUID: dsInfo.DatasourceUID,
|
|
DatasourceName: dsInfo.DatasourceName,
|
|
Query: logsJSONModel,
|
|
},
|
|
}, SingleField)
|
|
|
|
return nil
|
|
}
|
|
|
|
func appendErrorNotice(frame *data.Frame, err *AzureLogAnalyticsAPIError) *data.Frame {
|
|
if err == nil {
|
|
return frame
|
|
}
|
|
if frame == nil {
|
|
frame = &data.Frame{}
|
|
}
|
|
frame.AppendNotices(apiErrorToNotice(err))
|
|
return frame
|
|
}
|
|
|
|
func (e *AzureLogAnalyticsDatasource) createRequest(ctx context.Context, queryURL string, query *AzureLogAnalyticsQuery) (*http.Request, error) {
|
|
body := map[string]interface{}{
|
|
"query": query.Query,
|
|
}
|
|
|
|
if query.DashboardTime {
|
|
from := query.TimeRange.From.Format(time.RFC3339)
|
|
to := query.TimeRange.To.Format(time.RFC3339)
|
|
timespan := fmt.Sprintf("%s/%s", from, to)
|
|
body["timespan"] = timespan
|
|
body["query_datetimescope_from"] = from
|
|
body["query_datetimescope_to"] = to
|
|
body["query_datetimescope_column"] = query.TimeColumn
|
|
}
|
|
|
|
if len(query.Resources) > 1 && query.QueryType == dataquery.AzureQueryTypeLogAnalytics && !query.AppInsightsQuery {
|
|
str := strings.ToLower(query.Resources[0])
|
|
|
|
if strings.Contains(str, "microsoft.operationalinsights/workspaces") {
|
|
body["workspaces"] = query.Resources
|
|
} else {
|
|
body["resources"] = query.Resources
|
|
}
|
|
}
|
|
|
|
if query.AppInsightsQuery {
|
|
// If the query type is traces then we only need the first resource as the rest are specified in the query
|
|
if query.QueryType == dataquery.AzureQueryTypeAzureTraces {
|
|
body["applications"] = []string{query.Resources[0]}
|
|
} else {
|
|
body["applications"] = query.Resources
|
|
}
|
|
}
|
|
|
|
jsonValue, err := json.Marshal(body)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("%v: %w", "failed to create request", err)
|
|
}
|
|
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, queryURL, bytes.NewBuffer(jsonValue))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("%v: %w", "failed to create request", err)
|
|
}
|
|
|
|
req.URL.Path = "/"
|
|
req.Header.Set("Content-Type", "application/json")
|
|
req.URL.Path = path.Join(req.URL.Path, query.URL)
|
|
|
|
return req, nil
|
|
}
|
|
|
|
type AzureLogAnalyticsURLResources struct {
|
|
Resources []AzureLogAnalyticsURLResource `json:"resources"`
|
|
}
|
|
|
|
type AzureLogAnalyticsURLResource struct {
|
|
ResourceID string `json:"resourceId"`
|
|
}
|
|
|
|
func getQueryUrl(query string, resources []string, azurePortalUrl string, timeRange backend.TimeRange) (string, error) {
|
|
encodedQuery, err := encodeQuery(query)
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to encode the query: %s", err)
|
|
}
|
|
|
|
portalUrl := azurePortalUrl + "/#blade/Microsoft_OperationsManagementSuite_Workspace/AnalyticsBlade/initiator/AnalyticsShareLinkToQuery/isQueryEditorVisible/true/scope/"
|
|
resourcesJson := AzureLogAnalyticsURLResources{
|
|
Resources: make([]AzureLogAnalyticsURLResource, 0),
|
|
}
|
|
for _, resource := range resources {
|
|
resourcesJson.Resources = append(resourcesJson.Resources, AzureLogAnalyticsURLResource{
|
|
ResourceID: resource,
|
|
})
|
|
}
|
|
resourcesMarshalled, err := json.Marshal(resourcesJson)
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to marshal log analytics resources: %s", err)
|
|
}
|
|
from := timeRange.From.Format(time.RFC3339)
|
|
to := timeRange.To.Format(time.RFC3339)
|
|
timespan := url.QueryEscape(fmt.Sprintf("%s/%s", from, to))
|
|
portalUrl += url.QueryEscape(string(resourcesMarshalled))
|
|
portalUrl += "/query/" + url.PathEscape(encodedQuery) + "/isQueryBase64Compressed/true/timespan/" + timespan
|
|
return portalUrl, nil
|
|
}
|
|
|
|
func getTracesQueryUrl(azurePortalUrl string) (string, error) {
|
|
portalUrl := azurePortalUrl
|
|
portalUrl += "/#view/AppInsightsExtension/DetailsV2Blade/ComponentId~/"
|
|
|
|
resource := "%7B%22ResourceId%22:%22${__data.fields.resource:percentencode}%22%7D"
|
|
portalUrl += resource
|
|
portalUrl += "/DataModel~/"
|
|
|
|
// We're making use of data link variables to select the necessary fields in the frontend
|
|
eventId := "%22eventId%22%3A%22${__data.fields.itemId}%22%2C"
|
|
timestamp := "%22timestamp%22%3A%22${__data.fields.startTime}%22%2C"
|
|
eventTable := "%22eventTable%22%3A%22${__data.fields.itemType}%22"
|
|
traceObject := fmt.Sprintf("%%7B%s%s%s%%7D", eventId, timestamp, eventTable)
|
|
|
|
portalUrl += traceObject
|
|
|
|
return portalUrl, nil
|
|
}
|
|
|
|
func getCorrelationWorkspaces(ctx context.Context, baseResource string, resourcesMap map[string]bool, dsInfo types.DatasourceInfo, operationId string) (map[string]bool, error) {
|
|
azMonService := dsInfo.Services["Azure Monitor"]
|
|
correlationUrl := azMonService.URL + fmt.Sprintf("%s/providers/microsoft.insights/transactions/%s", baseResource, operationId)
|
|
|
|
callCorrelationAPI := func(url string) (AzureCorrelationAPIResponse, error) {
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewBuffer([]byte{}))
|
|
if err != nil {
|
|
return AzureCorrelationAPIResponse{}, fmt.Errorf("%v: %w", "failed to create request", err)
|
|
}
|
|
req.URL.Path = url
|
|
req.Header.Set("Content-Type", "application/json")
|
|
values := req.URL.Query()
|
|
values.Add("api-version", "2019-10-17-preview")
|
|
req.URL.RawQuery = values.Encode()
|
|
req.Method = "GET"
|
|
|
|
_, span := tracing.DefaultTracer().Start(ctx, "azure traces correlation request", trace.WithAttributes(
|
|
attribute.String("target", req.URL.String()),
|
|
attribute.Int64("datasource_id", dsInfo.DatasourceID),
|
|
attribute.Int64("org_id", dsInfo.OrgID),
|
|
))
|
|
defer span.End()
|
|
|
|
res, err := azMonService.HTTPClient.Do(req)
|
|
if err != nil {
|
|
return AzureCorrelationAPIResponse{}, backend.DownstreamError(err)
|
|
}
|
|
body, err := io.ReadAll(res.Body)
|
|
if err != nil {
|
|
return AzureCorrelationAPIResponse{}, backend.DownstreamError(err)
|
|
}
|
|
|
|
defer func() {
|
|
if err := res.Body.Close(); err != nil {
|
|
azMonService.Logger.Warn("Failed to close response body", "err", err)
|
|
}
|
|
}()
|
|
|
|
if res.StatusCode/100 != 2 {
|
|
if res.StatusCode == 404 {
|
|
return AzureCorrelationAPIResponse{}, backend.DownstreamError(fmt.Errorf("requested trace not found by Application Insights indexing. Select the relevant Application Insights resource to search for the Operation ID directly"))
|
|
}
|
|
return AzureCorrelationAPIResponse{}, utils.CreateResponseErrorFromStatusCode(res.StatusCode, res.Status, body)
|
|
}
|
|
var data AzureCorrelationAPIResponse
|
|
d := json.NewDecoder(bytes.NewReader(body))
|
|
d.UseNumber()
|
|
err = d.Decode(&data)
|
|
if err != nil {
|
|
return AzureCorrelationAPIResponse{}, err
|
|
}
|
|
|
|
for _, resource := range data.Properties.Resources {
|
|
lowerCaseResource := strings.ToLower(resource)
|
|
if _, ok := resourcesMap[lowerCaseResource]; !ok {
|
|
resourcesMap[lowerCaseResource] = true
|
|
}
|
|
}
|
|
return data, nil
|
|
}
|
|
|
|
var nextLink *string
|
|
var correlationResponse AzureCorrelationAPIResponse
|
|
|
|
correlationResponse, err := callCorrelationAPI(correlationUrl)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
nextLink = correlationResponse.Properties.NextLink
|
|
|
|
for nextLink != nil {
|
|
correlationResponse, err := callCorrelationAPI(correlationUrl)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
nextLink = correlationResponse.Properties.NextLink
|
|
}
|
|
|
|
// Remove the base element as that's where the query is run anyway
|
|
delete(resourcesMap, strings.ToLower(baseResource))
|
|
return resourcesMap, nil
|
|
}
|
|
|
|
// GetPrimaryResultTable returns the first table in the response named "PrimaryResult", or an
|
|
// error if there is no table by that name.
|
|
func (ar *AzureLogAnalyticsResponse) GetPrimaryResultTable() (*types.AzureResponseTable, error) {
|
|
for _, t := range ar.Tables {
|
|
if t.Name == "PrimaryResult" {
|
|
return &t, nil
|
|
}
|
|
}
|
|
return nil, fmt.Errorf("no data as PrimaryResult table is missing from the response")
|
|
}
|
|
|
|
func (e *AzureLogAnalyticsDatasource) unmarshalResponse(res *http.Response) (AzureLogAnalyticsResponse, error) {
|
|
body, err := io.ReadAll(res.Body)
|
|
if err != nil {
|
|
return AzureLogAnalyticsResponse{}, err
|
|
}
|
|
defer func() {
|
|
if err := res.Body.Close(); err != nil {
|
|
e.Logger.Warn("Failed to close response body", "err", err)
|
|
}
|
|
}()
|
|
|
|
if res.StatusCode/100 != 2 {
|
|
return AzureLogAnalyticsResponse{}, utils.CreateResponseErrorFromStatusCode(res.StatusCode, res.Status, body)
|
|
}
|
|
|
|
var data AzureLogAnalyticsResponse
|
|
d := json.NewDecoder(bytes.NewReader(body))
|
|
d.UseNumber()
|
|
err = d.Decode(&data)
|
|
if err != nil {
|
|
return AzureLogAnalyticsResponse{}, err
|
|
}
|
|
|
|
return data, nil
|
|
}
|
|
|
|
// LogAnalyticsMeta is a type for the a Frame's Meta's Custom property.
|
|
type LogAnalyticsMeta struct {
|
|
ColumnTypes []string `json:"azureColumnTypes"`
|
|
AzurePortalLink string `json:"azurePortalLink,omitempty"`
|
|
}
|
|
|
|
// encodeQuery encodes the query in gzip so the frontend can build links.
|
|
func encodeQuery(rawQuery string) (string, error) {
|
|
var b bytes.Buffer
|
|
gz := gzip.NewWriter(&b)
|
|
if _, err := gz.Write([]byte(rawQuery)); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
if err := gz.Close(); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return base64.StdEncoding.EncodeToString(b.Bytes()), nil
|
|
}
|