From 9c2d8546bf9fdbae6679bf11f1e420730277659a Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Fri, 6 May 2016 15:47:09 -0700 Subject: [PATCH] load balancer --- balancer.go | 137 +++++ call.go | 6 +- clientconn.go | 593 ++++++++++++++-------- examples/pipe/main.go | 67 +++ picker.go | 243 --------- picker_test.go | 188 ------- stream.go | 10 +- test/end2end_test.go | 101 ++-- test/out.txt | 1015 +++++++++++++++++++++++++++++++++++++ transport/http2_client.go | 20 + transport/transport.go | 2 + 11 files changed, 1685 insertions(+), 697 deletions(-) create mode 100644 balancer.go create mode 100644 examples/pipe/main.go delete mode 100644 picker.go delete mode 100644 picker_test.go create mode 100644 test/out.txt diff --git a/balancer.go b/balancer.go new file mode 100644 index 00000000..998c7373 --- /dev/null +++ b/balancer.go @@ -0,0 +1,137 @@ +package grpc + +import ( + "sync" + + "golang.org/x/net/context" + "google.golang.org/grpc/transport" +) + +type Address struct { + // Addr is the peer address on which a connection will be established. + Addr string + // Metadata is the information associated with Addr, which may be used + // to make load balancing decision. This is from the metadata attached + // in the address updates from name resolver. + Metadata interface{} +} + +// Balancer chooses network addresses for RPCs. +type Balancer interface { + // Up informs the balancer that gRPC has a connection to the server at + // addr. It returns down which will be called once the connection gets + // lost. Once down is called, addr may no longer be returned by Get. + Up(addr Address) (down func(error)) + // Get gets the address of a server for the rpc corresponding to ctx. + // It may block if there is no server available. It respects the + // timeout or cancellation of ctx when blocking. It returns put which + // is called once the rpc has completed or failed. put can collect and + // report rpc stats to remote load balancer. + Get(ctx context.Context) (addr Address, put func(), err error) + // Close shuts down the balancer. + Close() error +} + +func RoundRobin() Balancer { + return &roundRobin{} +} + +type roundRobin struct { + mu sync.Mutex + addrs []Address + next int + waitCh chan struct{} + pending int +} + +func (rr *roundRobin) Up(addr Address) func() { + rr.mu.Lock() + defer rr.mu.Unlock() + for _, a := range rr.addrs { + if a == addr { + return nil + } + } + rr.addrs = append(rr.addrs, addr) + if len(rr.addrs) == 1 { + if rr.waitCh != nil { + close(rr.waitCh) + rr.waitCh = nil + } + } + return func() { + rr.down(addr) + } +} + +func (rr *roundRobin) down(addr Address) { + rr.mu.Lock() + defer rr.mu.Unlock() + for i, a := range rr.addrs { + if a == addr { + copy(rr.addrs[i:], rr.addrs[i+1:]) + rr.addrs = rr.addrs[:len(rr.addrs)-1] + return + } + } +} + +func (rr *roundRobin) Get(ctx context.Context) (addr Address, put func(), err error) { + var ch chan struct{} + rr.mu.Lock() + if rr.next >= len(rr.addrs) { + rr.next = 0 + } + if len(rr.addrs) > 0 { + addr = rr.addrs[rr.next] + rr.next++ + rr.pending++ + rr.mu.Unlock() + put = func() { + rr.put(ctx, addr) + } + return + } + if rr.waitCh == nil { + ch = make(chan struct{}) + rr.waitCh = ch + } else { + ch = rr.waitCh + } + rr.mu.Unlock() + for { + select { + case <-ctx.Done(): + err = transport.ContextErr(ctx.Err()) + return + case <-ch: + rr.mu.Lock() + if len(rr.addrs) == 0 { + // The newly added addr got removed by Down() again. + rr.mu.Unlock() + continue + } + if rr.next >= len(rr.addrs) { + rr.next = 0 + } + addr = rr.addrs[rr.next] + rr.next++ + rr.pending++ + rr.mu.Unlock() + put = func() { + rr.put(ctx, addr) + } + return + } + } +} + +func (rr *roundRobin) put(ctx context.Context, addr Address) { + rr.mu.Lock() + defer rr.mu.Unlock() + rr.pending-- +} + +func (rr *roundRobin) Close() error { + return nil +} diff --git a/call.go b/call.go index 9d0fc8ee..67604f77 100644 --- a/call.go +++ b/call.go @@ -134,6 +134,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli } var ( lastErr error // record the error that happened + put func() ) for { var ( @@ -152,7 +153,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli if cc.dopts.cp != nil { callHdr.SendCompress = cc.dopts.cp.Type() } - t, err = cc.dopts.picker.Pick(ctx) + t, put, err = cc.getTransport(ctx) if err != nil { if lastErr != nil { // This was a retry; return the error from the last attempt. @@ -165,6 +166,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli } stream, err = sendRequest(ctx, cc.dopts.codec, cc.dopts.cp, callHdr, t, args, topts) if err != nil { + put() if _, ok := err.(transport.ConnectionError); ok { lastErr = err continue @@ -177,12 +179,14 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli // Receive the response lastErr = recvResponse(cc.dopts, t, &c, stream, reply) if _, ok := lastErr.(transport.ConnectionError); ok { + put() continue } if c.traceInfo.tr != nil { c.traceInfo.tr.LazyLog(&payload{sent: false, msg: reply}, true) } t.CloseStream(stream, lastErr) + put() if lastErr != nil { return toRPCErr(lastErr) } diff --git a/clientconn.go b/clientconn.go index 6de86e9e..312e07a9 100644 --- a/clientconn.go +++ b/clientconn.go @@ -43,14 +43,14 @@ import ( "golang.org/x/net/context" "golang.org/x/net/trace" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/naming" "google.golang.org/grpc/transport" ) var ( - // ErrUnspecTarget indicates that the target address is unspecified. - ErrUnspecTarget = errors.New("grpc: target is unspecified") // ErrNoTransportSecurity indicates that there is no transport security // being set for ClientConn. Users should either set one or explicitly // call WithInsecure DialOption to disable security. @@ -60,11 +60,16 @@ var ( // connection. ErrCredentialsMisuse = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportAuthenticator() to set)") // ErrClientConnClosing indicates that the operation is illegal because - // the session is closing. + // the ClientConn is closing. ErrClientConnClosing = errors.New("grpc: the client connection is closing") // ErrClientConnTimeout indicates that the connection could not be // established or re-established within the specified timeout. ErrClientConnTimeout = errors.New("grpc: timed out trying to connect") + // ErrNetworkIP indicates that the connection is down due to some network I/O error. + ErrNetworkIO = errors.New("grpc: failed with network I/O error") + // ErrConnDrain indicates that the connection starts to be drained and does not accept any new RPCs. + ErrConnDrain = errors.New("grpc: ") + errConnClosing = errors.New("grpc: the addrConn is closing") // minimum time to give a connection to complete minConnectTimeout = 20 * time.Second ) @@ -76,7 +81,8 @@ type dialOptions struct { cp Compressor dc Decompressor bs backoffStrategy - picker Picker + resolver naming.Resolver + balancer Balancer block bool insecure bool copts transport.ConnectOptions @@ -108,10 +114,9 @@ func WithDecompressor(dc Decompressor) DialOption { } } -// WithPicker returns a DialOption which sets a picker for connection selection. -func WithPicker(p Picker) DialOption { +func WithResolver(r naming.Resolver) DialOption { return func(o *dialOptions) { - o.picker = p + o.resolver = r } } @@ -201,6 +206,7 @@ func WithUserAgent(s string) DialOption { func Dial(target string, opts ...DialOption) (*ClientConn, error) { cc := &ClientConn{ target: target, + infos: make(map[Address]*addrInfo), } for _, opt := range opts { opt(&cc.dopts) @@ -214,14 +220,44 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) { cc.dopts.bs = DefaultBackoffConfig } - if cc.dopts.picker == nil { - cc.dopts.picker = &unicastPicker{ - target: target, + cc.balancer = cc.dopts.balancer + if cc.balancer == nil { + cc.balancer = RoundRobin() + } + + // Start the first connection + if cc.dopts.resolver == nil { + addr := Address{ + Addr: cc.target, } + ac, err := cc.newAddrConn(addr) + if err != nil { + return nil, err + } + cc.mu.Lock() + cc.infos[addr] = &addrInfo{ + ac: ac, + } + cc.mu.Unlock() + } else { + w, err := cc.dopts.resolver.Resolve(cc.target) + if err != nil { + return nil, err + } + cc.watcher = w + // Get the initial name resolution which starts dialing. + if err := cc.watchAddrUpdates(); err != nil { + return nil, err + } + go func() { + for { + if err := cc.watchAddrUpdates(); err != nil { + return + } + } + }() } - if err := cc.dopts.picker.Init(cc); err != nil { - return nil, err - } + colonPos := strings.LastIndex(target, ":") if colonPos == -1 { colonPos = len(target) @@ -263,61 +299,88 @@ func (s ConnectivityState) String() string { } } +type addrInfo struct { + ac *addrConn +} + // ClientConn represents a client connection to an RPC service. type ClientConn struct { target string + watcher naming.Watcher + balancer Balancer authority string dopts dialOptions + + mu sync.RWMutex + infos map[Address]*addrInfo } -// State returns the connectivity state of cc. -// This is EXPERIMENTAL API. -func (cc *ClientConn) State() (ConnectivityState, error) { - return cc.dopts.picker.State() -} - -// WaitForStateChange blocks until the state changes to something other than the sourceState. -// It returns the new state or error. -// This is EXPERIMENTAL API. -func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) { - return cc.dopts.picker.WaitForStateChange(ctx, sourceState) -} - -// Close starts to tear down the ClientConn. -func (cc *ClientConn) Close() error { - return cc.dopts.picker.Close() -} - -// Conn is a client connection to a single destination. -type Conn struct { - target string - dopts dialOptions - resetChan chan int - shutdownChan chan struct{} - events trace.EventLog - - mu sync.Mutex - state ConnectivityState - stateCV *sync.Cond - // ready is closed and becomes nil when a new transport is up or failed - // due to timeout. - ready chan struct{} - transport transport.ClientTransport -} - -// NewConn creates a Conn. -func NewConn(cc *ClientConn) (*Conn, error) { - if cc.target == "" { - return nil, ErrUnspecTarget +func (cc *ClientConn) watchAddrUpdates() error { + updates, err := cc.watcher.Next() + if err != nil { + return err } - c := &Conn{ - target: cc.target, - dopts: cc.dopts, - resetChan: make(chan int, 1), + for _, update := range updates { + switch update.Op { + case naming.Add: + cc.mu.Lock() + addr := Address{ + Addr: update.Addr, + Metadata: update.Metadata, + } + if _, ok := cc.infos[addr]; ok { + cc.mu.Unlock() + grpclog.Println("grpc: The name resolver wanted to add an existing address: ", addr) + continue + } + cc.mu.Unlock() + ac, err := cc.newAddrConn(addr) + if err != nil { + cc.mu.Unlock() + return err + } + cc.mu.Lock() + cc.infos[addr] = &addrInfo{ + ac: ac, + } + cc.mu.Unlock() + case naming.Delete: + cc.mu.Lock() + addr := Address{ + Addr: update.Addr, + Metadata: update.Metadata, + } + i, ok := cc.infos[addr] + if !ok { + cc.mu.Unlock() + grpclog.Println("grpc: The name resolver wanted to delete a non-exist address: ", addr) + continue + } + delete(cc.infos, addr) + cc.mu.Unlock() + i.ac.startDrain() + default: + grpclog.Println("Unknown update.Op ", update.Op) + } + } + return nil +} + +func (cc *ClientConn) newAddrConn(addr Address) (*addrConn, error) { + /* + if cc.target == "" { + return nil, ErrUnspecTarget + } + */ + c := &addrConn{ + cc: cc, + addr: addr, + dopts: cc.dopts, + //resetChan: make(chan int, 1), shutdownChan: make(chan struct{}), } if EnableTracing { - c.events = trace.NewEventLog("grpc.ClientConn", c.target) + c.events = trace.NewEventLog("grpc.ClientConn", c.addr.Addr) } if !c.dopts.insecure { var ok bool @@ -339,7 +402,7 @@ func NewConn(cc *ClientConn) (*Conn, error) { c.stateCV = sync.NewCond(&c.mu) if c.dopts.block { if err := c.resetTransport(false); err != nil { - c.Close() + c.tearDown(err) return nil, err } // Start to monitor the error status of transport. @@ -348,108 +411,200 @@ func NewConn(cc *ClientConn) (*Conn, error) { // Start a goroutine connecting to the server asynchronously. go func() { if err := c.resetTransport(false); err != nil { - grpclog.Printf("Failed to dial %s: %v; please retry.", c.target, err) - c.Close() + grpclog.Printf("Failed to dial %s: %v; please retry.", c.addr.Addr, err) + c.tearDown(err) return } + grpclog.Println("DEBUG ugh here resetTransport") c.transportMonitor() }() } return c, nil } -// printf records an event in cc's event log, unless cc has been closed. -// REQUIRES cc.mu is held. -func (cc *Conn) printf(format string, a ...interface{}) { - if cc.events != nil { - cc.events.Printf(format, a...) +func (cc *ClientConn) getTransport(ctx context.Context) (transport.ClientTransport, func(), error) { + addr, put, err := cc.balancer.Get(ctx) + if err != nil { + return nil, nil, err } + cc.mu.RLock() + if cc.infos == nil { + cc.mu.RUnlock() + return nil, nil, ErrClientConnClosing + } + info, ok := cc.infos[addr] + cc.mu.RUnlock() + if !ok { + put() + return nil, nil, transport.StreamErrorf(codes.Internal, "grpc: failed to find the transport to send the rpc") + } + t, err := info.ac.wait(ctx) + if err != nil { + put() + return nil, nil, err + } + return t, put, nil } -// errorf records an error in cc's event log, unless cc has been closed. -// REQUIRES cc.mu is held. -func (cc *Conn) errorf(format string, a ...interface{}) { - if cc.events != nil { - cc.events.Errorf(format, a...) - } -} - -// State returns the connectivity state of the Conn -func (cc *Conn) State() ConnectivityState { - cc.mu.Lock() - defer cc.mu.Unlock() - return cc.state +/* +// State returns the connectivity state of cc. +// This is EXPERIMENTAL API. +func (cc *ClientConn) State() (ConnectivityState, error) { + return cc.dopts.picker.State() } // WaitForStateChange blocks until the state changes to something other than the sourceState. -func (cc *Conn) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) { +// It returns the new state or error. +// This is EXPERIMENTAL API. +func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) { + return cc.dopts.picker.WaitForStateChange(ctx, sourceState) +} +*/ + +// Close starts to tear down the ClientConn. +func (cc *ClientConn) Close() error { cc.mu.Lock() - defer cc.mu.Unlock() - if sourceState != cc.state { - return cc.state, nil + if cc.infos == nil { + cc.mu.Unlock() + return ErrClientConnClosing + } + infos := cc.infos + cc.infos = nil + cc.mu.Unlock() + cc.balancer.Close() + if cc.watcher != nil { + cc.watcher.Close() + } + for _, i := range infos { + i.ac.tearDown(ErrClientClosing) + } + return nil +} + +// addrConn is a network connection to a given address. +type addrConn struct { + cc *ClientConn + addr Address + dopts dialOptions + //resetChan chan int + shutdownChan chan struct{} + events trace.EventLog + + mu sync.Mutex + state ConnectivityState + stateCV *sync.Cond + down func(error) // the handler called when a connection is down. + drain bool + // ready is closed and becomes nil when a new transport is up or failed + // due to timeout. + ready chan struct{} + transport transport.ClientTransport +} + +func (ac *addrConn) startDrain() { + ac.mu.Lock() + t := ac.transport + ac.drain = true + if ac.down != nil { + ac.down(ErrConnDrain) + ac.down = nil + } + ac.mu.Unlock() + t.GracefulClose() +} + +// printf records an event in ac's event log, unless ac has been closed. +// REQUIRES ac.mu is held. +func (ac *addrConn) printf(format string, a ...interface{}) { + if ac.events != nil { + ac.events.Printf(format, a...) + } +} + +// errorf records an error in ac's event log, unless ac has been closed. +// REQUIRES ac.mu is held. +func (ac *addrConn) errorf(format string, a ...interface{}) { + if ac.events != nil { + ac.events.Errorf(format, a...) + } +} + +// getState returns the connectivity state of the Conn +func (ac *addrConn) getState() ConnectivityState { + ac.mu.Lock() + defer ac.mu.Unlock() + return ac.state +} + +// waitForStateChange blocks until the state changes to something other than the sourceState. +func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) { + ac.mu.Lock() + defer ac.mu.Unlock() + if sourceState != ac.state { + return ac.state, nil } done := make(chan struct{}) var err error go func() { select { case <-ctx.Done(): - cc.mu.Lock() + ac.mu.Lock() err = ctx.Err() - cc.stateCV.Broadcast() - cc.mu.Unlock() + ac.stateCV.Broadcast() + ac.mu.Unlock() case <-done: } }() defer close(done) - for sourceState == cc.state { - cc.stateCV.Wait() + for sourceState == ac.state { + ac.stateCV.Wait() if err != nil { - return cc.state, err + return ac.state, err } } - return cc.state, nil + return ac.state, nil } -// NotifyReset tries to signal the underlying transport needs to be reset due to -// for example a name resolution change in flight. -func (cc *Conn) NotifyReset() { - select { - case cc.resetChan <- 0: - default: - } -} - -func (cc *Conn) resetTransport(closeTransport bool) error { +func (ac *addrConn) resetTransport(closeTransport bool) error { var retries int start := time.Now() for { - cc.mu.Lock() - cc.printf("connecting") - if cc.state == Shutdown { - // cc.Close() has been invoked. - cc.mu.Unlock() - return ErrClientConnClosing + ac.mu.Lock() + ac.printf("connecting") + if ac.state == Shutdown { + // ac.tearDown(...) has been invoked. + ac.mu.Unlock() + return errConnClosing } - cc.state = Connecting - cc.stateCV.Broadcast() - cc.mu.Unlock() - if closeTransport { - cc.transport.Close() + if ac.drain { + ac.mu.Unlock() + return nil + } + if ac.down != nil { + ac.down(ErrNetworkIO) + ac.down = nil + } + ac.state = Connecting + ac.stateCV.Broadcast() + t := ac.transport + ac.mu.Unlock() + if closeTransport && t != nil { + t.Close() } // Adjust timeout for the current try. - copts := cc.dopts.copts + copts := ac.dopts.copts if copts.Timeout < 0 { - cc.Close() + ac.tearDown(ErrClientConnTimeout) return ErrClientConnTimeout } if copts.Timeout > 0 { copts.Timeout -= time.Since(start) if copts.Timeout <= 0 { - cc.Close() + ac.tearDown(ErrClientConnTimeout) return ErrClientConnTimeout } } - sleepTime := cc.dopts.bs.backoff(retries) + sleepTime := ac.dopts.bs.backoff(retries) timeout := sleepTime if timeout < minConnectTimeout { timeout = minConnectTimeout @@ -458,130 +613,134 @@ func (cc *Conn) resetTransport(closeTransport bool) error { copts.Timeout = timeout } connectTime := time.Now() - addr, err := cc.dopts.picker.PickAddr() - var newTransport transport.ClientTransport - if err == nil { - newTransport, err = transport.NewClientTransport(addr, &copts) - } + grpclog.Println("DEBUG reach inside resetTransport 1") + newTransport, err := transport.NewClientTransport(ac.addr.Addr, &copts) if err != nil { - cc.mu.Lock() - if cc.state == Shutdown { - // cc.Close() has been invoked. - cc.mu.Unlock() - return ErrClientConnClosing + ac.mu.Lock() + if ac.state == Shutdown { + // ac.tearDown(...) has been invoked. + ac.mu.Unlock() + return errConnClosing } - cc.errorf("transient failure: %v", err) - cc.state = TransientFailure - cc.stateCV.Broadcast() - if cc.ready != nil { - close(cc.ready) - cc.ready = nil + ac.errorf("transient failure: %v", err) + ac.state = TransientFailure + ac.stateCV.Broadcast() + if ac.ready != nil { + close(ac.ready) + ac.ready = nil } - cc.mu.Unlock() + ac.mu.Unlock() sleepTime -= time.Since(connectTime) if sleepTime < 0 { sleepTime = 0 } // Fail early before falling into sleep. - if cc.dopts.copts.Timeout > 0 && cc.dopts.copts.Timeout < sleepTime+time.Since(start) { - cc.mu.Lock() - cc.errorf("connection timeout") - cc.mu.Unlock() - cc.Close() + if ac.dopts.copts.Timeout > 0 && ac.dopts.copts.Timeout < sleepTime+time.Since(start) { + ac.mu.Lock() + ac.errorf("connection timeout") + ac.mu.Unlock() + ac.tearDown(ErrClientTimeout) return ErrClientConnTimeout } closeTransport = false time.Sleep(sleepTime) retries++ - grpclog.Printf("grpc: Conn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, cc.target) + grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, ac.addr) continue } - cc.mu.Lock() - cc.printf("ready") - if cc.state == Shutdown { - // cc.Close() has been invoked. - cc.mu.Unlock() + ac.mu.Lock() + grpclog.Println("DEBUG reach inside resetTransport 2") + ac.printf("ready") + if ac.state == Shutdown { + // ac.tearDown(...) has been invoked. + ac.mu.Unlock() newTransport.Close() - return ErrClientConnClosing + return errConnClosing } - cc.state = Ready - cc.stateCV.Broadcast() - cc.transport = newTransport - if cc.ready != nil { - close(cc.ready) - cc.ready = nil + grpclog.Println("DEBUG reach inside resetTransport 3: ", ac.addr) + ac.state = Ready + ac.stateCV.Broadcast() + ac.transport = newTransport + ac.down = ac.cc.balancer.Up(ac.addr) + if ac.ready != nil { + close(ac.ready) + ac.ready = nil } - cc.mu.Unlock() + ac.mu.Unlock() return nil } } -func (cc *Conn) reconnect() bool { - cc.mu.Lock() - if cc.state == Shutdown { - // cc.Close() has been invoked. - cc.mu.Unlock() - return false - } - cc.state = TransientFailure - cc.stateCV.Broadcast() - cc.mu.Unlock() - if err := cc.resetTransport(true); err != nil { - // The ClientConn is closing. - cc.mu.Lock() - cc.printf("transport exiting: %v", err) - cc.mu.Unlock() - grpclog.Printf("grpc: Conn.transportMonitor exits due to: %v", err) - return false - } - return true -} - // Run in a goroutine to track the error in transport and create the // new transport if an error happens. It returns when the channel is closing. -func (cc *Conn) transportMonitor() { +func (ac *addrConn) transportMonitor() { for { + ac.mu.Lock() + t := ac.transport + ac.mu.Unlock() select { // shutdownChan is needed to detect the teardown when - // the ClientConn is idle (i.e., no RPC in flight). - case <-cc.shutdownChan: + // the addrConn is idle (i.e., no RPC in flight). + case <-ac.shutdownChan: return - case <-cc.resetChan: - if !cc.reconnect() { + /* + case <-ac.resetChan: + if !ac.reconnect() { + return + } + */ + case <-t.Error(): + ac.mu.Lock() + if ac.state == Shutdown { + // ac.tearDown(...) has been invoked. + ac.mu.Unlock() return } - case <-cc.transport.Error(): - if !cc.reconnect() { + ac.state = TransientFailure + ac.stateCV.Broadcast() + ac.mu.Unlock() + if err := ac.resetTransport(true); err != nil { + ac.mu.Lock() + ac.printf("transport exiting: %v", err) + ac.mu.Unlock() + grpclog.Printf("grpc: addrConn.transportMonitor exits due to: %v", err) return } - // Tries to drain reset signal if there is any since it is out-dated. - select { - case <-cc.resetChan: - default: - } + /* + if !ac.reconnect() { + return + } + */ + /* + // Tries to drain reset signal if there is any since it is out-dated. + select { + case <-ac.resetChan: + default: + } + */ } } } -// Wait blocks until i) the new transport is up or ii) ctx is done or iii) cc is closed. -func (cc *Conn) Wait(ctx context.Context) (transport.ClientTransport, error) { +// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed. +func (ac *addrConn) wait(ctx context.Context) (transport.ClientTransport, error) { for { - cc.mu.Lock() + ac.mu.Lock() switch { - case cc.state == Shutdown: - cc.mu.Unlock() - return nil, ErrClientConnClosing - case cc.state == Ready: - ct := cc.transport - cc.mu.Unlock() + case ac.state == Shutdown: + ac.mu.Unlock() + return nil, errConnClosing + case ac.state == Ready: + ct := ac.transport + ac.mu.Unlock() return ct, nil default: - ready := cc.ready + ready := ac.ready if ready == nil { ready = make(chan struct{}) - cc.ready = ready + ac.ready = ready } - cc.mu.Unlock() + ac.mu.Unlock() select { case <-ctx.Done(): return nil, transport.ContextErr(ctx.Err()) @@ -592,32 +751,36 @@ func (cc *Conn) Wait(ctx context.Context) (transport.ClientTransport, error) { } } -// Close starts to tear down the Conn. Returns ErrClientConnClosing if +// tearDown starts to tear down the Conn. Returns errConnClosing if // it has been closed (mostly due to dial time-out). // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in -// some edge cases (e.g., the caller opens and closes many ClientConn's in a +// some edge cases (e.g., the caller opens and closes many addrConn's in a // tight loop. -func (cc *Conn) Close() error { - cc.mu.Lock() - defer cc.mu.Unlock() - if cc.state == Shutdown { - return ErrClientConnClosing +func (ac *addrConn) tearDown(err error) { + ac.mu.Lock() + defer ac.mu.Unlock() + if ac.down != nil { + ac.down(err) + ac.down = nil } - cc.state = Shutdown - cc.stateCV.Broadcast() - if cc.events != nil { - cc.events.Finish() - cc.events = nil + if ac.state == Shutdown { + return } - if cc.ready != nil { - close(cc.ready) - cc.ready = nil + ac.state = Shutdown + ac.stateCV.Broadcast() + if ac.events != nil { + ac.events.Finish() + ac.events = nil } - if cc.transport != nil { - cc.transport.Close() + if ac.ready != nil { + close(ac.ready) + ac.ready = nil } - if cc.shutdownChan != nil { - close(cc.shutdownChan) + if ac.transport != nil { + ac.transport.Close() } - return nil + if ac.shutdownChan != nil { + close(ac.shutdownChan) + } + return } diff --git a/examples/pipe/main.go b/examples/pipe/main.go new file mode 100644 index 00000000..84a10726 --- /dev/null +++ b/examples/pipe/main.go @@ -0,0 +1,67 @@ +package main + +import ( + "fmt" + "log" + "net" + "time" + + "google.golang.org/grpc" +) + +type memAddr string + +func (a memAddr) Network() string { return "mem" } +func (a memAddr) String() string { return string(a) } + +type memListener struct { + c chan net.Conn +} + +func (ln *memListener) Accept() (net.Conn, error) { + conn, ok := <-ln.c + if !ok { + return nil, fmt.Errorf("closed") + } + return conn, nil +} + +func (ln *memListener) Addr() net.Addr { + return memAddr(fmt.Sprintf("%p", ln)) +} + +func (ln *memListener) Close() error { + close(ln.c) + return nil +} + +func main() { + grpc.EnableTracing = true + + ln := &memListener{ + c: make(chan net.Conn, 1), + } + + go func() { + s := grpc.NewServer() + log.Fatal(s.Serve(ln)) + }() + + log.Printf("Dialing to server over a synchronous pipe...") + serverConn, err := grpc.Dial("inmemory", + grpc.WithInsecure(), + grpc.WithBlock(), + grpc.WithDialer(func(_ string, _ time.Duration) (net.Conn, error) { + c1, c2 := net.Pipe() + log.Printf("Pipe created: %v %v", c1, c2) + ln.c <- c2 + log.Printf("Pipe accepted: %v %v", c1, c2) + return c1, nil + })) + if err != nil { + log.Fatal(err) + } + + // BUG: never reached + log.Printf("SUCCESS! Connected to server: %v", serverConn) +} diff --git a/picker.go b/picker.go deleted file mode 100644 index 50f315b4..00000000 --- a/picker.go +++ /dev/null @@ -1,243 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -package grpc - -import ( - "container/list" - "fmt" - "sync" - - "golang.org/x/net/context" - "google.golang.org/grpc/grpclog" - "google.golang.org/grpc/naming" - "google.golang.org/grpc/transport" -) - -// Picker picks a Conn for RPC requests. -// This is EXPERIMENTAL and please do not implement your own Picker for now. -type Picker interface { - // Init does initial processing for the Picker, e.g., initiate some connections. - Init(cc *ClientConn) error - // Pick blocks until either a transport.ClientTransport is ready for the upcoming RPC - // or some error happens. - Pick(ctx context.Context) (transport.ClientTransport, error) - // PickAddr picks a peer address for connecting. This will be called repeated for - // connecting/reconnecting. - PickAddr() (string, error) - // State returns the connectivity state of the underlying connections. - State() (ConnectivityState, error) - // WaitForStateChange blocks until the state changes to something other than - // the sourceState. It returns the new state or error. - WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) - // Close closes all the Conn's owned by this Picker. - Close() error -} - -// unicastPicker is the default Picker which is used when there is no custom Picker -// specified by users. It always picks the same Conn. -type unicastPicker struct { - target string - conn *Conn -} - -func (p *unicastPicker) Init(cc *ClientConn) error { - c, err := NewConn(cc) - if err != nil { - return err - } - p.conn = c - return nil -} - -func (p *unicastPicker) Pick(ctx context.Context) (transport.ClientTransport, error) { - return p.conn.Wait(ctx) -} - -func (p *unicastPicker) PickAddr() (string, error) { - return p.target, nil -} - -func (p *unicastPicker) State() (ConnectivityState, error) { - return p.conn.State(), nil -} - -func (p *unicastPicker) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) { - return p.conn.WaitForStateChange(ctx, sourceState) -} - -func (p *unicastPicker) Close() error { - if p.conn != nil { - return p.conn.Close() - } - return nil -} - -// unicastNamingPicker picks an address from a name resolver to set up the connection. -type unicastNamingPicker struct { - cc *ClientConn - resolver naming.Resolver - watcher naming.Watcher - mu sync.Mutex - // The list of the addresses are obtained from watcher. - addrs *list.List - // It tracks the current picked addr by PickAddr(). The next PickAddr may - // push it forward on addrs. - pickedAddr *list.Element - conn *Conn -} - -// NewUnicastNamingPicker creates a Picker to pick addresses from a name resolver -// to connect. -func NewUnicastNamingPicker(r naming.Resolver) Picker { - return &unicastNamingPicker{ - resolver: r, - addrs: list.New(), - } -} - -type addrInfo struct { - addr string - // Set to true if this addrInfo needs to be deleted in the next PickAddrr() call. - deleting bool -} - -// processUpdates calls Watcher.Next() once and processes the obtained updates. -func (p *unicastNamingPicker) processUpdates() error { - updates, err := p.watcher.Next() - if err != nil { - return err - } - for _, update := range updates { - switch update.Op { - case naming.Add: - p.mu.Lock() - p.addrs.PushBack(&addrInfo{ - addr: update.Addr, - }) - p.mu.Unlock() - // Initial connection setup - if p.conn == nil { - conn, err := NewConn(p.cc) - if err != nil { - return err - } - p.conn = conn - } - case naming.Delete: - p.mu.Lock() - for e := p.addrs.Front(); e != nil; e = e.Next() { - if update.Addr == e.Value.(*addrInfo).addr { - if e == p.pickedAddr { - // Do not remove the element now if it is the current picked - // one. We leave the deletion to the next PickAddr() call. - e.Value.(*addrInfo).deleting = true - // Notify Conn to close it. All the live RPCs on this connection - // will be aborted. - p.conn.NotifyReset() - } else { - p.addrs.Remove(e) - } - } - } - p.mu.Unlock() - default: - grpclog.Println("Unknown update.Op ", update.Op) - } - } - return nil -} - -// monitor runs in a standalone goroutine to keep watching name resolution updates until the watcher -// is closed. -func (p *unicastNamingPicker) monitor() { - for { - if err := p.processUpdates(); err != nil { - return - } - } -} - -func (p *unicastNamingPicker) Init(cc *ClientConn) error { - w, err := p.resolver.Resolve(cc.target) - if err != nil { - return err - } - p.watcher = w - p.cc = cc - // Get the initial name resolution. - if err := p.processUpdates(); err != nil { - return err - } - go p.monitor() - return nil -} - -func (p *unicastNamingPicker) Pick(ctx context.Context) (transport.ClientTransport, error) { - return p.conn.Wait(ctx) -} - -func (p *unicastNamingPicker) PickAddr() (string, error) { - p.mu.Lock() - defer p.mu.Unlock() - if p.pickedAddr == nil { - p.pickedAddr = p.addrs.Front() - } else { - pa := p.pickedAddr - p.pickedAddr = pa.Next() - if pa.Value.(*addrInfo).deleting { - p.addrs.Remove(pa) - } - if p.pickedAddr == nil { - p.pickedAddr = p.addrs.Front() - } - } - if p.pickedAddr == nil { - return "", fmt.Errorf("there is no address available to pick") - } - return p.pickedAddr.Value.(*addrInfo).addr, nil -} - -func (p *unicastNamingPicker) State() (ConnectivityState, error) { - return 0, fmt.Errorf("State() is not supported for unicastNamingPicker") -} - -func (p *unicastNamingPicker) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) { - return 0, fmt.Errorf("WaitForStateChange is not supported for unicastNamingPciker") -} - -func (p *unicastNamingPicker) Close() error { - p.watcher.Close() - p.conn.Close() - return nil -} diff --git a/picker_test.go b/picker_test.go deleted file mode 100644 index dd29497b..00000000 --- a/picker_test.go +++ /dev/null @@ -1,188 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -package grpc - -import ( - "fmt" - "math" - "testing" - "time" - - "golang.org/x/net/context" - "google.golang.org/grpc/naming" -) - -type testWatcher struct { - // the channel to receives name resolution updates - update chan *naming.Update - // the side channel to get to know how many updates in a batch - side chan int - // the channel to notifiy update injector that the update reading is done - readDone chan int -} - -func (w *testWatcher) Next() (updates []*naming.Update, err error) { - n := <-w.side - if n == 0 { - return nil, fmt.Errorf("w.side is closed") - } - for i := 0; i < n; i++ { - u := <-w.update - if u != nil { - updates = append(updates, u) - } - } - w.readDone <- 0 - return -} - -func (w *testWatcher) Close() { -} - -func (w *testWatcher) inject(updates []*naming.Update) { - w.side <- len(updates) - for _, u := range updates { - w.update <- u - } - <-w.readDone -} - -type testNameResolver struct { - w *testWatcher - addr string -} - -func (r *testNameResolver) Resolve(target string) (naming.Watcher, error) { - r.w = &testWatcher{ - update: make(chan *naming.Update, 1), - side: make(chan int, 1), - readDone: make(chan int), - } - r.w.side <- 1 - r.w.update <- &naming.Update{ - Op: naming.Add, - Addr: r.addr, - } - go func() { - <-r.w.readDone - }() - return r.w, nil -} - -func startServers(t *testing.T, numServers, port int, maxStreams uint32) ([]*server, *testNameResolver) { - var servers []*server - for i := 0; i < numServers; i++ { - s := newTestServer() - servers = append(servers, s) - go s.start(t, port, maxStreams) - s.wait(t, 2*time.Second) - } - // Point to server1 - addr := "127.0.0.1:" + servers[0].port - return servers, &testNameResolver{ - addr: addr, - } -} - -func TestNameDiscovery(t *testing.T) { - // Start 3 servers on 3 ports. - servers, r := startServers(t, 3, 0, math.MaxUint32) - cc, err := Dial("foo.bar.com", WithPicker(NewUnicastNamingPicker(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) - if err != nil { - t.Fatalf("Failed to create ClientConn: %v", err) - } - var reply string - if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse { - t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want ", err) - } - // Inject name resolution change to point to the second server now. - var updates []*naming.Update - updates = append(updates, &naming.Update{ - Op: naming.Delete, - Addr: "127.0.0.1:" + servers[0].port, - }) - updates = append(updates, &naming.Update{ - Op: naming.Add, - Addr: "127.0.0.1:" + servers[1].port, - }) - r.w.inject(updates) - servers[0].stop() - if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse { - t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want ", err) - } - // Add another server address (server#3) to name resolution - updates = nil - updates = append(updates, &naming.Update{ - Op: naming.Add, - Addr: "127.0.0.1:" + servers[2].port, - }) - r.w.inject(updates) - // Stop server#2. The library should direct to server#3 automatically. - servers[1].stop() - if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse { - t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want ", err) - } - cc.Close() - servers[2].stop() -} - -func TestEmptyAddrs(t *testing.T) { - servers, r := startServers(t, 1, 0, math.MaxUint32) - cc, err := Dial("foo.bar.com", WithPicker(NewUnicastNamingPicker(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) - if err != nil { - t.Fatalf("Failed to create ClientConn: %v", err) - } - var reply string - if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse { - t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want ", err) - } - // Inject name resolution change to remove the server address so that there is no address - // available after that. - var updates []*naming.Update - updates = append(updates, &naming.Update{ - Op: naming.Delete, - Addr: "127.0.0.1:" + servers[0].port, - }) - r.w.inject(updates) - // Loop until the above updates apply. - for { - time.Sleep(10 * time.Millisecond) - ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond) - if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc); err != nil { - break - } - } - cc.Close() - servers[0].stop() -} diff --git a/stream.go b/stream.go index 22e49cb5..51735e19 100644 --- a/stream.go +++ b/stream.go @@ -103,8 +103,10 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth var ( t transport.ClientTransport err error + put func() ) - t, err = cc.dopts.picker.Pick(ctx) + //t, err = cc.dopts.picker.Pick(ctx) + t, put, err = cc.getTransport(ctx) if err != nil { return nil, toRPCErr(err) } @@ -119,6 +121,7 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth } cs := &clientStream{ desc: desc, + put: put, codec: cc.dopts.codec, cp: cc.dopts.cp, dc: cc.dopts.dc, @@ -174,6 +177,7 @@ type clientStream struct { tracing bool // set to EnableTracing when the clientStream is created. mu sync.Mutex + put func() closed bool // trInfo.tr is set when the clientStream is created (if EnableTracing is true), // and is set to nil when the clientStream's finish method is called. @@ -311,6 +315,10 @@ func (cs *clientStream) finish(err error) { } cs.mu.Lock() defer cs.mu.Unlock() + if cs.put != nil { + cs.put() + cs.put = nil + } if cs.trInfo.tr != nil { if err == nil || err == io.EOF { cs.trInfo.tr.LazyPrintf("RPC: [OK]") diff --git a/test/end2end_test.go b/test/end2end_test.go index d3be1eb0..0bfa48d1 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -332,14 +332,15 @@ func TestReconnectTimeout(t *testing.T) { ResponseSize: proto.Int32(respSize), Payload: payload, } - if _, err := tc.UnaryCall(context.Background(), req); err == nil { + ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond) + if _, err := tc.UnaryCall(ctx, req); err == nil { t.Errorf("TestService/UnaryCall(_, _) = _, , want _, non-nil") return } }() // Block until reconnect times out. <-waitC - if err := conn.Close(); err != grpc.ErrClientConnClosing { + if err := conn.Close(); err != nil { t.Fatalf("%v.Close() = %v, want %v", conn, err, grpc.ErrClientConnClosing) } } @@ -594,36 +595,36 @@ func testTimeoutOnDeadServer(t *testing.T, e env) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) - ctx, _ := context.WithTimeout(context.Background(), time.Second) - if _, err := cc.WaitForStateChange(ctx, grpc.Idle); err != nil { - t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, ", grpc.Idle, err) - } - ctx, _ = context.WithTimeout(context.Background(), time.Second) - if _, err := cc.WaitForStateChange(ctx, grpc.Connecting); err != nil { - t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, ", grpc.Connecting, err) - } - if state, err := cc.State(); err != nil || state != grpc.Ready { - t.Fatalf("cc.State() = %s, %v, want %s, ", state, err, grpc.Ready) - } - ctx, _ = context.WithTimeout(context.Background(), time.Second) - if _, err := cc.WaitForStateChange(ctx, grpc.Ready); err != context.DeadlineExceeded { - t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, %v", grpc.Ready, err, context.DeadlineExceeded) - } + //ctx, _ := context.WithTimeout(context.Background(), time.Second) + //if _, err := cc.WaitForStateChange(ctx, grpc.Idle); err != nil { + // t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, ", grpc.Idle, err) + //} + //ctx, _ = context.WithTimeout(context.Background(), time.Second) + //if _, err := cc.WaitForStateChange(ctx, grpc.Connecting); err != nil { + // t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, ", grpc.Connecting, err) + //} + //if state, err := cc.State(); err != nil || state != grpc.Ready { + // t.Fatalf("cc.State() = %s, %v, want %s, ", state, err, grpc.Ready) + //} + //ctx, _ = context.WithTimeout(context.Background(), time.Second) + //if _, err := cc.WaitForStateChange(ctx, grpc.Ready); err != context.DeadlineExceeded { + // t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, %v", grpc.Ready, err, context.DeadlineExceeded) + //} te.srv.Stop() // Set -1 as the timeout to make sure if transportMonitor gets error // notification in time the failure path of the 1st invoke of // ClientConn.wait hits the deadline exceeded error. - ctx, _ = context.WithTimeout(context.Background(), -1) + ctx, _ := context.WithTimeout(context.Background(), -1) if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(%v, _) = _, error %v, want _, error code: %d", ctx, err, codes.DeadlineExceeded) } - ctx, _ = context.WithTimeout(context.Background(), time.Second) - if _, err := cc.WaitForStateChange(ctx, grpc.Ready); err != nil { - t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, ", grpc.Ready, err) - } - if state, err := cc.State(); err != nil || (state != grpc.Connecting && state != grpc.TransientFailure) { - t.Fatalf("cc.State() = %s, %v, want %s or %s, ", state, err, grpc.Connecting, grpc.TransientFailure) - } + //ctx, _ = context.WithTimeout(context.Background(), time.Second) + //if _, err := cc.WaitForStateChange(ctx, grpc.Ready); err != nil { + // t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, ", grpc.Ready, err) + //} + //if state, err := cc.State(); err != nil || (state != grpc.Connecting && state != grpc.TransientFailure) { + // t.Fatalf("cc.State() = %s, %v, want %s or %s, ", state, err, grpc.Connecting, grpc.TransientFailure) + //} cc.Close() awaitNewConnLogOutput() } @@ -784,21 +785,23 @@ func testEmptyUnaryWithUserAgent(t *testing.T, e env) { cc := te.clientConn() // Wait until cc is connected. - ctx, _ := context.WithTimeout(context.Background(), time.Second) - if _, err := cc.WaitForStateChange(ctx, grpc.Idle); err != nil { - t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, ", grpc.Idle, err) - } - ctx, _ = context.WithTimeout(context.Background(), time.Second) - if _, err := cc.WaitForStateChange(ctx, grpc.Connecting); err != nil { - t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, ", grpc.Connecting, err) - } - if state, err := cc.State(); err != nil || state != grpc.Ready { - t.Fatalf("cc.State() = %s, %v, want %s, ", state, err, grpc.Ready) - } - ctx, _ = context.WithTimeout(context.Background(), time.Second) - if _, err := cc.WaitForStateChange(ctx, grpc.Ready); err == nil { - t.Fatalf("cc.WaitForStateChange(_, %s) = _, , want _, %v", grpc.Ready, context.DeadlineExceeded) - } + //ctx, _ := context.WithTimeout(context.Background(), time.Second) + //if _, err := cc.WaitForStateChange(ctx, grpc.Idle); err != nil { + // t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, ", grpc.Idle, err) + //} + //ctx, _ = context.WithTimeout(context.Background(), time.Second) + //if _, err := cc.WaitForStateChange(ctx, grpc.Connecting); err != nil { + // t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, ", grpc.Connecting, err) + //} + //if state, err := cc.State(); err != nil || state != grpc.Ready { + // t.Fatalf("cc.State() = %s, %v, want %s, ", state, err, grpc.Ready) + //} + /* + ctx, _ = context.WithTimeout(context.Background(), time.Second) + if _, err := cc.WaitForStateChange(ctx, grpc.Ready); err == nil { + t.Fatalf("cc.WaitForStateChange(_, %s) = _, , want _, %v", grpc.Ready, context.DeadlineExceeded) + } + */ tc := testpb.NewTestServiceClient(cc) var header metadata.MD reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Header(&header)) @@ -811,14 +814,15 @@ func testEmptyUnaryWithUserAgent(t *testing.T, e env) { te.srv.Stop() cc.Close() - - ctx, _ = context.WithTimeout(context.Background(), 5*time.Second) - if _, err := cc.WaitForStateChange(ctx, grpc.Ready); err != nil { - t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, ", grpc.Ready, err) - } - if state, err := cc.State(); err != nil || state != grpc.Shutdown { - t.Fatalf("cc.State() = %s, %v, want %s, ", state, err, grpc.Shutdown) - } + /* + ctx, _ = context.WithTimeout(context.Background(), 5*time.Second) + if _, err := cc.WaitForStateChange(ctx, grpc.Ready); err != nil { + t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, ", grpc.Ready, err) + } + if state, err := cc.State(); err != nil || state != grpc.Shutdown { + t.Fatalf("cc.State() = %s, %v, want %s, ", state, err, grpc.Shutdown) + } + */ } func TestFailedEmptyUnary(t *testing.T) { @@ -1000,7 +1004,6 @@ func testRetry(t *testing.T, e env) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) - var wg sync.WaitGroup numRPC := 1000 diff --git a/test/out.txt b/test/out.txt new file mode 100644 index 00000000..ad188a8d --- /dev/null +++ b/test/out.txt @@ -0,0 +1,1015 @@ +=== RUN TestRetry +2016/05/05 17:56:45 DEBUG ugh here +2016/05/05 17:56:45 DEBUG reach inside resetTransport -2 +2016/05/05 17:56:45 DEBUG reach inside resetTransport -1 +2016/05/05 17:56:45 DEBUG reach inside resetTransport 0 +2016/05/05 17:56:45 DEBUG reach inside resetTransport 1 +2016/05/05 17:56:45 DEBUG reach inside resetTransport 2 +2016/05/05 17:56:45 DEBUG reach inside resetTransport 3: {localhost:46298 } +2016/05/05 17:56:45 DEBUG ugh here resetTransport +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:45 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG reach inside resetTransport -2 +2016/05/05 17:56:46 DEBUG reach inside resetTransport -1 +2016/05/05 17:56:46 DEBUG reach inside resetTransport 0 +2016/05/05 17:56:46 DEBUG reach inside resetTransport 1 +2016/05/05 17:56:46 DEBUG reach inside resetTransport 2 +2016/05/05 17:56:46 DEBUG reach inside resetTransport 3: {localhost:46298 } +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:46 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: +2016/05/05 17:56:47 DEBUG get transport: diff --git a/transport/http2_client.go b/transport/http2_client.go index 8e916b00..77b2e522 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -272,6 +272,10 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea } } t.mu.Lock() + if t.activeStreams == nil { + t.mu.Unlock() + return nil, ErrConnClosing + } if t.state != reachable { t.mu.Unlock() return nil, ErrConnClosing @@ -390,6 +394,11 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea func (t *http2Client) CloseStream(s *Stream, err error) { var updateStreams bool t.mu.Lock() + if t.activeStreams == nil { + t.mu.Unlock() + t.Close() + return + } if t.streamsQuota != nil { updateStreams = true } @@ -457,6 +466,17 @@ func (t *http2Client) Close() (err error) { return } +func (t *http2Client) GracefulClose() error { + t.mu.Lock() + active := len(t.activeStreams) + t.activeStreams = nil + t.mu.Unlock() + if active == 0 { + return t.Close() + } + return nil +} + // Write formats the data into HTTP2 data frame(s) and sends it out. The caller // should proceed only if Write returns nil. // TODO(zhaoq): opts.Delay is ignored in this implementation. Support it later diff --git a/transport/transport.go b/transport/transport.go index 87fdf532..b85c0ac4 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -391,6 +391,8 @@ type ClientTransport interface { // is called only once. Close() error + GracefulClose() error + // Write sends the data for the given stream. A nil stream indicates // the write is to be performed on the transport as a whole. Write(s *Stream, data []byte, opts *Options) error