Fix review comments
This commit is contained in:
3
call.go
3
call.go
@ -84,7 +84,8 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd
|
|||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if e, ok := err.(transport.ConnectionError); !ok || !e.Temporary() {
|
// If err is connection error, t will be closed, no need to close stream here.
|
||||||
|
if _, ok := err.(transport.ConnectionError); !ok {
|
||||||
t.CloseStream(stream, err)
|
t.CloseStream(stream, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -283,9 +283,9 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
|
|||||||
cc.Close()
|
cc.Close()
|
||||||
return nil, ErrClientConnTimeout
|
return nil, ErrClientConnTimeout
|
||||||
}
|
}
|
||||||
|
// If balancer is nil or balancer.Notify() is nil, ok will be false here.
|
||||||
|
// The lbWatcher goroutine will not be created.
|
||||||
if ok {
|
if ok {
|
||||||
// If balancer is nil or balancer.Notify() is nil, ok will false here.
|
|
||||||
// Then this goroutine will not be created.
|
|
||||||
go cc.lbWatcher()
|
go cc.lbWatcher()
|
||||||
}
|
}
|
||||||
colonPos := strings.LastIndex(target, ":")
|
colonPos := strings.LastIndex(target, ":")
|
||||||
@ -434,13 +434,14 @@ func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr err
|
|||||||
if ac.dopts.block && !skipWait {
|
if ac.dopts.block && !skipWait {
|
||||||
if err := ac.resetTransport(false); err != nil {
|
if err := ac.resetTransport(false); err != nil {
|
||||||
if err != errConnClosing {
|
if err != errConnClosing {
|
||||||
ac.cc.mu.Lock()
|
// Tear down ac and delete it from cc.conns.
|
||||||
delete(ac.cc.conns, ac.addr)
|
cc.mu.Lock()
|
||||||
ac.cc.mu.Unlock()
|
delete(cc.conns, ac.addr)
|
||||||
|
cc.mu.Unlock()
|
||||||
ac.tearDown(err)
|
ac.tearDown(err)
|
||||||
}
|
}
|
||||||
if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
|
if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
|
||||||
return e.OriginalError()
|
return e.Origin()
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -452,9 +453,7 @@ func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr err
|
|||||||
if err := ac.resetTransport(false); err != nil {
|
if err := ac.resetTransport(false); err != nil {
|
||||||
grpclog.Printf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err)
|
grpclog.Printf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err)
|
||||||
if err != errConnClosing {
|
if err != errConnClosing {
|
||||||
ac.cc.mu.Lock()
|
// Keep this ac in cc.conns, to get the reason it's torn down.
|
||||||
delete(ac.cc.conns, ac.addr)
|
|
||||||
ac.cc.mu.Unlock()
|
|
||||||
ac.tearDown(err)
|
ac.tearDown(err)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
@ -475,7 +474,7 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions)
|
|||||||
// If balancer is nil, there should be only one addrConn available.
|
// If balancer is nil, there should be only one addrConn available.
|
||||||
cc.mu.RLock()
|
cc.mu.RLock()
|
||||||
for _, ac = range cc.conns {
|
for _, ac = range cc.conns {
|
||||||
// Break after the first loop to get the first addrConn.
|
// Break after the first iteration to get the first addrConn.
|
||||||
ok = true
|
ok = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -747,6 +746,7 @@ func (ac *addrConn) transportMonitor() {
|
|||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
grpclog.Printf("grpc: addrConn.transportMonitor exits due to: %v", err)
|
grpclog.Printf("grpc: addrConn.transportMonitor exits due to: %v", err)
|
||||||
if err != errConnClosing {
|
if err != errConnClosing {
|
||||||
|
// Keep this ac in cc.conns, to get the reason it's torn down.
|
||||||
ac.tearDown(err)
|
ac.tearDown(err)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
@ -762,8 +762,9 @@ func (ac *addrConn) wait(ctx context.Context, failFast bool) (transport.ClientTr
|
|||||||
ac.mu.Lock()
|
ac.mu.Lock()
|
||||||
switch {
|
switch {
|
||||||
case ac.state == Shutdown:
|
case ac.state == Shutdown:
|
||||||
|
err := ac.tearDownErr
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
return nil, ac.tearDownErr
|
return nil, err
|
||||||
case ac.state == Ready:
|
case ac.state == Ready:
|
||||||
ct := ac.transport
|
ct := ac.transport
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
|
@ -489,7 +489,7 @@ func ConnectionErrorf(temp bool, e error, format string, a ...interface{}) Conne
|
|||||||
return ConnectionError{
|
return ConnectionError{
|
||||||
Desc: fmt.Sprintf(format, a...),
|
Desc: fmt.Sprintf(format, a...),
|
||||||
temp: temp,
|
temp: temp,
|
||||||
origErr: e,
|
err: e,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -498,7 +498,7 @@ func ConnectionErrorf(temp bool, e error, format string, a ...interface{}) Conne
|
|||||||
type ConnectionError struct {
|
type ConnectionError struct {
|
||||||
Desc string
|
Desc string
|
||||||
temp bool
|
temp bool
|
||||||
origErr error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e ConnectionError) Error() string {
|
func (e ConnectionError) Error() string {
|
||||||
@ -510,14 +510,14 @@ func (e ConnectionError) Temporary() bool {
|
|||||||
return e.temp
|
return e.temp
|
||||||
}
|
}
|
||||||
|
|
||||||
// OriginalError returns the original error of this connection error.
|
// Origin returns the original error of this connection error.
|
||||||
func (e ConnectionError) OriginalError() error {
|
func (e ConnectionError) Origin() error {
|
||||||
// Never return nil error here.
|
// Never return nil error here.
|
||||||
// If original error is nil, return itself.
|
// If the original error is nil, return itself.
|
||||||
if e.origErr == nil {
|
if e.err == nil {
|
||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
return e.origErr
|
return e.err
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
Reference in New Issue
Block a user