705 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			705 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
|  *
 | |
|  * Copyright 2016 gRPC authors.
 | |
|  *
 | |
|  * Licensed under the Apache License, Version 2.0 (the "License");
 | |
|  * you may not use this file except in compliance with the License.
 | |
|  * You may obtain a copy of the License at
 | |
|  *
 | |
|  *     http://www.apache.org/licenses/LICENSE-2.0
 | |
|  *
 | |
|  * Unless required by applicable law or agreed to in writing, software
 | |
|  * distributed under the License is distributed on an "AS IS" BASIS,
 | |
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
|  * See the License for the specific language governing permissions and
 | |
|  * limitations under the License.
 | |
|  *
 | |
|  */
 | |
| 
 | |
| package grpc
 | |
| 
 | |
| import (
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"math/rand"
 | |
| 	"net"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"golang.org/x/net/context"
 | |
| 	"google.golang.org/grpc/codes"
 | |
| 	lbmpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages"
 | |
| 	"google.golang.org/grpc/grpclog"
 | |
| 	"google.golang.org/grpc/metadata"
 | |
| 	"google.golang.org/grpc/naming"
 | |
| )
 | |
| 
 | |
| // Client API for LoadBalancer service.
 | |
| // Mostly copied from generated pb.go file.
 | |
| // To avoid circular dependency.
 | |
| type loadBalancerClient struct {
 | |
| 	cc *ClientConn
 | |
| }
 | |
| 
 | |
| func (c *loadBalancerClient) BalanceLoad(ctx context.Context, opts ...CallOption) (*balanceLoadClientStream, error) {
 | |
| 	desc := &StreamDesc{
 | |
| 		StreamName:    "BalanceLoad",
 | |
| 		ServerStreams: true,
 | |
| 		ClientStreams: true,
 | |
| 	}
 | |
| 	stream, err := NewClientStream(ctx, desc, c.cc, "/grpc.lb.v1.LoadBalancer/BalanceLoad", opts...)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	x := &balanceLoadClientStream{stream}
 | |
| 	return x, nil
 | |
| }
 | |
| 
 | |
| type balanceLoadClientStream struct {
 | |
| 	ClientStream
 | |
| }
 | |
| 
 | |
| func (x *balanceLoadClientStream) Send(m *lbmpb.LoadBalanceRequest) error {
 | |
| 	return x.ClientStream.SendMsg(m)
 | |
| }
 | |
| 
 | |
| func (x *balanceLoadClientStream) Recv() (*lbmpb.LoadBalanceResponse, error) {
 | |
| 	m := new(lbmpb.LoadBalanceResponse)
 | |
| 	if err := x.ClientStream.RecvMsg(m); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return m, nil
 | |
| }
 | |
| 
 | |
| // NewGRPCLBBalancer creates a grpclb load balancer.
 | |
