From 8233e124e4634a8313f2c9a1ea7d4db33546b11b Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Thu, 31 Aug 2017 10:59:09 -0700 Subject: [PATCH] Add new Resolver and Balancer APIs (gRFC L9) (#1408) - Add package balancer and resolver. - Change ClientConn internals to new APIs and adds a wrapper for v1 balancer. --- balancer/balancer.go | 201 +++++++++++ balancer_conn_wrappers.go | 123 +++++++ balancer_v1_wrapper.go | 303 +++++++++++++++++ call.go | 23 +- clientconn.go | 498 ++++++++++++---------------- clientconn_test.go | 16 - grpclb.go | 44 ++- grpclb/grpclb_test.go | 13 +- reflection/serverreflection_test.go | 2 +- resolver/resolver.go | 128 +++++++ rpc_util.go | 12 +- stream.go | 19 +- test/end2end_test.go | 35 +- 13 files changed, 1063 insertions(+), 354 deletions(-) create mode 100644 balancer/balancer.go create mode 100644 balancer_conn_wrappers.go create mode 100644 balancer_v1_wrapper.go create mode 100644 resolver/resolver.go diff --git a/balancer/balancer.go b/balancer/balancer.go new file mode 100644 index 00000000..6d83a104 --- /dev/null +++ b/balancer/balancer.go @@ -0,0 +1,201 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package balancer defines APIs for load balancing in gRPC. +// All APIs in this package are experimental. +package balancer + +import ( + "errors" + "net" + + "golang.org/x/net/context" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/resolver" +) + +var ( + // m is a map from name to balancer builder. + m = make(map[string]Builder) + // defaultBuilder is the default balancer to use. + defaultBuilder Builder // TODO(bar) install pickfirst as default. +) + +// Register registers the balancer builder to the balancer map. +// b.Name will be used as the name registered with this builder. +func Register(b Builder) { + m[b.Name()] = b +} + +// Get returns the resolver builder registered with the given name. +// If no builder is register with the name, the default pickfirst will +// be used. +func Get(name string) Builder { + if b, ok := m[name]; ok { + return b + } + return defaultBuilder +} + +// SubConn represents a gRPC sub connection. +// Each sub connection contains a list of addresses. gRPC will +// try to connect to them (in sequence), and stop trying the +// remainder once one connection is successful. +// +// The reconnect backoff will be applied on the list, not a single address. +// For example, try_on_all_addresses -> backoff -> try_on_all_addresses. +// +// All SubConns start in IDLE, and will not try to connect. To trigger +// the connecting, Balancers must call Connect. +// When the connection encounters an error, it will reconnect immediately. +// When the connection becomes IDLE, it will not reconnect unless Connect is +// called. +type SubConn interface { + // UpdateAddresses updates the addresses used in this SubConn. + // gRPC checks if currently-connected address is still in the new list. + // If it's in the list, the connection will be kept. + // If it's not in the list, the connection will gracefully closed, and + // a new connection will be created. + // + // This will trigger a state transition for the SubConn. + UpdateAddresses([]resolver.Address) + // Connect starts the connecting for this SubConn. + Connect() +} + +// NewSubConnOptions contains options to create new SubConn. +type NewSubConnOptions struct{} + +// ClientConn represents a gRPC ClientConn. +type ClientConn interface { + // NewSubConn is called by balancer to create a new SubConn. + // It doesn't block and wait for the connections to be established. + // Behaviors of the SubConn can be controlled by options. + NewSubConn([]resolver.Address, NewSubConnOptions) (SubConn, error) + // RemoveSubConn removes the SubConn from ClientConn. + // The SubConn will be shutdown. + RemoveSubConn(SubConn) + + // UpdateBalancerState is called by balancer to nofity gRPC that some internal + // state in balancer has changed. + // + // gRPC will update the connectivity state of the ClientConn, and will call pick + // on the new picker to pick new SubConn. + UpdateBalancerState(s connectivity.State, p Picker) + + // Target returns the dial target for this ClientConn. + Target() string +} + +// BuildOptions contains additional information for Build. +type BuildOptions struct { + // DialCreds is the transport credential the Balancer implementation can + // use to dial to a remote load balancer server. The Balancer implementations + // can ignore this if it does not need to talk to another party securely. + DialCreds credentials.TransportCredentials + // Dialer is the custom dialer the Balancer implementation can use to dial + // to a remote load balancer server. The Balancer implementations + // can ignore this if it doesn't need to talk to remote balancer. + Dialer func(context.Context, string) (net.Conn, error) +} + +// Builder creates a balancer. +type Builder interface { + // Build creates a new balancer with the ClientConn. + Build(cc ClientConn, opts BuildOptions) Balancer + // Name returns the name of balancers built by this builder. + // It will be used to pick balancers (for example in service config). + Name() string +} + +// PickOptions contains addition information for the Pick operation. +type PickOptions struct{} + +// DoneInfo contains additional information for done. +type DoneInfo struct { + // Err is the rpc error the RPC finished with. It could be nil. + Err error +} + +var ( + // ErrNoSubConnAvailable indicates no SubConn is available for pick(). + // gRPC will block the RPC until a new picker is available via UpdateBalancerState(). + ErrNoSubConnAvailable = errors.New("no SubConn is available") + // ErrTransientFailure indicates all SubConns are in TransientFailure. + // WaitForReady RPCs will block, non-WaitForReady RPCs will fail. + ErrTransientFailure = errors.New("all SubConns are in TransientFailure") +) + +// Picker is used by gRPC to pick a SubConn to send an RPC. +// Balancer is expected to generate a new picker from its snapshot everytime its +// internal state has changed. +// +// The pickers used by gRPC can be updated by ClientConn.UpdateBalancerState(). +type Picker interface { + // Pick returns the SubConn to be used to send the RPC. + // The returned SubConn must be one returned by NewSubConn(). + // + // This functions is expected to return: + // - a SubConn that is known to be READY; + // - ErrNoSubConnAvailable if no SubConn is available, but progress is being + // made (for example, some SubConn is in CONNECTING mode); + // - other errors if no active connecting is happening (for example, all SubConn + // are in TRANSIENT_FAILURE mode). + // + // If a SubConn is returned: + // - If it is READY, gRPC will send the RPC on it; + // - If it is not ready, or becomes not ready after it's returned, gRPC will block + // this call until a new picker is updated and will call pick on the new picker. + // + // If the returned error is not nil: + // - If the error is ErrNoSubConnAvailable, gRPC will block until UpdateBalancerState() + // - If the error is ErrTransientFailure: + // - If the RPC is wait-for-ready, gRPC will block until UpdateBalancerState() + // is called to pick again; + // - Otherwise, RPC will fail with unavailable error. + // - Else (error is other non-nil error): + // - The RPC will fail with unavailable error. + // + // The returned done() function will be called once the rpc has finished, with the + // final status of that RPC. + // done may be nil if balancer doesn't care about the RPC status. + Pick(ctx context.Context, opts PickOptions) (conn SubConn, done func(DoneInfo), err error) +} + +// Balancer takes input from gRPC, manages SubConns, and collects and aggregates +// the connectivity states. +// +// It also generates and updates the Picker used by gRPC to pick SubConns for RPCs. +type Balancer interface { + // HandleSubConnStateChange is called by gRPC when the connectivity state + // of sc has changed. + // Balancer is expected to aggregate all the state of SubConn and report + // that back to gRPC. + // Balancer should also generate and update Pickers when its internal state has + // been changed by the new state. + HandleSubConnStateChange(sc SubConn, state connectivity.State) + // HandleResolvedAddrs is called by gRPC to send updated resolved addresses to + // balancers. + // Balancer can create new SubConn or remove SubConn with the addresses. + // An empty address slice and a non-nil error will be passed if the resolver returns + // non-nil error to gRPC. + HandleResolvedAddrs([]resolver.Address, error) + // Close closes the balancer. + Close() +} diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go new file mode 100644 index 00000000..404377db --- /dev/null +++ b/balancer_conn_wrappers.go @@ -0,0 +1,123 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpc + +import ( + "sync" + + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/resolver" +) + +// TODO(bar) move ClientConn methods to clientConn file. + +func (cc *ClientConn) updatePicker(p balancer.Picker) { + // TODO(bar) add a goroutine and sync it. + // TODO(bar) implement blocking behavior and unblock the previous pick. + cc.pmu.Lock() + cc.picker = p + cc.pmu.Unlock() +} + +// ccBalancerWrapper is a wrapper on top of cc for balancers. +// It implements balancer.ClientConn interface. +type ccBalancerWrapper struct { + cc *ClientConn +} + +func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { + grpclog.Infof("ccBalancerWrapper: new subconn: %v", addrs) + ac, err := ccb.cc.newAddrConn(addrs) + if err != nil { + return nil, err + } + acbw := &acBalancerWrapper{ac: ac} + ac.acbw = acbw + return acbw, nil +} + +func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { + grpclog.Infof("ccBalancerWrapper: removing subconn") + acbw, ok := sc.(*acBalancerWrapper) + if !ok { + return + } + ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain) +} + +func (ccb *ccBalancerWrapper) UpdateBalancerState(s connectivity.State, p balancer.Picker) { + // TODO(bar) update cc connectivity state. + ccb.cc.updatePicker(p) +} + +func (ccb *ccBalancerWrapper) Target() string { + return ccb.cc.target +} + +// acBalancerWrapper is a wrapper on top of ac for balancers. +// It implements balancer.SubConn interface. +type acBalancerWrapper struct { + mu sync.Mutex + ac *addrConn +} + +func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) { + grpclog.Infof("acBalancerWrapper: UpdateAddresses called with %v", addrs) + acbw.mu.Lock() + defer acbw.mu.Unlock() + // TODO(bar) update the addresses or tearDown and create a new ac. + if !acbw.ac.tryUpdateAddrs(addrs) { + cc := acbw.ac.cc + acbw.ac.mu.Lock() + // Set old ac.acbw to nil so the states update will be ignored by balancer. + acbw.ac.acbw = nil + acbw.ac.mu.Unlock() + acState := acbw.ac.getState() + acbw.ac.tearDown(errConnDrain) + + if acState == connectivity.Shutdown { + return + } + + ac, err := cc.newAddrConn(addrs) + if err != nil { + grpclog.Warningf("acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err) + return + } + acbw.ac = ac + ac.acbw = acbw + if acState != connectivity.Idle { + ac.connect(false) + } + } +} + +func (acbw *acBalancerWrapper) Connect() { + acbw.mu.Lock() + defer acbw.mu.Unlock() + acbw.ac.connect(false) +} + +func (acbw *acBalancerWrapper) getAddrConn() *addrConn { + acbw.mu.Lock() + defer acbw.mu.Unlock() + return acbw.ac +} diff --git a/balancer_v1_wrapper.go b/balancer_v1_wrapper.go new file mode 100644 index 00000000..bac7afa2 --- /dev/null +++ b/balancer_v1_wrapper.go @@ -0,0 +1,303 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpc + +import ( + "sync" + + "golang.org/x/net/context" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/resolver" +) + +type balancerWrapperBuilder struct { + b Balancer // The v1 balancer. +} + +func (bwb *balancerWrapperBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + bwb.b.Start(cc.Target(), BalancerConfig{ + DialCreds: opts.DialCreds, + Dialer: opts.Dialer, + }) + _, pickfirst := bwb.b.(*pickFirst) + bw := &balancerWrapper{ + balancer: bwb.b, + pickfirst: pickfirst, + cc: cc, + startCh: make(chan struct{}), + conns: make(map[resolver.Address]balancer.SubConn), + connSt: make(map[balancer.SubConn]*scState), + } + cc.UpdateBalancerState(connectivity.Idle, bw) + go bw.lbWatcher() + return bw +} + +func (bwb *balancerWrapperBuilder) Name() string { + return "wrapper" +} + +type scState struct { + addr Address // The v1 address type. + s connectivity.State + down func(error) +} + +type balancerWrapper struct { + balancer Balancer // The v1 balancer. + pickfirst bool + + cc balancer.ClientConn + + mu sync.Mutex + conns map[resolver.Address]balancer.SubConn + connSt map[balancer.SubConn]*scState + // This channel is closed when handling the first resolver result. + // lbWatcher blocks until this is closed, to avoid race between + // - NewSubConn is created, cc wants to notify balancer of state changes; + // - Build hasn't return, cc doesn't have access to balancer. + startCh chan struct{} +} + +// lbWatcher watches the Notify channel of the balancer and manages +// connections accordingly. +func (bw *balancerWrapper) lbWatcher() { + <-bw.startCh + grpclog.Infof("balancerWrapper: is pickfirst: %v\n", bw.pickfirst) + notifyCh := bw.balancer.Notify() + if notifyCh == nil { + // There's no resolver in the balancer. Connect directly. + a := resolver.Address{ + Addr: bw.cc.Target(), + Type: resolver.Backend, + } + sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{}) + if err != nil { + grpclog.Warningf("Error creating connection to %v. Err: %v", a, err) + } else { + bw.mu.Lock() + bw.conns[a] = sc + bw.connSt[sc] = &scState{ + addr: Address{Addr: bw.cc.Target()}, + s: connectivity.Idle, + } + bw.mu.Unlock() + sc.Connect() + } + return + } + + for addrs := range notifyCh { + grpclog.Infof("balancerWrapper: got update addr from Notify: %v\n", addrs) + if bw.pickfirst { + var ( + oldA resolver.Address + oldSC balancer.SubConn + ) + bw.mu.Lock() + for oldA, oldSC = range bw.conns { + break + } + bw.mu.Unlock() + if len(addrs) <= 0 { + if oldSC != nil { + // Teardown old sc. + bw.mu.Lock() + delete(bw.conns, oldA) + delete(bw.connSt, oldSC) + bw.mu.Unlock() + bw.cc.RemoveSubConn(oldSC) + } + continue + } + + var newAddrs []resolver.Address + for _, a := range addrs { + newAddr := resolver.Address{ + Addr: a.Addr, + Type: resolver.Backend, // All addresses from balancer are all backends. + ServerName: "", // TODO(bar) support servername. + Metadata: a.Metadata, + } + newAddrs = append(newAddrs, newAddr) + } + if oldSC == nil { + // Create new sc. + sc, err := bw.cc.NewSubConn(newAddrs, balancer.NewSubConnOptions{}) + if err != nil { + grpclog.Warningf("Error creating connection to %v. Err: %v", newAddrs, err) + } else { + bw.mu.Lock() + // For pickfirst, there should be only one SubConn, so the + // address doesn't matter. All states updating (up and down) + // and picking should all happen on that only SubConn. + bw.conns[resolver.Address{}] = sc + bw.connSt[sc] = &scState{ + addr: addrs[0], // Use the first address. + s: connectivity.Idle, + } + bw.mu.Unlock() + sc.Connect() + } + } else { + oldSC.UpdateAddresses(newAddrs) + bw.mu.Lock() + bw.connSt[oldSC].addr = addrs[0] + bw.mu.Unlock() + } + } else { + var ( + add []resolver.Address // Addresses need to setup connections. + del []balancer.SubConn // Connections need to tear down. + ) + resAddrs := make(map[resolver.Address]Address) + for _, a := range addrs { + resAddrs[resolver.Address{ + Addr: a.Addr, + Type: resolver.Backend, // All addresses from balancer are all backends. + ServerName: "", // TODO(bar) support servername. + Metadata: a.Metadata, + }] = a + } + bw.mu.Lock() + for a := range resAddrs { + if _, ok := bw.conns[a]; !ok { + add = append(add, a) + } + } + for a, c := range bw.conns { + if _, ok := resAddrs[a]; !ok { + del = append(del, c) + delete(bw.conns, a) + delete(bw.connSt, c) + } + } + bw.mu.Unlock() + for _, a := range add { + sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{}) + if err != nil { + grpclog.Warningf("Error creating connection to %v. Err: %v", a, err) + } else { + bw.mu.Lock() + bw.conns[a] = sc + bw.connSt[sc] = &scState{ + addr: resAddrs[a], + s: connectivity.Idle, + } + bw.mu.Unlock() + sc.Connect() + } + } + for _, c := range del { + bw.cc.RemoveSubConn(c) + } + } + } +} + +func (bw *balancerWrapper) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { + grpclog.Infof("balancerWrapper: handle subconn state change: %p, %v", sc, s) + bw.mu.Lock() + defer bw.mu.Unlock() + scSt, ok := bw.connSt[sc] + if !ok { + return + } + if s == connectivity.Idle { + sc.Connect() + } + oldS := scSt.s + scSt.s = s + grpclog.Infof("balancerWrapper: handle state change addr: %v, old state: %v, new state: %v", sc.(*acBalancerWrapper).getAddrConn().addrs, oldS, s) + if oldS != connectivity.Ready && s == connectivity.Ready { + scSt.down = bw.balancer.Up(scSt.addr) + } else if oldS == connectivity.Ready && s != connectivity.Ready { + if scSt.down != nil { + scSt.down(errConnClosing) // TODO(bar) what error to use? + } + } + // The connectivity state is ignored by clientConn now. + // TODO(bar) use the aggregated connectivity state. + bw.cc.UpdateBalancerState(connectivity.Ready, bw) + return +} + +func (bw *balancerWrapper) HandleResolvedAddrs([]resolver.Address, error) { + bw.mu.Lock() + defer bw.mu.Unlock() + select { + case <-bw.startCh: + default: + close(bw.startCh) + } + // There should be a resolver inside the balancer. + // All updates here, if any, are ignored. + return +} + +func (bw *balancerWrapper) Close() { + bw.mu.Lock() + defer bw.mu.Unlock() + select { + case <-bw.startCh: + default: + close(bw.startCh) + } + bw.balancer.Close() + return +} + +// The picker is the balancerWrapper itself. +// Pick should never return ErrNoSubConnAvailable. +// It either blocks or returns error, consistent with v1 balancer Get(). +func (bw *balancerWrapper) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) { + failfast := true // Default failfast is true. + if ss, ok := rpcInfoFromContext(ctx); ok { + failfast = ss.failfast + } + a, p, err := bw.balancer.Get(ctx, BalancerGetOptions{BlockingWait: !failfast}) + if err != nil { + return nil, nil, err + } + var put func(balancer.DoneInfo) + if p != nil { + put = func(i balancer.DoneInfo) { p() } + } + var sc balancer.SubConn + if bw.pickfirst { + bw.mu.Lock() + // Get the first sc in conns. + for _, sc = range bw.conns { + break + } + bw.mu.Unlock() + } else { + bw.mu.Lock() + sc = bw.conns[resolver.Address{ + Addr: a.Addr, + Type: resolver.Backend, + ServerName: "", // TODO(bar) support servername. + Metadata: a.Metadata, + }] + bw.mu.Unlock() + } + return sc, put, nil +} diff --git a/call.go b/call.go index 438758fc..1c7d1c13 100644 --- a/call.go +++ b/call.go @@ -25,6 +25,7 @@ import ( "golang.org/x/net/context" "golang.org/x/net/trace" + "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" "google.golang.org/grpc/peer" "google.golang.org/grpc/stats" @@ -135,7 +136,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli } func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) { - c := defaultCallInfo + c := defaultCallInfo() mc := cc.GetMethodConfig(method) if mc.WaitForReady != nil { c.failFast = !*mc.WaitForReady @@ -149,13 +150,13 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli opts = append(cc.dopts.callOptions, opts...) for _, o := range opts { - if err := o.before(&c); err != nil { + if err := o.before(c); err != nil { return toRPCErr(err) } } defer func() { for _, o := range opts { - o.after(&c) + o.after(c) } }() @@ -178,7 +179,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli } }() } - ctx = newContextWithRPCInfo(ctx) + ctx = newContextWithRPCInfo(ctx, c.failFast) sh := cc.dopts.copts.StatsHandler if sh != nil { ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast}) @@ -208,7 +209,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli stream *transport.Stream // Record the put handler from Balancer.Get(...). It is called once the // RPC has completed or failed. - put func() + put func(balancer.DoneInfo) ) // TODO(zhaoq): Need a formal spec of fail-fast. callHdr := &transport.CallHdr{ @@ -252,7 +253,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli // If error is not connection error, we are sure nothing has been sent. updateRPCInfoInContext(ctx, rpcInfo{bytesSent: true, bytesReceived: false}) } - put() + put(balancer.DoneInfo{Err: err}) } if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast { continue @@ -262,14 +263,14 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli if peer, ok := peer.FromContext(stream.Context()); ok { c.peer = peer } - err = sendRequest(ctx, cc.dopts, cc.dopts.cp, &c, callHdr, stream, t, args, topts) + err = sendRequest(ctx, cc.dopts, cc.dopts.cp, c, callHdr, stream, t, args, topts) if err != nil { if put != nil { updateRPCInfoInContext(ctx, rpcInfo{ bytesSent: stream.BytesSent(), bytesReceived: stream.BytesReceived(), }) - put() + put(balancer.DoneInfo{Err: err}) } // Retry a non-failfast RPC when // i) there is a connection error; or @@ -279,14 +280,14 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli } return toRPCErr(err) } - err = recvResponse(ctx, cc.dopts, t, &c, stream, reply) + err = recvResponse(ctx, cc.dopts, t, c, stream, reply) if err != nil { if put != nil { updateRPCInfoInContext(ctx, rpcInfo{ bytesSent: stream.BytesSent(), bytesReceived: stream.BytesReceived(), }) - put() + put(balancer.DoneInfo{Err: err}) } if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast { continue @@ -302,7 +303,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli bytesSent: stream.BytesSent(), bytesReceived: stream.BytesReceived(), }) - put() + put(balancer.DoneInfo{Err: err}) } return stream.Status().Err() } diff --git a/clientconn.go b/clientconn.go index 98ec57a2..45ea87eb 100644 --- a/clientconn.go +++ b/clientconn.go @@ -22,16 +22,19 @@ import ( "errors" "math" "net" + "reflect" "strings" "sync" "time" "golang.org/x/net/context" "golang.org/x/net/trace" + "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/resolver" "google.golang.org/grpc/stats" "google.golang.org/grpc/transport" ) @@ -79,13 +82,14 @@ type dialOptions struct { cp Compressor dc Decompressor bs backoffStrategy - balancer Balancer block bool insecure bool timeout time.Duration scChan <-chan ServiceConfig copts transport.ConnectOptions callOptions []CallOption + // This is to support v1 balancer. + balancerBuilder balancer.Builder } const ( @@ -147,10 +151,14 @@ func WithDecompressor(dc Decompressor) DialOption { } } -// WithBalancer returns a DialOption which sets a load balancer. +// WithBalancer returns a DialOption which sets a load balancer with the v1 API. +// Name resolver will be ignored if this DialOption is specified. +// Deprecated: use the new balancer APIs in balancer package instead. func WithBalancer(b Balancer) DialOption { return func(o *dialOptions) { - o.balancer = b + o.balancerBuilder = &balancerWrapperBuilder{ + b: b, + } } } @@ -314,7 +322,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * cc := &ClientConn{ target: target, csMgr: &connectivityStateManager{}, - conns: make(map[Address]*addrConn), + conns: make(map[*addrConn]struct{}), } cc.csEvltr = &connectivityStateEvaluator{csMgr: cc.csMgr} cc.ctx, cc.cancel = context.WithCancel(context.Background()) @@ -383,49 +391,42 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } else { cc.authority = target } - waitC := make(chan error, 1) - go func() { - defer close(waitC) - if cc.dopts.balancer == nil && cc.sc.LB != nil { - cc.dopts.balancer = cc.sc.LB + + // TODO(bar) parse scheme and start resolver. + if cc.dopts.balancerBuilder != nil { + var credsClone credentials.TransportCredentials + if creds != nil { + credsClone = creds.Clone() } - if cc.dopts.balancer != nil { - var credsClone credentials.TransportCredentials - if creds != nil { - credsClone = creds.Clone() - } - config := BalancerConfig{ - DialCreds: credsClone, - Dialer: cc.dopts.copts.Dialer, - } - if err := cc.dopts.balancer.Start(target, config); err != nil { + buildOpts := balancer.BuildOptions{ + DialCreds: credsClone, + Dialer: cc.dopts.copts.Dialer, + } + // Build should not take long time. So it's ok to not have a goroutine for it. + // TODO(bar) init balancer after first resolver result to support service config balancer. + cc.balancer = cc.dopts.balancerBuilder.Build(&ccBalancerWrapper{cc: cc}, buildOpts) + } else { + waitC := make(chan error, 1) + go func() { + defer close(waitC) + // No balancer, or no resolver within the balancer. Connect directly. + ac, err := cc.newAddrConn([]resolver.Address{{Addr: target}}) + if err != nil { waitC <- err return } - ch := cc.dopts.balancer.Notify() - if ch != nil { - if cc.dopts.block { - doneChan := make(chan struct{}) - go cc.lbWatcher(doneChan) - <-doneChan - } else { - go cc.lbWatcher(nil) - } + if err := ac.connect(cc.dopts.block); err != nil { + waitC <- err return } - } - // No balancer, or no resolver within the balancer. Connect directly. - if err := cc.resetAddrConn([]Address{{Addr: target}}, cc.dopts.block, nil); err != nil { - waitC <- err - return - } - }() - select { - case <-ctx.Done(): - return nil, ctx.Err() - case err := <-waitC: - if err != nil { - return nil, err + }() + select { + case <-ctx.Done(): + return nil, ctx.Err() + case err := <-waitC: + if err != nil { + return nil, err + } } } if cc.dopts.scChan != nil && !scSet { @@ -443,6 +444,27 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * go cc.scWatcher() } + if cc.balancer != nil { + // Unblock balancer initialization with a fake resolver update. + // The balancer wrapper will not read the addresses, so an empty list works. + // TODO(bar) remove this after the real resolver is started. + cc.balancer.HandleResolvedAddrs([]resolver.Address{}, nil) + } + + // A blocking dial blocks until the clientConn is ready. + if cc.dopts.block { + for { + s := cc.GetState() + if s == connectivity.Ready { + break + } + if !cc.WaitForStateChange(ctx, s) { + // ctx got timeout or canceled. + return nil, ctx.Err() + } + } + } + return cc, nil } @@ -548,9 +570,15 @@ type ClientConn struct { csMgr *connectivityStateManager csEvltr *connectivityStateEvaluator // This will eventually be part of balancer. + balancer balancer.Balancer + + // TODO(bar) move the mutex and picker into a struct that does blocking pick(). + pmu sync.Mutex + picker balancer.Picker + mu sync.RWMutex sc ServiceConfig - conns map[Address]*addrConn + conns map[*addrConn]struct{} // Keepalive parameter can be updated if a GoAway is received. mkp keepalive.ClientParameters } @@ -577,84 +605,6 @@ func (cc *ClientConn) GetState() connectivity.State { return cc.csMgr.getState() } -// lbWatcher watches the Notify channel of the balancer in cc and manages -// connections accordingly. If doneChan is not nil, it is closed after the -// first successfull connection is made. -func (cc *ClientConn) lbWatcher(doneChan chan struct{}) { - defer func() { - // In case channel from cc.dopts.balancer.Notify() gets closed before a - // successful connection gets established, don't forget to notify the - // caller. - if doneChan != nil { - close(doneChan) - } - }() - - _, isPickFirst := cc.dopts.balancer.(*pickFirst) - for addrs := range cc.dopts.balancer.Notify() { - if isPickFirst { - if len(addrs) == 0 { - // No address can be connected, should teardown current addrconn if exists - cc.mu.Lock() - if len(cc.conns) != 0 { - cc.pickFirstAddrConnTearDown() - } - cc.mu.Unlock() - } else { - cc.resetAddrConn(addrs, true, nil) - if doneChan != nil { - close(doneChan) - doneChan = nil - } - } - } else { - // Not pickFirst, create a new addrConn for each address. - var ( - add []Address // Addresses need to setup connections. - del []*addrConn // Connections need to tear down. - ) - cc.mu.Lock() - for _, a := range addrs { - if _, ok := cc.conns[a]; !ok { - add = append(add, a) - } - } - for k, c := range cc.conns { - var keep bool - for _, a := range addrs { - if k == a { - keep = true - break - } - } - if !keep { - del = append(del, c) - delete(cc.conns, k) - } - } - cc.mu.Unlock() - for _, a := range add { - var err error - if doneChan != nil { - err = cc.resetAddrConn([]Address{a}, true, nil) - if err == nil { - close(doneChan) - doneChan = nil - } - } else { - err = cc.resetAddrConn([]Address{a}, false, nil) - } - if err != nil { - grpclog.Warningf("Error creating connection to %v. Err: %v", a, err) - } - } - for _, c := range del { - c.tearDown(errConnDrain) - } - } - } -} - func (cc *ClientConn) scWatcher() { for { select { @@ -673,72 +623,8 @@ func (cc *ClientConn) scWatcher() { } } -// pickFirstUpdateAddresses checks whether current address in the updating list, Update the list if true. -// It is only used when the balancer is pick first. -func (cc *ClientConn) pickFirstUpdateAddresses(addrs []Address) bool { - if len(cc.conns) == 0 { - // No addrconn. Should go resetting addrconn. - return false - } - var currentAc *addrConn - for _, currentAc = range cc.conns { - break - } - var addrInNewSlice bool - for _, addr := range addrs { - if strings.Compare(addr.Addr, currentAc.curAddr.Addr) == 0 { - addrInNewSlice = true - break - } - } - if addrInNewSlice { - cc.conns = make(map[Address]*addrConn) - for _, addr := range addrs { - cc.conns[addr] = currentAc - } - currentAc.addrs = addrs - return true - } - return false -} - -// pickFirstAddrConnTearDown() should be called after lock. -func (cc *ClientConn) pickFirstAddrConnTearDown() { - if len(cc.conns) == 0 { - return - } - var currentAc *addrConn - for _, currentAc = range cc.conns { - break - } - for k := range cc.conns { - delete(cc.conns, k) - } - currentAc.tearDown(errConnDrain) -} - -// resetAddrConn creates an addrConn for addr and adds it to cc.conns. -// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason. -// If tearDownErr is nil, errConnDrain will be used instead. -// -// We should never need to replace an addrConn with a new one. This function is only used -// as newAddrConn to create new addrConn. -// TODO rename this function and clean up the code. -func (cc *ClientConn) resetAddrConn(addrs []Address, block bool, tearDownErr error) error { - // if current transport in addrs, just change lists to update order and new addresses - // not work for roundrobin - cc.mu.Lock() - if _, isPickFirst := cc.dopts.balancer.(*pickFirst); isPickFirst { - // If Current address in use in the updating list, just update the list. - // Otherwise, teardown current addrconn and create a new one. - if cc.pickFirstUpdateAddresses(addrs) { - cc.mu.Unlock() - return nil - } - cc.pickFirstAddrConnTearDown() - } - cc.mu.Unlock() - +// newAddrConn creates an addrConn for addrs and adds it to cc.conns. +func (cc *ClientConn) newAddrConn(addrs []resolver.Address) (*addrConn, error) { ac := &addrConn{ cc: cc, addrs: addrs, @@ -746,6 +632,42 @@ func (cc *ClientConn) resetAddrConn(addrs []Address, block bool, tearDownErr err } ac.ctx, ac.cancel = context.WithCancel(cc.ctx) ac.csEvltr = cc.csEvltr + // Track ac in cc. This needs to be done before any getTransport(...) is called. + cc.mu.Lock() + if cc.conns == nil { + cc.mu.Unlock() + return nil, ErrClientConnClosing + } + cc.conns[ac] = struct{}{} + cc.mu.Unlock() + return ac, nil +} + +// removeAddrConn removes the addrConn in the subConn from clientConn. +// It also tears down the ac with the given error. +func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) { + cc.mu.Lock() + if cc.conns == nil { + cc.mu.Unlock() + return + } + delete(cc.conns, ac) + cc.mu.Unlock() + ac.tearDown(err) +} + +// connect starts to creating transport and also starts the transport monitor +// goroutine for this ac. +// TODO(bar) Move this to the addrConn section. +// This was part of resetAddrConn, keep it here to make the diff look clean. +func (ac *addrConn) connect(block bool) error { + ac.mu.Lock() + if ac.state == connectivity.Shutdown { + ac.mu.Unlock() + return errConnClosing + } + ac.mu.Unlock() + if EnableTracing { ac.events = trace.NewEventLog("grpc.ClientConn", ac.addrs[0].Addr) } @@ -763,37 +685,10 @@ func (cc *ClientConn) resetAddrConn(addrs []Address, block bool, tearDownErr err } } } - // Track ac in cc. This needs to be done before any getTransport(...) is called. - cc.mu.Lock() - if cc.conns == nil { - cc.mu.Unlock() - return ErrClientConnClosing - } - stale := cc.conns[ac.addrs[0]] - for _, a := range ac.addrs { - cc.conns[a] = ac - } - cc.mu.Unlock() - if stale != nil { - // There is an addrConn alive on ac.addr already. This could be due to - // a buggy Balancer that reports duplicated Addresses. - if tearDownErr == nil { - // tearDownErr is nil if resetAddrConn is called by - // 1) Dial - // 2) lbWatcher - // In both cases, the stale ac should drain, not close. - stale.tearDown(errConnDrain) - } else { - stale.tearDown(tearDownErr) - } - } + if block { if err := ac.resetTransport(false); err != nil { if err != errConnClosing { - // Tear down ac and delete it from cc.conns. - cc.mu.Lock() - delete(cc.conns, ac.addrs[0]) - cc.mu.Unlock() ac.tearDown(err) } if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() { @@ -820,6 +715,36 @@ func (cc *ClientConn) resetAddrConn(addrs []Address, block bool, tearDownErr err return nil } +// tryUpdateAddrs tries to update ac.addrs with the new addresses list. +// +// It checks whether current connected address of ac is in the new addrs list. +// - If true, it updates ac.addrs and returns true. The ac will keep using +// the existing connection. +// - If false, it does nothing and returns false. +func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool { + ac.mu.Lock() + defer ac.mu.Unlock() + grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs) + if ac.state == connectivity.Shutdown { + ac.addrs = addrs + return true + } + + var curAddrFound bool + for _, a := range addrs { + if reflect.DeepEqual(ac.curAddr, a) { + curAddrFound = true + break + } + } + grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound) + if curAddrFound { + ac.addrs = addrs + } + + return curAddrFound +} + // GetMethodConfig gets the method config of the input method. // If there's an exact match for input method (i.e. /service/method), we return // the corresponding MethodConfig. @@ -839,54 +764,52 @@ func (cc *ClientConn) GetMethodConfig(method string) MethodConfig { return m } -func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) { +func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(balancer.DoneInfo), error) { var ( ac *addrConn - ok bool - put func() + put func(balancer.DoneInfo) ) - if cc.dopts.balancer == nil { + if cc.balancer == nil { // If balancer is nil, there should be only one addrConn available. cc.mu.RLock() if cc.conns == nil { cc.mu.RUnlock() return nil, nil, toRPCErr(ErrClientConnClosing) } - for _, ac = range cc.conns { + for ac = range cc.conns { // Break after the first iteration to get the first addrConn. - ok = true break } cc.mu.RUnlock() } else { + cc.pmu.Lock() + // TODO(bar) call pick on struct blockPicker instead of the real picker. + p := cc.picker + cc.pmu.Unlock() + var ( - addr Address - err error + err error + sc balancer.SubConn ) - addr, put, err = cc.dopts.balancer.Get(ctx, opts) + sc, put, err = p.Pick(ctx, balancer.PickOptions{}) if err != nil { return nil, nil, toRPCErr(err) } - cc.mu.RLock() - if cc.conns == nil { - cc.mu.RUnlock() - return nil, nil, toRPCErr(ErrClientConnClosing) - } - ac, ok = cc.conns[addr] - cc.mu.RUnlock() - } - if !ok { - if put != nil { + if acbw, ok := sc.(*acBalancerWrapper); ok { + ac = acbw.getAddrConn() + } else if put != nil { updateRPCInfoInContext(ctx, rpcInfo{bytesSent: false, bytesReceived: false}) - put() + put(balancer.DoneInfo{Err: errors.New("SubConn returned by pick cannot be recognized")}) } + } + if ac == nil { return nil, nil, errConnClosing } - t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait) + t, err := ac.wait(ctx, cc.balancer != nil, !opts.BlockingWait) if err != nil { if put != nil { updateRPCInfoInContext(ctx, rpcInfo{bytesSent: false, bytesReceived: false}) - put() + put(balancer.DoneInfo{Err: err}) } return nil, nil, err } @@ -906,10 +829,10 @@ func (cc *ClientConn) Close() error { cc.conns = nil cc.csMgr.updateState(connectivity.Shutdown) cc.mu.Unlock() - if cc.dopts.balancer != nil { - cc.dopts.balancer.Close() + if cc.balancer != nil { + cc.balancer.Close() } - for _, ac := range conns { + for ac := range conns { ac.tearDown(ErrClientConnClosing) } return nil @@ -921,16 +844,16 @@ type addrConn struct { cancel context.CancelFunc cc *ClientConn - curAddr Address - addrs []Address + curAddr resolver.Address + addrs []resolver.Address dopts dialOptions events trace.EventLog + acbw balancer.SubConn csEvltr *connectivityStateEvaluator mu sync.Mutex state connectivity.State - down func(error) // the handler called when a connection is down. // ready is closed and becomes nil when a new transport is up or failed // due to timeout. ready chan struct{} @@ -974,20 +897,24 @@ func (ac *addrConn) errorf(format string, a ...interface{}) { // For the old transport: // - if drain is true, it will be gracefully closed. // - otherwise, it will be closed. +// TODO(bar) make sure all state transitions are valid. func (ac *addrConn) resetTransport(drain bool) error { ac.mu.Lock() if ac.state == connectivity.Shutdown { ac.mu.Unlock() return errConnClosing } - ac.printf("connecting") - if ac.down != nil { - ac.down(downErrorf(false, true, "%v", errNetworkIO)) - ac.down = nil - } oldState := ac.state ac.state = connectivity.Connecting ac.csEvltr.recordTransition(oldState, ac.state) + if ac.cc.balancer != nil { + ac.cc.balancer.HandleSubConnStateChange(ac.acbw, ac.state) + } + // TODO(bar) don't call balancer functions to handle subconn state change if ac.acbw is nil. + if ac.ready != nil { + close(ac.ready) + ac.ready = nil + } t := ac.transport ac.transport = nil ac.mu.Unlock() @@ -998,15 +925,27 @@ func (ac *addrConn) resetTransport(drain bool) error { ac.dopts.copts.KeepaliveParams = ac.cc.mkp ac.cc.mu.RUnlock() for retries := 0; ; retries++ { - ac.mu.Lock() sleepTime := ac.dopts.bs.backoff(retries) timeout := minConnectTimeout + ac.mu.Lock() if timeout < time.Duration(int(sleepTime)/len(ac.addrs)) { timeout = time.Duration(int(sleepTime) / len(ac.addrs)) } connectTime := time.Now() + if ac.state == connectivity.Shutdown { + ac.mu.Unlock() + return errConnClosing + } + ac.printf("connecting") + oldState := ac.state + ac.state = connectivity.Connecting + ac.csEvltr.recordTransition(oldState, ac.state) + // TODO(bar) remove condition once we always have a balancer. + if ac.cc.balancer != nil { + ac.cc.balancer.HandleSubConnStateChange(ac.acbw, ac.state) + } // copy ac.addrs in case of race - addrsIter := make([]Address, len(ac.addrs)) + addrsIter := make([]resolver.Address, len(ac.addrs)) copy(addrsIter, ac.addrs) ac.mu.Unlock() for _, addr := range addrsIter { @@ -1038,14 +977,6 @@ func (ac *addrConn) resetTransport(drain bool) error { ac.mu.Unlock() return errConnClosing } - ac.errorf("transient failure: %v", err) - oldState = ac.state - ac.state = connectivity.TransientFailure - ac.csEvltr.recordTransition(oldState, ac.state) - if ac.ready != nil { - close(ac.ready) - ac.ready = nil - } ac.mu.Unlock() continue } @@ -1060,18 +991,30 @@ func (ac *addrConn) resetTransport(drain bool) error { oldState = ac.state ac.state = connectivity.Ready ac.csEvltr.recordTransition(oldState, ac.state) + if ac.cc.balancer != nil { + ac.cc.balancer.HandleSubConnStateChange(ac.acbw, ac.state) + } ac.transport = newTransport + ac.curAddr = addr if ac.ready != nil { close(ac.ready) ac.ready = nil } - if ac.cc.dopts.balancer != nil { - ac.down = ac.cc.dopts.balancer.Up(addr) - } - ac.curAddr = addr ac.mu.Unlock() return nil } + ac.mu.Lock() + oldState = ac.state + ac.state = connectivity.TransientFailure + ac.csEvltr.recordTransition(oldState, ac.state) + if ac.cc.balancer != nil { + ac.cc.balancer.HandleSubConnStateChange(ac.acbw, ac.state) + } + if ac.ready != nil { + close(ac.ready) + ac.ready = nil + } + ac.mu.Unlock() timer := time.NewTimer(sleepTime - time.Since(connectTime)) select { case <-timer.C: @@ -1128,26 +1071,8 @@ func (ac *addrConn) transportMonitor() { return case <-t.GoAway(): ac.adjustParams(t.GetGoAwayReason()) - if err := ac.resetTransport(false); err != nil { - grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err) - if err != errConnClosing { - // Keep this ac in cc.conns, to get the reason it's torn down. - ac.tearDown(err) - } - return - } default: } - ac.mu.Lock() - if ac.state == connectivity.Shutdown { - // ac has been shutdown. - ac.mu.Unlock() - return - } - oldState := ac.state - ac.state = connectivity.TransientFailure - ac.csEvltr.recordTransition(oldState, ac.state) - ac.mu.Unlock() if err := ac.resetTransport(false); err != nil { grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err) ac.mu.Lock() @@ -1213,12 +1138,8 @@ func (ac *addrConn) tearDown(err error) { ac.cancel() ac.mu.Lock() - ac.curAddr = Address{} + ac.curAddr = resolver.Address{} defer ac.mu.Unlock() - if ac.down != nil { - ac.down(downErrorf(false, false, "%v", err)) - ac.down = nil - } if err == errConnDrain && ac.transport != nil { // GracefulClose(...) may be executed multiple times when // i) receiving multiple GoAway frames from the server; or @@ -1233,6 +1154,9 @@ func (ac *addrConn) tearDown(err error) { ac.state = connectivity.Shutdown ac.tearDownErr = err ac.csEvltr.recordTransition(oldState, ac.state) + if ac.cc.balancer != nil { + ac.cc.balancer.HandleSubConnStateChange(ac.acbw, ac.state) + } if ac.events != nil { ac.events.Finish() ac.events = nil @@ -1246,3 +1170,9 @@ func (ac *addrConn) tearDown(err error) { } return } + +func (ac *addrConn) getState() connectivity.State { + ac.mu.Lock() + defer ac.mu.Unlock() + return ac.state +} diff --git a/clientconn_test.go b/clientconn_test.go index bdaf3df1..3491eb13 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -376,19 +376,3 @@ func TestClientUpdatesParamsAfterGoAway(t *testing.T) { t.Fatalf("cc.dopts.copts.Keepalive.Time = %v , want 100ms", v) } } - -func TestClientLBWatcherWithClosedBalancer(t *testing.T) { - b := newBlockingBalancer() - cc := &ClientConn{dopts: dialOptions{balancer: b}} - - doneChan := make(chan struct{}) - go cc.lbWatcher(doneChan) - // Balancer closes before any successful connections. - b.Close() - - select { - case <-doneChan: - case <-time.After(100 * time.Millisecond): - t.Fatal("lbWatcher with closed balancer didn't close doneChan after 100ms") - } -} diff --git a/grpclb.go b/grpclb.go index afe49828..c5b1907a 100644 --- a/grpclb.go +++ b/grpclb.go @@ -73,7 +73,7 @@ func (x *balanceLoadClientStream) Recv() (*lbmpb.LoadBalanceResponse, error) { // NewGRPCLBBalancer creates a grpclb load balancer. func NewGRPCLBBalancer(r naming.Resolver) Balancer { - return &balancer{ + return &grpclbBalancer{ r: r, } } @@ -96,7 +96,7 @@ type grpclbAddrInfo struct { dropForLoadBalancing bool } -type balancer struct { +type grpclbBalancer struct { r naming.Resolver target string mu sync.Mutex @@ -113,7 +113,7 @@ type balancer struct { clientStats lbmpb.ClientStats } -func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerInfo) error { +func (b *grpclbBalancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerInfo) error { updates, err := w.Next() if err != nil { grpclog.Warningf("grpclb: failed to get next addr update from watcher: %v", err) @@ -187,7 +187,7 @@ func convertDuration(d *lbmpb.Duration) time.Duration { return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)*time.Nanosecond } -func (b *balancer) processServerList(l *lbmpb.ServerList, seq int) { +func (b *grpclbBalancer) processServerList(l *lbmpb.ServerList, seq int) { if l == nil { return } @@ -230,7 +230,7 @@ func (b *balancer) processServerList(l *lbmpb.ServerList, seq int) { return } -func (b *balancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration, done <-chan struct{}) { +func (b *grpclbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration, done <-chan struct{}) { ticker := time.NewTicker(interval) defer ticker.Stop() for { @@ -259,10 +259,13 @@ func (b *balancer) sendLoadReport(s *balanceLoadClientStream, interval time.Dura } } -func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry bool) { - ctx, cancel := context.WithCancel(context.Background()) +func (b *grpclbBalancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry bool) { + // This RPC should be a failfast RPC. It should only fail if the server is down. + // We cannot make it failfast because the first few RPCs on a clientConn always fail. + // TODO(bar) make this failfast when the failfast bahavior in clientConn is fixed. + ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - stream, err := lbc.BalanceLoad(ctx) + stream, err := lbc.BalanceLoad(ctx, FailFast(false)) if err != nil { grpclog.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err) return @@ -332,7 +335,7 @@ func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry b return true } -func (b *balancer) Start(target string, config BalancerConfig) error { +func (b *grpclbBalancer) Start(target string, config BalancerConfig) error { b.rand = rand.New(rand.NewSource(time.Now().Unix())) // TODO: Fall back to the basic direct connection if there is no name resolver. if b.r == nil { @@ -488,7 +491,7 @@ func (b *balancer) Start(target string, config BalancerConfig) error { return nil } -func (b *balancer) down(addr Address, err error) { +func (b *grpclbBalancer) down(addr Address, err error) { b.mu.Lock() defer b.mu.Unlock() for _, a := range b.addrs { @@ -499,7 +502,7 @@ func (b *balancer) down(addr Address, err error) { } } -func (b *balancer) Up(addr Address) func(error) { +func (b *grpclbBalancer) Up(addr Address) func(error) { b.mu.Lock() defer b.mu.Unlock() if b.done { @@ -527,7 +530,7 @@ func (b *balancer) Up(addr Address) func(error) { } } -func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) { +func (b *grpclbBalancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) { var ch chan struct{} b.mu.Lock() if b.done { @@ -597,17 +600,10 @@ func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Addre } } if !opts.BlockingWait { - if len(b.addrs) == 0 { - b.clientStats.NumCallsFinished++ - b.clientStats.NumCallsFinishedWithClientFailedToSend++ - b.mu.Unlock() - err = Errorf(codes.Unavailable, "there is no address available") - return - } - // Returns the next addr on b.addrs for a failfast RPC. - addr = b.addrs[b.next].addr - b.next++ + b.clientStats.NumCallsFinished++ + b.clientStats.NumCallsFinishedWithClientFailedToSend++ b.mu.Unlock() + err = Errorf(codes.Unavailable, "there is no address available") return } // Wait on b.waitCh for non-failfast RPCs. @@ -684,11 +680,11 @@ func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Addre } } -func (b *balancer) Notify() <-chan []Address { +func (b *grpclbBalancer) Notify() <-chan []Address { return b.addrCh } -func (b *balancer) Close() error { +func (b *grpclbBalancer) Close() error { b.mu.Lock() defer b.mu.Unlock() if b.done { diff --git a/grpclb/grpclb_test.go b/grpclb/grpclb_test.go index fd861af1..5ad468df 100644 --- a/grpclb/grpclb_test.go +++ b/grpclb/grpclb_test.go @@ -42,6 +42,8 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/naming" testpb "google.golang.org/grpc/test/grpc_testing" + + _ "google.golang.org/grpc/grpclog/glogger" ) var ( @@ -404,7 +406,7 @@ func TestGRPCLB(t *testing.T) { t.Fatalf("Failed to dial to the backend %v", err) } testC := testpb.NewTestServiceClient(cc) - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { + if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } cc.Close() @@ -442,6 +444,15 @@ func TestDropRequest(t *testing.T) { t.Fatalf("Failed to dial to the backend %v", err) } testC := testpb.NewTestServiceClient(cc) + // Wait until the first connection is up. + // The first one has Drop set to true, error should contain "drop requests". + for { + if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { + if strings.Contains(err.Error(), "drops requests") { + break + } + } + } // The 1st, non-fail-fast RPC should succeed. This ensures both server // connections are made, because the first one has DropForLoadBalancing set to true. if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { diff --git a/reflection/serverreflection_test.go b/reflection/serverreflection_test.go index 2c31a4b3..90858908 100644 --- a/reflection/serverreflection_test.go +++ b/reflection/serverreflection_test.go @@ -203,7 +203,7 @@ func TestReflectionEnd2end(t *testing.T) { defer conn.Close() c := rpb.NewServerReflectionClient(conn) - stream, err := c.ServerReflectionInfo(context.Background()) + stream, err := c.ServerReflectionInfo(context.Background(), grpc.FailFast(false)) if err != nil { t.Fatalf("cannot get ServerReflectionInfo: %v", err) } diff --git a/resolver/resolver.go b/resolver/resolver.go new file mode 100644 index 00000000..918a6c7d --- /dev/null +++ b/resolver/resolver.go @@ -0,0 +1,128 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package resolver defines APIs for name resolution in gRPC. +// All APIs in this package are experimental. +package resolver + +var ( + // m is a map from scheme to resolver builder. + m = make(map[string]Builder) + // defaultScheme is the default scheme to use. + defaultScheme string +) + +// TODO(bar) install dns resolver in init(){}. + +// Register registers the resolver builder to the resolver map. +// b.Scheme will be used as the scheme registered with this builder. +func Register(b Builder) { + m[b.Scheme()] = b +} + +// Get returns the resolver builder registered with the given scheme. +// If no builder is register with the scheme, the default scheme will +// be used. +// If the default scheme is not modified, "dns" will be the default +// scheme, and the preinstalled dns resolver will be used. +// If the default scheme is modified, and a resolver is registered with +// the scheme, that resolver will be returned. +// If the default scheme is modified, and no resolver is registered with +// the scheme, nil will be returned. +func Get(scheme string) Builder { + if b, ok := m[scheme]; ok { + return b + } + if b, ok := m[defaultScheme]; ok { + return b + } + return nil +} + +// SetDefaultScheme sets the default scheme that will be used. +// The default default scheme is "dns". +func SetDefaultScheme(scheme string) { + defaultScheme = scheme +} + +// AddressType indicates the address type returned by name resolution. +type AddressType uint8 + +const ( + // Backend indicates the address is for a backend server. + Backend AddressType = iota + // GRPCLB indicates the address is for a grpclb load balancer. + GRPCLB +) + +// Address represents a server the client connects to. +// This is the EXPERIMENTAL API and may be changed or extended in the future. +type Address struct { + // Addr is the server address on which a connection will be established. + Addr string + // Type is the type of this address. + Type AddressType + // ServerName is the name of this address. + // It's the name of the grpc load balancer, which will be used for authentication. + ServerName string + // Metadata is the information associated with Addr, which may be used + // to make load balancing decision. + Metadata interface{} +} + +// BuildOption includes additional information for the builder to create +// the resolver. +type BuildOption struct { +} + +// ClientConn contains the callbacks for resolver to notify any updates +// to the gRPC ClientConn. +type ClientConn interface { + // NewAddress is called by resolver to notify ClientConn a new list + // of resolved addresses. + // The address list should be the complete list of resolved addresses. + NewAddress(addresses []Address) + // NewServiceConfig is called by resolver to notify ClientConn a new + // service config. The service config should be provided as a json string. + NewServiceConfig(serviceConfig string) +} + +// Builder creates a resolver that will be used to watch name resolution updates. +type Builder interface { + // Build creates a new resolver for the given target. + // + // gRPC dial calls Build synchronously, and fails if the returned error is + // not nil. + Build(target string, cc ClientConn, opts BuildOption) (Resolver, error) + // Scheme returns the scheme supported by this resolver. + // Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md. + Scheme() string +} + +// ResolveNowOption includes additional information for ResolveNow. +type ResolveNowOption struct{} + +// Resolver watches for the updates on the specified target. +// Updates include address updates and service config updates. +type Resolver interface { + // ResolveNow will be called by gRPC to try to resolve the target name again. + // It's just a hint, resolver can ignore this if it's not necessary. + ResolveNow(ResolveNowOption) + // Close closes the resolver. + Close() +} diff --git a/rpc_util.go b/rpc_util.go index 09bf2d64..423d7d3c 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -132,7 +132,9 @@ type callInfo struct { creds credentials.PerRPCCredentials } -var defaultCallInfo = callInfo{failFast: true} +func defaultCallInfo() *callInfo { + return &callInfo{failFast: true} +} // CallOption configures a Call before it starts or extracts information from // a Call after it completes. @@ -384,14 +386,15 @@ func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{ } type rpcInfo struct { + failfast bool bytesSent bool bytesReceived bool } type rpcInfoContextKey struct{} -func newContextWithRPCInfo(ctx context.Context) context.Context { - return context.WithValue(ctx, rpcInfoContextKey{}, &rpcInfo{}) +func newContextWithRPCInfo(ctx context.Context, failfast bool) context.Context { + return context.WithValue(ctx, rpcInfoContextKey{}, &rpcInfo{failfast: failfast}) } func rpcInfoFromContext(ctx context.Context) (s *rpcInfo, ok bool) { @@ -401,7 +404,8 @@ func rpcInfoFromContext(ctx context.Context) (s *rpcInfo, ok bool) { func updateRPCInfoInContext(ctx context.Context, s rpcInfo) { if ss, ok := rpcInfoFromContext(ctx); ok { - *ss = s + ss.bytesReceived = s.bytesReceived + ss.bytesSent = s.bytesSent } return } diff --git a/stream.go b/stream.go index 6f29a975..ce35b556 100644 --- a/stream.go +++ b/stream.go @@ -27,6 +27,7 @@ import ( "golang.org/x/net/context" "golang.org/x/net/trace" + "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" @@ -106,10 +107,10 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth var ( t transport.ClientTransport s *transport.Stream - put func() + put func(balancer.DoneInfo) cancel context.CancelFunc ) - c := defaultCallInfo + c := defaultCallInfo() mc := cc.GetMethodConfig(method) if mc.WaitForReady != nil { c.failFast = !*mc.WaitForReady @@ -126,7 +127,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth opts = append(cc.dopts.callOptions, opts...) for _, o := range opts { - if err := o.before(&c); err != nil { + if err := o.before(c); err != nil { return nil, toRPCErr(err) } } @@ -167,7 +168,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth } }() } - ctx = newContextWithRPCInfo(ctx) + ctx = newContextWithRPCInfo(ctx, c.failFast) sh := cc.dopts.copts.StatsHandler if sh != nil { ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast}) @@ -217,7 +218,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth updateRPCInfoInContext(ctx, rpcInfo{bytesSent: true, bytesReceived: false}) } if put != nil { - put() + put(balancer.DoneInfo{Err: err}) put = nil } if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast { @@ -283,7 +284,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth // clientStream implements a client side Stream. type clientStream struct { opts []CallOption - c callInfo + c *callInfo t transport.ClientTransport s *transport.Stream p *parser @@ -297,7 +298,7 @@ type clientStream struct { tracing bool // set to EnableTracing when the clientStream is created. mu sync.Mutex - put func() + put func(balancer.DoneInfo) closed bool finished bool // trInfo.tr is set when the clientStream is created (if EnableTracing is true), @@ -494,14 +495,14 @@ func (cs *clientStream) finish(err error) { } }() for _, o := range cs.opts { - o.after(&cs.c) + o.after(cs.c) } if cs.put != nil { updateRPCInfoInContext(cs.s.Context(), rpcInfo{ bytesSent: cs.s.BytesSent(), bytesReceived: cs.s.BytesReceived(), }) - cs.put() + cs.put(balancer.DoneInfo{Err: err}) cs.put = nil } if cs.statsHandler != nil { diff --git a/test/end2end_test.go b/test/end2end_test.go index 93763f5e..606c602c 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -451,6 +451,10 @@ type test struct { clientInitialConnWindowSize int32 perRPCCreds credentials.PerRPCCredentials + // All test dialing is blocking by default. Set this to true if dial + // should be non-blocking. + nonBlockingDial bool + // srv and srvAddr are set once startServer is called. srv *grpc.Server srvAddr string @@ -643,6 +647,10 @@ func (te *test) clientConn() *grpc.ClientConn { if te.customCodec != nil { opts = append(opts, grpc.WithCodec(te.customCodec)) } + if !te.nonBlockingDial && te.srvAddr != "" { + // Only do a blocking dial if server is up. + opts = append(opts, grpc.WithBlock()) + } var err error te.cc, err = grpc.Dial(te.srvAddr, opts...) if err != nil { @@ -4040,11 +4048,20 @@ func TestFailFastRPCErrorOnBadCertificates(t *testing.T) { te.startServer(&testServer{security: te.e.security}) defer te.tearDown() + te.nonBlockingDial = true // Connection will never be successful because server is not set up correctly. cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); !strings.Contains(err.Error(), clientAlwaysFailCredErrorMsg) { - te.t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want err.Error() contains %q", err, clientAlwaysFailCredErrorMsg) + var err error + for i := 0; i < 1000; i++ { + // This loop runs for at most 1 second. + // The first several RPCs will fail with Unavailable because the connection hasn't started. + // When the first connection failed with creds error, the next RPC should also fail with the expected error. + if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); strings.Contains(err.Error(), clientAlwaysFailCredErrorMsg) { + return + } + time.Sleep(time.Millisecond) } + te.t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want err.Error() contains %q", err, clientAlwaysFailCredErrorMsg) } func TestFailFastRPCWithNoBalancerErrorOnBadCertificates(t *testing.T) { @@ -4052,11 +4069,20 @@ func TestFailFastRPCWithNoBalancerErrorOnBadCertificates(t *testing.T) { te.startServer(&testServer{security: te.e.security}) defer te.tearDown() + te.nonBlockingDial = true cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); !strings.Contains(err.Error(), clientAlwaysFailCredErrorMsg) { - te.t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want err.Error() contains %q", err, clientAlwaysFailCredErrorMsg) + var err error + for i := 0; i < 1000; i++ { + // This loop runs for at most 1 second. + // The first several RPCs will fail with Unavailable because the connection hasn't started. + // When the first connection failed with creds error, the next RPC should also fail with the expected error. + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); strings.Contains(err.Error(), clientAlwaysFailCredErrorMsg) { + return + } + time.Sleep(time.Millisecond) } + te.t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want err.Error() contains %q", err, clientAlwaysFailCredErrorMsg) } func TestNonFailFastRPCWithNoBalancerErrorOnBadCertificates(t *testing.T) { @@ -4064,6 +4090,7 @@ func TestNonFailFastRPCWithNoBalancerErrorOnBadCertificates(t *testing.T) { te.startServer(&testServer{security: te.e.security}) defer te.tearDown() + te.nonBlockingDial = true cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); !strings.Contains(err.Error(), clientAlwaysFailCredErrorMsg) {