Add new Resolver and Balancer APIs (gRFC L9) (#1408)

- Add package balancer and resolver.
 - Change ClientConn internals to new APIs and adds a wrapper for v1 balancer.
This commit is contained in:
Menghan Li
2017-08-31 10:59:09 -07:00
committed by GitHub
parent 1ea63c9e71
commit 8233e124e4
13 changed files with 1063 additions and 354 deletions

201
balancer/balancer.go Normal file
View File

@ -0,0 +1,201 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package balancer defines APIs for load balancing in gRPC.
// All APIs in this package are experimental.
package balancer
import (
"errors"
"net"
"golang.org/x/net/context"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/resolver"
)
var (
// m is a map from name to balancer builder.
m = make(map[string]Builder)
// defaultBuilder is the default balancer to use.
defaultBuilder Builder // TODO(bar) install pickfirst as default.
)
// Register registers the balancer builder to the balancer map.
// b.Name will be used as the name registered with this builder.
func Register(b Builder) {
m[b.Name()] = b
}
// Get returns the resolver builder registered with the given name.
// If no builder is register with the name, the default pickfirst will
// be used.
func Get(name string) Builder {
if b, ok := m[name]; ok {
return b
}
return defaultBuilder
}
// SubConn represents a gRPC sub connection.
// Each sub connection contains a list of addresses. gRPC will
// try to connect to them (in sequence), and stop trying the
// remainder once one connection is successful.
//
// The reconnect backoff will be applied on the list, not a single address.
// For example, try_on_all_addresses -> backoff -> try_on_all_addresses.
//
// All SubConns start in IDLE, and will not try to connect. To trigger
// the connecting, Balancers must call Connect.
// When the connection encounters an error, it will reconnect immediately.
// When the connection becomes IDLE, it will not reconnect unless Connect is
// called.
type SubConn interface {
// UpdateAddresses updates the addresses used in this SubConn.
// gRPC checks if currently-connected address is still in the new list.
// If it's in the list, the connection will be kept.
// If it's not in the list, the connection will gracefully closed, and
// a new connection will be created.
//
// This will trigger a state transition for the SubConn.
UpdateAddresses([]resolver.Address)
// Connect starts the connecting for this SubConn.
Connect()
}
// NewSubConnOptions contains options to create new SubConn.
type NewSubConnOptions struct{}
// ClientConn represents a gRPC ClientConn.
type ClientConn interface {
// NewSubConn is called by balancer to create a new SubConn.
// It doesn't block and wait for the connections to be established.
// Behaviors of the SubConn can be controlled by options.
NewSubConn([]resolver.Address, NewSubConnOptions) (SubConn, error)
// RemoveSubConn removes the SubConn from ClientConn.
// The SubConn will be shutdown.
RemoveSubConn(SubConn)
// UpdateBalancerState is called by balancer to nofity gRPC that some internal
// state in balancer has changed.
//
// gRPC will update the connectivity state of the ClientConn, and will call pick
// on the new picker to pick new SubConn.
UpdateBalancerState(s connectivity.State, p Picker)
// Target returns the dial target for this ClientConn.
Target() string
}
// BuildOptions contains additional information for Build.
type BuildOptions struct {
// DialCreds is the transport credential the Balancer implementation can
// use to dial to a remote load balancer server. The Balancer implementations
// can ignore this if it does not need to talk to another party securely.
DialCreds credentials.TransportCredentials
// Dialer is the custom dialer the Balancer implementation can use to dial
// to a remote load balancer server. The Balancer implementations
// can ignore this if it doesn't need to talk to remote balancer.
Dialer func(context.Context, string) (net.Conn, error)
}
// Builder creates a balancer.
type Builder interface {
// Build creates a new balancer with the ClientConn.
Build(cc ClientConn, opts BuildOptions) Balancer
// Name returns the name of balancers built by this builder.
// It will be used to pick balancers (for example in service config).
Name() string
}
// PickOptions contains addition information for the Pick operation.
type PickOptions struct{}
// DoneInfo contains additional information for done.
type DoneInfo struct {
// Err is the rpc error the RPC finished with. It could be nil.
Err error
}
var (
// ErrNoSubConnAvailable indicates no SubConn is available for pick().
// gRPC will block the RPC until a new picker is available via UpdateBalancerState().
ErrNoSubConnAvailable = errors.New("no SubConn is available")
// ErrTransientFailure indicates all SubConns are in TransientFailure.
// WaitForReady RPCs will block, non-WaitForReady RPCs will fail.
ErrTransientFailure = errors.New("all SubConns are in TransientFailure")
)
// Picker is used by gRPC to pick a SubConn to send an RPC.
// Balancer is expected to generate a new picker from its snapshot everytime its
// internal state has changed.
//
// The pickers used by gRPC can be updated by ClientConn.UpdateBalancerState().
type Picker interface {
// Pick returns the SubConn to be used to send the RPC.
// The returned SubConn must be one returned by NewSubConn().
//
// This functions is expected to return:
// - a SubConn that is known to be READY;
// - ErrNoSubConnAvailable if no SubConn is available, but progress is being
// made (for example, some SubConn is in CONNECTING mode);
// - other errors if no active connecting is happening (for example, all SubConn
// are in TRANSIENT_FAILURE mode).
//
// If a SubConn is returned:
// - If it is READY, gRPC will send the RPC on it;
// - If it is not ready, or becomes not ready after it's returned, gRPC will block
// this call until a new picker is updated and will call pick on the new picker.
//
// If the returned error is not nil:
// - If the error is ErrNoSubConnAvailable, gRPC will block until UpdateBalancerState()
// - If the error is ErrTransientFailure:
// - If the RPC is wait-for-ready, gRPC will block until UpdateBalancerState()
// is called to pick again;
// - Otherwise, RPC will fail with unavailable error.
// - Else (error is other non-nil error):
// - The RPC will fail with unavailable error.
//
// The returned done() function will be called once the rpc has finished, with the
// final status of that RPC.
// done may be nil if balancer doesn't care about the RPC status.
Pick(ctx context.Context, opts PickOptions) (conn SubConn, done func(DoneInfo), err error)
}
// Balancer takes input from gRPC, manages SubConns, and collects and aggregates
// the connectivity states.
//
// It also generates and updates the Picker used by gRPC to pick SubConns for RPCs.
type Balancer interface {
// HandleSubConnStateChange is called by gRPC when the connectivity state
// of sc has changed.
// Balancer is expected to aggregate all the state of SubConn and report
// that back to gRPC.
// Balancer should also generate and update Pickers when its internal state has
// been changed by the new state.
HandleSubConnStateChange(sc SubConn, state connectivity.State)
// HandleResolvedAddrs is called by gRPC to send updated resolved addresses to
// balancers.
// Balancer can create new SubConn or remove SubConn with the addresses.
// An empty address slice and a non-nil error will be passed if the resolver returns
// non-nil error to gRPC.
HandleResolvedAddrs([]resolver.Address, error)
// Close closes the balancer.
Close()
}

123
balancer_conn_wrappers.go Normal file
View File

@ -0,0 +1,123 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpc
import (
"sync"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver"
)
// TODO(bar) move ClientConn methods to clientConn file.
func (cc *ClientConn) updatePicker(p balancer.Picker) {
// TODO(bar) add a goroutine and sync it.
// TODO(bar) implement blocking behavior and unblock the previous pick.
cc.pmu.Lock()
cc.picker = p
cc.pmu.Unlock()
}
// ccBalancerWrapper is a wrapper on top of cc for balancers.
// It implements balancer.ClientConn interface.
type ccBalancerWrapper struct {
cc *ClientConn
}
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
grpclog.Infof("ccBalancerWrapper: new subconn: %v", addrs)
ac, err := ccb.cc.newAddrConn(addrs)
if err != nil {
return nil, err
}
acbw := &acBalancerWrapper{ac: ac}
ac.acbw = acbw
return acbw, nil
}
func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
grpclog.Infof("ccBalancerWrapper: removing subconn")
acbw, ok := sc.(*acBalancerWrapper)
if !ok {
return
}
ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
}
func (ccb *ccBalancerWrapper) UpdateBalancerState(s connectivity.State, p balancer.Picker) {
// TODO(bar) update cc connectivity state.
ccb.cc.updatePicker(p)
}
func (ccb *ccBalancerWrapper) Target() string {
return ccb.cc.target
}
// acBalancerWrapper is a wrapper on top of ac for balancers.
// It implements balancer.SubConn interface.
type acBalancerWrapper struct {
mu sync.Mutex
ac *addrConn
}
func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
grpclog.Infof("acBalancerWrapper: UpdateAddresses called with %v", addrs)
acbw.mu.Lock()
defer acbw.mu.Unlock()
// TODO(bar) update the addresses or tearDown and create a new ac.
if !acbw.ac.tryUpdateAddrs(addrs) {
cc := acbw.ac.cc
acbw.ac.mu.Lock()
// Set old ac.acbw to nil so the states update will be ignored by balancer.
acbw.ac.acbw = nil
acbw.ac.mu.Unlock()
acState := acbw.ac.getState()
acbw.ac.tearDown(errConnDrain)
if acState == connectivity.Shutdown {
return
}
ac, err := cc.newAddrConn(addrs)
if err != nil {
grpclog.Warningf("acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err)
return
}
acbw.ac = ac
ac.acbw = acbw
if acState != connectivity.Idle {
ac.connect(false)
}
}
}
func (acbw *acBalancerWrapper) Connect() {
acbw.mu.Lock()
defer acbw.mu.Unlock()
acbw.ac.connect(false)
}
func (acbw *acBalancerWrapper) getAddrConn() *addrConn {
acbw.mu.Lock()
defer acbw.mu.Unlock()
return acbw.ac
}

