From 7fc29d0caa400efc53212b222356f99efe87d7d5 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 28 Mar 2017 12:10:54 -0700 Subject: [PATCH] populate initReq target name and fix IP []byte type in grpclb (#1145) --- grpclb/grpclb.go | 9 +++++++-- grpclb/grpclb_test.go | 18 +++++++++++++----- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/grpclb/grpclb.go b/grpclb/grpclb.go index 4f6b63d2..b699a525 100644 --- a/grpclb/grpclb.go +++ b/grpclb/grpclb.go @@ -40,6 +40,7 @@ import ( "errors" "fmt" "math/rand" + "net" "sync" "time" @@ -96,6 +97,7 @@ type addrInfo struct { type balancer struct { r naming.Resolver + target string mu sync.Mutex seq int // a sequence number to make sure addrCh does not get stale addresses. w naming.Watcher @@ -224,7 +226,7 @@ func (b *balancer) processServerList(l *lbpb.ServerList, seq int) { for _, s := range servers { md := metadata.Pairs("lb-token", s.LoadBalanceToken) addr := grpc.Address{ - Addr: fmt.Sprintf("%s:%d", s.IpAddress, s.Port), + Addr: fmt.Sprintf("%s:%d", net.IP(s.IpAddress), s.Port), Metadata: &md, } sl = append(sl, &addrInfo{ @@ -272,7 +274,9 @@ func (b *balancer) callRemoteBalancer(lbc lbpb.LoadBalancerClient, seq int) (ret b.mu.Unlock() initReq := &lbpb.LoadBalanceRequest{ LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{ - InitialRequest: new(lbpb.InitialLoadBalanceRequest), + InitialRequest: &lbpb.InitialLoadBalanceRequest{ + Name: b.target, + }, }, } if err := stream.Send(initReq); err != nil { @@ -322,6 +326,7 @@ func (b *balancer) Start(target string, config grpc.BalancerConfig) error { if b.r == nil { return errors.New("there is no name resolver installed") } + b.target = target b.mu.Lock() if b.done { b.mu.Unlock() diff --git a/grpclb/grpclb_test.go b/grpclb/grpclb_test.go index 767f400d..2a62a87f 100644 --- a/grpclb/grpclb_test.go +++ b/grpclb/grpclb_test.go @@ -180,6 +180,14 @@ func (b *remoteBalancer) stop() { } func (b *remoteBalancer) BalanceLoad(stream lbpb.LoadBalancer_BalanceLoadServer) error { + req, err := stream.Recv() + if err != nil { + return err + } + initReq := req.GetInitialRequest() + if initReq.Name != besn { + return grpc.Errorf(codes.InvalidArgument, "invalid service name: %v", initReq.Name) + } resp := &lbpb.LoadBalanceResponse{ LoadBalanceResponseType: &lbpb.LoadBalanceResponse_InitialResponse{ InitialResponse: new(lbpb.InitialLoadBalanceResponse), @@ -264,7 +272,7 @@ func TestGRPCLB(t *testing.T) { t.Fatalf("Failed to generate the port number %v", err) } be := &lbpb.Server{ - IpAddress: []byte(beAddr[0]), + IpAddress: beLis.Addr().(*net.TCPAddr).IP, Port: int32(bePort), LoadBalanceToken: lbToken, } @@ -334,12 +342,12 @@ func TestDropRequest(t *testing.T) { } sls := []*lbpb.ServerList{{ Servers: []*lbpb.Server{{ - IpAddress: []byte(beAddr1[0]), + IpAddress: beLis1.Addr().(*net.TCPAddr).IP, Port: int32(bePort1), LoadBalanceToken: lbToken, DropRequest: true, }, { - IpAddress: []byte(beAddr2[0]), + IpAddress: beLis2.Addr().(*net.TCPAddr).IP, Port: int32(bePort2), LoadBalanceToken: lbToken, DropRequest: false, @@ -410,7 +418,7 @@ func TestDropRequestFailedNonFailFast(t *testing.T) { t.Fatalf("Failed to generate the port number %v", err) } be := &lbpb.Server{ - IpAddress: []byte(beAddr[0]), + IpAddress: beLis.Addr().(*net.TCPAddr).IP, Port: int32(bePort), LoadBalanceToken: lbToken, DropRequest: true, @@ -473,7 +481,7 @@ func TestServerExpiration(t *testing.T) { t.Fatalf("Failed to generate the port number %v", err) } be := &lbpb.Server{ - IpAddress: []byte(beAddr[0]), + IpAddress: beLis.Addr().(*net.TCPAddr).IP, Port: int32(bePort), LoadBalanceToken: lbToken, }