From 199ad57e0d0effc1189cf5ecb2b4625a418670c8 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 29 Jul 2015 10:51:07 -0700 Subject: [PATCH] update --- stream.go | 4 ++ test/end2end_test.go | 4 +- transport/transport_test.go | 77 +++++++++++++++++++++++++++++++++++++ 3 files changed, 83 insertions(+), 2 deletions(-) diff --git a/stream.go b/stream.go index 89e3e361..c06ba767 100644 --- a/stream.go +++ b/stream.go @@ -44,6 +44,8 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/transport" + + "google.golang.org/grpc/grpclog" ) type streamHandler func(srv interface{}, stream ServerStream) error @@ -187,7 +189,9 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { } func (cs *clientStream) RecvMsg(m interface{}) (err error) { + grpclog.Println("start recv") err = recv(cs.p, cs.codec, m) + grpclog.Println("recv sth") defer func() { // err != nil indicates the termination of the stream. if err != nil { diff --git a/test/end2end_test.go b/test/end2end_test.go index 9dc667c0..f91b1202 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -826,9 +826,9 @@ func TestFailedServerStreaming(t *testing.T) { } func testFailedServerStreaming(t *testing.T, e env) { - s, cc := setUp(nil, math.MaxUint32, "", e) + _, cc := setUp(nil, math.MaxUint32, "", e) tc := testpb.NewTestServiceClient(cc) - defer tearDown(s, cc) + //defer tearDown(s, cc) respParam := make([]*testpb.ResponseParameters, len(respSizes)) for i, s := range respSizes { respParam[i] = &testpb.ResponseParameters{ diff --git a/transport/transport_test.go b/transport/transport_test.go index 8529e2af..20893519 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -355,6 +355,83 @@ func TestLargeMessageSuspension(t *testing.T) { server.stop() } +func TestMaxStreams(t *testing.T) { + server, ct := setUp(t, 0, 1, suspended) + callHdr := &CallHdr{ + Host: "localhost", + Method: "foo.Large", + } + // Have a pending stream which takes all streams quota. + s, err := ct.NewStream(context.Background(), callHdr) + if err != nil { + t.Fatalf("failed to open stream: %v", err) + } + cc, ok := ct.(*http2Client) + if !ok { + t.Fatalf("Failed to convert %v to *http2Client", ct) + } + done := make(chan struct{}) + ch := make(chan int) + go func() { + for { + select { + case <-time.After(5 * time.Millisecond): + ch <- 0 + case <-time.After(5 * time.Second): + close(done) + return + } + } + }() + for { + select { + case <-ch: + case <-done: + t.Fatalf("Client has not received the max stream setting in 5 seconds.") + } + cc.mu.Lock() + // cc.streamsQuota should be initialized once receiving the 1st setting frame from + // the server. + if cc.streamsQuota != nil { + cc.mu.Unlock() + select { + case <-cc.streamsQuota.acquire(): + t.Fatalf("streamsQuota.acquire() becomes readable mistakenly.") + default: + if cc.streamsQuota.quota != 0 { + t.Fatalf("streamsQuota.quota got non-zero quota mistakenly.") + } + } + break + } + cc.mu.Unlock() + } + // Close the pending stream so that the streams quota becomes available for the next new stream. + ct.CloseStream(s, nil) + for { + select { + case <-ch: + case <-done: + t.Fatalf("Client has not received the max stream setting in 5 seconds.") + } + select { + case i:=<-cc.streamsQuota.acquire(): + if i != 1 { + t.Fatalf("fadfa") + } + cc.streamsQuota.add(i) + default: + t.Fatalf("43092") + } + break + } + if _, err := ct.NewStream(context.Background(), callHdr); err != nil { + t.Fatalf("failed to open stream: %v", err) + } + ct.Close() + server.stop() +} + func TestServerWithMisbehavedClient(t *testing.T) { server, ct := setUp(t, 0, math.MaxUint32, suspended) callHdr := &CallHdr{