From ac49e6fc559c683277a4d6a874e22e73bd96ccc1 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Mon, 19 Sep 2016 19:22:02 -0700 Subject: [PATCH] some minor fixes --- grpclb/grpclb.go | 133 ++++++++++++++++++++++++----------------------- 1 file changed, 68 insertions(+), 65 deletions(-) diff --git a/grpclb/grpclb.go b/grpclb/grpclb.go index f3c73466..5864cce0 100644 --- a/grpclb/grpclb.go +++ b/grpclb/grpclb.go @@ -1,6 +1,6 @@ /* * - * Copyright 2014, Google Inc. + * Copyright 2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -37,12 +37,12 @@ package grpclb import ( + "errors" "fmt" "sync" "golang.org/x/net/context" "google.golang.org/grpc" - "google.golang.org/grpc/credentials" lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/naming" @@ -172,7 +172,70 @@ func (b *balancer) processServerList(l *lbpb.ServerList) { return } -func (b *balancer) init(target string, creds credentials.TransportCredentials) error { +func (b *balancer) callRemoteBalancer(lbc lbpb.LoadBalancerClient) (retry bool) { + b.mu.Lock() + if b.done { + b.mu.Unlock() + return + } + b.mu.Unlock() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + stream, err := lbc.BalanceLoad(ctx, grpc.FailFast(false)) + if err != nil { + grpclog.Printf("Failed to perform RPC to the remote balancer %v", err) + return + } + initReq := &lbpb.LoadBalanceRequest{ + LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{ + InitialRequest: new(lbpb.InitialLoadBalanceRequest), + }, + } + if err := stream.Send(initReq); err != nil { + // TODO: backoff on retry? + return true + } + reply, err := stream.Recv() + if err != nil { + // TODO: backoff on retry? + return true + } + initResp := reply.GetInitialResponse() + if initResp == nil { + grpclog.Println("Failed to receive the initial response from the remote balancer.") + return + } + // TODO: Support delegation. + if initResp.LoadBalancerDelegate != "" { + // delegation + grpclog.Println("TODO: Delegation is not supported yet.") + return + } + // Retrieve the server list. + for { + reply, err := stream.Recv() + if err != nil { + break + } + if serverList := reply.GetServerList(); serverList != nil { + b.processServerList(serverList) + } + } + return true +} + +func (b *balancer) Start(target string, config grpc.BalancerConfig) error { + // TODO: Fall back to the basic direct connection if there is no name resolver. + if b.r == nil { + return errors.New("there is no name resolver installed") + } + b.mu.Lock() + if b.done { + b.mu.Unlock() + return grpc.ErrClientConnClosing + } + b.addrCh = make(chan []grpc.Address) + b.mu.Unlock() w, err := b.r.Resolve(target) if err != nil { return err @@ -205,6 +268,7 @@ func (b *balancer) init(target string, creds credentials.TransportCredentials) e // // TODO: override the server name in creds using Metadata in addr. var err error + creds := config.DialCreds if creds == nil { cc, err = grpc.Dial(rb.addr.Addr, grpc.WithBlock(), grpc.WithInsecure()) } else { @@ -217,56 +281,9 @@ func (b *balancer) init(target string, creds credentials.TransportCredentials) e lbc := lbpb.NewLoadBalancerClient(cc) go func() { for { - b.mu.Lock() - if b.done { - b.mu.Unlock() + if retry := b.callRemoteBalancer(lbc); !retry { return } - b.mu.Unlock() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - stream, err := lbc.BalanceLoad(ctx, grpc.FailFast(false)) - if err != nil { - grpclog.Printf("Failed to perform RPC to the remote balancer %v", err) - return - } - initReq := &lbpb.LoadBalanceRequest{ - LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{ - InitialRequest: new(lbpb.InitialLoadBalanceRequest), - }, - } - if err := stream.Send(initReq); err != nil { - cancel() - // TODO: backoff on retry? - continue - } - reply, err := stream.Recv() - if err != nil { - cancel() - // TODO: backoff on retry? - continue - } - initResp := reply.GetInitialResponse() - if initResp == nil { - grpclog.Println("Failed to receive the initial response from the remote balancer.") - return - } - // TODO: Support delegation. - if initResp.LoadBalancerDelegate != "" { - // delegation - grpclog.Println("TODO: Delegation is not supported yet.") - return - } - // Retrieve the server list. - for { - reply, err := stream.Recv() - if err != nil { - break - } - if serverList := reply.GetServerList(); serverList != nil { - b.processServerList(serverList) - } - } } }() } @@ -274,20 +291,6 @@ func (b *balancer) init(target string, creds credentials.TransportCredentials) e return nil } -func (b *balancer) Start(target string, config grpc.BalancerConfig) error { - b.mu.Lock() - if b.done { - b.mu.Unlock() - return grpc.ErrClientConnClosing - } - b.addrCh = make(chan []grpc.Address) - b.mu.Unlock() - if err := b.init(target, config.DialCreds); err != nil { - return err - } - return nil -} - func (b *balancer) down(addr grpc.Address, err error) { b.mu.Lock() defer b.mu.Unlock()