From a1c5391b5474c74f011844a418958b2407cda0f3 Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Fri, 7 Aug 2015 15:06:59 -0700 Subject: [PATCH 01/16] add interop empty_stream test and timeout_on_sleeping_server test --- interop/client/client.go | 41 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/interop/client/client.go b/interop/client/client.go index cc599cf4..1346de36 100644 --- a/interop/client/client.go +++ b/interop/client/client.go @@ -40,6 +40,7 @@ import ( "net" "strconv" "strings" + "time" "github.com/golang/protobuf/proto" "golang.org/x/net/context" @@ -68,6 +69,8 @@ var ( client_streaming : request streaming with single response; server_streaming : single request with response streaming; ping_pong : full-duplex streaming; + empty_stream : full-duplex streaming with zero message; + timeout_on_sleeping_server: fullduplex streaming; compute_engine_creds: large_unary with compute engine auth; service_account_creds: large_unary with service account auth; cancel_after_begin: cancellation after metadata has been sent but before payloads are sent; @@ -245,6 +248,40 @@ func doPingPong(tc testpb.TestServiceClient) { grpclog.Println("Pingpong done") } +func doEmptyStream(tc testpb.TestServiceClient) { + stream, err := tc.FullDuplexCall(context.Background()) + if err != nil { + grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err) + } + if err := stream.CloseSend(); err != nil { + grpclog.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil) + } + if _, err := stream.Recv(); err != io.EOF { + grpclog.Fatalf("%v failed to complete the empty stream test: %v", stream, err) + } + grpclog.Println("Emptystream done") +} + +func doTimeoutOnSleepingServer(tc testpb.TestServiceClient) { + ctx, _ := context.WithTimeout(context.Background(), 1*time.Millisecond) + stream, err := tc.FullDuplexCall(ctx) + if err != nil { + grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err) + } + pl := newPayload(testpb.PayloadType_COMPRESSABLE, 27182) + req := &testpb.StreamingOutputCallRequest{ + ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(), + Payload: pl, + } + if err := stream.Send(req); err != nil { + grpclog.Fatalf("%v.Send(%v) = %v", stream, req, err) + } + if _, err := stream.Recv(); grpc.Code(err) != codes.DeadlineExceeded { + grpclog.Fatalf("%v got the error %v, want error code: %d", stream, err, codes.DeadlineExceeded) + } + grpclog.Println("TimeoutOnSleepingServer done") +} + func doComputeEngineCreds(tc testpb.TestServiceClient) { pl := newPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize) req := &testpb.SimpleRequest{ @@ -400,6 +437,10 @@ func main() { doServerStreaming(tc) case "ping_pong": doPingPong(tc) + case "empty_stream": + doEmptyStream(tc) + case "timeout_on_sleeping_server": + doTimeoutOnSleepingServer(tc) case "compute_engine_creds": if !*useTLS { grpclog.Fatalf("TLS is not enabled. TLS is required to execute compute_engine_creds test case.") From 62388cbe3000f7998d37e3271441e811425c7b0d Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Fri, 7 Aug 2015 15:26:32 -0700 Subject: [PATCH 02/16] add 2 interop tests --- interop/client/.client.go.swp | Bin 0 -> 16384 bytes 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 interop/client/.client.go.swp diff --git a/interop/client/.client.go.swp b/interop/client/.client.go.swp new file mode 100644 index 0000000000000000000000000000000000000000..9e4c4dccf68bc8fdd187bbc443ba9576f0dad8bd GIT binary patch literal 16384 zcmeHOU5w*a6*h%Z0&OXjN03faRb-cJGP_i4JKI&2Njx)#Op@BpPFMX&9s4HZu4B9Y znb|26+8200DiT5hA%s{V-gp4@i9XQsC!SCX^aY{9L#6V-L*FZv@||lZ@nm*Z;_ zD#$fv$$rBQqOdUyf~hC)JX_@ZazU^;b(6hx++4eF?R+zi>}D=M4Lp?E2%>3o8ijV# z^%D^VVbk_p;V1bG$`?w@hmwJkf%nM3ZOUiQon2$$yQ_E6ogW{*$9$z&$w0|K$w0|K z$w0|K$w0|K$w0}#{{sU_c(d{qIC4|&%pkwNZ{_}Oeueq_hgQ=6k*9n4`(Lc2zmumw zmA^le7cej9{rP*jIl=SySJLz3@>ViXGEg#5GEg#5GEg#5GEg#5GEg#5GEg#5GVpIO zU^$AyTkXgApF0#~zW;ycql)r2@Mqvxz{|kXz#32k?g3uET~WRVJOX?Sc=a|#c?oz9 zxF2}uR_Fj;0DRyc;PsCv%C~?4a0~GB4=c(6&;Z`7D9RhaUxB{>zXx6geha(={0#Up z@FU>6z&C)e0bd2iz$Q=!ZUx@_kfOW@dp?UR-bP^JCE!cI{lM+OYY3M70C*Jm6!0>*n*a`A1CmRwi+wAinxdRiDz@d@ z!W&r=e9bVLxKW&pA`ypyAB%O$5L*4C3}Km z5$%hp@x-d;v6#C4brsG;VmVe~(Vi94SO_0F{Dj6SASOX1D6|f}z;a@;@PiXCC>1Wp zwZ+J??I87&ksS#qUMJ6rrec)(R&+=Q_>M!`13Aj&$V!vF3ku~Iz5xl-M2v(F>%@sl zaxu!pavhaQ;?6{nCZoU~#hwr$NEv1RuhS&;JSPo3ah2jE64nffFuNjVVRAUiUQs4? zW>qe;{Aqk$GUs<`u`C3&Gd(HuQZzFqFN-T|onqWPfjWFD6{tPQ(7d*bb}sml-?ki4 zG0(g3D(88wIcqvgD~dI8%mB-Gs3s*&!Kl@&8CO|lu9N&lHA&0(FcC3l$^2k7%j}iP z6YapCxYHEP#c?MS5pjb;jsy)l7IA|vS)S`q5OTYg&1zk#aC#fbbKkT3MVfflv|%I> z9IRDYXv9jYb*dIztJa~wiw)WOd)92JWko?=DSH9*1n4P|>quQ!8?3r|+zGh}xiR@clCjvJ zKKg2O;Krg(XU-ICVlforG^*&p+515dB}GnhhcrK_Rr8!7L}2dOHC#un<`nYN*;qtN zjoYyKIvVF37tCP}&YnBpSi_&QD}=D;k`-3$h>4YYiII&8t!z-B74CB(*+&aw2kT|1 zGt2cTn9x>RrCfG({NS(3?&no(|FKjsjAu*PX8%;WiQltvVQ0Sp#GTQ^txSZIkCw#XzpKq#J zmk-BulH~jgKmOcr#LmMx5#k}A~5Ns^ZS%7=Xn_XxUeIt$=RkD+--%0{Bb)O)|GNgN**Bx!cm!I;oe` z9%-A@>Ri$cwUbGxH0T?KwxM-3^D@6OhV5IEv|hVA>}b6$>Zw~@ZAqy2=!3z2#F|>!1)Az{a#_(9%tmn6V77aG*&a+Xwp-mUR}P9ET-LR%ZD~-E9ZuBXnFhAdl=K!{QY8gDs@~ql z&%8b*!>ANl6)>3%sL%xmg&cd2Vn5JDp)1%(oT3 z(}aRR7BRE14dL;TZQ94$$jO$MNMwzeJr5&yx`&BBNH7o9GnPCQ#O1)Mjny4)?vc7Yb+Ecs0KEG&kP zP*dbBlHMIPi+F*J$(Q-bt>ZTZs^WT9PuwK4mYSQ^@;s6?7V`?7m&cCuzP+f)b8Bbs ztry!9=WL!n(__REiA|IPdV`T6?`*xx@3d>Z%!@EZ2_uK?c!o(H}TJPzytcLN06 z3cP`R|0}?Yzz>1vfdk+%Km|Sn+zBYaTZjkz3HS|g4R{W)0Rwmov4Ot>uL3^>t^p4K z_W_>-6yR^L`Io>o;M;%&JPxogj{xOWGEg#5GEg#5GEg#5GEg$`zh&SVrGmeeJYv(> zL{JN{r=cIGAxA5)T;_0(j25pGVo|60CkKa@p+-cED^Adsrzb5wJlW*46Fm=N*AiO> z*|27=-r$6#dQ8g(pOc)JTb`oinU2Of+XyYiSw897QQs!DDhJ-?F<{>sdsv5Op)_8Y zKb5-3FL#Gmg^d%Dc>wX~`O1Kp0IG4|LZ>@*%;E6}n_1TfhTBqk}xP zjKh}WM0GwNiW`kaUKAHs;a7;sTVaSWN{xTjv&_5L^$tX&58B+U^B`~S$%90;@>9@J z$@=B7j>#rxeGfea;Sc)!u1R>DD>>pu<+vl2%Dj|?J2JM+eCAx{X9}B$>TuM*cAC9B qcGiGlZh5r920xR9*FoK4{G*E(PwL@J>c33T!qRqeWT>42VgCe<$u7$P literal 0 HcmV?d00001 From 8b80bf491fa69fe3671a5cc70dbcedf07f92a9c7 Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Fri, 7 Aug 2015 15:31:13 -0700 Subject: [PATCH 03/16] fix the space --- interop/client/client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/interop/client/client.go b/interop/client/client.go index 1346de36..74cbad9c 100644 --- a/interop/client/client.go +++ b/interop/client/client.go @@ -69,8 +69,8 @@ var ( client_streaming : request streaming with single response; server_streaming : single request with response streaming; ping_pong : full-duplex streaming; - empty_stream : full-duplex streaming with zero message; - timeout_on_sleeping_server: fullduplex streaming; + empty_stream : full-duplex streaming with zero message; + timeout_on_sleeping_server: fullduplex streaming; compute_engine_creds: large_unary with compute engine auth; service_account_creds: large_unary with service account auth; cancel_after_begin: cancellation after metadata has been sent but before payloads are sent; From 2a4442cda2d99dad88ccbcaf39f8975fcf85747c Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Fri, 7 Aug 2015 16:10:08 -0700 Subject: [PATCH 04/16] Delete client.go.swp --- interop/client/.client.go.swp | Bin 16384 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 interop/client/.client.go.swp diff --git a/interop/client/.client.go.swp b/interop/client/.client.go.swp deleted file mode 100644 index 9e4c4dccf68bc8fdd187bbc443ba9576f0dad8bd..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 16384 zcmeHOU5w*a6*h%Z0&OXjN03faRb-cJGP_i4JKI&2Njx)#Op@BpPFMX&9s4HZu4B9Y znb|26+8200DiT5hA%s{V-gp4@i9XQsC!SCX^aY{9L#6V-L*FZv@||lZ@nm*Z;_ zD#$fv$$rBQqOdUyf~hC)JX_@ZazU^;b(6hx++4eF?R+zi>}D=M4Lp?E2%>3o8ijV# z^%D^VVbk_p;V1bG$`?w@hmwJkf%nM3ZOUiQon2$$yQ_E6ogW{*$9$z&$w0|K$w0|K z$w0|K$w0|K$w0}#{{sU_c(d{qIC4|&%pkwNZ{_}Oeueq_hgQ=6k*9n4`(Lc2zmumw zmA^le7cej9{rP*jIl=SySJLz3@>ViXGEg#5GEg#5GEg#5GEg#5GEg#5GEg#5GVpIO zU^$AyTkXgApF0#~zW;ycql)r2@Mqvxz{|kXz#32k?g3uET~WRVJOX?Sc=a|#c?oz9 zxF2}uR_Fj;0DRyc;PsCv%C~?4a0~GB4=c(6&;Z`7D9RhaUxB{>zXx6geha(={0#Up z@FU>6z&C)e0bd2iz$Q=!ZUx@_kfOW@dp?UR-bP^JCE!cI{lM+OYY3M70C*Jm6!0>*n*a`A1CmRwi+wAinxdRiDz@d@ z!W&r=e9bVLxKW&pA`ypyAB%O$5L*4C3}Km z5$%hp@x-d;v6#C4brsG;VmVe~(Vi94SO_0F{Dj6SASOX1D6|f}z;a@;@PiXCC>1Wp zwZ+J??I87&ksS#qUMJ6rrec)(R&+=Q_>M!`13Aj&$V!vF3ku~Iz5xl-M2v(F>%@sl zaxu!pavhaQ;?6{nCZoU~#hwr$NEv1RuhS&;JSPo3ah2jE64nffFuNjVVRAUiUQs4? zW>qe;{Aqk$GUs<`u`C3&Gd(HuQZzFqFN-T|onqWPfjWFD6{tPQ(7d*bb}sml-?ki4 zG0(g3D(88wIcqvgD~dI8%mB-Gs3s*&!Kl@&8CO|lu9N&lHA&0(FcC3l$^2k7%j}iP z6YapCxYHEP#c?MS5pjb;jsy)l7IA|vS)S`q5OTYg&1zk#aC#fbbKkT3MVfflv|%I> z9IRDYXv9jYb*dIztJa~wiw)WOd)92JWko?=DSH9*1n4P|>quQ!8?3r|+zGh}xiR@clCjvJ zKKg2O;Krg(XU-ICVlforG^*&p+515dB}GnhhcrK_Rr8!7L}2dOHC#un<`nYN*;qtN zjoYyKIvVF37tCP}&YnBpSi_&QD}=D;k`-3$h>4YYiII&8t!z-B74CB(*+&aw2kT|1 zGt2cTn9x>RrCfG({NS(3?&no(|FKjsjAu*PX8%;WiQltvVQ0Sp#GTQ^txSZIkCw#XzpKq#J zmk-BulH~jgKmOcr#LmMx5#k}A~5Ns^ZS%7=Xn_XxUeIt$=RkD+--%0{Bb)O)|GNgN**Bx!cm!I;oe` z9%-A@>Ri$cwUbGxH0T?KwxM-3^D@6OhV5IEv|hVA>}b6$>Zw~@ZAqy2=!3z2#F|>!1)Az{a#_(9%tmn6V77aG*&a+Xwp-mUR}P9ET-LR%ZD~-E9ZuBXnFhAdl=K!{QY8gDs@~ql z&%8b*!>ANl6)>3%sL%xmg&cd2Vn5JDp)1%(oT3 z(}aRR7BRE14dL;TZQ94$$jO$MNMwzeJr5&yx`&BBNH7o9GnPCQ#O1)Mjny4)?vc7Yb+Ecs0KEG&kP zP*dbBlHMIPi+F*J$(Q-bt>ZTZs^WT9PuwK4mYSQ^@;s6?7V`?7m&cCuzP+f)b8Bbs ztry!9=WL!n(__REiA|IPdV`T6?`*xx@3d>Z%!@EZ2_uK?c!o(H}TJPzytcLN06 z3cP`R|0}?Yzz>1vfdk+%Km|Sn+zBYaTZjkz3HS|g4R{W)0Rwmov4Ot>uL3^>t^p4K z_W_>-6yR^L`Io>o;M;%&JPxogj{xOWGEg#5GEg#5GEg#5GEg$`zh&SVrGmeeJYv(> zL{JN{r=cIGAxA5)T;_0(j25pGVo|60CkKa@p+-cED^Adsrzb5wJlW*46Fm=N*AiO> z*|27=-r$6#dQ8g(pOc)JTb`oinU2Of+XyYiSw897QQs!DDhJ-?F<{>sdsv5Op)_8Y zKb5-3FL#Gmg^d%Dc>wX~`O1Kp0IG4|LZ>@*%;E6}n_1TfhTBqk}xP zjKh}WM0GwNiW`kaUKAHs;a7;sTVaSWN{xTjv&_5L^$tX&58B+U^B`~S$%90;@>9@J z$@=B7j>#rxeGfea;Sc)!u1R>DD>>pu<+vl2%Dj|?J2JM+eCAx{X9}B$>TuM*cAC9B qcGiGlZh5r920xR9*FoK4{G*E(PwL@J>c33T!qRqeWT>42VgCe<$u7$P From 4a21e4ecb9aa7e2a07d9fb64d5b58dfa79a3c2cf Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Fri, 31 Jul 2015 14:16:02 -0700 Subject: [PATCH 05/16] Switch ALPN/NPN to advertise only h2 --- clientconn.go | 40 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/clientconn.go b/clientconn.go index 4ef00553..42221882 100644 --- a/clientconn.go +++ b/clientconn.go @@ -144,6 +144,7 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) { // Set the default codec. cc.dopts.codec = protoCodec{} } + cc.stateCV = sync.NewCond(&cc.mu) if cc.dopts.block { if err := cc.resetTransport(false); err != nil { cc.Close() @@ -188,8 +189,9 @@ type ClientConn struct { dopts dialOptions shutdownChan chan struct{} - mu sync.Mutex - state ConnectivityState + mu sync.Mutex + state ConnectivityState + stateCV *sync.Cond // ready is closed and becomes nil when a new transport is up or failed // due to timeout. ready chan struct{} @@ -200,6 +202,40 @@ type ClientConn struct { transport transport.ClientTransport } +// State returns the connectivity state of the ClientConn +func (cc *ClientConn) State() ConnectivityState { + cc.mu.Lock() + defer cc.mu.Unlock() + return cc.state +} + +// WaitForStateChange returns true when the state changes to something other than the +// sourceState and false if timeout fires. +func (cc *ClientConn) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool { + cc.mu.Lock() + defer cc.mu.Unlock() + if sourceState != cc.state { + return true + } + // Shutdown state is a sink -- once it is entered, no furhter state change could happen. + if sourceState == Shutdown { + return false + } + done := make(chan struct{}) + go func() { + select { + case <-time.After(timeout): + cc.stateCV.Broadcast() + case <-done: + } + }() + for sourceState == cc.state { + cc.stateCV.Wait() + } + close(done) + return true +} + func (cc *ClientConn) resetTransport(closeTransport bool) error { var retries int start := time.Now() From 3878d99912fda6a3c4c44a168d5a278ec867c907 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Fri, 31 Jul 2015 19:00:43 -0700 Subject: [PATCH 06/16] channel state API --- clientconn.go | 39 ++++++++++++++++++++++++++++++++------ credentials/credentials.go | 2 +- test/end2end_test.go | 37 +++++++++++++++++++++++++++++++++++- 3 files changed, 70 insertions(+), 8 deletions(-) diff --git a/clientconn.go b/clientconn.go index 42221882..040206a1 100644 --- a/clientconn.go +++ b/clientconn.go @@ -35,6 +35,7 @@ package grpc import ( "errors" + "fmt" "net" "strings" "sync" @@ -182,6 +183,23 @@ const ( Shutdown ) +func (s ConnectivityState) String() string { + switch s { + case Idle: + return "IDLE" + case Connecting: + return "CONNECTING" + case Ready: + return "READY" + case TransientFailure: + return "TRANSIENT_FAILURE" + case Shutdown: + return "SHUTDOWN" + default: + panic(fmt.Sprintf("unknown connectivity state: %d", s)) + } +} + // ClientConn represents a client connection to an RPC service. type ClientConn struct { target string @@ -212,27 +230,31 @@ func (cc *ClientConn) State() ConnectivityState { // WaitForStateChange returns true when the state changes to something other than the // sourceState and false if timeout fires. func (cc *ClientConn) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool { + start := time.Now() cc.mu.Lock() defer cc.mu.Unlock() if sourceState != cc.state { return true } - // Shutdown state is a sink -- once it is entered, no furhter state change could happen. - if sourceState == Shutdown { - return false - } done := make(chan struct{}) + expired := timeout <= time.Since(start) go func() { select { - case <-time.After(timeout): + case <-time.After(timeout-time.Since(start)): + cc.mu.Lock() + expired = true cc.stateCV.Broadcast() + cc.mu.Unlock() case <-done: } }() + defer close(done) for sourceState == cc.state { cc.stateCV.Wait() + if expired { + return false + } } - close(done) return true } @@ -242,6 +264,7 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { for { cc.mu.Lock() cc.state = Connecting + cc.stateCV.Broadcast() t := cc.transport ts := cc.transportSeq // Avoid wait() picking up a dying transport unnecessarily. @@ -280,6 +303,7 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { if err != nil { cc.mu.Lock() cc.state = TransientFailure + cc.stateCV.Broadcast() cc.mu.Unlock() sleepTime -= time.Since(connectTime) if sleepTime < 0 { @@ -304,6 +328,7 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { return ErrClientConnClosing } cc.state = Ready + cc.stateCV.Broadcast() cc.transport = newTransport cc.transportSeq = ts + 1 if cc.ready != nil { @@ -327,6 +352,7 @@ func (cc *ClientConn) transportMonitor() { case <-cc.transport.Error(): cc.mu.Lock() cc.state = TransientFailure + cc.stateCV.Broadcast() cc.mu.Unlock() if err := cc.resetTransport(true); err != nil { // The ClientConn is closing. @@ -381,6 +407,7 @@ func (cc *ClientConn) Close() error { return ErrClientConnClosing } cc.state = Shutdown + cc.stateCV.Broadcast() if cc.ready != nil { close(cc.ready) cc.ready = nil diff --git a/credentials/credentials.go b/credentials/credentials.go index c1a331e8..0c2b24c0 100644 --- a/credentials/credentials.go +++ b/credentials/credentials.go @@ -51,7 +51,7 @@ import ( var ( // alpnProtoStr are the specified application level protocols for gRPC. - alpnProtoStr = []string{"h2"} + alpnProtoStr = []string{"h2", "h2-14", "h2-15", "h2-16"} ) // Credentials defines the common interface all supported credentials must diff --git a/test/end2end_test.go b/test/end2end_test.go index 9dc667c0..6462c080 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -354,6 +354,15 @@ func TestTimeoutOnDeadServer(t *testing.T) { func testTimeoutOnDeadServer(t *testing.T, e env) { s, cc := setUp(nil, math.MaxUint32, "", e) tc := testpb.NewTestServiceClient(cc) + if ok := cc.WaitForStateChange(time.Second, grpc.Idle); !ok { + t.Fatalf("cc.WaitForStateChange(_, %s) = %t, want true", grpc.Idle, ok) + } + if ok := cc.WaitForStateChange(time.Second, grpc.Connecting); !ok { + t.Fatalf("cc.WaitForStateChange(_, %s) = %t, want true", grpc.Connecting, ok) + } + if cc.State() != grpc.Ready { + t.Fatalf("cc.State() = %s, want %s", cc.State(), grpc.Ready) + } s.Stop() // Set -1 as the timeout to make sure if transportMonitor gets error // notification in time the failure path of the 1st invoke of @@ -362,6 +371,13 @@ func testTimeoutOnDeadServer(t *testing.T, e env) { if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(%v, _) = _, error %v, want _, error code: %d", ctx, err, codes.DeadlineExceeded) } + if ok := cc.WaitForStateChange(time.Second, grpc.Ready); !ok { + t.Fatalf("cc.WaitForStateChange(_, %s) = %t, want true", grpc.Connecting, ok) + } + state := cc.State() + if state != grpc.Connecting && state != grpc.TransientFailure { + t.Fatalf("cc.State() = %s, want %s or %s", state, grpc.Connecting, grpc.TransientFailure) + } cc.Close() } @@ -461,8 +477,20 @@ func TestEmptyUnaryWithUserAgent(t *testing.T) { func testEmptyUnaryWithUserAgent(t *testing.T, e env) { s, cc := setUp(nil, math.MaxUint32, testAppUA, e) + // Wait until cc is connected. + if ok := cc.WaitForStateChange(time.Second, grpc.Idle); !ok { + t.Fatalf("cc.WaitForStateChange(_, %s) = %t, want true", grpc.Idle, ok) + } + if ok := cc.WaitForStateChange(10 * time.Second, grpc.Connecting); !ok { + t.Fatalf("cc.WaitForStateChange(_, %s) = %t, want true", grpc.Connecting, ok) + } + if cc.State() != grpc.Ready { + t.Fatalf("cc.State() = %s, want %s", cc.State(), grpc.Ready) + } + if ok := cc.WaitForStateChange(time.Second, grpc.Ready); ok { + t.Fatalf("cc.WaitForStateChange(_, %s) = %t, want false", grpc.Ready, ok) + } tc := testpb.NewTestServiceClient(cc) - defer tearDown(s, cc) var header metadata.MD reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Header(&header)) if err != nil || !proto.Equal(&testpb.Empty{}, reply) { @@ -471,6 +499,13 @@ func testEmptyUnaryWithUserAgent(t *testing.T, e env) { if v, ok := header["ua"]; !ok || v != testAppUA { t.Fatalf("header[\"ua\"] = %q, %t, want %q, true", v, ok, testAppUA) } + tearDown(s, cc) + if ok := cc.WaitForStateChange(5 * time.Second, grpc.Ready); !ok { + t.Fatalf("cc.WaitForStateChange(_, %s) = %t, want true", grpc.Ready, ok) + } + if cc.State() != grpc.Shutdown { + t.Fatalf("cc.State() = %s, want %s", cc.State(), grpc.Shutdown) + } } func TestFailedEmptyUnary(t *testing.T) { From b8ef5b44d26f667fa79afda6e13060837cf4f0a2 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Mon, 3 Aug 2015 11:19:25 -0700 Subject: [PATCH 07/16] polish the func comment --- clientconn.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clientconn.go b/clientconn.go index 040206a1..5b8d76a7 100644 --- a/clientconn.go +++ b/clientconn.go @@ -227,8 +227,8 @@ func (cc *ClientConn) State() ConnectivityState { return cc.state } -// WaitForStateChange returns true when the state changes to something other than the -// sourceState and false if timeout fires. +// WaitForStateChange blocks until the state changes to something other than the sourceState +// or timeout fires. It returns false if timeout fires and true otherwise. func (cc *ClientConn) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool { start := time.Now() cc.mu.Lock() From b3a7314a90cf8b1fbb4bc370e8e0b21b11e03664 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Mon, 3 Aug 2015 11:29:27 -0700 Subject: [PATCH 08/16] add a shortcut logic --- clientconn.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/clientconn.go b/clientconn.go index 5b8d76a7..766d758d 100644 --- a/clientconn.go +++ b/clientconn.go @@ -238,6 +238,9 @@ func (cc *ClientConn) WaitForStateChange(timeout time.Duration, sourceState Conn } done := make(chan struct{}) expired := timeout <= time.Since(start) + if expired { + return false + } go func() { select { case <-time.After(timeout-time.Since(start)): From 0fb69761f0219c1aaf730eaf38da79f45e9f12dc Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Mon, 3 Aug 2015 11:45:42 -0700 Subject: [PATCH 09/16] small fix --- clientconn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clientconn.go b/clientconn.go index 766d758d..5c20f71e 100644 --- a/clientconn.go +++ b/clientconn.go @@ -236,11 +236,11 @@ func (cc *ClientConn) WaitForStateChange(timeout time.Duration, sourceState Conn if sourceState != cc.state { return true } - done := make(chan struct{}) expired := timeout <= time.Since(start) if expired { return false } + done := make(chan struct{}) go func() { select { case <-time.After(timeout-time.Since(start)): From 2d4ac52b42ca4263c619f3529dc603fac6784383 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Mon, 3 Aug 2015 13:09:15 -0700 Subject: [PATCH 10/16] add a bit more test --- test/end2end_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 6462c080..b5d4f3df 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -363,6 +363,9 @@ func testTimeoutOnDeadServer(t *testing.T, e env) { if cc.State() != grpc.Ready { t.Fatalf("cc.State() = %s, want %s", cc.State(), grpc.Ready) } + if ok := cc.WaitForStateChange(time.Millisecond, grpc.Ready); ok { + t.Fatalf("cc.WaitForStateChange(_, %s) = %t, want false", grpc.Ready, ok) + } s.Stop() // Set -1 as the timeout to make sure if transportMonitor gets error // notification in time the failure path of the 1st invoke of @@ -372,7 +375,7 @@ func testTimeoutOnDeadServer(t *testing.T, e env) { t.Fatalf("TestService/EmptyCall(%v, _) = _, error %v, want _, error code: %d", ctx, err, codes.DeadlineExceeded) } if ok := cc.WaitForStateChange(time.Second, grpc.Ready); !ok { - t.Fatalf("cc.WaitForStateChange(_, %s) = %t, want true", grpc.Connecting, ok) + t.Fatalf("cc.WaitForStateChange(_, %s) = %t, want true", grpc.Ready, ok) } state := cc.State() if state != grpc.Connecting && state != grpc.TransientFailure { From 931f7e7de7e9ad7c3769dd1a1bebbd680f9531f7 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Mon, 3 Aug 2015 13:11:00 -0700 Subject: [PATCH 11/16] small fix --- clientconn.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/clientconn.go b/clientconn.go index 5c20f71e..8d4e1f2f 100644 --- a/clientconn.go +++ b/clientconn.go @@ -241,16 +241,16 @@ func (cc *ClientConn) WaitForStateChange(timeout time.Duration, sourceState Conn return false } done := make(chan struct{}) - go func() { + go func(expired *bool) { select { case <-time.After(timeout-time.Since(start)): cc.mu.Lock() - expired = true + *expired = true cc.stateCV.Broadcast() cc.mu.Unlock() case <-done: } - }() + }(&expired) defer close(done) for sourceState == cc.state { cc.stateCV.Wait() From 4297e9bd91c9b33962d65d2da22ae9e4b826baa2 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Mon, 3 Aug 2015 13:18:25 -0700 Subject: [PATCH 12/16] revert some unnecessary changes --- clientconn.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/clientconn.go b/clientconn.go index 8d4e1f2f..5c20f71e 100644 --- a/clientconn.go +++ b/clientconn.go @@ -241,16 +241,16 @@ func (cc *ClientConn) WaitForStateChange(timeout time.Duration, sourceState Conn return false } done := make(chan struct{}) - go func(expired *bool) { + go func() { select { case <-time.After(timeout-time.Since(start)): cc.mu.Lock() - *expired = true + expired = true cc.stateCV.Broadcast() cc.mu.Unlock() case <-done: } - }(&expired) + }() defer close(done) for sourceState == cc.state { cc.stateCV.Wait() From e6e95f0f216f574506d71cbeace7bb2708fe105d Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Mon, 3 Aug 2015 13:24:14 -0700 Subject: [PATCH 13/16] gofmt -w --- clientconn.go | 2 +- test/end2end_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/clientconn.go b/clientconn.go index 5c20f71e..52d47a57 100644 --- a/clientconn.go +++ b/clientconn.go @@ -243,7 +243,7 @@ func (cc *ClientConn) WaitForStateChange(timeout time.Duration, sourceState Conn done := make(chan struct{}) go func() { select { - case <-time.After(timeout-time.Since(start)): + case <-time.After(timeout - time.Since(start)): cc.mu.Lock() expired = true cc.stateCV.Broadcast() diff --git a/test/end2end_test.go b/test/end2end_test.go index b5d4f3df..fb83b452 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -484,7 +484,7 @@ func testEmptyUnaryWithUserAgent(t *testing.T, e env) { if ok := cc.WaitForStateChange(time.Second, grpc.Idle); !ok { t.Fatalf("cc.WaitForStateChange(_, %s) = %t, want true", grpc.Idle, ok) } - if ok := cc.WaitForStateChange(10 * time.Second, grpc.Connecting); !ok { + if ok := cc.WaitForStateChange(10*time.Second, grpc.Connecting); !ok { t.Fatalf("cc.WaitForStateChange(_, %s) = %t, want true", grpc.Connecting, ok) } if cc.State() != grpc.Ready { @@ -503,7 +503,7 @@ func testEmptyUnaryWithUserAgent(t *testing.T, e env) { t.Fatalf("header[\"ua\"] = %q, %t, want %q, true", v, ok, testAppUA) } tearDown(s, cc) - if ok := cc.WaitForStateChange(5 * time.Second, grpc.Ready); !ok { + if ok := cc.WaitForStateChange(5*time.Second, grpc.Ready); !ok { t.Fatalf("cc.WaitForStateChange(_, %s) = %t, want true", grpc.Ready, ok) } if cc.State() != grpc.Shutdown { From f0e1a2ac753552553bef73c00a312ca005ac2bbf Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Fri, 7 Aug 2015 15:06:59 -0700 Subject: [PATCH 14/16] add 2 interop tests add 2 interop tests fix the space delete swp --- interop/client/client.go | 41 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/interop/client/client.go b/interop/client/client.go index cc599cf4..74cbad9c 100644 --- a/interop/client/client.go +++ b/interop/client/client.go @@ -40,6 +40,7 @@ import ( "net" "strconv" "strings" + "time" "github.com/golang/protobuf/proto" "golang.org/x/net/context" @@ -68,6 +69,8 @@ var ( client_streaming : request streaming with single response; server_streaming : single request with response streaming; ping_pong : full-duplex streaming; + empty_stream : full-duplex streaming with zero message; + timeout_on_sleeping_server: fullduplex streaming; compute_engine_creds: large_unary with compute engine auth; service_account_creds: large_unary with service account auth; cancel_after_begin: cancellation after metadata has been sent but before payloads are sent; @@ -245,6 +248,40 @@ func doPingPong(tc testpb.TestServiceClient) { grpclog.Println("Pingpong done") } +func doEmptyStream(tc testpb.TestServiceClient) { + stream, err := tc.FullDuplexCall(context.Background()) + if err != nil { + grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err) + } + if err := stream.CloseSend(); err != nil { + grpclog.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil) + } + if _, err := stream.Recv(); err != io.EOF { + grpclog.Fatalf("%v failed to complete the empty stream test: %v", stream, err) + } + grpclog.Println("Emptystream done") +} + +func doTimeoutOnSleepingServer(tc testpb.TestServiceClient) { + ctx, _ := context.WithTimeout(context.Background(), 1*time.Millisecond) + stream, err := tc.FullDuplexCall(ctx) + if err != nil { + grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err) + } + pl := newPayload(testpb.PayloadType_COMPRESSABLE, 27182) + req := &testpb.StreamingOutputCallRequest{ + ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(), + Payload: pl, + } + if err := stream.Send(req); err != nil { + grpclog.Fatalf("%v.Send(%v) = %v", stream, req, err) + } + if _, err := stream.Recv(); grpc.Code(err) != codes.DeadlineExceeded { + grpclog.Fatalf("%v got the error %v, want error code: %d", stream, err, codes.DeadlineExceeded) + } + grpclog.Println("TimeoutOnSleepingServer done") +} + func doComputeEngineCreds(tc testpb.TestServiceClient) { pl := newPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize) req := &testpb.SimpleRequest{ @@ -400,6 +437,10 @@ func main() { doServerStreaming(tc) case "ping_pong": doPingPong(tc) + case "empty_stream": + doEmptyStream(tc) + case "timeout_on_sleeping_server": + doTimeoutOnSleepingServer(tc) case "compute_engine_creds": if !*useTLS { grpclog.Fatalf("TLS is not enabled. TLS is required to execute compute_engine_creds test case.") From d1b30b71950a0373a91e936a5bf0f2010f3ce883 Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Wed, 12 Aug 2015 12:34:38 -0700 Subject: [PATCH 15/16] modify the error information --- interop/client/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/interop/client/client.go b/interop/client/client.go index 74cbad9c..f49acae2 100644 --- a/interop/client/client.go +++ b/interop/client/client.go @@ -277,7 +277,7 @@ func doTimeoutOnSleepingServer(tc testpb.TestServiceClient) { grpclog.Fatalf("%v.Send(%v) = %v", stream, req, err) } if _, err := stream.Recv(); grpc.Code(err) != codes.DeadlineExceeded { - grpclog.Fatalf("%v got the error %v, want error code: %d", stream, err, codes.DeadlineExceeded) + grpclog.Fatalf("%v.Recv() = _, %v, want error code %d", stream, err, codes.DeadlineExceeded) } grpclog.Println("TimeoutOnSleepingServer done") } From 6eb790420c8cbcde4a61ee833279ad62598849eb Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Wed, 12 Aug 2015 12:49:51 -0700 Subject: [PATCH 16/16] fix merge conflict --- credentials/credentials.go | 2 +- interop/client/client.go | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/credentials/credentials.go b/credentials/credentials.go index 0c2b24c0..c1a331e8 100644 --- a/credentials/credentials.go +++ b/credentials/credentials.go @@ -51,7 +51,7 @@ import ( var ( // alpnProtoStr are the specified application level protocols for gRPC. - alpnProtoStr = []string{"h2", "h2-14", "h2-15", "h2-16"} + alpnProtoStr = []string{"h2"} ) // Credentials defines the common interface all supported credentials must diff --git a/interop/client/client.go b/interop/client/client.go index 90c91872..f49acae2 100644 --- a/interop/client/client.go +++ b/interop/client/client.go @@ -277,11 +277,7 @@ func doTimeoutOnSleepingServer(tc testpb.TestServiceClient) { grpclog.Fatalf("%v.Send(%v) = %v", stream, req, err) } if _, err := stream.Recv(); grpc.Code(err) != codes.DeadlineExceeded { -<<<<<<< HEAD grpclog.Fatalf("%v.Recv() = _, %v, want error code %d", stream, err, codes.DeadlineExceeded) -======= - grpclog.Fatalf("%v got the error %v, want error code: %d", stream, err, codes.DeadlineExceeded) ->>>>>>> 2a4442cda2d99dad88ccbcaf39f8975fcf85747c } grpclog.Println("TimeoutOnSleepingServer done") }