303
balancer_v1_wrapper.go Normal file
View File

@ -0,0 +1,303 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpc
import (
"sync"
"golang.org/x/net/context"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver"
)
type balancerWrapperBuilder struct {
b Balancer // The v1 balancer.
}
func (bwb *balancerWrapperBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
bwb.b.Start(cc.Target(), BalancerConfig{
DialCreds: opts.DialCreds,
Dialer: opts.Dialer,
})
_, pickfirst := bwb.b.(*pickFirst)
bw := &balancerWrapper{
balancer: bwb.b,
pickfirst: pickfirst,
cc: cc,
startCh: make(chan struct{}),
conns: make(map[resolver.Address]balancer.SubConn),
connSt: make(map[balancer.SubConn]*scState),
}
cc.UpdateBalancerState(connectivity.Idle, bw)
go bw.lbWatcher()
return bw
}
func (bwb *balancerWrapperBuilder) Name() string {
return "wrapper"
}
type scState struct {
addr Address // The v1 address type.
s connectivity.State
down func(error)
}
type balancerWrapper struct {
balancer Balancer // The v1 balancer.
pickfirst bool
cc balancer.ClientConn
mu sync.Mutex
conns map[resolver.Address]balancer.SubConn
connSt map[balancer.SubConn]*scState
// This channel is closed when handling the first resolver result.
// lbWatcher blocks until this is closed, to avoid race between
// - NewSubConn is created, cc wants to notify balancer of state changes;
// - Build hasn't return, cc doesn't have access to balancer.
startCh chan struct{}
}
// lbWatcher watches the Notify channel of the balancer and manages
// connections accordingly.
func (bw *balancerWrapper) lbWatcher() {
<-bw.startCh
grpclog.Infof("balancerWrapper: is pickfirst: %v\n", bw.pickfirst)
notifyCh := bw.balancer.Notify()
if notifyCh == nil {
// There's no resolver in the balancer. Connect directly.
a := resolver.Address{
Addr: bw.cc.Target(),
Type: resolver.Backend,
}
sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
if err != nil {
grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
} else {
bw.mu.Lock()
bw.conns[a] = sc
bw.connSt[sc] = &scState{
addr: Address{Addr: bw.cc.Target()},
s: connectivity.Idle,
}
bw.mu.Unlock()
sc.Connect()
}
return
}
for addrs := range notifyCh {
grpclog.Infof("balancerWrapper: got update addr from Notify: %v\n", addrs)
if bw.pickfirst {
var (
oldA resolver.Address
oldSC balancer.SubConn
)
bw.mu.Lock()
for oldA, oldSC = range bw.conns {
break
}
bw.mu.Unlock()
if len(addrs) <= 0 {
if oldSC != nil {
// Teardown old sc.
bw.mu.Lock()
delete(bw.conns, oldA)
delete(bw.connSt, oldSC)
bw.mu.Unlock()
bw.cc.RemoveSubConn(oldSC)
}
continue
}
var newAddrs []resolver.Address
for _, a := range addrs {
newAddr := resolver.Address{
Addr: a.Addr,
Type: resolver.Backend, // All addresses from balancer are all backends.
ServerName: "", // TODO(bar) support servername.
Metadata: a.Metadata,
}
newAddrs = append(newAddrs, newAddr)
}
if oldSC == nil {
// Create new sc.
sc, err := bw.cc.NewSubConn(newAddrs, balancer.NewSubConnOptions{})
if err != nil {
grpclog.Warningf("Error creating connection to %v. Err: %v", newAddrs, err)
} else {
bw.mu.Lock()
// For pickfirst, there should be only one SubConn, so the
// address doesn't matter. All states updating (up and down)
// and picking should all happen on that only SubConn.
bw.conns[resolver.Address{}] = sc
bw.connSt[sc] = &scState{
addr: addrs[0], // Use the first address.
s: connectivity.Idle,
}
bw.mu.Unlock()
sc.Connect()
}
} else {
oldSC.UpdateAddresses(newAddrs)
bw.mu.Lock()
bw.connSt[oldSC].addr = addrs[0]
bw.mu.Unlock()
}
} else {
var (
add []resolver.Address // Addresses need to setup connections.
del []balancer.SubConn // Connections need to tear down.
)
resAddrs := make(map[resolver.Address]Address)
for _, a := range addrs {
resAddrs[resolver.Address{
Addr: a.Addr,
Type: resolver.Backend, // All addresses from balancer are all backends.
ServerName: "", // TODO(bar) support servername.
Metadata: a.Metadata,
}] = a
}
bw.mu.Lock()
for a := range resAddrs {
if _, ok := bw.conns[a]; !ok {
add = append(add, a)
}
}
for a, c := range bw.conns {
if _, ok := resAddrs[a]; !ok {
del = append(del, c)
delete(bw.conns, a)
delete(bw.connSt, c)
}
}
bw.mu.Unlock()
for _, a := range add {
sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
if err != nil {
grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
} else {
bw.mu.Lock()
bw.conns[a] = sc
bw.connSt[sc] = &scState{
addr: resAddrs[a],
s: connectivity.Idle,
}
bw.mu.Unlock()
sc.Connect()
}
}
for _, c := range del {
bw.cc.RemoveSubConn(c)
}
}
}
}
func (bw *balancerWrapper) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
grpclog.Infof("balancerWrapper: handle subconn state change: %p, %v", sc, s)
bw.mu.Lock()
defer bw.mu.Unlock()
scSt, ok := bw.connSt[sc]
if !ok {
return
}
if s == connectivity.Idle {
sc.Connect()
}
oldS := scSt.s
scSt.s = s
grpclog.Infof("balancerWrapper: handle state change addr: %v, old state: %v, new state: %v", sc.(*acBalancerWrapper).getAddrConn().addrs, oldS, s)
if oldS != connectivity.Ready && s == connectivity.Ready {
scSt.down = bw.balancer.Up(scSt.addr)
} else if oldS == connectivity.Ready && s != connectivity.Ready {
if scSt.down != nil {
scSt.down(errConnClosing) // TODO(bar) what error to use?
}
}
// The connectivity state is ignored by clientConn now.
// TODO(bar) use the aggregated connectivity state.
bw.cc.UpdateBalancerState(connectivity.Ready, bw)
return
}
func (bw *balancerWrapper) HandleResolvedAddrs([]resolver.Address, error) {
bw.mu.Lock()
defer bw.mu.Unlock()
select {
case <-bw.startCh:
default:
close(bw.startCh)
}
// There should be a resolver inside the balancer.
// All updates here, if any, are ignored.
return
}
func (bw *balancerWrapper) Close() {
bw.mu.Lock()
defer bw.mu.Unlock()
select {
case <-bw.startCh:
default:
close(bw.startCh)
}
bw.balancer.Close()
return
}
// The picker is the balancerWrapper itself.
// Pick should never return ErrNoSubConnAvailable.
// It either blocks or returns error, consistent with v1 balancer Get().
func (bw *balancerWrapper) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
failfast := true // Default failfast is true.
if ss, ok := rpcInfoFromContext(ctx); ok {
failfast = ss.failfast
}
a, p, err := bw.balancer.Get(ctx, BalancerGetOptions{BlockingWait: !failfast})
if err != nil {
return nil, nil, err
}
var put func(balancer.DoneInfo)
if p != nil {
put = func(i balancer.DoneInfo) { p() }
}
var sc balancer.SubConn
if bw.pickfirst {
bw.mu.Lock()
// Get the first sc in conns.
for _, sc = range bw.conns {
break
}
bw.mu.Unlock()
} else {
bw.mu.Lock()
sc = bw.conns[resolver.Address{
Addr: a.Addr,
Type: resolver.Backend,
ServerName: "", // TODO(bar) support servername.
Metadata: a.Metadata,
}]
bw.mu.Unlock()
}
return sc, put, nil
}

