populate initReq target name and fix IP []byte type in grpclb (#1145)
This commit is contained in:
@ -40,6 +40,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -96,6 +97,7 @@ type addrInfo struct {
|
|||||||
|
|
||||||
type balancer struct {
|
type balancer struct {
|
||||||
r naming.Resolver
|
r naming.Resolver
|
||||||
|
target string
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
seq int // a sequence number to make sure addrCh does not get stale addresses.
|
seq int // a sequence number to make sure addrCh does not get stale addresses.
|
||||||
w naming.Watcher
|
w naming.Watcher
|
||||||
@ -224,7 +226,7 @@ func (b *balancer) processServerList(l *lbpb.ServerList, seq int) {
|
|||||||
for _, s := range servers {
|
for _, s := range servers {
|
||||||
md := metadata.Pairs("lb-token", s.LoadBalanceToken)
|
md := metadata.Pairs("lb-token", s.LoadBalanceToken)
|
||||||
addr := grpc.Address{
|
addr := grpc.Address{
|
||||||
Addr: fmt.Sprintf("%s:%d", s.IpAddress, s.Port),
|
Addr: fmt.Sprintf("%s:%d", net.IP(s.IpAddress), s.Port),
|
||||||
Metadata: &md,
|
Metadata: &md,
|
||||||
}
|
}
|
||||||
sl = append(sl, &addrInfo{
|
sl = append(sl, &addrInfo{
|
||||||
@ -272,7 +274,9 @@ func (b *balancer) callRemoteBalancer(lbc lbpb.LoadBalancerClient, seq int) (ret
|
|||||||
b.mu.Unlock()
|
b.mu.Unlock()
|
||||||
initReq := &lbpb.LoadBalanceRequest{
|
initReq := &lbpb.LoadBalanceRequest{
|
||||||
LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{
|
LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{
|
||||||
InitialRequest: new(lbpb.InitialLoadBalanceRequest),
|
InitialRequest: &lbpb.InitialLoadBalanceRequest{
|
||||||
|
Name: b.target,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if err := stream.Send(initReq); err != nil {
|
if err := stream.Send(initReq); err != nil {
|
||||||
@ -322,6 +326,7 @@ func (b *balancer) Start(target string, config grpc.BalancerConfig) error {
|
|||||||
if b.r == nil {
|
if b.r == nil {
|
||||||
return errors.New("there is no name resolver installed")
|
return errors.New("there is no name resolver installed")
|
||||||
}
|
}
|
||||||
|
b.target = target
|
||||||
b.mu.Lock()
|
b.mu.Lock()
|
||||||
if b.done {
|
if b.done {
|
||||||
b.mu.Unlock()
|
b.mu.Unlock()
|
||||||
|
@ -180,6 +180,14 @@ func (b *remoteBalancer) stop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (b *remoteBalancer) BalanceLoad(stream lbpb.LoadBalancer_BalanceLoadServer) error {
|
func (b *remoteBalancer) BalanceLoad(stream lbpb.LoadBalancer_BalanceLoadServer) error {
|
||||||
|
req, err := stream.Recv()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
initReq := req.GetInitialRequest()
|
||||||
|
if initReq.Name != besn {
|
||||||
|
return grpc.Errorf(codes.InvalidArgument, "invalid service name: %v", initReq.Name)
|
||||||
|
}
|
||||||
resp := &lbpb.LoadBalanceResponse{
|
resp := &lbpb.LoadBalanceResponse{
|
||||||
LoadBalanceResponseType: &lbpb.LoadBalanceResponse_InitialResponse{
|
LoadBalanceResponseType: &lbpb.LoadBalanceResponse_InitialResponse{
|
||||||
InitialResponse: new(lbpb.InitialLoadBalanceResponse),
|
InitialResponse: new(lbpb.InitialLoadBalanceResponse),
|
||||||
@ -264,7 +272,7 @@ func TestGRPCLB(t *testing.T) {
|
|||||||
t.Fatalf("Failed to generate the port number %v", err)
|
t.Fatalf("Failed to generate the port number %v", err)
|
||||||
}
|
}
|
||||||
be := &lbpb.Server{
|
be := &lbpb.Server{
|
||||||
IpAddress: []byte(beAddr[0]),
|
IpAddress: beLis.Addr().(*net.TCPAddr).IP,
|
||||||
Port: int32(bePort),
|
Port: int32(bePort),
|
||||||
LoadBalanceToken: lbToken,
|
LoadBalanceToken: lbToken,
|
||||||
}
|
}
|
||||||
@ -334,12 +342,12 @@ func TestDropRequest(t *testing.T) {
|
|||||||
}
|
}
|
||||||
sls := []*lbpb.ServerList{{
|
sls := []*lbpb.ServerList{{
|
||||||
Servers: []*lbpb.Server{{
|
Servers: []*lbpb.Server{{
|
||||||
IpAddress: []byte(beAddr1[0]),
|
IpAddress: beLis1.Addr().(*net.TCPAddr).IP,
|
||||||
Port: int32(bePort1),
|
Port: int32(bePort1),
|
||||||
LoadBalanceToken: lbToken,
|
LoadBalanceToken: lbToken,
|
||||||
DropRequest: true,
|
DropRequest: true,
|
||||||
}, {
|
}, {
|
||||||
IpAddress: []byte(beAddr2[0]),
|
IpAddress: beLis2.Addr().(*net.TCPAddr).IP,
|
||||||
Port: int32(bePort2),
|
Port: int32(bePort2),
|
||||||
LoadBalanceToken: lbToken,
|
LoadBalanceToken: lbToken,
|
||||||
DropRequest: false,
|
DropRequest: false,
|
||||||
@ -410,7 +418,7 @@ func TestDropRequestFailedNonFailFast(t *testing.T) {
|
|||||||
t.Fatalf("Failed to generate the port number %v", err)
|
t.Fatalf("Failed to generate the port number %v", err)
|
||||||
}
|
}
|
||||||
be := &lbpb.Server{
|
be := &lbpb.Server{
|
||||||
IpAddress: []byte(beAddr[0]),
|
IpAddress: beLis.Addr().(*net.TCPAddr).IP,
|
||||||
Port: int32(bePort),
|
Port: int32(bePort),
|
||||||
LoadBalanceToken: lbToken,
|
LoadBalanceToken: lbToken,
|
||||||
DropRequest: true,
|
DropRequest: true,
|
||||||
@ -473,7 +481,7 @@ func TestServerExpiration(t *testing.T) {
|
|||||||
t.Fatalf("Failed to generate the port number %v", err)
|
t.Fatalf("Failed to generate the port number %v", err)
|
||||||
}
|
}
|
||||||
be := &lbpb.Server{
|
be := &lbpb.Server{
|
||||||
IpAddress: []byte(beAddr[0]),
|
IpAddress: beLis.Addr().(*net.TCPAddr).IP,
|
||||||
Port: int32(bePort),
|
Port: int32(bePort),
|
||||||
LoadBalanceToken: lbToken,
|
LoadBalanceToken: lbToken,
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user