some minor fixes

This commit is contained in:
iamqizhao
2016-09-19 19:22:02 -07:00
parent f02984b7c6
commit ac49e6fc55

View File

@ -1,6 +1,6 @@
/*
*
* Copyright 2014, Google Inc.
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -37,12 +37,12 @@
package grpclb
import (
"errors"
"fmt"
"sync"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/naming"
@ -172,7 +172,70 @@ func (b *balancer) processServerList(l *lbpb.ServerList) {
return
}
func (b *balancer) init(target string, creds credentials.TransportCredentials) error {
func (b *balancer) callRemoteBalancer(lbc lbpb.LoadBalancerClient) (retry bool) {
b.mu.Lock()
if b.done {
b.mu.Unlock()
return
}
b.mu.Unlock()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := lbc.BalanceLoad(ctx, grpc.FailFast(false))
if err != nil {
grpclog.Printf("Failed to perform RPC to the remote balancer %v", err)
return
}
initReq := &lbpb.LoadBalanceRequest{
LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{
InitialRequest: new(lbpb.InitialLoadBalanceRequest),
},
}
if err := stream.Send(initReq); err != nil {
// TODO: backoff on retry?
return true
}
reply, err := stream.Recv()
if err != nil {
// TODO: backoff on retry?
return true
}
initResp := reply.GetInitialResponse()
if initResp == nil {
grpclog.Println("Failed to receive the initial response from the remote balancer.")
return
}
// TODO: Support delegation.
if initResp.LoadBalancerDelegate != "" {
// delegation
grpclog.Println("TODO: Delegation is not supported yet.")
return
}
// Retrieve the server list.
for {
reply, err := stream.Recv()
if err != nil {
break
}
if serverList := reply.GetServerList(); serverList != nil {
b.processServerList(serverList)
}
}
return true
}
func (b *balancer) Start(target string, config grpc.BalancerConfig) error {
// TODO: Fall back to the basic direct connection if there is no name resolver.
if b.r == nil {
return errors.New("there is no name resolver installed")
}
b.mu.Lock()
if b.done {
b.mu.Unlock()
return grpc.ErrClientConnClosing
}
b.addrCh = make(chan []grpc.Address)
b.mu.Unlock()
w, err := b.r.Resolve(target)
if err != nil {
return err
@ -205,6 +268,7 @@ func (b *balancer) init(target string, creds credentials.TransportCredentials) e
//
// TODO: override the server name in creds using Metadata in addr.
var err error
creds := config.DialCreds
if creds == nil {
cc, err = grpc.Dial(rb.addr.Addr, grpc.WithBlock(), grpc.WithInsecure())
} else {
@ -217,56 +281,9 @@ func (b *balancer) init(target string, creds credentials.TransportCredentials) e
lbc := lbpb.NewLoadBalancerClient(cc)
go func() {
for {
b.mu.Lock()
if b.done {
b.mu.Unlock()
if retry := b.callRemoteBalancer(lbc); !retry {
return
}
b.mu.Unlock()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := lbc.BalanceLoad(ctx, grpc.FailFast(false))
if err != nil {
grpclog.Printf("Failed to perform RPC to the remote balancer %v", err)
return
}
initReq := &lbpb.LoadBalanceRequest{
LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{
InitialRequest: new(lbpb.InitialLoadBalanceRequest),
},
}
if err := stream.Send(initReq); err != nil {
cancel()
// TODO: backoff on retry?
continue
}
reply, err := stream.Recv()
if err != nil {
cancel()
// TODO: backoff on retry?
continue
}
initResp := reply.GetInitialResponse()
if initResp == nil {
grpclog.Println("Failed to receive the initial response from the remote balancer.")
return
}
// TODO: Support delegation.
if initResp.LoadBalancerDelegate != "" {
// delegation
grpclog.Println("TODO: Delegation is not supported yet.")
return
}
// Retrieve the server list.
for {
reply, err := stream.Recv()
if err != nil {
break
}
if serverList := reply.GetServerList(); serverList != nil {
b.processServerList(serverList)
}
}
}
}()
}
@ -274,20 +291,6 @@ func (b *balancer) init(target string, creds credentials.TransportCredentials) e
return nil
}
func (b *balancer) Start(target string, config grpc.BalancerConfig) error {
b.mu.Lock()
if b.done {
b.mu.Unlock()
return grpc.ErrClientConnClosing
}
b.addrCh = make(chan []grpc.Address)
b.mu.Unlock()
if err := b.init(target, config.DialCreds); err != nil {
return err
}
return nil
}
func (b *balancer) down(addr grpc.Address, err error) {
b.mu.Lock()
defer b.mu.Unlock()