23
call.go
View File

@ -25,6 +25,7 @@ import (
"golang.org/x/net/context"
"golang.org/x/net/trace"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
@ -135,7 +136,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
}
func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) {
c := defaultCallInfo
c := defaultCallInfo()
mc := cc.GetMethodConfig(method)
if mc.WaitForReady != nil {
c.failFast = !*mc.WaitForReady
@ -149,13 +150,13 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
opts = append(cc.dopts.callOptions, opts...)
for _, o := range opts {
if err := o.before(&c); err != nil {
if err := o.before(c); err != nil {
return toRPCErr(err)
}
}
defer func() {
for _, o := range opts {
o.after(&c)
o.after(c)
}
}()
@ -178,7 +179,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
}
}()
}
ctx = newContextWithRPCInfo(ctx)
ctx = newContextWithRPCInfo(ctx, c.failFast)
sh := cc.dopts.copts.StatsHandler
if sh != nil {
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
@ -208,7 +209,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
stream *transport.Stream
// Record the put handler from Balancer.Get(...). It is called once the
// RPC has completed or failed.
put func()
put func(balancer.DoneInfo)
)
// TODO(zhaoq): Need a formal spec of fail-fast.
callHdr := &transport.CallHdr{
@ -252,7 +253,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
// If error is not connection error, we are sure nothing has been sent.
updateRPCInfoInContext(ctx, rpcInfo{bytesSent: true, bytesReceived: false})
}
put()
put(balancer.DoneInfo{Err: err})
}
if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast {
continue
@ -262,14 +263,14 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
if peer, ok := peer.FromContext(stream.Context()); ok {
c.peer = peer
}
err = sendRequest(ctx, cc.dopts, cc.dopts.cp, &c, callHdr, stream, t, args, topts)
err = sendRequest(ctx, cc.dopts, cc.dopts.cp, c, callHdr, stream, t, args, topts)
if err != nil {
if put != nil {
updateRPCInfoInContext(ctx, rpcInfo{
bytesSent: stream.BytesSent(),
bytesReceived: stream.BytesReceived(),
})
put()
put(balancer.DoneInfo{Err: err})
}
// Retry a non-failfast RPC when
// i) there is a connection error; or
@ -279,14 +280,14 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
}
return toRPCErr(err)
}
err = recvResponse(ctx, cc.dopts, t, &c, stream, reply)
err = recvResponse(ctx, cc.dopts, t, c, stream, reply)
if err != nil {
if put != nil {
updateRPCInfoInContext(ctx, rpcInfo{
bytesSent: stream.BytesSent(),
bytesReceived: stream.BytesReceived(),
})
put()
put(balancer.DoneInfo{Err: err})
}
if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast {
continue
@ -302,7 +303,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
bytesSent: stream.BytesSent(),
bytesReceived: stream.BytesReceived(),
})
put()
put(balancer.DoneInfo{Err: err})
}
return stream.Status().Err()
}

View File

