mirror of
https://github.com/grafana/grafana.git
synced 2025-07-31 11:12:58 +08:00
dsproxy: implements support for plugin routes with jwt file
Google Cloud service accounts use a JWT token to get an oauth access token. This adds support for that.
This commit is contained in:
149
pkg/api/pluginproxy/access_token_provider.go
Normal file
149
pkg/api/pluginproxy/access_token_provider.go
Normal file
@ -0,0 +1,149 @@
|
||||
package pluginproxy
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"golang.org/x/oauth2"
|
||||
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"golang.org/x/oauth2/jwt"
|
||||
)
|
||||
|
||||
var (
|
||||
tokenCache = map[string]*jwtToken{}
|
||||
oauthJwtTokenCache = map[string]*oauth2.Token{}
|
||||
)
|
||||
|
||||
type accessTokenProvider struct {
|
||||
route *plugins.AppPluginRoute
|
||||
datasourceID int64
|
||||
}
|
||||
|
||||
type jwtToken struct {
|
||||
ExpiresOn time.Time `json:"-"`
|
||||
ExpiresOnString string `json:"expires_on"`
|
||||
AccessToken string `json:"access_token"`
|
||||
}
|
||||
|
||||
func newAccessTokenProvider(dsID int64, pluginRoute *plugins.AppPluginRoute) *accessTokenProvider {
|
||||
return &accessTokenProvider{
|
||||
datasourceID: dsID,
|
||||
route: pluginRoute,
|
||||
}
|
||||
}
|
||||
|
||||
func (provider *accessTokenProvider) getAccessToken(data templateData) (string, error) {
|
||||
if cachedToken, found := tokenCache[provider.getAccessTokenCacheKey()]; found {
|
||||
if cachedToken.ExpiresOn.After(time.Now().Add(time.Second * 10)) {
|
||||
logger.Info("Using token from cache")
|
||||
return cachedToken.AccessToken, nil
|
||||
}
|
||||
}
|
||||
|
||||
urlInterpolated, err := interpolateString(provider.route.TokenAuth.Url, data)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
params := make(url.Values)
|
||||
for key, value := range provider.route.TokenAuth.Params {
|
||||
interpolatedParam, err := interpolateString(value, data)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
params.Add(key, interpolatedParam)
|
||||
}
|
||||
|
||||
getTokenReq, _ := http.NewRequest("POST", urlInterpolated, bytes.NewBufferString(params.Encode()))
|
||||
getTokenReq.Header.Add("Content-Type", "application/x-www-form-urlencoded")
|
||||
getTokenReq.Header.Add("Content-Length", strconv.Itoa(len(params.Encode())))
|
||||
|
||||
resp, err := client.Do(getTokenReq)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
defer resp.Body.Close()
|
||||
|
||||
var token jwtToken
|
||||
if err := json.NewDecoder(resp.Body).Decode(&token); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
expiresOnEpoch, _ := strconv.ParseInt(token.ExpiresOnString, 10, 64)
|
||||
token.ExpiresOn = time.Unix(expiresOnEpoch, 0)
|
||||
tokenCache[provider.getAccessTokenCacheKey()] = &token
|
||||
|
||||
logger.Info("Got new access token", "ExpiresOn", token.ExpiresOn)
|
||||
|
||||
return token.AccessToken, nil
|
||||
}
|
||||
|
||||
func (provider *accessTokenProvider) getJwtAccessToken(ctx context.Context, data templateData) (string, error) {
|
||||
if cachedToken, found := oauthJwtTokenCache[provider.getAccessTokenCacheKey()]; found {
|
||||
if cachedToken.Expiry.After(time.Now().Add(time.Second * 10)) {
|
||||
logger.Info("Using token from cache")
|
||||
return cachedToken.AccessToken, nil
|
||||
}
|
||||
}
|
||||
|
||||
conf := &jwt.Config{}
|
||||
|
||||
if val, ok := provider.route.JwtTokenAuth.Params["client_email"]; ok {
|
||||
interpolatedVal, err := interpolateString(val, data)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
conf.Email = interpolatedVal
|
||||
}
|
||||
|
||||
if val, ok := provider.route.JwtTokenAuth.Params["private_key"]; ok {
|
||||
interpolatedVal, err := interpolateString(val, data)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
conf.PrivateKey = []byte(interpolatedVal)
|
||||
}
|
||||
|
||||
if val, ok := provider.route.JwtTokenAuth.Params["token_uri"]; ok {
|
||||
interpolatedVal, err := interpolateString(val, data)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
conf.TokenURL = interpolatedVal
|
||||
}
|
||||
|
||||
conf.Scopes = provider.route.JwtTokenAuth.Scopes
|
||||
|
||||
token, err := getTokenSource(conf, ctx)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
oauthJwtTokenCache[provider.getAccessTokenCacheKey()] = token
|
||||
|
||||
return token.AccessToken, nil
|
||||
}
|
||||
|
||||
var getTokenSource = func(conf *jwt.Config, ctx context.Context) (*oauth2.Token, error) {
|
||||
tokenSrc := conf.TokenSource(ctx)
|
||||
token, err := tokenSrc.Token()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
logger.Info("interpolatedVal", "token.AccessToken", token.AccessToken)
|
||||
|
||||
return token, nil
|
||||
}
|
||||
|
||||
func (provider *accessTokenProvider) getAccessTokenCacheKey() string {
|
||||
return fmt.Sprintf("%v_%v_%v", provider.datasourceID, provider.route.Path, provider.route.Method)
|
||||
}
|
91
pkg/api/pluginproxy/access_token_provider_test.go
Normal file
91
pkg/api/pluginproxy/access_token_provider_test.go
Normal file
@ -0,0 +1,91 @@
|
||||
package pluginproxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
"golang.org/x/oauth2"
|
||||
"golang.org/x/oauth2/jwt"
|
||||
)
|
||||
|
||||
func TestAccessToken(t *testing.T) {
|
||||
Convey("Plugin with JWT token auth route", t, func() {
|
||||
pluginRoute := &plugins.AppPluginRoute{
|
||||
Path: "pathwithjwttoken1",
|
||||
Url: "https://api.jwt.io/some/path",
|
||||
Method: "GET",
|
||||
JwtTokenAuth: &plugins.JwtTokenAuth{
|
||||
Url: "https://login.server.com/{{.JsonData.tenantId}}/oauth2/token",
|
||||
Scopes: []string{
|
||||
"https://www.testapi.com/auth/monitoring.read",
|
||||
"https://www.testapi.com/auth/cloudplatformprojects.readonly",
|
||||
},
|
||||
Params: map[string]string{
|
||||
"token_uri": "{{.JsonData.tokenUri}}",
|
||||
"client_email": "{{.JsonData.clientEmail}}",
|
||||
"private_key": "{{.SecureJsonData.privateKey}}",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
templateData := templateData{
|
||||
JsonData: map[string]interface{}{
|
||||
"clientEmail": "test@test.com",
|
||||
"tokenUri": "login.url.com/token",
|
||||
},
|
||||
SecureJsonData: map[string]string{
|
||||
"privateKey": "testkey",
|
||||
},
|
||||
}
|
||||
|
||||
Convey("should fetch token using jwt private key", func() {
|
||||
getTokenSource = func(conf *jwt.Config, ctx context.Context) (*oauth2.Token, error) {
|
||||
return &oauth2.Token{AccessToken: "abc"}, nil
|
||||
}
|
||||
provider := newAccessTokenProvider(1, pluginRoute)
|
||||
token, err := provider.getJwtAccessToken(context.Background(), templateData)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
So(token, ShouldEqual, "abc")
|
||||
})
|
||||
|
||||
Convey("should set jwt config values", func() {
|
||||
getTokenSource = func(conf *jwt.Config, ctx context.Context) (*oauth2.Token, error) {
|
||||
So(conf.Email, ShouldEqual, "test@test.com")
|
||||
So(conf.PrivateKey, ShouldResemble, []byte("testkey"))
|
||||
So(len(conf.Scopes), ShouldEqual, 2)
|
||||
So(conf.Scopes[0], ShouldEqual, "https://www.testapi.com/auth/monitoring.read")
|
||||
So(conf.Scopes[1], ShouldEqual, "https://www.testapi.com/auth/cloudplatformprojects.readonly")
|
||||
So(conf.TokenURL, ShouldEqual, "login.url.com/token")
|
||||
|
||||
return &oauth2.Token{AccessToken: "abc"}, nil
|
||||
}
|
||||
|
||||
provider := newAccessTokenProvider(1, pluginRoute)
|
||||
_, err := provider.getJwtAccessToken(context.Background(), templateData)
|
||||
So(err, ShouldBeNil)
|
||||
})
|
||||
|
||||
Convey("should use cached token on second call", func() {
|
||||
getTokenSource = func(conf *jwt.Config, ctx context.Context) (*oauth2.Token, error) {
|
||||
return &oauth2.Token{
|
||||
AccessToken: "abc",
|
||||
Expiry: time.Now().Add(1 * time.Minute)}, nil
|
||||
}
|
||||
provider := newAccessTokenProvider(1, pluginRoute)
|
||||
token1, err := provider.getJwtAccessToken(context.Background(), templateData)
|
||||
So(err, ShouldBeNil)
|
||||
So(token1, ShouldEqual, "abc")
|
||||
|
||||
getTokenSource = func(conf *jwt.Config, ctx context.Context) (*oauth2.Token, error) {
|
||||
return &oauth2.Token{AccessToken: "error: cache not used"}, nil
|
||||
}
|
||||
token2, err := provider.getJwtAccessToken(context.Background(), templateData)
|
||||
So(err, ShouldBeNil)
|
||||
So(token2, ShouldEqual, "abc")
|
||||
})
|
||||
})
|
||||
}
|
@ -2,8 +2,6 @@ package pluginproxy
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
@ -17,7 +15,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"golang.org/x/oauth2/jwt"
|
||||
|
||||
"github.com/grafana/grafana/pkg/log"
|
||||
m "github.com/grafana/grafana/pkg/models"
|
||||
@ -28,16 +25,9 @@ import (
|
||||
|
||||
var (
|
||||
logger = log.New("data-proxy-log")
|
||||
tokenCache = map[string]*jwtToken{}
|
||||
client = newHTTPClient()
|
||||
)
|
||||
|
||||
type jwtToken struct {
|
||||
ExpiresOn time.Time `json:"-"`
|
||||
ExpiresOnString string `json:"expires_on"`
|
||||
AccessToken string `json:"access_token"`
|
||||
}
|
||||
|
||||
type DataSourceProxy struct {
|
||||
ds *m.DataSource
|
||||
ctx *m.ReqContext
|
||||
@ -342,8 +332,10 @@ func (proxy *DataSourceProxy) applyRoute(req *http.Request) {
|
||||
logger.Error("Failed to render plugin headers", "error", err)
|
||||
}
|
||||
|
||||
tokenProvider := newAccessTokenProvider(proxy.ds.Id, proxy.route)
|
||||
|
||||
if proxy.route.TokenAuth != nil {
|
||||
if token, err := proxy.getAccessToken(data); err != nil {
|
||||
if token, err := tokenProvider.getAccessToken(data); err != nil {
|
||||
logger.Error("Failed to get access token", "error", err)
|
||||
} else {
|
||||
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", token))
|
||||
@ -351,95 +343,14 @@ func (proxy *DataSourceProxy) applyRoute(req *http.Request) {
|
||||
}
|
||||
|
||||
if proxy.route.JwtTokenAuth != nil {
|
||||
if token, err := proxy.getJwtAccessToken(data); err != nil {
|
||||
if token, err := tokenProvider.getJwtAccessToken(proxy.ctx.Req.Context(), data); err != nil {
|
||||
logger.Error("Failed to get access token", "error", err)
|
||||
} else {
|
||||
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", token))
|
||||
}
|
||||
}
|
||||
|
||||
logger.Info("Requesting", "url", req.URL.String())
|
||||
}
|
||||
|
||||
func (proxy *DataSourceProxy) getAccessToken(data templateData) (string, error) {
|
||||
if cachedToken, found := tokenCache[proxy.getAccessTokenCacheKey()]; found {
|
||||
if cachedToken.ExpiresOn.After(time.Now().Add(time.Second * 10)) {
|
||||
logger.Info("Using token from cache")
|
||||
return cachedToken.AccessToken, nil
|
||||
}
|
||||
}
|
||||
|
||||
urlInterpolated, err := interpolateString(proxy.route.TokenAuth.Url, data)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
params := make(url.Values)
|
||||
for key, value := range proxy.route.TokenAuth.Params {
|
||||
interpolatedParam, err := interpolateString(value, data)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
params.Add(key, interpolatedParam)
|
||||
}
|
||||
|
||||
getTokenReq, _ := http.NewRequest("POST", urlInterpolated, bytes.NewBufferString(params.Encode()))
|
||||
getTokenReq.Header.Add("Content-Type", "application/x-www-form-urlencoded")
|
||||
getTokenReq.Header.Add("Content-Length", strconv.Itoa(len(params.Encode())))
|
||||
|
||||
resp, err := client.Do(getTokenReq)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
defer resp.Body.Close()
|
||||
|
||||
var token jwtToken
|
||||
if err := json.NewDecoder(resp.Body).Decode(&token); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
expiresOnEpoch, _ := strconv.ParseInt(token.ExpiresOnString, 10, 64)
|
||||
token.ExpiresOn = time.Unix(expiresOnEpoch, 0)
|
||||
tokenCache[proxy.getAccessTokenCacheKey()] = &token
|
||||
|
||||
logger.Info("Got new access token", "ExpiresOn", token.ExpiresOn)
|
||||
return token.AccessToken, nil
|
||||
}
|
||||
|
||||
func (proxy *DataSourceProxy) getJwtAccessToken(data templateData) (string, error) {
|
||||
conf := new(jwt.Config)
|
||||
|
||||
if val, ok := proxy.route.JwtTokenAuth.Params["client_email"]; ok {
|
||||
interpolatedVal, err := interpolateString(val, data)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
conf.Email = interpolatedVal
|
||||
}
|
||||
|
||||
if val, ok := proxy.route.JwtTokenAuth.Params["private_key"]; ok {
|
||||
interpolatedVal, err := interpolateString(val, data)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
conf.PrivateKey = []byte(interpolatedVal)
|
||||
}
|
||||
conf.Scopes = []string{"https://www.googleapis.com/auth/monitoring.read", "https://www.googleapis.com/auth/cloudplatformprojects.readonly"}
|
||||
conf.TokenURL = "https://oauth2.googleapis.com/token"
|
||||
|
||||
ctx := context.Background()
|
||||
tokenSrc := conf.TokenSource(ctx)
|
||||
token, err := tokenSrc.Token()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
logger.Info("interpolatedVal", "token.AccessToken", token.AccessToken)
|
||||
return token.AccessToken, nil
|
||||
}
|
||||
|
||||
func (proxy *DataSourceProxy) getAccessTokenCacheKey() string {
|
||||
return fmt.Sprintf("%v_%v_%v", proxy.ds.Id, proxy.route.Path, proxy.route.Method)
|
||||
}
|
||||
|
||||
func interpolateString(text string, data templateData) (string, error) {
|
||||
|
@ -37,8 +37,11 @@ type AppPluginRouteHeader struct {
|
||||
Content string `json:"content"`
|
||||
}
|
||||
|
||||
// JwtTokenAuth struct is both for normal Token Auth and JWT Token Auth with
|
||||
// an uploaded JWT file.
|
||||
type JwtTokenAuth struct {
|
||||
Url string `json:"url"`
|
||||
Scopes []string `json:"scopes"`
|
||||
Params map[string]string `json:"params"`
|
||||
}
|
||||
|
||||
|
@ -20,6 +20,11 @@ export default class StackdriverDatasource {
|
||||
title: 'Success',
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
status: 'error',
|
||||
message: 'Returned http status code ' + response.status,
|
||||
};
|
||||
})
|
||||
.catch(error => {
|
||||
let message = 'Stackdriver: ';
|
||||
|
@ -19,6 +19,10 @@
|
||||
"method": "GET",
|
||||
"url": "https://content-monitoring.googleapis.com",
|
||||
"jwtTokenAuth": {
|
||||
"scopes": [
|
||||
"https://www.googleapis.com/auth/monitoring.read",
|
||||
"https://www.googleapis.com/auth/cloudplatformprojects.readonly"
|
||||
],
|
||||
"params": {
|
||||
"token_uri": "{{.JsonData.tokenUri}}",
|
||||
"client_email": "{{.JsonData.clientEmail}}",
|
||||
@ -31,6 +35,9 @@
|
||||
"method": "GET",
|
||||
"url": "https://cloudresourcemanager.googleapis.com",
|
||||
"jwtTokenAuth": {
|
||||
"scopes": [
|
||||
"https://www.googleapis.com/auth/cloudplatformprojects.readonly"
|
||||
],
|
||||
"params": {
|
||||
"token_uri": "{{.JsonData.tokenUri}}",
|
||||
"client_email": "{{.JsonData.clientEmail}}",
|
||||
|
Reference in New Issue
Block a user