add logs to grpclb on send and recv (#1235)
This commit is contained in:
16
grpclb.go
16
grpclb.go
@ -152,6 +152,7 @@ type balancer struct {
|
|||||||
func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerInfo) error {
|
func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerInfo) error {
|
||||||
updates, err := w.Next()
|
updates, err := w.Next()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
grpclog.Printf("grpclb: failed to get next addr update from watcher: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
b.mu.Lock()
|
b.mu.Lock()
|
||||||
@ -306,6 +307,7 @@ func (b *balancer) sendLoadReport(s *balanceLoadClientStream, interval time.Dura
|
|||||||
ClientStats: &stats,
|
ClientStats: &stats,
|
||||||
},
|
},
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
|
grpclog.Printf("grpclb: failed to send load report: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -316,7 +318,7 @@ func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry b
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
stream, err := lbc.BalanceLoad(ctx)
|
stream, err := lbc.BalanceLoad(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
grpclog.Printf("Failed to perform RPC to the remote balancer %v", err)
|
grpclog.Printf("grpclb: failed to perform RPC to the remote balancer %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
b.mu.Lock()
|
b.mu.Lock()
|
||||||
@ -333,17 +335,19 @@ func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry b
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
if err := stream.Send(initReq); err != nil {
|
if err := stream.Send(initReq); err != nil {
|
||||||
|
grpclog.Printf("grpclb: failed to send init request: %v", err)
|
||||||
// TODO: backoff on retry?
|
// TODO: backoff on retry?
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
reply, err := stream.Recv()
|
reply, err := stream.Recv()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
grpclog.Printf("grpclb: failed to recv init response: %v", err)
|
||||||
// TODO: backoff on retry?
|
// TODO: backoff on retry?
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
initResp := reply.GetInitialResponse()
|
initResp := reply.GetInitialResponse()
|
||||||
if initResp == nil {
|
if initResp == nil {
|
||||||
grpclog.Println("Failed to receive the initial response from the remote balancer.")
|
grpclog.Println("grpclb: reply from remote balancer did not include initial response.")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// TODO: Support delegation.
|
// TODO: Support delegation.
|
||||||
@ -364,6 +368,7 @@ func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry b
|
|||||||
for {
|
for {
|
||||||
reply, err := stream.Recv()
|
reply, err := stream.Recv()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
grpclog.Printf("grpclb: failed to recv server list: %v", err)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
b.mu.Lock()
|
b.mu.Lock()
|
||||||
@ -397,6 +402,7 @@ func (b *balancer) Start(target string, config BalancerConfig) error {
|
|||||||
w, err := b.r.Resolve(target)
|
w, err := b.r.Resolve(target)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.mu.Unlock()
|
b.mu.Unlock()
|
||||||
|
grpclog.Printf("grpclb: failed to resolve address: %v, err: %v", target, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
b.w = w
|
b.w = w
|
||||||
@ -406,7 +412,7 @@ func (b *balancer) Start(target string, config BalancerConfig) error {
|
|||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
if err := b.watchAddrUpdates(w, balancerAddrsCh); err != nil {
|
if err := b.watchAddrUpdates(w, balancerAddrsCh); err != nil {
|
||||||
grpclog.Printf("grpc: the naming watcher stops working due to %v.\n", err)
|
grpclog.Printf("grpclb: the naming watcher stops working due to %v.\n", err)
|
||||||
close(balancerAddrsCh)
|
close(balancerAddrsCh)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -497,7 +503,7 @@ func (b *balancer) Start(target string, config BalancerConfig) error {
|
|||||||
if creds := config.DialCreds; creds != nil {
|
if creds := config.DialCreds; creds != nil {
|
||||||
if rb.name != "" {
|
if rb.name != "" {
|
||||||
if err := creds.OverrideServerName(rb.name); err != nil {
|
if err := creds.OverrideServerName(rb.name); err != nil {
|
||||||
grpclog.Printf("Failed to override the server name in the credentials: %v", err)
|
grpclog.Printf("grpclb: failed to override the server name in the credentials: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -512,7 +518,7 @@ func (b *balancer) Start(target string, config BalancerConfig) error {
|
|||||||
ccError = make(chan struct{})
|
ccError = make(chan struct{})
|
||||||
cc, err = Dial(rb.addr, dopts...)
|
cc, err = Dial(rb.addr, dopts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
grpclog.Printf("Failed to setup a connection to the remote balancer %v: %v", rb.addr, err)
|
grpclog.Printf("grpclb: failed to setup a connection to the remote balancer %v: %v", rb.addr, err)
|
||||||
close(ccError)
|
close(ccError)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user