@ -22,16 +22,19 @@ import (
"errors"
"math"
"net"
"reflect"
"strings"
"sync"
"time"
"golang.org/x/net/context"
"golang.org/x/net/trace"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/transport"
)
@ -79,13 +82,14 @@ type dialOptions struct {
cp Compressor
dc Decompressor
bs backoffStrategy
balancer Balancer
block bool
insecure bool
timeout time.Duration
scChan <-chan ServiceConfig
copts transport.ConnectOptions
callOptions []CallOption
// This is to support v1 balancer.
balancerBuilder balancer.Builder
}
const (
@ -147,10 +151,14 @@ func WithDecompressor(dc Decompressor) DialOption {
}
}
// WithBalancer returns a DialOption which sets a load balancer.
// WithBalancer returns a DialOption which sets a load balancer with the v1 API.
// Name resolver will be ignored if this DialOption is specified.
// Deprecated: use the new balancer APIs in balancer package instead.
func WithBalancer(b Balancer) DialOption {
return func(o *dialOptions) {
o.balancer = b
o.balancerBuilder = &balancerWrapperBuilder{
b: b,
}
}
}
@ -314,7 +322,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
cc := &ClientConn{
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[Address]*addrConn),
conns: make(map[*addrConn]struct{}),
}
cc.csEvltr = &connectivityStateEvaluator{csMgr: cc.csMgr}
cc.ctx, cc.cancel = context.WithCancel(context.Background())
@ -383,49 +391,42 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
} else {
cc.authority = target
}
waitC := make(chan error, 1)
go func() {
defer close(waitC)
if cc.dopts.balancer == nil && cc.sc.LB != nil {
cc.dopts.balancer = cc.sc.LB
// TODO(bar) parse scheme and start resolver.
if cc.dopts.balancerBuilder != nil {
var credsClone credentials.TransportCredentials
if creds != nil {
credsClone = creds.Clone()
}
if cc.dopts.balancer != nil {
var credsClone credentials.TransportCredentials
if creds != nil {
credsClone = creds.Clone()
}
config := BalancerConfig{
DialCreds: credsClone,
Dialer: cc.dopts.copts.Dialer,
}
if err := cc.dopts.balancer.Start(target, config); err != nil {
buildOpts := balancer.BuildOptions{
DialCreds: credsClone,
Dialer: cc.dopts.copts.Dialer,
}
// Build should not take long time. So it's ok to not have a goroutine for it.
// TODO(bar) init balancer after first resolver result to support service config balancer.
cc.balancer = cc.dopts.balancerBuilder.Build(&ccBalancerWrapper{cc: cc}, buildOpts)
} else {
waitC := make(chan error, 1)
go func() {
defer close(waitC)
// No balancer, or no resolver within the balancer. Connect directly.
ac, err := cc.newAddrConn([]resolver.Address{{Addr: target}})
if err != nil {
waitC <- err
return
}
ch := cc.dopts.balancer.Notify()
if ch != nil {
if cc.dopts.block {
doneChan := make(chan struct{})
go cc.lbWatcher(doneChan)
<-doneChan
} else {
go cc.lbWatcher(nil)
}
if err := ac.connect(cc.dopts.block); err != nil {
waitC <- err
return
}
}
// No balancer, or no resolver within the balancer. Connect directly.
if err := cc.resetAddrConn([]Address{{Addr: target}}, cc.dopts.block, nil); err != nil {
waitC <- err
return
}
}()
select {
case <-ctx.Done():
return nil, ctx.Err()
case err := <-waitC:
if err != nil {
return nil, err
}()
select {
case <-ctx.Done():
return nil, ctx.Err()
case err := <-waitC:
if err != nil {
return nil, err
}
}
}
if cc.dopts.scChan != nil && !scSet {
@ -443,6 +444,27 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
go cc.scWatcher()
}
if cc.balancer != nil {
// Unblock balancer initialization with a fake resolver update.
// The balancer wrapper will not read the addresses, so an empty list works.
// TODO(bar) remove this after the real resolver is started.
cc.balancer.HandleResolvedAddrs([]resolver.Address{}, nil)
}
// A blocking dial blocks until the clientConn is ready.
if cc.dopts.block {
for {
s := cc.GetState()
if s == connectivity.Ready {
break
}
if !cc.WaitForStateChange(ctx, s) {
// ctx got timeout or canceled.
return nil, ctx.Err()
}
}
}
return cc, nil
}
@ -548,9 +570,15 @@ type ClientConn struct {
csMgr *connectivityStateManager
csEvltr *connectivityStateEvaluator // This will eventually be part of balancer.
balancer balancer.Balancer
// TODO(bar) move the mutex and picker into a struct that does blocking pick().
pmu sync.Mutex
picker balancer.Picker
mu sync.RWMutex
sc ServiceConfig
conns map[Address]*addrConn
conns map[*addrConn]struct{}
// Keepalive parameter can be updated if a GoAway is received.
mkp keepalive.ClientParameters
}
@ -577,84 +605,6 @@ func (cc *ClientConn) GetState() connectivity.State {
return cc.csMgr.getState()
}
// lbWatcher watches the Notify channel of the balancer in cc and manages
// connections accordingly. If doneChan is not nil, it is closed after the
// first successfull connection is made.
func (cc *ClientConn) lbWatcher(doneChan chan struct{}) {
defer func() {
// In case channel from cc.dopts.balancer.Notify() gets closed before a
// successful connection gets established, don't forget to notify the
// caller.
if doneChan != nil {
close(doneChan)
}
}()
_, isPickFirst := cc.dopts.balancer.(*pickFirst)
for addrs := range cc.dopts.balancer.Notify() {
if isPickFirst {
if len(addrs) == 0 {
// No address can be connected, should teardown current addrconn if exists
cc.mu.Lock()
if len(cc.conns) != 0 {
cc.pickFirstAddrConnTearDown()
}
cc.mu.Unlock()
} else {
cc.resetAddrConn(addrs, true, nil)
if doneChan != nil {
close(doneChan)
doneChan = nil
}
}
} else {
// Not pickFirst, create a new addrConn for each address.
var (
add []Address // Addresses need to setup connections.
del []*addrConn // Connections need to tear down.
)
cc.mu.Lock()
for _, a := range addrs {
if _, ok := cc.conns[a]; !ok {
add = append(add, a)
}
}
for k, c := range cc.conns {
var keep bool
for _, a := range addrs {
if k == a {
keep = true
break
}
}
if !keep {
del = append(del, c)
delete(cc.conns, k)
}
}
cc.mu.Unlock()
for _, a := range add {
var err error
if doneChan != nil {
err = cc.resetAddrConn([]Address{a}, true, nil)
if err == nil {
close(doneChan)
doneChan = nil
}
} else {
err = cc.resetAddrConn([]Address{a}, false, nil)
}
if err != nil {
grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
}
}
for _, c := range del {
c.tearDown(errConnDrain)
}
}
}
}
func (cc *ClientConn) scWatcher() {
for {
select {
@ -673,72 +623,8 @@ func (cc *ClientConn) scWatcher() {
}
}
// pickFirstUpdateAddresses checks whether current address in the updating list, Update the list if true.
// It is only used when the balancer is pick first.
func (cc *ClientConn) pickFirstUpdateAddresses(addrs []Address) bool {
if len(cc.conns) == 0 {
// No addrconn. Should go resetting addrconn.
return false
}
var currentAc *addrConn
for _, currentAc = range cc.conns {
break
}
var addrInNewSlice bool
for _, addr := range addrs {
if strings.Compare(addr.Addr, currentAc.curAddr.Addr) == 0 {
addrInNewSlice = true
break
}
}
if addrInNewSlice {
cc.conns = make(map[Address]*addrConn)
for _, addr := range addrs {
cc.conns[addr] = currentAc
}
currentAc.addrs = addrs
return true
}
return false
}
// pickFirstAddrConnTearDown() should be called after lock.
func (cc *ClientConn) pickFirstAddrConnTearDown() {
if len(cc.conns) == 0 {
return
}
var currentAc *addrConn
for _, currentAc = range cc.conns {
break
}
for k := range cc.conns {
delete(cc.conns, k)
}
currentAc.tearDown(errConnDrain)
}
// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
// If tearDownErr is nil, errConnDrain will be used instead.
//
// We should never need to replace an addrConn with a new one. This function is only used
// as newAddrConn to create new addrConn.
// TODO rename this function and clean up the code.
func (cc *ClientConn) resetAddrConn(addrs []Address, block bool, tearDownErr error) error {
// if current transport in addrs, just change lists to update order and new addresses
// not work for roundrobin
cc.mu.Lock()
if _, isPickFirst := cc.dopts.balancer.(*pickFirst); isPickFirst {
// If Current address in use in the updating list, just update the list.
// Otherwise, teardown current addrconn and create a new one.
if cc.pickFirstUpdateAddresses(addrs) {
cc.mu.Unlock()
return nil
}
cc.pickFirstAddrConnTearDown()
}
cc.mu.Unlock()
// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
func (cc *ClientConn) newAddrConn(addrs []resolver.Address) (*addrConn, error) {
ac := &addrConn{
cc: cc,
addrs: addrs,
@ -746,6 +632,42 @@ func (cc *ClientConn) resetAddrConn(addrs []Address, block bool, tearDownErr err
}
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
ac.csEvltr = cc.csEvltr
// Track ac in cc. This needs to be done before any getTransport(...) is called.
cc.mu.Lock()
if cc.conns == nil {
cc.mu.Unlock()
return nil, ErrClientConnClosing
}
cc.conns[ac] = struct{}{}
cc.mu.Unlock()
return ac, nil
}
// removeAddrConn removes the addrConn in the subConn from clientConn.
// It also tears down the ac with the given error.
func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
cc.mu.Lock()
if cc.conns == nil {
cc.mu.Unlock()
return
}
delete(cc.conns, ac)
cc.mu.Unlock()
ac.tearDown(err)
}
// connect starts to creating transport and also starts the transport monitor
// goroutine for this ac.
// TODO(bar) Move this to the addrConn section.
// This was part of resetAddrConn, keep it here to make the diff look clean.
func (ac *addrConn) connect(block bool) error {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return errConnClosing
}
ac.mu.Unlock()
if EnableTracing {
ac.events = trace.NewEventLog("grpc.ClientConn", ac.addrs[0].Addr)
}
@ -763,37 +685,10 @@ func (cc *ClientConn) resetAddrConn(addrs []Address, block bool, tearDownErr err
}
}
}
// Track ac in cc. This needs to be done before any getTransport(...) is called.
cc.mu.Lock()
if cc.conns == nil {
cc.mu.Unlock()
return ErrClientConnClosing
}
stale := cc.conns[ac.addrs[0]]
for _, a := range ac.addrs {
cc.conns[a] = ac
}
cc.mu.Unlock()
if stale != nil {
// There is an addrConn alive on ac.addr already. This could be due to
// a buggy Balancer that reports duplicated Addresses.
if tearDownErr == nil {
// tearDownErr is nil if resetAddrConn is called by
// 1) Dial
// 2) lbWatcher
// In both cases, the stale ac should drain, not close.
stale.tearDown(errConnDrain)
} else {
stale.tearDown(tearDownErr)
}
}
if block {
if err := ac.resetTransport(false); err != nil {
if err != errConnClosing {
// Tear down ac and delete it from cc.conns.
cc.mu.Lock()
delete(cc.conns, ac.addrs[0])
cc.mu.Unlock()
ac.tearDown(err)
}
if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
@ -820,6 +715,36 @@ func (cc *ClientConn) resetAddrConn(addrs []Address, block bool, tearDownErr err
return nil
}
// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
//
// It checks whether current connected address of ac is in the new addrs list.
// - If true, it updates ac.addrs and returns true. The ac will keep using
// the existing connection.
// - If false, it does nothing and returns false.
func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
ac.mu.Lock()
defer ac.mu.Unlock()
grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
if ac.state == connectivity.Shutdown {
ac.addrs = addrs
return true
}
var curAddrFound bool
for _, a := range addrs {
if reflect.DeepEqual(ac.curAddr, a) {
curAddrFound = true
break
}
}
grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
if curAddrFound {
ac.addrs = addrs
}
return curAddrFound
}
// GetMethodConfig gets the method config of the input method.
// If there's an exact match for input method (i.e. /service/method), we return
// the corresponding MethodConfig.
@ -839,54 +764,52 @@ func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
return m
}
func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(balancer.DoneInfo), error) {
var (
ac *addrConn
ok bool
put func()
put func(balancer.DoneInfo)
)
if cc.dopts.balancer == nil {
if cc.balancer == nil {
// If balancer is nil, there should be only one addrConn available.
cc.mu.RLock()
if cc.conns == nil {
cc.mu.RUnlock()
return nil, nil, toRPCErr(ErrClientConnClosing)
}
for _, ac = range cc.conns {
for ac = range cc.conns {
// Break after the first iteration to get the first addrConn.
ok = true
break
}
cc.mu.RUnlock()
} else {
cc.pmu.Lock()
// TODO(bar) call pick on struct blockPicker instead of the real picker.
p := cc.picker
cc.pmu.Unlock()
var (
addr Address
err error
err error
sc balancer.SubConn
)
addr, put, err = cc.dopts.balancer.Get(ctx, opts)
sc, put, err = p.Pick(ctx, balancer.PickOptions{})
if err != nil {
return nil, nil, toRPCErr(err)
}
cc.mu.RLock()
if cc.conns == nil {
cc.mu.RUnlock()
return nil, nil, toRPCErr(ErrClientConnClosing)
}
ac, ok = cc.conns[addr]
cc.mu.RUnlock()
}
if !ok {
if put != nil {
if acbw, ok := sc.(*acBalancerWrapper); ok {
ac = acbw.getAddrConn()
} else if put != nil {
updateRPCInfoInContext(ctx, rpcInfo{bytesSent: false, bytesReceived: false})
put()
put(balancer.DoneInfo{Err: errors.New("SubConn returned by pick cannot be recognized")})
}
}
if ac == nil {
return nil, nil, errConnClosing
}
t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait)
t, err := ac.wait(ctx, cc.balancer != nil, !opts.BlockingWait)
if err != nil {
if put != nil {
updateRPCInfoInContext(ctx, rpcInfo{bytesSent: false, bytesReceived: false})
put()
put(balancer.DoneInfo{Err: err})
}
return nil, nil, err
}
@ -906,10 +829,10 @@ func (cc *ClientConn) Close() error {
cc.conns = nil
cc.csMgr.updateState(connectivity.Shutdown)
cc.mu.Unlock()
if cc.dopts.balancer != nil {
cc.dopts.balancer.Close()
if cc.balancer != nil {
cc.balancer.Close()
}
for _, ac := range conns {
for ac := range conns {
ac.tearDown(ErrClientConnClosing)
}
return nil
@ -921,16 +844,16 @@ type addrConn struct {
cancel context.CancelFunc
cc *ClientConn
curAddr Address
addrs []Address
curAddr resolver.Address
addrs []resolver.Address
dopts dialOptions
events trace.EventLog
acbw balancer.SubConn
csEvltr *connectivityStateEvaluator
mu sync.Mutex
state connectivity.State
down func(error) // the handler called when a connection is down.
// ready is closed and becomes nil when a new transport is up or failed
// due to timeout.
ready chan struct{}
@ -974,20 +897,24 @@ func (ac *addrConn) errorf(format string, a ...interface{}) {
// For the old transport:
// - if drain is true, it will be gracefully closed.
// - otherwise, it will be closed.
// TODO(bar) make sure all state transitions are valid.
func (ac *addrConn) resetTransport(drain bool) error {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return errConnClosing
}
ac.printf("connecting")
if ac.down != nil {
ac.down(downErrorf(false, true, "%v", errNetworkIO))
ac.down = nil
}
oldState := ac.state
ac.state = connectivity.Connecting
ac.csEvltr.recordTransition(oldState, ac.state)
if ac.cc.balancer != nil {
ac.cc.balancer.HandleSubConnStateChange(ac.acbw, ac.state)
}
// TODO(bar) don't call balancer functions to handle subconn state change if ac.acbw is nil.
if ac.ready != nil {
close(ac.ready)
ac.ready = nil
}
t := ac.transport
ac.transport = nil
ac.mu.Unlock()
@ -998,15 +925,27 @@ func (ac *addrConn) resetTransport(drain bool) error {
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
ac.cc.mu.RUnlock()
for retries := 0; ; retries++ {
ac.mu.Lock()
sleepTime := ac.dopts.bs.backoff(retries)
timeout := minConnectTimeout
ac.mu.Lock()
if timeout < time.Duration(int(sleepTime)/len(ac.addrs)) {
timeout = time.Duration(int(sleepTime) / len(ac.addrs))
}
connectTime := time.Now()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return errConnClosing
}
ac.printf("connecting")
oldState := ac.state
ac.state = connectivity.Connecting
ac.csEvltr.recordTransition(oldState, ac.state)
// TODO(bar) remove condition once we always have a balancer.
if ac.cc.balancer != nil {
ac.cc.balancer.HandleSubConnStateChange(ac.acbw, ac.state)
}
// copy ac.addrs in case of race
addrsIter := make([]Address, len(ac.addrs))
addrsIter := make([]resolver.Address, len(ac.addrs))
copy(addrsIter, ac.addrs)
ac.mu.Unlock()
for _, addr := range addrsIter {
@ -1038,14 +977,6 @@ func (ac *addrConn) resetTransport(drain bool) error {
ac.mu.Unlock()
return errConnClosing
}
ac.errorf("transient failure: %v", err)
oldState = ac.state
ac.state = connectivity.TransientFailure
ac.csEvltr.recordTransition(oldState, ac.state)
if ac.ready != nil {
close(ac.ready)
ac.ready = nil
}
ac.mu.Unlock()
continue
}
@ -1060,18 +991,30 @@ func (ac *addrConn) resetTransport(drain bool) error {
oldState = ac.state
ac.state = connectivity.Ready
ac.csEvltr.recordTransition(oldState, ac.state)
if ac.cc.balancer != nil {
ac.cc.balancer.HandleSubConnStateChange(ac.acbw, ac.state)
}
ac.transport = newTransport
ac.curAddr = addr
if ac.ready != nil {
close(ac.ready)
ac.ready = nil
}
if ac.cc.dopts.balancer != nil {
ac.down = ac.cc.dopts.balancer.Up(addr)
}
ac.curAddr = addr
ac.mu.Unlock()
return nil
}
ac.mu.Lock()
oldState = ac.state
ac.state = connectivity.TransientFailure
ac.csEvltr.recordTransition(oldState, ac.state)
if ac.cc.balancer != nil {
ac.cc.balancer.HandleSubConnStateChange(ac.acbw, ac.state)
}
if ac.ready != nil {
close(ac.ready)
ac.ready = nil
}
ac.mu.Unlock()
timer := time.NewTimer(sleepTime - time.Since(connectTime))
select {
case <-timer.C:
@ -1128,26 +1071,8 @@ func (ac *addrConn) transportMonitor() {
return
case <-t.GoAway():
ac.adjustParams(t.GetGoAwayReason())
if err := ac.resetTransport(false); err != nil {
grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
if err != errConnClosing {
// Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err)
}
return
}
default:
}
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
// ac has been shutdown.
ac.mu.Unlock()
return
}
oldState := ac.state
ac.state = connectivity.TransientFailure
ac.csEvltr.recordTransition(oldState, ac.state)
ac.mu.Unlock()
if err := ac.resetTransport(false); err != nil {
grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
ac.mu.Lock()
@ -1213,12 +1138,8 @@ func (ac *addrConn) tearDown(err error) {
ac.cancel()
ac.mu.Lock()
ac.curAddr = Address{}
ac.curAddr = resolver.Address{}
defer ac.mu.Unlock()
if ac.down != nil {
ac.down(downErrorf(false, false, "%v", err))
ac.down = nil
}
if err == errConnDrain && ac.transport != nil {
// GracefulClose(...) may be executed multiple times when
// i) receiving multiple GoAway frames from the server; or
@ -1233,6 +1154,9 @@ func (ac *addrConn) tearDown(err error) {
ac.state = connectivity.Shutdown
ac.tearDownErr = err
ac.csEvltr.recordTransition(oldState, ac.state)
if ac.cc.balancer != nil {
ac.cc.balancer.HandleSubConnStateChange(ac.acbw, ac.state)
}
if ac.events != nil {
ac.events.Finish()
ac.events = nil
@ -1246,3 +1170,9 @@ func (ac *addrConn) tearDown(err error) {
}
return
}
func (ac *addrConn) getState() connectivity.State {
ac.mu.Lock()
defer ac.mu.Unlock()
return ac.state
}

View File

@ -376,19 +376,3 @@ func TestClientUpdatesParamsAfterGoAway(t *testing.T) {
t.Fatalf("cc.dopts.copts.Keepalive.Time = %v , want 100ms", v)
}
}
func TestClientLBWatcherWithClosedBalancer(t *testing.T) {
b := newBlockingBalancer()
cc := &ClientConn{dopts: dialOptions{balancer: b}}
doneChan := make(chan struct{})
go cc.lbWatcher(doneChan)
// Balancer closes before any successful connections.
b.Close()
select {
case <-doneChan:
case <-time.After(100 * time.Millisecond):
t.Fatal("lbWatcher with closed balancer didn't close doneChan after 100ms")
}
}

View File

@ -73,7 +73,7 @@ func (x *balanceLoadClientStream) Recv() (*lbmpb.LoadBalanceResponse, error) {
// NewGRPCLBBalancer creates a grpclb load balancer.
func NewGRPCLBBalancer(r naming.Resolver) Balancer {
return &balancer{
return &grpclbBalancer{
r: r,
}
}
@ -96,7 +96,7 @@ type grpclbAddrInfo struct {
dropForLoadBalancing bool
}
type balancer struct {
type grpclbBalancer struct {
r naming.Resolver
target string
mu sync.Mutex
@ -113,7 +113,7 @@ type balancer struct {
clientStats lbmpb.ClientStats
}
func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerInfo) error {
func (b *grpclbBalancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerInfo) error {
updates, err := w.Next()
if err != nil {
grpclog.Warningf("grpclb: failed to get next addr update from watcher: %v", err)
@ -187,7 +187,7 @@ func convertDuration(d *lbmpb.Duration) time.Duration {
return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)*time.Nanosecond
}
func (b *balancer) processServerList(l *lbmpb.ServerList, seq int) {
func (b *grpclbBalancer) processServerList(l *lbmpb.ServerList, seq int) {
if l == nil {
return
}
@ -230,7 +230,7 @@ func (b *balancer) processServerList(l *lbmpb.ServerList, seq int) {
return
}
func (b *balancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration, done <-chan struct{}) {
func (b *grpclbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration, done <-chan struct{}) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
@ -259,10 +259,13 @@ func (b *balancer) sendLoadReport(s *balanceLoadClientStream, interval time.Dura
}
}
func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry bool) {
ctx, cancel := context.WithCancel(context.Background())
func (b *grpclbBalancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry bool) {
// This RPC should be a failfast RPC. It should only fail if the server is down.
// We cannot make it failfast because the first few RPCs on a clientConn always fail.
// TODO(bar) make this failfast when the failfast bahavior in clientConn is fixed.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
stream, err := lbc.BalanceLoad(ctx)
stream, err := lbc.BalanceLoad(ctx, FailFast(false))
if err != nil {
grpclog.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
return
@ -332,7 +335,7 @@ func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry b
return true
}
func (b *balancer) Start(target string, config BalancerConfig) error {
func (b *grpclbBalancer) Start(target string, config BalancerConfig) error {
b.rand = rand.New(rand.NewSource(time.Now().Unix()))
// TODO: Fall back to the basic direct connection if there is no name resolver.
if b.r == nil {
@ -488,7 +491,7 @@ func (b *balancer) Start(target string, config BalancerConfig) error {
return nil
}
func (b *balancer) down(addr Address, err error) {
func (b *grpclbBalancer) down(addr Address, err error) {
b.mu.Lock()
defer b.mu.Unlock()
for _, a := range b.addrs {
@ -499,7 +502,7 @@ func (b *balancer) down(addr Address, err error) {
}
}
func (b *balancer) Up(addr Address) func(error) {
func (b *grpclbBalancer) Up(addr Address) func(error) {
b.mu.Lock()
defer b.mu.Unlock()
if b.done {
@ -527,7 +530,7 @@ func (b *balancer) Up(addr Address) func(error) {
}
}
func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {
func (b *grpclbBalancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {
var ch chan struct{}
b.mu.Lock()
if b.done {
@ -597,17 +600,10 @@ func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Addre
}
}
if !opts.BlockingWait {
if len(b.addrs) == 0 {
b.clientStats.NumCallsFinished++
b.clientStats.NumCallsFinishedWithClientFailedToSend++
b.mu.Unlock()
err = Errorf(codes.Unavailable, "there is no address available")
return
}
// Returns the next addr on b.addrs for a failfast RPC.
addr = b.addrs[b.next].addr
b.next++
b.clientStats.NumCallsFinished++
b.clientStats.NumCallsFinishedWithClientFailedToSend++
b.mu.Unlock()
err = Errorf(codes.Unavailable, "there is no address available")
return
}
// Wait on b.waitCh for non-failfast RPCs.
@ -684,11 +680,11 @@ func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Addre
}
}
func (b *balancer) Notify() <-chan []Address {
func (b *grpclbBalancer) Notify() <-chan []Address {
return b.addrCh
}
func (b *balancer) Close() error {
func (b *grpclbBalancer) Close() error {
b.mu.Lock()
defer b.mu.Unlock()
if b.done {

View File

@ -42,6 +42,8 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/naming"
testpb "google.golang.org/grpc/test/grpc_testing"
_ "google.golang.org/grpc/grpclog/glogger"
)
var (
@ -404,7 +406,7 @@ func TestGRPCLB(t *testing.T) {
t.Fatalf("Failed to dial to the backend %v", err)
}
testC := testpb.NewTestServiceClient(cc)
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
cc.Close()
@ -442,6 +444,15 @@ func TestDropRequest(t *testing.T) {
t.Fatalf("Failed to dial to the backend %v", err)
}
testC := testpb.NewTestServiceClient(cc)
// Wait until the first connection is up.
// The first one has Drop set to true, error should contain "drop requests".
for {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
if strings.Contains(err.Error(), "drops requests") {
break
}
}
}
// The 1st, non-fail-fast RPC should succeed. This ensures both server
// connections are made, because the first one has DropForLoadBalancing set to true.
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {

View File

@ -203,7 +203,7 @@ func TestReflectionEnd2end(t *testing.T) {
defer conn.Close()
c := rpb.NewServerReflectionClient(conn)
stream, err := c.ServerReflectionInfo(context.Background())
stream, err := c.ServerReflectionInfo(context.Background(), grpc.FailFast(false))
if err != nil {
t.Fatalf("cannot get ServerReflectionInfo: %v", err)
}

128
resolver/resolver.go Normal file
View File

@ -0,0 +1,128 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package resolver defines APIs for name resolution in gRPC.
// All APIs in this package are experimental.
package resolver
var (
// m is a map from scheme to resolver builder.
m = make(map[string]Builder)
// defaultScheme is the default scheme to use.
defaultScheme string
)
// TODO(bar) install dns resolver in init(){}.
// Register registers the resolver builder to the resolver map.
// b.Scheme will be used as the scheme registered with this builder.
func Register(b Builder) {
m[b.Scheme()] = b
}
// Get returns the resolver builder registered with the given scheme.
// If no builder is register with the scheme, the default scheme will
// be used.
// If the default scheme is not modified, "dns" will be the default
// scheme, and the preinstalled dns resolver will be used.
// If the default scheme is modified, and a resolver is registered with
// the scheme, that resolver will be returned.
// If the default scheme is modified, and no resolver is registered with
// the scheme, nil will be returned.
func Get(scheme string) Builder {
if b, ok := m[scheme]; ok {
return b
}
if b, ok := m[defaultScheme]; ok {
return b
}
return nil
}
// SetDefaultScheme sets the default scheme that will be used.
// The default default scheme is "dns".
func SetDefaultScheme(scheme string) {
defaultScheme = scheme
}
// AddressType indicates the address type returned by name resolution.
type AddressType uint8
const (
// Backend indicates the address is for a backend server.
Backend AddressType = iota
// GRPCLB indicates the address is for a grpclb load balancer.
GRPCLB
)
// Address represents a server the client connects to.
// This is the EXPERIMENTAL API and may be changed or extended in the future.
type Address struct {
// Addr is the server address on which a connection will be established.
Addr string
// Type is the type of this address.
Type AddressType
// ServerName is the name of this address.
// It's the name of the grpc load balancer, which will be used for authentication.
ServerName string
// Metadata is the information associated with Addr, which may be used
// to make load balancing decision.
Metadata interface{}
}
// BuildOption includes additional information for the builder to create
// the resolver.
type BuildOption struct {
}
// ClientConn contains the callbacks for resolver to notify any updates
// to the gRPC ClientConn.
type ClientConn interface {
// NewAddress is called by resolver to notify ClientConn a new list
// of resolved addresses.
// The address list should be the complete list of resolved addresses.
NewAddress(addresses []Address)
// NewServiceConfig is called by resolver to notify ClientConn a new
// service config. The service config should be provided as a json string.
NewServiceConfig(serviceConfig string)
}
// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {
// Build creates a new resolver for the given target.
//
// gRPC dial calls Build synchronously, and fails if the returned error is
// not nil.
Build(target string, cc ClientConn, opts BuildOption) (Resolver, error)
// Scheme returns the scheme supported by this resolver.
// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
Scheme() string
}
// ResolveNowOption includes additional information for ResolveNow.
type ResolveNowOption struct{}
// Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver interface {
// ResolveNow will be called by gRPC to try to resolve the target name again.
// It's just a hint, resolver can ignore this if it's not necessary.
ResolveNow(ResolveNowOption)
// Close closes the resolver.
Close()
}

View File

@ -132,7 +132,9 @@ type callInfo struct {
creds credentials.PerRPCCredentials
}
var defaultCallInfo = callInfo{failFast: true}
func defaultCallInfo() *callInfo {
return &callInfo{failFast: true}
}
// CallOption configures a Call before it starts or extracts information from
// a Call after it completes.
@ -384,14 +386,15 @@ func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{
}
type rpcInfo struct {
failfast bool
bytesSent bool
bytesReceived bool
}
type rpcInfoContextKey struct{}
func newContextWithRPCInfo(ctx context.Context) context.Context {
return context.WithValue(ctx, rpcInfoContextKey{}, &rpcInfo{})
func newContextWithRPCInfo(ctx context.Context, failfast bool) context.Context {
return context.WithValue(ctx, rpcInfoContextKey{}, &rpcInfo{failfast: failfast})
}
func rpcInfoFromContext(ctx context.Context) (s *rpcInfo, ok bool) {
@ -401,7 +404,8 @@ func rpcInfoFromContext(ctx context.Context) (s *rpcInfo, ok bool) {
func updateRPCInfoInContext(ctx context.Context, s rpcInfo) {
if ss, ok := rpcInfoFromContext(ctx); ok {
*ss = s
ss.bytesReceived = s.bytesReceived
ss.bytesSent = s.bytesSent
}
return
}

View File

@ -27,6 +27,7 @@ import (
"golang.org/x/net/context"
"golang.org/x/net/trace"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
@ -106,10 +107,10 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
var (
t transport.ClientTransport
s *transport.Stream
put func()
put func(balancer.DoneInfo)
cancel context.CancelFunc
)
c := defaultCallInfo
c := defaultCallInfo()
mc := cc.GetMethodConfig(method)
if mc.WaitForReady != nil {
c.failFast = !*mc.WaitForReady
@ -126,7 +127,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
opts = append(cc.dopts.callOptions, opts...)
for _, o := range opts {
if err := o.before(&c); err != nil {
if err := o.before(c); err != nil {
return nil, toRPCErr(err)
}
}
@ -167,7 +168,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
}
}()
}
ctx = newContextWithRPCInfo(ctx)
ctx = newContextWithRPCInfo(ctx, c.failFast)
sh := cc.dopts.copts.StatsHandler
if sh != nil {
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
@ -217,7 +218,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
updateRPCInfoInContext(ctx, rpcInfo{bytesSent: true, bytesReceived: false})
}
if put != nil {
put()
put(balancer.DoneInfo{Err: err})
put = nil
}
if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast {
@ -283,7 +284,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
// clientStream implements a client side Stream.
type clientStream struct {
opts []CallOption
c callInfo
c *callInfo
t transport.ClientTransport
s *transport.Stream
p *parser
@ -297,7 +298,7 @@ type clientStream struct {
tracing bool // set to EnableTracing when the clientStream is created.
mu sync.Mutex
put func()
put func(balancer.DoneInfo)
closed bool
finished bool
// trInfo.tr is set when the clientStream is created (if EnableTracing is true),
@ -494,14 +495,14 @@ func (cs *clientStream) finish(err error) {
}
}()
for _, o := range cs.opts {
o.after(&cs.c)
o.after(cs.c)
}
if cs.put != nil {
updateRPCInfoInContext(cs.s.Context(), rpcInfo{
bytesSent: cs.s.BytesSent(),
bytesReceived: cs.s.BytesReceived(),
})
cs.put()
cs.put(balancer.DoneInfo{Err: err})
cs.put = nil
}
if cs.statsHandler != nil {

View File

@ -451,6 +451,10 @@ type test struct {
clientInitialConnWindowSize int32
perRPCCreds credentials.PerRPCCredentials
// All test dialing is blocking by default. Set this to true if dial
// should be non-blocking.
nonBlockingDial bool
// srv and srvAddr are set once startServer is called.
srv *grpc.Server
srvAddr string
@ -643,6 +647,10 @@ func (te *test) clientConn() *grpc.ClientConn {
if te.customCodec != nil {
opts = append(opts, grpc.WithCodec(te.customCodec))
}
if !te.nonBlockingDial && te.srvAddr != "" {
// Only do a blocking dial if server is up.
opts = append(opts, grpc.WithBlock())
}
var err error
te.cc, err = grpc.Dial(te.srvAddr, opts...)
if err != nil {
@ -4040,11 +4048,20 @@ func TestFailFastRPCErrorOnBadCertificates(t *testing.T) {
te.startServer(&testServer{security: te.e.security})
defer te.tearDown()
te.nonBlockingDial = true // Connection will never be successful because server is not set up correctly.
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); !strings.Contains(err.Error(), clientAlwaysFailCredErrorMsg) {
te.t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want err.Error() contains %q", err, clientAlwaysFailCredErrorMsg)
var err error
for i := 0; i < 1000; i++ {
// This loop runs for at most 1 second.
// The first several RPCs will fail with Unavailable because the connection hasn't started.
// When the first connection failed with creds error, the next RPC should also fail with the expected error.
if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); strings.Contains(err.Error(), clientAlwaysFailCredErrorMsg) {
return
}
time.Sleep(time.Millisecond)
}
te.t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want err.Error() contains %q", err, clientAlwaysFailCredErrorMsg)
}
func TestFailFastRPCWithNoBalancerErrorOnBadCertificates(t *testing.T) {
@ -4052,11 +4069,20 @@ func TestFailFastRPCWithNoBalancerErrorOnBadCertificates(t *testing.T) {
te.startServer(&testServer{security: te.e.security})
defer te.tearDown()
te.nonBlockingDial = true
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); !strings.Contains(err.Error(), clientAlwaysFailCredErrorMsg) {
te.t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want err.Error() contains %q", err, clientAlwaysFailCredErrorMsg)
var err error
for i := 0; i < 1000; i++ {
// This loop runs for at most 1 second.
// The first several RPCs will fail with Unavailable because the connection hasn't started.
// When the first connection failed with creds error, the next RPC should also fail with the expected error.
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); strings.Contains(err.Error(), clientAlwaysFailCredErrorMsg) {
return
}
time.Sleep(time.Millisecond)
}
te.t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want err.Error() contains %q", err, clientAlwaysFailCredErrorMsg)
}
func TestNonFailFastRPCWithNoBalancerErrorOnBadCertificates(t *testing.T) {
@ -4064,6 +4090,7 @@ func TestNonFailFastRPCWithNoBalancerErrorOnBadCertificates(t *testing.T) {
te.startServer(&testServer{security: te.e.security})
defer te.tearDown()
te.nonBlockingDial = true
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); !strings.Contains(err.Error(), clientAlwaysFailCredErrorMsg) {