some minor fixes

This commit is contained in:
iamqizhao
2016-05-18 16:26:12 -07:00
parent 162d8d2d33
commit 5d62215b41
4 changed files with 12 additions and 4 deletions

View File

@ -132,12 +132,12 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
Last: true, Last: true,
Delay: false, Delay: false,
} }
var put func()
for { for {
var ( var (
err error err error
t transport.ClientTransport t transport.ClientTransport
stream *transport.Stream stream *transport.Stream
put func()
) )
// TODO(zhaoq): Need a formal spec of retry strategy for non-failFast rpcs. // TODO(zhaoq): Need a formal spec of retry strategy for non-failFast rpcs.
callHdr := &transport.CallHdr{ callHdr := &transport.CallHdr{
@ -149,6 +149,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
} }
t, put, err = cc.getTransport(ctx) t, put, err = cc.getTransport(ctx)
if err != nil { if err != nil {
// TODO(zhaoq): Probably revisit the error handling.
if err == ErrClientConnClosing { if err == ErrClientConnClosing {
return toRPCErr(err) return toRPCErr(err)
} }

View File

@ -384,6 +384,7 @@ func (cc *ClientConn) newAddrConn(addr Address) error {
} }
} }
} }
// Insert ac into ac.cc.conns. This needs to be done before any getTransport(...) is called.
ac.cc.mu.Lock() ac.cc.mu.Lock()
if ac.cc.conns == nil { if ac.cc.conns == nil {
ac.cc.mu.Unlock() ac.cc.mu.Unlock()
@ -620,11 +621,11 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
ac.state = Ready ac.state = Ready
ac.stateCV.Broadcast() ac.stateCV.Broadcast()
ac.transport = newTransport ac.transport = newTransport
ac.down = ac.cc.balancer.Up(ac.addr)
if ac.ready != nil { if ac.ready != nil {
close(ac.ready) close(ac.ready)
ac.ready = nil ac.ready = nil
} }
ac.down = ac.cc.balancer.Up(ac.addr)
ac.mu.Unlock() ac.mu.Unlock()
return nil return nil
} }

View File

@ -340,7 +340,7 @@ func TestReconnectTimeout(t *testing.T) {
// Block until reconnect times out. // Block until reconnect times out.
<-waitC <-waitC
if err := conn.Close(); err != nil { if err := conn.Close(); err != nil {
t.Fatalf("%v.Close() = %v, want %v", conn, err, grpc.ErrClientConnClosing) t.Fatalf("%v.Close() = %v, want <nil>", conn, err)
} }
} }
@ -586,6 +586,7 @@ func TestTimeoutOnDeadServer(t *testing.T) {
func testTimeoutOnDeadServer(t *testing.T, e env) { func testTimeoutOnDeadServer(t *testing.T, e env) {
te := newTest(t, e) te := newTest(t, e)
te.userAgent = testAppUA
te.declareLogNoise( te.declareLogNoise(
"transport: http2Client.notifyError got notified that the client transport was broken EOF", "transport: http2Client.notifyError got notified that the client transport was broken EOF",
"grpc: Conn.transportMonitor exits due to: grpc: the client connection is closing", "grpc: Conn.transportMonitor exits due to: grpc: the client connection is closing",
@ -597,13 +598,16 @@ func testTimeoutOnDeadServer(t *testing.T, e env) {
cc := te.clientConn() cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
}
te.srv.Stop() te.srv.Stop()
// Set -1 as the timeout to make sure if transportMonitor gets error // Set -1 as the timeout to make sure if transportMonitor gets error
// notification in time the failure path of the 1st invoke of // notification in time the failure path of the 1st invoke of
// ClientConn.wait hits the deadline exceeded error. // ClientConn.wait hits the deadline exceeded error.
ctx, _ := context.WithTimeout(context.Background(), -1) ctx, _ := context.WithTimeout(context.Background(), -1)
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded { if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/EmptyCall(%v, _) = _, error %v, want _, error code: %d", ctx, err, codes.DeadlineExceeded) t.Fatalf("TestService/EmptyCall(%v, _) = _, %v, want _, error code: %d", ctx, err, codes.DeadlineExceeded)
} }
awaitNewConnLogOutput() awaitNewConnLogOutput()
} }

View File

@ -392,6 +392,8 @@ type ClientTransport interface {
// is called only once. // is called only once.
Close() error Close() error
// GracefulClose starts to tear down the transport. It stops accepting
// new RPCs and wait the completion of the pending RPCs.
GracefulClose() error GracefulClose() error
// Write sends the data for the given stream. A nil stream indicates // Write sends the data for the given stream. A nil stream indicates