Parse ServiceConfig JSON string (#1515)

This commit is contained in:
lyuxuan
2017-10-19 12:09:19 -07:00
committed by GitHub
parent a353537ff5
commit 6f3b6ff46b
6 changed files with 1228 additions and 213 deletions

View File

@ -191,6 +191,8 @@ func WithBalancerBuilder(b balancer.Builder) DialOption {
} }
// WithServiceConfig returns a DialOption which has a channel to read the service configuration. // WithServiceConfig returns a DialOption which has a channel to read the service configuration.
// DEPRECATED: service config should be received through name resolver, as specified here.
// https://github.com/grpc/grpc/blob/master/doc/service_config.md
func WithServiceConfig(c <-chan ServiceConfig) DialOption { func WithServiceConfig(c <-chan ServiceConfig) DialOption {
return func(o *dialOptions) { return func(o *dialOptions) {
o.scChan = c o.scChan = c
@ -550,6 +552,7 @@ type ClientConn struct {
mu sync.RWMutex mu sync.RWMutex
sc ServiceConfig sc ServiceConfig
scRaw string
conns map[*addrConn]struct{} conns map[*addrConn]struct{}
// Keepalive parameter can be updated if a GoAway is received. // Keepalive parameter can be updated if a GoAway is received.
mkp keepalive.ClientParameters mkp keepalive.ClientParameters
@ -591,6 +594,7 @@ func (cc *ClientConn) scWatcher() {
// TODO: load balance policy runtime change is ignored. // TODO: load balance policy runtime change is ignored.
// We may revist this decision in the future. // We may revist this decision in the future.
cc.sc = sc cc.sc = sc
cc.scRaw = ""
cc.mu.Unlock() cc.mu.Unlock()
case <-cc.ctx.Done(): case <-cc.ctx.Done():
return return
@ -800,6 +804,20 @@ func (cc *ClientConn) getTransport(ctx context.Context, failfast bool) (transpor
return t, done, nil return t, done, nil
} }
// handleServiceConfig parses the service config string in JSON format to Go native
// struct ServiceConfig, and store both the struct and the JSON string in ClientConn.
func (cc *ClientConn) handleServiceConfig(js string) error {
sc, err := parseServiceConfig(js)
if err != nil {
return err
}
cc.mu.Lock()
cc.scRaw = js
cc.sc = sc
cc.mu.Unlock()
return nil
}
// Close tears down the ClientConn and all underlying connections. // Close tears down the ClientConn and all underlying connections.
func (cc *ClientConn) Close() error { func (cc *ClientConn) Close() error {
cc.cancel() cc.cancel()

View File

@ -109,6 +109,7 @@ func (ccr *ccResolverWrapper) watcher() {
default: default:
} }
grpclog.Infof("ccResolverWrapper: got new service config: %v", sc) grpclog.Infof("ccResolverWrapper: got new service config: %v", sc)
ccr.cc.handleServiceConfig(sc)
case <-ccr.done: case <-ccr.done:
return return
} }

View File

@ -492,65 +492,6 @@ func Errorf(c codes.Code, format string, a ...interface{}) error {
return status.Errorf(c, format, a...) return status.Errorf(c, format, a...)
} }
// MethodConfig defines the configuration recommended by the service providers for a
// particular method.
// This is EXPERIMENTAL and subject to change.
type MethodConfig struct {
// WaitForReady indicates whether RPCs sent to this method should wait until
// the connection is ready by default (!failfast). The value specified via the
// gRPC client API will override the value set here.
WaitForReady *bool
// Timeout is the default timeout for RPCs sent to this method. The actual
// deadline used will be the minimum of the value specified here and the value
// set by the application via the gRPC client API. If either one is not set,
// then the other will be used. If neither is set, then the RPC has no deadline.
Timeout *time.Duration
// MaxReqSize is the maximum allowed payload size for an individual request in a
// stream (client->server) in bytes. The size which is measured is the serialized
// payload after per-message compression (but before stream compression) in bytes.
// The actual value used is the minimum of the value specified here and the value set
// by the application via the gRPC client API. If either one is not set, then the other
// will be used. If neither is set, then the built-in default is used.
MaxReqSize *int
// MaxRespSize is the maximum allowed payload size for an individual response in a
// stream (server->client) in bytes.
MaxRespSize *int
}
// ServiceConfig is provided by the service provider and contains parameters for how
// clients that connect to the service should behave.
// This is EXPERIMENTAL and subject to change.
type ServiceConfig struct {
// LB is the load balancer the service providers recommends. The balancer specified
// via grpc.WithBalancer will override this.
LB Balancer
// Methods contains a map for the methods in this service.
// If there is an exact match for a method (i.e. /service/method) in the map, use the corresponding MethodConfig.
// If there's no exact match, look for the default config for the service (/service/) and use the corresponding MethodConfig if it exists.
// Otherwise, the method has no MethodConfig to use.
Methods map[string]MethodConfig
}
func min(a, b *int) *int {
if *a < *b {
return a
}
return b
}
func getMaxSize(mcMax, doptMax *int, defaultVal int) *int {
if mcMax == nil && doptMax == nil {
return &defaultVal
}
if mcMax != nil && doptMax != nil {
return min(mcMax, doptMax)
}
if mcMax != nil {
return mcMax
}
return doptMax
}
// SupportPackageIsVersion3 is referenced from generated protocol buffer files. // SupportPackageIsVersion3 is referenced from generated protocol buffer files.
// The latest support package version is 4. // The latest support package version is 4.
// SupportPackageIsVersion3 is kept for compatibility. It will be removed in the // SupportPackageIsVersion3 is kept for compatibility. It will be removed in the

185
service_config.go Normal file
View File

@ -0,0 +1,185 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpc
import (
"encoding/json"
"time"
"google.golang.org/grpc/grpclog"
)
// MethodConfig defines the configuration recommended by the service providers for a
// particular method.
// DEPRECATED: Users should not use this struct. Service config should be received
// through name resolver, as specified here
// https://github.com/grpc/grpc/blob/master/doc/service_config.md
type MethodConfig struct {
// WaitForReady indicates whether RPCs sent to this method should wait until
// the connection is ready by default (!failfast). The value specified via the
// gRPC client API will override the value set here.
WaitForReady *bool
// Timeout is the default timeout for RPCs sent to this method. The actual
// deadline used will be the minimum of the value specified here and the value
// set by the application via the gRPC client API. If either one is not set,
// then the other will be used. If neither is set, then the RPC has no deadline.
Timeout *time.Duration
// MaxReqSize is the maximum allowed payload size for an individual request in a
// stream (client->server) in bytes. The size which is measured is the serialized
// payload after per-message compression (but before stream compression) in bytes.
// The actual value used is the minimum of the value specified here and the value set
// by the application via the gRPC client API. If either one is not set, then the other
// will be used. If neither is set, then the built-in default is used.
MaxReqSize *int
// MaxRespSize is the maximum allowed payload size for an individual response in a
// stream (server->client) in bytes.
MaxRespSize *int
}
// ServiceConfig is provided by the service provider and contains parameters for how
// clients that connect to the service should behave.
// DEPRECATED: Users should not use this struct. Service config should be received
// through name resolver, as specified here
// https://github.com/grpc/grpc/blob/master/doc/service_config.md
type ServiceConfig struct {
// LB is the load balancer the service providers recommends. The balancer specified
// via grpc.WithBalancer will override this.
LB *string
// Methods contains a map for the methods in this service.
// If there is an exact match for a method (i.e. /service/method) in the map, use the corresponding MethodConfig.
// If there's no exact match, look for the default config for the service (/service/) and use the corresponding MethodConfig if it exists.
// Otherwise, the method has no MethodConfig to use.
Methods map[string]MethodConfig
}
func parseTimeout(t *string) (*time.Duration, error) {
if t == nil {
return nil, nil
}
d, err := time.ParseDuration(*t)
return &d, err
}
type jsonName struct {
Service *string
Method *string
}
func (j jsonName) generatePath() (string, bool) {
if j.Service == nil {
return "", false
}
res := "/" + *j.Service + "/"
if j.Method != nil {
res += *j.Method
}
return res, true
}
// TODO(lyuxuan): delete this struct after cleaning up old service config implementation.
type jsonMC struct {
Name *[]jsonName
WaitForReady *bool
Timeout *string
MaxRequestMessageBytes *int
MaxResponseMessageBytes *int
}
// TODO(lyuxuan): delete this struct after cleaning up old service config implementation.
type jsonSC struct {
LoadBalancingPolicy *string
MethodConfig *[]jsonMC
}
func parseServiceConfig(js string) (ServiceConfig, error) {
var rsc jsonSC
err := json.Unmarshal([]byte(js), &rsc)
if err != nil {
grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err)
return ServiceConfig{}, err
}
sc := ServiceConfig{
LB: rsc.LoadBalancingPolicy,
Methods: make(map[string]MethodConfig),
}
if rsc.MethodConfig == nil {
return sc, nil
}
for _, m := range *rsc.MethodConfig {
if m.Name == nil {
continue
}
d, err := parseTimeout(m.Timeout)
if err != nil {
grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err)
return ServiceConfig{}, err
}
mc := MethodConfig{
WaitForReady: m.WaitForReady,
Timeout: d,
MaxReqSize: m.MaxRequestMessageBytes,
MaxRespSize: m.MaxResponseMessageBytes,
}
for _, n := range *m.Name {
if path, valid := n.generatePath(); valid {
sc.Methods[path] = mc
}
}
}
return sc, nil
}
func min(a, b *int) *int {
if *a < *b {
return a
}
return b
}
func getMaxSize(mcMax, doptMax *int, defaultVal int) *int {
if mcMax == nil && doptMax == nil {
return &defaultVal
}
if mcMax != nil && doptMax != nil {
return min(mcMax, doptMax)
}
if mcMax != nil {
return mcMax
}
return doptMax
}
func newBool(b bool) *bool {
return &b
}
func newInt(b int) *int {
return &b
}
func newDuration(b time.Duration) *time.Duration {
return &b
}
func newString(b string) *string {
return &b
}

323
service_config_test.go Normal file
View File

@ -0,0 +1,323 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpc
import (
"reflect"
"testing"
"time"
)
func TestParseLoadBalancer(t *testing.T) {
testcases := []struct {
scjs string
wantSC ServiceConfig
wantErr bool
}{
{
`{
"loadBalancingPolicy": "round_robin",
"methodConfig": [
{
"name": [
{
"service": "foo",
"method": "Bar"
}
],
"waitForReady": true
}
]
}`,
ServiceConfig{
LB: newString("round_robin"),
Methods: map[string]MethodConfig{
"/foo/Bar": {
WaitForReady: newBool(true),
},
},
},
false,
},
{
`{
"loadBalancingPolicy": 1,
"methodConfig": [
{
"name": [
{
"service": "foo",
"method": "Bar"
}
],
"waitForReady": false
}
]
}`,
ServiceConfig{},
true,
},
}
for _, c := range testcases {
sc, err := parseServiceConfig(c.scjs)
if c.wantErr != (err != nil) || !reflect.DeepEqual(sc, c.wantSC) {
t.Fatalf("parseServiceConfig(%s) = %+v, %v, want %+v, %v", c.scjs, sc, err, c.wantSC, c.wantErr)
}
}
}
func TestPraseWaitForReady(t *testing.T) {
testcases := []struct {
scjs string
wantSC ServiceConfig
wantErr bool
}{
{
`{
"methodConfig": [
{
"name": [
{
"service": "foo",
"method": "Bar"
}
],
"waitForReady": true
}
]
}`,
ServiceConfig{
Methods: map[string]MethodConfig{
"/foo/Bar": {
WaitForReady: newBool(true),
},
},
},
false,
},
{
`{
"methodConfig": [
{
"name": [
{
"service": "foo",
"method": "Bar"
}
],
"waitForReady": false
}
]
}`,
ServiceConfig{
Methods: map[string]MethodConfig{
"/foo/Bar": {
WaitForReady: newBool(false),
},
},
},
false,
},
{
`{
"methodConfig": [
{
"name": [
{
"service": "foo",
"method": "Bar"
}
],
"waitForReady": fall
},
{
"name": [
{
"service": "foo",
"method": "Bar"
}
],
"waitForReady": true
}
]
}`,
ServiceConfig{},
true,
},
}
for _, c := range testcases {
sc, err := parseServiceConfig(c.scjs)
if c.wantErr != (err != nil) || !reflect.DeepEqual(sc, c.wantSC) {
t.Fatalf("parseServiceConfig(%s) = %+v, %v, want %+v, %v", c.scjs, sc, err, c.wantSC, c.wantErr)
}
}
}
func TestPraseTimeOut(t *testing.T) {
testcases := []struct {
scjs string
wantSC ServiceConfig
wantErr bool
}{
{
`{
"methodConfig": [
{
"name": [
{
"service": "foo",
"method": "Bar"
}
],
"timeout": "1s"
}
]
}`,
ServiceConfig{
Methods: map[string]MethodConfig{
"/foo/Bar": {
Timeout: newDuration(time.Second),
},
},
},
false,
},
{
`{
"methodConfig": [
{
"name": [
{
"service": "foo",
"method": "Bar"
}
],
"timeout": "3c"
}
]
}`,
ServiceConfig{},
true,
},
{
`{
"methodConfig": [
{
"name": [
{
"service": "foo",
"method": "Bar"
}
],
"timeout": "3c"
},
{
"name": [
{
"service": "foo",
"method": "Bar"
}
],
"timeout": "1s"
}
]
}`,
ServiceConfig{},
true,
},
}
for _, c := range testcases {
sc, err := parseServiceConfig(c.scjs)
if c.wantErr != (err != nil) || !reflect.DeepEqual(sc, c.wantSC) {
t.Fatalf("parseServiceConfig(%s) = %+v, %v, want %+v, %v", c.scjs, sc, err, c.wantSC, c.wantErr)
}
}
}
func TestPraseMsgSize(t *testing.T) {
testcases := []struct {
scjs string
wantSC ServiceConfig
wantErr bool
}{
{
`{
"methodConfig": [
{
"name": [
{
"service": "foo",
"method": "Bar"
}
],
"maxRequestMessageBytes": 1024,
"maxResponseMessageBytes": 2048
}
]
}`,
ServiceConfig{
Methods: map[string]MethodConfig{
"/foo/Bar": {
MaxReqSize: newInt(1024),
MaxRespSize: newInt(2048),
},
},
},
false,
},
{
`{
"methodConfig": [
{
"name": [
{
"service": "foo",
"method": "Bar"
}
],
"maxRequestMessageBytes": "1024",
"maxResponseMessageBytes": "2048"
},
{
"name": [
{
"service": "foo",
"method": "Bar"
}
],
"maxRequestMessageBytes": 1024,
"maxResponseMessageBytes": 2048
}
]
}`,
ServiceConfig{},
true,
},
}
for _, c := range testcases {
sc, err := parseServiceConfig(c.scjs)
if c.wantErr != (err != nil) || !reflect.DeepEqual(sc, c.wantSC) {
t.Fatalf("parseServiceConfig(%s) = %+v, %v, want %+v, %v", c.scjs, sc, err, c.wantSC, c.wantErr)
}
}
}

File diff suppressed because it is too large Load Diff