| func NewGRPCLBBalancer(r naming.Resolver) Balancer {
 | |
| 	return &grpclbBalancer{
 | |
| 		r: r,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| type remoteBalancerInfo struct {
 | |
| 	addr string
 | |
| 	// the server name used for authentication with the remote LB server.
 | |
| 	name string
 | |
| }
 | |
| 
 | |
| // grpclbAddrInfo consists of the information of a backend server.
 | |
| type grpclbAddrInfo struct {
 | |
| 	addr      Address
 | |
| 	connected bool
 | |
| 	// dropForRateLimiting indicates whether this particular request should be
 | |
| 	// dropped by the client for rate limiting.
 | |
| 	dropForRateLimiting bool
 | |
| 	// dropForLoadBalancing indicates whether this particular request should be
 | |
| 	// dropped by the client for load balancing.
 | |
| 	dropForLoadBalancing bool
 | |
| }
 | |
| 
 | |
| type grpclbBalancer struct {
 | |
| 	r      naming.Resolver
 | |
| 	target string
 | |
| 	mu     sync.Mutex
 | |
| 	seq    int // a sequence number to make sure addrCh does not get stale addresses.
 | |
| 	w      naming.Watcher
 | |
| 	addrCh chan []Address
 | |
| 	rbs    []remoteBalancerInfo
 | |
| 	addrs  []*grpclbAddrInfo
 | |
| 	next   int
 | |
| 	waitCh chan struct{}
 | |
| 	done   bool
 | |
| 	rand   *rand.Rand
 | |
| 
 | |
| 	clientStats lbmpb.ClientStats
 | |
| }
 | |
| 
 | |
| func (b *grpclbBalancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerInfo) error {
 | |
| 	updates, err := w.Next()
 | |
| 	if err != nil {
 | |
| 		grpclog.Warningf("grpclb: failed to get next addr update from watcher: %v", err)
 | |
| 		return err
 | |
| 	}
 | |
| 	b.mu.Lock()
 | |
| 	defer b.mu.Unlock()
 | |
| 	if b.done {
 | |
| 		return ErrClientConnClosing
 | |
| 	}
 | |
| 	for _, update := range updates {
 | |
| 		switch update.Op {
 | |
| 		case naming.Add:
 | |
| 			var exist bool
 | |
| 			for _, v := range b.rbs {
 | |
| 				// TODO: Is the same addr with different server name a different balancer?
 | |
| 				if update.Addr == v.addr {
 | |
| 					exist = true
 | |
| 					break
 | |
| 				}
 | |
| 			}
 | |
| 			if exist {
 | |
| 				continue
 | |
| 			}
 | |
| 			md, ok := update.Metadata.(*naming.AddrMetadataGRPCLB)
 | |
| 			if !ok {
 | |
| 				// TODO: Revisit the handling here and may introduce some fallback mechanism.
 | |
| 				grpclog.Errorf("The name resolution contains unexpected metadata %v", update.Metadata)
 | |
| 				continue
 | |
| 			}
 | |
| 			switch md.AddrType {
 | |
| 			case naming.Backend:
 | |
| 				// TODO: Revisit the handling here and may introduce some fallback mechanism.
 | |
| 				grpclog.Errorf("The name resolution does not give grpclb addresses")
 | |
| 				continue
 | |
| 			case naming.GRPCLB:
 | |
| 				b.rbs = append(b.rbs, remoteBalancerInfo{
 | |
| 					addr: update.Addr,
 | |
| 					name: md.ServerName,
 | |
| 				})
 | |
| 			default:
 | |
| 				grpclog.Errorf("Received unknow address type %d", md.AddrType)
 | |
| 				continue
 | |
| 			}
 | |
| 		case naming.Delete:
 | |
| 			for i, v := range b.rbs {
 | |
| 				if update.Addr == v.addr {
 | |
| 					copy(b.rbs[i:], b.rbs[i+1:])
 | |
| 					b.rbs = b.rbs[:len(b.rbs)-1]
 | |
| 					break
 | |
| 				}
 | |
| 			}
 | |
| 		default:
 | |
| 			grpclog.Errorf("Unknown update.Op %v", update.Op)
 | |
| 		}
 | |
| 	}
 | |
| 	// TODO: Fall back to the basic round-robin load balancing if the resulting address is
 | |
| 	// not a load balancer.
 | |
| 	select {
 | |
| 	case <-ch:
 | |
| 	default:
 | |
| 	}
 | |
| 	ch <- b.rbs
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func convertDuration(d *lbmpb.Duration) time.Duration {
 | |
| 	if d == nil {
 | |
| 		return 0
 | |
| 	}
 | |
| 	return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)*time.Nanosecond
 | |
| }
 | |
| 
 | |
| func (b *grpclbBalancer) processServerList(l *lbmpb.ServerList, seq int) {
 | |
| 	if l == nil {
 | |
| 		return
 | |
| 	}
 | |
| 	servers := l.GetServers()
 | |
| 	var (
 | |
| 		sl    []*grpclbAddrInfo
 | |
| 		addrs []Address
 | |
| 	)
 | |
| 	for _, s := range servers {
 | |
| 		md := metadata.Pairs("lb-token", s.LoadBalanceToken)
 | |
| 		ip := net.IP(s.IpAddress)
 | |
| 		ipStr := ip.String()
 | |
| 		if ip.To4() == nil {
 | |
| 			// Add square brackets to ipv6 addresses, otherwise net.Dial() and
 | |
| 			// net.SplitHostPort() will return too many colons error.
 | |
| 			ipStr = fmt.Sprintf("[%s]", ipStr)
 | |
| 		}
 | |
| 		addr := Address{
 | |
| 			Addr:     fmt.Sprintf("%s:%d", ipStr, s.Port),
 | |
| 			Metadata: &md,
 | |
| 		}
 | |
| 		sl = append(sl, &grpclbAddrInfo{
 | |
| 			addr:                 addr,
 | |
| 			dropForRateLimiting:  s.DropForRateLimiting,
 | |
| 			dropForLoadBalancing: s.DropForLoadBalancing,
 | |
| 		})
 | |
| 		addrs = append(addrs, addr)
 | |
| 	}
 | |
| 	b.mu.Lock()
 | |
| 	defer b.mu.Unlock()
 | |
| 	if b.done || seq < b.seq {
 | |
| 		return
 | |
| 	}
 | |
| 	if len(sl) > 0 {
 | |
| 		// reset b.next to 0 when replacing the server list.
 | |
| 		b.next = 0
 | |
| 		b.addrs = sl
 | |
| 		b.addrCh <- addrs
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func (b *grpclbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration, done <-chan struct{}) {
 | |
| 	ticker := time.NewTicker(interval)
 | |
| 	defer ticker.Stop()
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ticker.C:
 | |
| 		case <-done:
 | |
| 			return
 | |
| 		}
 | |
| 		b.mu.Lock()
 | |
| 		stats := b.clientStats
 | |
| 		b.clientStats = lbmpb.ClientStats{} // Clear the stats.
 | |
| 		b.mu.Unlock()
 | |
| 		t := time.Now()
 | |
| 		stats.Timestamp = &lbmpb.Timestamp{
 | |
| 			Seconds: t.Unix(),
 | |
| 			Nanos:   int32(t.Nanosecond()),
 | |
| 		}
 | |
| 		if err := s.Send(&lbmpb.LoadBalanceRequest{
 | |
| 			LoadBalanceRequestType: &lbmpb.LoadBalanceRequest_ClientStats{
 | |
| 				ClientStats: &stats,
 | |
| 			},
 | |
| 		}); err != nil {
 | |
| 			grpclog.Errorf("grpclb: failed to send load report: %v", err)
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (b *grpclbBalancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry bool) {
 | |
| 	ctx, cancel := context.WithCancel(context.Background())
 | |
| 	defer cancel()
 | |
| 	stream, err := lbc.BalanceLoad(ctx)
 | |
| 	if err != nil {
 | |
| 		grpclog.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
 | |
| 		return
 | |
| 	}
 | |
| 	b.mu.Lock()
 | |
| 	if b.done {
 | |
| 		b.mu.Unlock()
 | |
| 		return
 | |
| 	}
 | |
| 	b.mu.Unlock()
 | |
| 	initReq := &lbmpb.LoadBalanceRequest{
 | |
| 		LoadBalanceRequestType: &lbmpb.LoadBalanceRequest_InitialRequest{
 | |
| 			InitialRequest: &lbmpb.InitialLoadBalanceRequest{
 | |
| 				Name: b.target,
 | |
| 			},
 | |
| 		},
 | |
| 	}
 | |
| 	if err := stream.Send(initReq); err != nil {
 | |
| 		grpclog.Errorf("grpclb: failed to send init request: %v", err)
 | |
| 		// TODO: backoff on retry?
 | |
| 		return true
 | |
| 	}
 | |
| 	reply, err := stream.Recv()
 | |
| 	if err != nil {
 | |
| 		grpclog.Errorf("grpclb: failed to recv init response: %v", err)
 | |
| 		// TODO: backoff on retry?
 | |
| 		return true
 | |
| 	}
 | |
| 	initResp := reply.GetInitialResponse()
 | |
| 	if initResp == nil {
 | |
| 		grpclog.Errorf("grpclb: reply from remote balancer did not include initial response.")
 | |
| 		return
 | |
| 	}
 | |
| 	// TODO: Support delegation.
 | |
| 	if initResp.LoadBalancerDelegate != "" {
 | |
| 		// delegation
 | |
| 		grpclog.Errorf("TODO: Delegation is not supported yet.")
 | |
| 		return
 | |
| 	}
 | |
| 	streamDone := make(chan struct{})
 | |
| 	defer close(streamDone)
 | |
| 	b.mu.Lock()
 | |
| 	b.clientStats = lbmpb.ClientStats{} // Clear client stats.
 | |
| 	b.mu.Unlock()
 | |
| 	if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 {
 | |
| 		go b.sendLoadReport(stream, d, streamDone)
 | |
| 	}
 | |
| 	// Retrieve the server list.
 | |
| 	for {
 | |
| 		reply, err := stream.Recv()
 | |
| 		if err != nil {
 | |
| 			grpclog.Errorf("grpclb: failed to recv server list: %v", err)
 | |
| 			break
 | |
| 		}
 | |
| 		b.mu.Lock()
 | |
| 		if b.done || seq < b.seq {
 | |
| 			b.mu.Unlock()
 | |
| 			return
 | |
| 		}
 | |
| 		b.seq++ // tick when receiving a new list of servers.
 | |
| 		seq = b.seq
 | |
| 		b.mu.Unlock()
 | |
| 		if serverList := reply.GetServerList(); serverList != nil {
 | |
| 			b.processServerList(serverList, seq)
 | |
| 		}
 | |
| 	}
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| func (b *grpclbBalancer) Start(target string, config BalancerConfig) error {
 | |
| 	b.rand = rand.New(rand.NewSource(time.Now().Unix()))
 | |
| 	// TODO: Fall back to the basic direct connection if there is no name resolver.
 | |
| 	if b.r == nil {
 | |
| 		return errors.New("there is no name resolver installed")
 | |
| 	}
 | |
| 	b.target = target
 | |
| 	b.mu.Lock()
 | |
| 	if b.done {
 | |
| 		b.mu.Unlock()
 | |
| 		return ErrClientConnClosing
 | |
| 	}
 | |
| 	b.addrCh = make(chan []Address)
 | |
| 	w, err := b.r.Resolve(target)
 | |
| 	if err != nil {
 | |
| 		b.mu.Unlock()
 | |
| 		grpclog.Errorf("grpclb: failed to resolve address: %v, err: %v", target, err)
 | |
| 		return err
 | |
| 	}
 | |
| 	b.w = w
 | |
| 	b.mu.Unlock()
 | |
| 	balancerAddrsCh := make(chan []remoteBalancerInfo, 1)
 | |
| 	// Spawn a goroutine to monitor the name resolution of remote load balancer.
 | |
| 	go func() {
 | |
| 		for {
 | |
| 			if err := b.watchAddrUpdates(w, balancerAddrsCh); err != nil {
 | |
| 				grpclog.Warningf("grpclb: the naming watcher stops working due to %v.\n", err)
 | |
| 				close(balancerAddrsCh)
 | |
| 				return
 | |
| 			}
 | |
| 		}
 | |
| 	}()
 | |
| 	// Spawn a goroutine to talk to the remote load balancer.
 | |
| 	go func() {
 | |
| 		var (
 | |
| 			cc *ClientConn
 | |
| 			// ccError is closed when there is an error in the current cc.
 | |
| 			// A new rb should be picked from rbs and connected.
 | |
| 			ccError chan struct{}
 | |
| 			rb      *remoteBalancerInfo
 | |
| 			rbs     []remoteBalancerInfo
 | |
| 			rbIdx   int
 | |
| 		)
 | |
| 
 | |
| 		defer func() {
 | |
| 			if ccError != nil {
 | |
| 				select {
 | |
| 				case <-ccError:
 | |
| 				default:
 | |
| 					close(ccError)
 | |
| 				}
 | |
| 			}
 | |
| 			if cc != nil {
 | |
| 				cc.Close()
 | |
| 			}
 | |
| 		}()
 | |
| 
 | |
| 		for {
 | |
| 			var ok bool
 | |
| 			select {
 | |
| 			case rbs, ok = <-balancerAddrsCh:
 | |
| 				if !ok {
 | |
| 					return
 | |
| 				}
 | |
| 				foundIdx := -1
 | |
| 				if rb != nil {
 | |
| 					for i, trb := range rbs {
 | |
| 						if trb == *rb {
 | |
| 							foundIdx = i
 | |
| 							break
 | |
| 						}
 | |
| 					}
 | |
| 				}
 | |
| 				if foundIdx >= 0 {
 | |
| 					if foundIdx >= 1 {
 | |
| 						// Move the address in use to the beginning of the list.
 | |
| 						b.rbs[0], b.rbs[foundIdx] = b.rbs[foundIdx], b.rbs[0]
 | |
| 						rbIdx = 0
 | |
| 					}
 | |
| 					continue // If found, don't dial new cc.
 | |
| 				} else if len(rbs) > 0 {
 | |
| 					// Pick a random one from the list, instead of always using the first one.
 | |
| 					if l := len(rbs); l > 1 && rb != nil {
 | |
| 						tmpIdx := b.rand.Intn(l - 1)
 | |
| 						b.rbs[0], b.rbs[tmpIdx] = b.rbs[tmpIdx], b.rbs[0]
 | |
| 					}
 | |
| 					rbIdx = 0
 | |
| 					rb = &rbs[0]
 | |
| 				} else {
 | |
| 					// foundIdx < 0 && len(rbs) <= 0.
 | |
| 					rb = nil
 | |
| 				}
 | |
| 			case <-ccError:
 | |
| 				ccError = nil
 | |
| 				if rbIdx < len(rbs)-1 {
 | |
| 					rbIdx++
 | |
| 					rb = &rbs[rbIdx]
 | |
| 				} else {
 | |
| 					rb = nil
 | |
| 				}
 | |
| 			}
 | |
| 
 | |
| 			if rb == nil {
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			if cc != nil {
 | |
| 				cc.Close()
 | |
| 			}
 | |
| 			// Talk to the remote load balancer to get the server list.
 | |
| 			var (
 | |
| 				err   error
 | |
| 				dopts []DialOption
 | |
| 			)
 | |
| 			if creds := config.DialCreds; creds != nil {
 | |
| 				if rb.name != "" {
 | |
| 					if err := creds.OverrideServerName(rb.name); err != nil {
 | |
| 						grpclog.Warningf("grpclb: failed to override the server name in the credentials: %v", err)
 | |
| 						continue
 | |
| 					}
 | |
| 				}
 | |
| 				dopts = append(dopts, WithTransportCredentials(creds))
 | |
| 			} else {
 | |
| 				dopts = append(dopts, WithInsecure())
 | |
| 			}
 | |
| 			if dialer := config.Dialer; dialer != nil {
 | |
| 				// WithDialer takes a different type of function, so we instead use a special DialOption here.
 | |
| 				dopts = append(dopts, func(o *dialOptions) { o.copts.Dialer = dialer })
 | |
| 			}
 | |
| 			dopts = append(dopts, WithBlock())
 | |
| 			ccError = make(chan struct{})
 | |
| 			ctx, cancel := context.WithTimeout(context.Background(), time.Second)
 | |
| 			cc, err = DialContext(ctx, rb.addr, dopts...)
 | |
| 			cancel()
 | |
| 			if err != nil {
 | |
| 				grpclog.Warningf("grpclb: failed to setup a connection to the remote balancer %v: %v", rb.addr, err)
 | |
| 				close(ccError)
 | |
| 				continue
 | |
| 			}
 | |
| 			b.mu.Lock()
 | |
| 			b.seq++ // tick when getting a new balancer address
 | |
| 			seq := b.seq
 | |
| 			b.next = 0
 | |
| 			b.mu.Unlock()
 | |
| 			go func(cc *ClientConn, ccError chan struct{}) {
 | |
| 				lbc := &loadBalancerClient{cc}
 | |
| 				b.callRemoteBalancer(lbc, seq)
 | |
| 				cc.Close()
 | |
| 				select {
 | |
| 				case <-ccError:
 | |
| 				default:
 | |
| 					close(ccError)
 | |
| 				}
 | |
| 			}(cc, ccError)
 | |
| 		}
 | |
| 	}()
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (b *grpclbBalancer) down(addr Address, err error) {
 | |
| 	b.mu.Lock()
 | |
| 	defer b.mu.Unlock()
 | |
| 	for _, a := range b.addrs {
 | |
| 		if addr == a.addr {
 | |
| 			a.connected = false
 | |
| 			break
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (b *grpclbBalancer) Up(addr Address) func(error) {
 | |
| 	b.mu.Lock()
 | |
| 	defer b.mu.Unlock()
 | |
| 	if b.done {
 | |
| 		return nil
 | |
| 	}
 | |
| 	var cnt int
 | |
| 	for _, a := range b.addrs {
 | |
| 		if a.addr == addr {
 | |
| 			if a.connected {
 | |
| 				return nil
 | |
| 			}
 | |
| 			a.connected = true
 | |
| 		}
 | |
| 		if a.connected && !a.dropForRateLimiting && !a.dropForLoadBalancing {
 | |
| 			cnt++
 | |
| 		}
 | |
| 	}
 | |
| 	// addr is the only one which is connected. Notify the Get() callers who are blocking.
 | |
| 	if cnt == 1 && b.waitCh != nil {
 | |
| 		close(b.waitCh)
 | |
| 		b.waitCh = nil
 | |
| 	}
 | |
| 	return func(err error) {
 | |
| 		b.down(addr, err)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (b *grpclbBalancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {
 | |
| 	var ch chan struct{}
 | |
| 	b.mu.Lock()
 | |
| 	if b.done {
 | |
| 		b.mu.Unlock()
 | |
| 		err = ErrClientConnClosing
 | |
| 		return
 | |
| 	}
 | |
| 	seq := b.seq
 | |
| 
 | |
| 	defer func() {
 | |
| 		if err != nil {
 | |
| 			return
 | |
| 		}
 | |
| 		put = func() {
 | |
| 			s, ok := rpcInfoFromContext(ctx)
 | |
| 			if !ok {
 | |
| 				return
 | |
| 			}
 | |
| 			b.mu.Lock()
 | |
| 			defer b.mu.Unlock()
 | |
| 			if b.done || seq < b.seq {
 | |
| 				return
 | |
| 			}
 | |
| 			b.clientStats.NumCallsFinished++
 | |
| 			if !s.bytesSent {
 | |
| 				b.clientStats.NumCallsFinishedWithClientFailedToSend++
 | |
| 			} else if s.bytesReceived {
 | |
| 				b.clientStats.NumCallsFinishedKnownReceived++
 | |
| 			}
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	b.clientStats.NumCallsStarted++
 | |
| 	if len(b.addrs) > 0 {
 | |
| 		if b.next >= len(b.addrs) {
 | |
| 			b.next = 0
 | |
| 		}
 | |
| 		next := b.next
 | |
| 		for {
 | |
| 			a := b.addrs[next]
 | |
| 			next = (next + 1) % len(b.addrs)
 | |
| 			if a.connected {
 | |
| 				if !a.dropForRateLimiting && !a.dropForLoadBalancing {
 | |
| 					addr = a.addr
 | |
| 					b.next = next
 | |
| 					b.mu.Unlock()
 | |
| 					return
 | |
| 				}
 | |
| 				if !opts.BlockingWait {
 | |
| 					b.next = next
 | |
| 					if a.dropForLoadBalancing {
 | |
| 						b.clientStats.NumCallsFinished++
 | |
| 						b.clientStats.NumCallsFinishedWithDropForLoadBalancing++
 | |
| 					} else if a.dropForRateLimiting {
 | |
| 						b.clientStats.NumCallsFinished++
 | |
| 						b.clientStats.NumCallsFinishedWithDropForRateLimiting++
 | |
| 					}
 | |
| 					b.mu.Unlock()
 | |
| 					err = Errorf(codes.Unavailable, "%s drops requests", a.addr.Addr)
 | |
| 					return
 | |
| 				}
 | |
| 			}
 | |
| 			if next == b.next {
 | |
| 				// Has iterated all the possible address but none is connected.
 | |
| 				break
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	if !opts.BlockingWait {
 | |
| 		b.clientStats.NumCallsFinished++
 | |
| 		b.clientStats.NumCallsFinishedWithClientFailedToSend++
 | |
| 		b.mu.Unlock()
 | |
| 		err = Errorf(codes.Unavailable, "there is no address available")
 | |
| 		return
 | |
| 	}
 | |
| 	// Wait on b.waitCh for non-failfast RPCs.
 | |
| 	if b.waitCh == nil {
 | |
| 		ch = make(chan struct{})
 | |
| 		b.waitCh = ch
 | |
| 	} else {
 | |
| 		ch = b.waitCh
 | |
| 	}
 | |
| 	b.mu.Unlock()
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			b.mu.Lock()
 | |
| 			b.clientStats.NumCallsFinished++
 | |
| 			b.clientStats.NumCallsFinishedWithClientFailedToSend++
 | |
| 			b.mu.Unlock()
 | |
| 			err = ctx.Err()
 | |
| 			return
 | |
| 		case <-ch:
 | |
| 			b.mu.Lock()
 | |
| 			if b.done {
 | |
| 				b.clientStats.NumCallsFinished++
 | |
| 				b.clientStats.NumCallsFinishedWithClientFailedToSend++
 | |
| 				b.mu.Unlock()
 | |
| 				err = ErrClientConnClosing
 | |
| 				return
 | |
| 			}
 | |
| 
 | |
| 			if len(b.addrs) > 0 {
 | |
| 				if b.next >= len(b.addrs) {
 | |
| 					b.next = 0
 | |
| 				}
 | |
| 				next := b.next
 | |
| 				for {
 | |
| 					a := b.addrs[next]
 | |
| 					next = (next + 1) % len(b.addrs)
 | |
| 					if a.connected {
 | |
| 						if !a.dropForRateLimiting && !a.dropForLoadBalancing {
 | |
| 							addr = a.addr
 | |
| 							b.next = next
 | |
| 							b.mu.Unlock()
 | |
| 							return
 | |
| 						}
 | |
| 						if !opts.BlockingWait {
 | |
| 							b.next = next
 | |
| 							if a.dropForLoadBalancing {
 | |
| 								b.clientStats.NumCallsFinished++
 | |
| 								b.clientStats.NumCallsFinishedWithDropForLoadBalancing++
 | |
| 							} else if a.dropForRateLimiting {
 | |
| 								b.clientStats.NumCallsFinished++
 | |
| 								b.clientStats.NumCallsFinishedWithDropForRateLimiting++
 | |
| 							}
 | |
| 							b.mu.Unlock()
 | |
| 							err = Errorf(codes.Unavailable, "drop requests for the addreess %s", a.addr.Addr)
 | |
| 							return
 | |
| 						}
 | |
| 					}
 | |
| 					if next == b.next {
 | |
| 						// Has iterated all the possible address but none is connected.
 | |
| 						break
 | |
| 					}
 | |
| 				}
 | |
| 			}
 | |
| 			// The newly added addr got removed by Down() again.
 | |
| 			if b.waitCh == nil {
 | |
| 				ch = make(chan struct{})
 | |
| 				b.waitCh = ch
 | |
| 			} else {
 | |
| 				ch = b.waitCh
 | |
| 			}
 | |
| 			b.mu.Unlock()
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (b *grpclbBalancer) Notify() <-chan []Address {
 | |
| 	return b.addrCh
 | |
| }
 | |
| 
 | |
| func (b *grpclbBalancer) Close() error {
 | |
| 	b.mu.Lock()
 | |
| 	defer b.mu.Unlock()
 | |
| 	if b.done {
 | |
| 		return errBalancerClosed
 | |
| 	}
 | |
| 	b.done = true
 | |
| 	if b.waitCh != nil {
 | |
| 		close(b.waitCh)
 | |
| 	}
 | |
| 	if b.addrCh != nil {
 | |
| 		close(b.addrCh)
 | |
| 	}
 | |
| 	if b.w != nil {
 | |
| 		b.w.Close()
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | 
