5
call.go
5
call.go
@ -90,7 +90,10 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd
|
|||||||
return nil, transport.StreamErrorf(codes.Internal, "grpc: %v", err)
|
return nil, transport.StreamErrorf(codes.Internal, "grpc: %v", err)
|
||||||
}
|
}
|
||||||
err = t.Write(stream, outBuf, opts)
|
err = t.Write(stream, outBuf, opts)
|
||||||
if err != nil {
|
// t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method
|
||||||
|
// does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following
|
||||||
|
// recvResponse to get the final status.
|
||||||
|
if err != nil && err != io.EOF {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// Sent successfully.
|
// Sent successfully.
|
||||||
|
@ -73,7 +73,7 @@ func TestCredentialsMisuse(t *testing.T) {
|
|||||||
t.Fatalf("Failed to create authenticator %v", err)
|
t.Fatalf("Failed to create authenticator %v", err)
|
||||||
}
|
}
|
||||||
// Two conflicting credential configurations
|
// Two conflicting credential configurations
|
||||||
if _, err := Dial("Non-Existent.Server:80", WithTransportCredentials(tlsCreds), WithTimeout(time.Millisecond), WithBlock(), WithInsecure()); err != errCredentialsConflict {
|
if _, err := Dial("Non-Existent.Server:80", WithTransportCredentials(tlsCreds), WithBlock(), WithInsecure()); err != errCredentialsConflict {
|
||||||
t.Fatalf("Dial(_, _) = _, %v, want _, %v", err, errCredentialsConflict)
|
t.Fatalf("Dial(_, _) = _, %v, want _, %v", err, errCredentialsConflict)
|
||||||
}
|
}
|
||||||
rpcCreds, err := oauth.NewJWTAccessFromKey(nil)
|
rpcCreds, err := oauth.NewJWTAccessFromKey(nil)
|
||||||
@ -81,7 +81,7 @@ func TestCredentialsMisuse(t *testing.T) {
|
|||||||
t.Fatalf("Failed to create credentials %v", err)
|
t.Fatalf("Failed to create credentials %v", err)
|
||||||
}
|
}
|
||||||
// security info on insecure connection
|
// security info on insecure connection
|
||||||
if _, err := Dial("Non-Existent.Server:80", WithPerRPCCredentials(rpcCreds), WithTimeout(time.Millisecond), WithBlock(), WithInsecure()); err != errTransportCredentialsMissing {
|
if _, err := Dial("Non-Existent.Server:80", WithPerRPCCredentials(rpcCreds), WithBlock(), WithInsecure()); err != errTransportCredentialsMissing {
|
||||||
t.Fatalf("Dial(_, _) = _, %v, want _, %v", err, errTransportCredentialsMissing)
|
t.Fatalf("Dial(_, _) = _, %v, want _, %v", err, errTransportCredentialsMissing)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -123,4 +123,5 @@ func testBackoffConfigSet(t *testing.T, expected *BackoffConfig, opts ...DialOpt
|
|||||||
if actual != *expected {
|
if actual != *expected {
|
||||||
t.Fatalf("unexpected backoff config on connection: %v, want %v", actual, expected)
|
t.Fatalf("unexpected backoff config on connection: %v, want %v", actual, expected)
|
||||||
}
|
}
|
||||||
|
conn.Close()
|
||||||
}
|
}
|
||||||
|
15
stream.go
15
stream.go
@ -183,12 +183,21 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
|||||||
cs.t = t
|
cs.t = t
|
||||||
cs.s = s
|
cs.s = s
|
||||||
cs.p = &parser{r: s}
|
cs.p = &parser{r: s}
|
||||||
// Listen on ctx.Done() to detect cancellation when there is no pending
|
// Listen on ctx.Done() to detect cancellation and s.Done() to detect normal termination
|
||||||
// I/O operations on this stream.
|
// when there is no pending I/O operations on this stream.
|
||||||
go func() {
|
go func() {
|
||||||
select {
|
select {
|
||||||
case <-t.Error():
|
case <-t.Error():
|
||||||
// Incur transport error, simply exit.
|
// Incur transport error, simply exit.
|
||||||
|
case <-s.Done():
|
||||||
|
// TODO: The trace of the RPC is terminated here when there is no pending
|
||||||
|
// I/O, which is probably not the optimal solution.
|
||||||
|
if s.StatusCode() == codes.OK {
|
||||||
|
cs.finish(nil)
|
||||||
|
} else {
|
||||||
|
cs.finish(Errorf(s.StatusCode(), "%s", s.StatusDesc()))
|
||||||
|
}
|
||||||
|
cs.closeTransportStream(nil)
|
||||||
case <-s.Context().Done():
|
case <-s.Context().Done():
|
||||||
err := s.Context().Err()
|
err := s.Context().Err()
|
||||||
cs.finish(err)
|
cs.finish(err)
|
||||||
@ -326,7 +335,7 @@ func (cs *clientStream) CloseSend() (err error) {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
if err == nil || err == io.EOF {
|
if err == nil || err == io.EOF {
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
if _, ok := err.(transport.ConnectionError); !ok {
|
if _, ok := err.(transport.ConnectionError); !ok {
|
||||||
cs.closeTransportStream(err)
|
cs.closeTransportStream(err)
|
||||||
|
@ -90,7 +90,8 @@ var (
|
|||||||
var raceMode bool // set by race_test.go in race mode
|
var raceMode bool // set by race_test.go in race mode
|
||||||
|
|
||||||
type testServer struct {
|
type testServer struct {
|
||||||
security string // indicate the authentication protocol used by this server.
|
security string // indicate the authentication protocol used by this server.
|
||||||
|
earlyFail bool // whether to error out the execution of a service handler prematurely.
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
|
func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
|
||||||
@ -219,6 +220,9 @@ func (s *testServer) StreamingInputCall(stream testpb.TestService_StreamingInput
|
|||||||
}
|
}
|
||||||
p := in.GetPayload().GetBody()
|
p := in.GetPayload().GetBody()
|
||||||
sum += len(p)
|
sum += len(p)
|
||||||
|
if s.earlyFail {
|
||||||
|
return grpc.Errorf(codes.NotFound, "not found")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -404,10 +408,9 @@ func (te *test) tearDown() {
|
|||||||
// modify it before calling its startServer and clientConn methods.
|
// modify it before calling its startServer and clientConn methods.
|
||||||
func newTest(t *testing.T, e env) *test {
|
func newTest(t *testing.T, e env) *test {
|
||||||
te := &test{
|
te := &test{
|
||||||
t: t,
|
t: t,
|
||||||
e: e,
|
e: e,
|
||||||
testServer: &testServer{security: e.security},
|
maxStream: math.MaxUint32,
|
||||||
maxStream: math.MaxUint32,
|
|
||||||
}
|
}
|
||||||
te.ctx, te.cancel = context.WithCancel(context.Background())
|
te.ctx, te.cancel = context.WithCancel(context.Background())
|
||||||
return te
|
return te
|
||||||
@ -415,7 +418,8 @@ func newTest(t *testing.T, e env) *test {
|
|||||||
|
|
||||||
// startServer starts a gRPC server listening. Callers should defer a
|
// startServer starts a gRPC server listening. Callers should defer a
|
||||||
// call to te.tearDown to clean up.
|
// call to te.tearDown to clean up.
|
||||||
func (te *test) startServer() {
|
func (te *test) startServer(ts testpb.TestServiceServer) {
|
||||||
|
te.testServer = ts
|
||||||
e := te.e
|
e := te.e
|
||||||
te.t.Logf("Running test in %s environment...", e.name)
|
te.t.Logf("Running test in %s environment...", e.name)
|
||||||
sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(te.maxStream)}
|
sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(te.maxStream)}
|
||||||
@ -545,7 +549,7 @@ func testTimeoutOnDeadServer(t *testing.T, e env) {
|
|||||||
"grpc: Conn.resetTransport failed to create client transport: connection error",
|
"grpc: Conn.resetTransport failed to create client transport: connection error",
|
||||||
"grpc: Conn.resetTransport failed to create client transport: connection error: desc = \"transport: dial unix",
|
"grpc: Conn.resetTransport failed to create client transport: connection error: desc = \"transport: dial unix",
|
||||||
)
|
)
|
||||||
te.startServer()
|
te.startServer(&testServer{security: e.security})
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
|
|
||||||
cc := te.clientConn()
|
cc := te.clientConn()
|
||||||
@ -554,10 +558,7 @@ func testTimeoutOnDeadServer(t *testing.T, e env) {
|
|||||||
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
|
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
|
||||||
}
|
}
|
||||||
te.srv.Stop()
|
te.srv.Stop()
|
||||||
// Set -1 as the timeout to make sure if transportMonitor gets error
|
ctx, _ := context.WithTimeout(context.Background(), time.Millisecond)
|
||||||
// notification in time the failure path of the 1st invoke of
|
|
||||||
// ClientConn.wait hits the deadline exceeded error.
|
|
||||||
ctx, _ := context.WithTimeout(context.Background(), -1)
|
|
||||||
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
|
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
|
||||||
t.Fatalf("TestService/EmptyCall(%v, _) = _, %v, want _, error code: %d", ctx, err, codes.DeadlineExceeded)
|
t.Fatalf("TestService/EmptyCall(%v, _) = _, %v, want _, error code: %d", ctx, err, codes.DeadlineExceeded)
|
||||||
}
|
}
|
||||||
@ -580,7 +581,7 @@ func testFailFast(t *testing.T, e env) {
|
|||||||
"grpc: Conn.resetTransport failed to create client transport: connection error",
|
"grpc: Conn.resetTransport failed to create client transport: connection error",
|
||||||
"grpc: Conn.resetTransport failed to create client transport: connection error: desc = \"transport: dial unix",
|
"grpc: Conn.resetTransport failed to create client transport: connection error: desc = \"transport: dial unix",
|
||||||
)
|
)
|
||||||
te.startServer()
|
te.startServer(&testServer{security: e.security})
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
|
|
||||||
cc := te.clientConn()
|
cc := te.clientConn()
|
||||||
@ -629,7 +630,7 @@ func testHealthCheckOnSuccess(t *testing.T, e env) {
|
|||||||
hs := health.NewHealthServer()
|
hs := health.NewHealthServer()
|
||||||
hs.SetServingStatus("grpc.health.v1.Health", 1)
|
hs.SetServingStatus("grpc.health.v1.Health", 1)
|
||||||
te.healthServer = hs
|
te.healthServer = hs
|
||||||
te.startServer()
|
te.startServer(&testServer{security: e.security})
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
|
|
||||||
cc := te.clientConn()
|
cc := te.clientConn()
|
||||||
@ -655,7 +656,7 @@ func testHealthCheckOnFailure(t *testing.T, e env) {
|
|||||||
hs := health.NewHealthServer()
|
hs := health.NewHealthServer()
|
||||||
hs.SetServingStatus("grpc.health.v1.HealthCheck", 1)
|
hs.SetServingStatus("grpc.health.v1.HealthCheck", 1)
|
||||||
te.healthServer = hs
|
te.healthServer = hs
|
||||||
te.startServer()
|
te.startServer(&testServer{security: e.security})
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
|
|
||||||
cc := te.clientConn()
|
cc := te.clientConn()
|
||||||
@ -679,7 +680,7 @@ func TestHealthCheckOff(t *testing.T) {
|
|||||||
|
|
||||||
func testHealthCheckOff(t *testing.T, e env) {
|
func testHealthCheckOff(t *testing.T, e env) {
|
||||||
te := newTest(t, e)
|
te := newTest(t, e)
|
||||||
te.startServer()
|
te.startServer(&testServer{security: e.security})
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
want := grpc.Errorf(codes.Unimplemented, "unknown service grpc.health.v1.Health")
|
want := grpc.Errorf(codes.Unimplemented, "unknown service grpc.health.v1.Health")
|
||||||
if _, err := healthCheck(1*time.Second, te.clientConn(), ""); !equalErrors(err, want) {
|
if _, err := healthCheck(1*time.Second, te.clientConn(), ""); !equalErrors(err, want) {
|
||||||
@ -698,7 +699,7 @@ func testHealthCheckServingStatus(t *testing.T, e env) {
|
|||||||
te := newTest(t, e)
|
te := newTest(t, e)
|
||||||
hs := health.NewHealthServer()
|
hs := health.NewHealthServer()
|
||||||
te.healthServer = hs
|
te.healthServer = hs
|
||||||
te.startServer()
|
te.startServer(&testServer{security: e.security})
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
|
|
||||||
cc := te.clientConn()
|
cc := te.clientConn()
|
||||||
@ -741,7 +742,7 @@ func TestErrorChanNoIO(t *testing.T) {
|
|||||||
|
|
||||||
func testErrorChanNoIO(t *testing.T, e env) {
|
func testErrorChanNoIO(t *testing.T, e env) {
|
||||||
te := newTest(t, e)
|
te := newTest(t, e)
|
||||||
te.startServer()
|
te.startServer(&testServer{security: e.security})
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
|
|
||||||
tc := testpb.NewTestServiceClient(te.clientConn())
|
tc := testpb.NewTestServiceClient(te.clientConn())
|
||||||
@ -760,7 +761,7 @@ func TestEmptyUnaryWithUserAgent(t *testing.T) {
|
|||||||
func testEmptyUnaryWithUserAgent(t *testing.T, e env) {
|
func testEmptyUnaryWithUserAgent(t *testing.T, e env) {
|
||||||
te := newTest(t, e)
|
te := newTest(t, e)
|
||||||
te.userAgent = testAppUA
|
te.userAgent = testAppUA
|
||||||
te.startServer()
|
te.startServer(&testServer{security: e.security})
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
|
|
||||||
cc := te.clientConn()
|
cc := te.clientConn()
|
||||||
@ -786,7 +787,7 @@ func TestFailedEmptyUnary(t *testing.T) {
|
|||||||
|
|
||||||
func testFailedEmptyUnary(t *testing.T, e env) {
|
func testFailedEmptyUnary(t *testing.T, e env) {
|
||||||
te := newTest(t, e)
|
te := newTest(t, e)
|
||||||
te.startServer()
|
te.startServer(&testServer{security: e.security})
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
tc := testpb.NewTestServiceClient(te.clientConn())
|
tc := testpb.NewTestServiceClient(te.clientConn())
|
||||||
|
|
||||||
@ -806,7 +807,7 @@ func TestLargeUnary(t *testing.T) {
|
|||||||
|
|
||||||
func testLargeUnary(t *testing.T, e env) {
|
func testLargeUnary(t *testing.T, e env) {
|
||||||
te := newTest(t, e)
|
te := newTest(t, e)
|
||||||
te.startServer()
|
te.startServer(&testServer{security: e.security})
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
tc := testpb.NewTestServiceClient(te.clientConn())
|
tc := testpb.NewTestServiceClient(te.clientConn())
|
||||||
|
|
||||||
@ -843,7 +844,7 @@ func TestMetadataUnaryRPC(t *testing.T) {
|
|||||||
|
|
||||||
func testMetadataUnaryRPC(t *testing.T, e env) {
|
func testMetadataUnaryRPC(t *testing.T, e env) {
|
||||||
te := newTest(t, e)
|
te := newTest(t, e)
|
||||||
te.startServer()
|
te.startServer(&testServer{security: e.security})
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
tc := testpb.NewTestServiceClient(te.clientConn())
|
tc := testpb.NewTestServiceClient(te.clientConn())
|
||||||
|
|
||||||
@ -889,7 +890,7 @@ func TestMalformedHTTP2Metadata(t *testing.T) {
|
|||||||
|
|
||||||
func testMalformedHTTP2Metadata(t *testing.T, e env) {
|
func testMalformedHTTP2Metadata(t *testing.T, e env) {
|
||||||
te := newTest(t, e)
|
te := newTest(t, e)
|
||||||
te.startServer()
|
te.startServer(&testServer{security: e.security})
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
tc := testpb.NewTestServiceClient(te.clientConn())
|
tc := testpb.NewTestServiceClient(te.clientConn())
|
||||||
|
|
||||||
@ -951,7 +952,7 @@ func TestRetry(t *testing.T) {
|
|||||||
func testRetry(t *testing.T, e env) {
|
func testRetry(t *testing.T, e env) {
|
||||||
te := newTest(t, e)
|
te := newTest(t, e)
|
||||||
te.declareLogNoise("transport: http2Client.notifyError got notified that the client transport was broken")
|
te.declareLogNoise("transport: http2Client.notifyError got notified that the client transport was broken")
|
||||||
te.startServer()
|
te.startServer(&testServer{security: e.security})
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
|
|
||||||
cc := te.clientConn()
|
cc := te.clientConn()
|
||||||
@ -1000,7 +1001,7 @@ func TestRPCTimeout(t *testing.T) {
|
|||||||
// TODO(zhaoq): Have a better test coverage of timeout and cancellation mechanism.
|
// TODO(zhaoq): Have a better test coverage of timeout and cancellation mechanism.
|
||||||
func testRPCTimeout(t *testing.T, e env) {
|
func testRPCTimeout(t *testing.T, e env) {
|
||||||
te := newTest(t, e)
|
te := newTest(t, e)
|
||||||
te.startServer()
|
te.startServer(&testServer{security: e.security})
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
|
|
||||||
cc := te.clientConn()
|
cc := te.clientConn()
|
||||||
@ -1037,7 +1038,7 @@ func TestCancel(t *testing.T) {
|
|||||||
func testCancel(t *testing.T, e env) {
|
func testCancel(t *testing.T, e env) {
|
||||||
te := newTest(t, e)
|
te := newTest(t, e)
|
||||||
te.declareLogNoise("grpc: the client connection is closing; please retry")
|
te.declareLogNoise("grpc: the client connection is closing; please retry")
|
||||||
te.startServer()
|
te.startServer(&testServer{security: e.security})
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
|
|
||||||
cc := te.clientConn()
|
cc := te.clientConn()
|
||||||
@ -1075,7 +1076,7 @@ func testCancelNoIO(t *testing.T, e env) {
|
|||||||
te := newTest(t, e)
|
te := newTest(t, e)
|
||||||
te.declareLogNoise("http2Client.notifyError got notified that the client transport was broken")
|
te.declareLogNoise("http2Client.notifyError got notified that the client transport was broken")
|
||||||
te.maxStream = 1 // Only allows 1 live stream per server transport.
|
te.maxStream = 1 // Only allows 1 live stream per server transport.
|
||||||
te.startServer()
|
te.startServer(&testServer{security: e.security})
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
|
|
||||||
cc := te.clientConn()
|
cc := te.clientConn()
|
||||||
@ -1145,8 +1146,7 @@ func TestNoService(t *testing.T) {
|
|||||||
|
|
||||||
func testNoService(t *testing.T, e env) {
|
func testNoService(t *testing.T, e env) {
|
||||||
te := newTest(t, e)
|
te := newTest(t, e)
|
||||||
te.testServer = nil // register nothing
|
te.startServer(nil)
|
||||||
te.startServer()
|
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
|
|
||||||
cc := te.clientConn()
|
cc := te.clientConn()
|
||||||
@ -1170,7 +1170,7 @@ func TestPingPong(t *testing.T) {
|
|||||||
|
|
||||||
func testPingPong(t *testing.T, e env) {
|
func testPingPong(t *testing.T, e env) {
|
||||||
te := newTest(t, e)
|
te := newTest(t, e)
|
||||||
te.startServer()
|
te.startServer(&testServer{security: e.security})
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
tc := testpb.NewTestServiceClient(te.clientConn())
|
tc := testpb.NewTestServiceClient(te.clientConn())
|
||||||
|
|
||||||
@ -1230,7 +1230,7 @@ func TestMetadataStreamingRPC(t *testing.T) {
|
|||||||
|
|
||||||
func testMetadataStreamingRPC(t *testing.T, e env) {
|
func testMetadataStreamingRPC(t *testing.T, e env) {
|
||||||
te := newTest(t, e)
|
te := newTest(t, e)
|
||||||
te.startServer()
|
te.startServer(&testServer{security: e.security})
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
tc := testpb.NewTestServiceClient(te.clientConn())
|
tc := testpb.NewTestServiceClient(te.clientConn())
|
||||||
|
|
||||||
@ -1301,7 +1301,7 @@ func TestServerStreaming(t *testing.T) {
|
|||||||
|
|
||||||
func testServerStreaming(t *testing.T, e env) {
|
func testServerStreaming(t *testing.T, e env) {
|
||||||
te := newTest(t, e)
|
te := newTest(t, e)
|
||||||
te.startServer()
|
te.startServer(&testServer{security: e.security})
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
tc := testpb.NewTestServiceClient(te.clientConn())
|
tc := testpb.NewTestServiceClient(te.clientConn())
|
||||||
|
|
||||||
@ -1356,7 +1356,7 @@ func TestFailedServerStreaming(t *testing.T) {
|
|||||||
|
|
||||||
func testFailedServerStreaming(t *testing.T, e env) {
|
func testFailedServerStreaming(t *testing.T, e env) {
|
||||||
te := newTest(t, e)
|
te := newTest(t, e)
|
||||||
te.startServer()
|
te.startServer(&testServer{security: e.security})
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
tc := testpb.NewTestServiceClient(te.clientConn())
|
tc := testpb.NewTestServiceClient(te.clientConn())
|
||||||
|
|
||||||
@ -1412,8 +1412,7 @@ func TestServerStreaming_Concurrent(t *testing.T) {
|
|||||||
|
|
||||||
func testServerStreaming_Concurrent(t *testing.T, e env) {
|
func testServerStreaming_Concurrent(t *testing.T, e env) {
|
||||||
te := newTest(t, e)
|
te := newTest(t, e)
|
||||||
te.testServer = concurrentSendServer{}
|
te.startServer(concurrentSendServer{})
|
||||||
te.startServer()
|
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
|
|
||||||
cc := te.clientConn()
|
cc := te.clientConn()
|
||||||
@ -1471,7 +1470,7 @@ func TestClientStreaming(t *testing.T) {
|
|||||||
|
|
||||||
func testClientStreaming(t *testing.T, e env) {
|
func testClientStreaming(t *testing.T, e env) {
|
||||||
te := newTest(t, e)
|
te := newTest(t, e)
|
||||||
te.startServer()
|
te.startServer(&testServer{security: e.security})
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
tc := testpb.NewTestServiceClient(te.clientConn())
|
tc := testpb.NewTestServiceClient(te.clientConn())
|
||||||
|
|
||||||
@ -1504,6 +1503,46 @@ func testClientStreaming(t *testing.T, e env) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestClientStreamingError(t *testing.T) {
|
||||||
|
defer leakCheck(t)()
|
||||||
|
for _, e := range listTestEnv() {
|
||||||
|
testClientStreamingError(t, e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testClientStreamingError(t *testing.T, e env) {
|
||||||
|
te := newTest(t, e)
|
||||||
|
te.startServer(&testServer{security: e.security, earlyFail: true})
|
||||||
|
defer te.tearDown()
|
||||||
|
tc := testpb.NewTestServiceClient(te.clientConn())
|
||||||
|
|
||||||
|
stream, err := tc.StreamingInputCall(te.ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("%v.StreamingInputCall(_) = _, %v, want <nil>", tc, err)
|
||||||
|
}
|
||||||
|
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
req := &testpb.StreamingInputCallRequest{
|
||||||
|
Payload: payload,
|
||||||
|
}
|
||||||
|
// The 1st request should go through.
|
||||||
|
if err := stream.Send(req); err != nil {
|
||||||
|
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
if err := stream.Send(req); err != io.EOF {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, err := stream.CloseAndRecv(); grpc.Code(err) != codes.NotFound {
|
||||||
|
t.Fatalf("%v.CloseAndRecv() = %v, want error %d", stream, err, codes.NotFound)
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestExceedMaxStreamsLimit(t *testing.T) {
|
func TestExceedMaxStreamsLimit(t *testing.T) {
|
||||||
defer leakCheck(t)()
|
defer leakCheck(t)()
|
||||||
for _, e := range listTestEnv() {
|
for _, e := range listTestEnv() {
|
||||||
@ -1519,7 +1558,7 @@ func testExceedMaxStreamsLimit(t *testing.T, e env) {
|
|||||||
"grpc: the client connection is closing",
|
"grpc: the client connection is closing",
|
||||||
)
|
)
|
||||||
te.maxStream = 1 // Only allows 1 live stream per server transport.
|
te.maxStream = 1 // Only allows 1 live stream per server transport.
|
||||||
te.startServer()
|
te.startServer(&testServer{security: e.security})
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
|
|
||||||
cc := te.clientConn()
|
cc := te.clientConn()
|
||||||
@ -1560,7 +1599,7 @@ func testStreamsQuotaRecovery(t *testing.T, e env) {
|
|||||||
"grpc: the client connection is closing",
|
"grpc: the client connection is closing",
|
||||||
)
|
)
|
||||||
te.maxStream = 1 // Allows 1 live stream.
|
te.maxStream = 1 // Allows 1 live stream.
|
||||||
te.startServer()
|
te.startServer(&testServer{security: e.security})
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
|
|
||||||
cc := te.clientConn()
|
cc := te.clientConn()
|
||||||
@ -1611,7 +1650,7 @@ func testCompressServerHasNoSupport(t *testing.T, e env) {
|
|||||||
te := newTest(t, e)
|
te := newTest(t, e)
|
||||||
te.serverCompression = false
|
te.serverCompression = false
|
||||||
te.clientCompression = true
|
te.clientCompression = true
|
||||||
te.startServer()
|
te.startServer(&testServer{security: e.security})
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
tc := testpb.NewTestServiceClient(te.clientConn())
|
tc := testpb.NewTestServiceClient(te.clientConn())
|
||||||
|
|
||||||
@ -1667,7 +1706,7 @@ func testCompressOK(t *testing.T, e env) {
|
|||||||
te := newTest(t, e)
|
te := newTest(t, e)
|
||||||
te.serverCompression = true
|
te.serverCompression = true
|
||||||
te.clientCompression = true
|
te.clientCompression = true
|
||||||
te.startServer()
|
te.startServer(&testServer{security: e.security})
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
tc := testpb.NewTestServiceClient(te.clientConn())
|
tc := testpb.NewTestServiceClient(te.clientConn())
|
||||||
|
|
||||||
@ -1730,7 +1769,7 @@ func errInjector(ctx context.Context, req interface{}, info *grpc.UnaryServerInf
|
|||||||
func testUnaryServerInterceptor(t *testing.T, e env) {
|
func testUnaryServerInterceptor(t *testing.T, e env) {
|
||||||
te := newTest(t, e)
|
te := newTest(t, e)
|
||||||
te.unaryInt = errInjector
|
te.unaryInt = errInjector
|
||||||
te.startServer()
|
te.startServer(&testServer{security: e.security})
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
|
|
||||||
tc := testpb.NewTestServiceClient(te.clientConn())
|
tc := testpb.NewTestServiceClient(te.clientConn())
|
||||||
@ -1761,7 +1800,7 @@ func fullDuplexOnly(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServ
|
|||||||
func testStreamServerInterceptor(t *testing.T, e env) {
|
func testStreamServerInterceptor(t *testing.T, e env) {
|
||||||
te := newTest(t, e)
|
te := newTest(t, e)
|
||||||
te.streamInt = fullDuplexOnly
|
te.streamInt = fullDuplexOnly
|
||||||
te.startServer()
|
te.startServer(&testServer{security: e.security})
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
|
|
||||||
tc := testpb.NewTestServiceClient(te.clientConn())
|
tc := testpb.NewTestServiceClient(te.clientConn())
|
||||||
@ -1825,12 +1864,12 @@ func TestClientRequestBodyError_UnexpectedEOF(t *testing.T) {
|
|||||||
|
|
||||||
func testClientRequestBodyError_UnexpectedEOF(t *testing.T, e env) {
|
func testClientRequestBodyError_UnexpectedEOF(t *testing.T, e env) {
|
||||||
te := newTest(t, e)
|
te := newTest(t, e)
|
||||||
te.testServer = &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
|
ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
|
||||||
errUnexpectedCall := errors.New("unexpected call func server method")
|
errUnexpectedCall := errors.New("unexpected call func server method")
|
||||||
t.Error(errUnexpectedCall)
|
t.Error(errUnexpectedCall)
|
||||||
return nil, errUnexpectedCall
|
return nil, errUnexpectedCall
|
||||||
}}
|
}}
|
||||||
te.startServer()
|
te.startServer(ts)
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
te.withServerTester(func(st *serverTester) {
|
te.withServerTester(func(st *serverTester) {
|
||||||
st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall")
|
st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall")
|
||||||
@ -1850,12 +1889,12 @@ func TestClientRequestBodyError_CloseAfterLength(t *testing.T) {
|
|||||||
func testClientRequestBodyError_CloseAfterLength(t *testing.T, e env) {
|
func testClientRequestBodyError_CloseAfterLength(t *testing.T, e env) {
|
||||||
te := newTest(t, e)
|
te := newTest(t, e)
|
||||||
te.declareLogNoise("Server.processUnaryRPC failed to write status")
|
te.declareLogNoise("Server.processUnaryRPC failed to write status")
|
||||||
te.testServer = &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
|
ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
|
||||||
errUnexpectedCall := errors.New("unexpected call func server method")
|
errUnexpectedCall := errors.New("unexpected call func server method")
|
||||||
t.Error(errUnexpectedCall)
|
t.Error(errUnexpectedCall)
|
||||||
return nil, errUnexpectedCall
|
return nil, errUnexpectedCall
|
||||||
}}
|
}}
|
||||||
te.startServer()
|
te.startServer(ts)
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
te.withServerTester(func(st *serverTester) {
|
te.withServerTester(func(st *serverTester) {
|
||||||
st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall")
|
st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall")
|
||||||
@ -1875,11 +1914,11 @@ func TestClientRequestBodyError_Cancel(t *testing.T) {
|
|||||||
func testClientRequestBodyError_Cancel(t *testing.T, e env) {
|
func testClientRequestBodyError_Cancel(t *testing.T, e env) {
|
||||||
te := newTest(t, e)
|
te := newTest(t, e)
|
||||||
gotCall := make(chan bool, 1)
|
gotCall := make(chan bool, 1)
|
||||||
te.testServer = &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
|
ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
|
||||||
gotCall <- true
|
gotCall <- true
|
||||||
return new(testpb.SimpleResponse), nil
|
return new(testpb.SimpleResponse), nil
|
||||||
}}
|
}}
|
||||||
te.startServer()
|
te.startServer(ts)
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
te.withServerTester(func(st *serverTester) {
|
te.withServerTester(func(st *serverTester) {
|
||||||
st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall")
|
st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall")
|
||||||
@ -1912,12 +1951,12 @@ func TestClientRequestBodyError_Cancel_StreamingInput(t *testing.T) {
|
|||||||
func testClientRequestBodyError_Cancel_StreamingInput(t *testing.T, e env) {
|
func testClientRequestBodyError_Cancel_StreamingInput(t *testing.T, e env) {
|
||||||
te := newTest(t, e)
|
te := newTest(t, e)
|
||||||
recvErr := make(chan error, 1)
|
recvErr := make(chan error, 1)
|
||||||
te.testServer = &funcServer{streamingInputCall: func(stream testpb.TestService_StreamingInputCallServer) error {
|
ts := &funcServer{streamingInputCall: func(stream testpb.TestService_StreamingInputCallServer) error {
|
||||||
_, err := stream.Recv()
|
_, err := stream.Recv()
|
||||||
recvErr <- err
|
recvErr <- err
|
||||||
return nil
|
return nil
|
||||||
}}
|
}}
|
||||||
te.startServer()
|
te.startServer(ts)
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
te.withServerTester(func(st *serverTester) {
|
te.withServerTester(func(st *serverTester) {
|
||||||
st.writeHeadersGRPC(1, "/grpc.testing.TestService/StreamingInputCall")
|
st.writeHeadersGRPC(1, "/grpc.testing.TestService/StreamingInputCall")
|
||||||
|
@ -202,6 +202,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
|
|||||||
// TODO(zhaoq): Handle uint32 overflow of Stream.id.
|
// TODO(zhaoq): Handle uint32 overflow of Stream.id.
|
||||||
s := &Stream{
|
s := &Stream{
|
||||||
id: t.nextID,
|
id: t.nextID,
|
||||||
|
done: make(chan struct{}),
|
||||||
method: callHdr.Method,
|
method: callHdr.Method,
|
||||||
sendCompress: callHdr.SendCompress,
|
sendCompress: callHdr.SendCompress,
|
||||||
buf: newRecvBuffer(),
|
buf: newRecvBuffer(),
|
||||||
@ -278,7 +279,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||||||
checkStreamsQuota := t.streamsQuota != nil
|
checkStreamsQuota := t.streamsQuota != nil
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
if checkStreamsQuota {
|
if checkStreamsQuota {
|
||||||
sq, err := wait(ctx, t.shutdownChan, t.streamsQuota.acquire())
|
sq, err := wait(ctx, nil, t.shutdownChan, t.streamsQuota.acquire())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -287,7 +288,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||||||
t.streamsQuota.add(sq - 1)
|
t.streamsQuota.add(sq - 1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if _, err := wait(ctx, t.shutdownChan, t.writableChan); err != nil {
|
if _, err := wait(ctx, nil, t.shutdownChan, t.writableChan); err != nil {
|
||||||
// Return the quota back now because there is no stream returned to the caller.
|
// Return the quota back now because there is no stream returned to the caller.
|
||||||
if _, ok := err.(StreamError); ok && checkStreamsQuota {
|
if _, ok := err.(StreamError); ok && checkStreamsQuota {
|
||||||
t.streamsQuota.add(1)
|
t.streamsQuota.add(1)
|
||||||
@ -414,11 +415,6 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
|
|||||||
if updateStreams {
|
if updateStreams {
|
||||||
t.streamsQuota.add(1)
|
t.streamsQuota.add(1)
|
||||||
}
|
}
|
||||||
// In case stream sending and receiving are invoked in separate
|
|
||||||
// goroutines (e.g., bi-directional streaming), the caller needs
|
|
||||||
// to call cancel on the stream to interrupt the blocking on
|
|
||||||
// other goroutines.
|
|
||||||
s.cancel()
|
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
if q := s.fc.resetPendingData(); q > 0 {
|
if q := s.fc.resetPendingData(); q > 0 {
|
||||||
if n := t.fc.onRead(q); n > 0 {
|
if n := t.fc.onRead(q); n > 0 {
|
||||||
@ -504,15 +500,15 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
|
|||||||
size := http2MaxFrameLen
|
size := http2MaxFrameLen
|
||||||
s.sendQuotaPool.add(0)
|
s.sendQuotaPool.add(0)
|
||||||
// Wait until the stream has some quota to send the data.
|
// Wait until the stream has some quota to send the data.
|
||||||
sq, err := wait(s.ctx, t.shutdownChan, s.sendQuotaPool.acquire())
|
sq, err := wait(s.ctx, s.done, t.shutdownChan, s.sendQuotaPool.acquire())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
t.sendQuotaPool.add(0)
|
t.sendQuotaPool.add(0)
|
||||||
// Wait until the transport has some quota to send the data.
|
// Wait until the transport has some quota to send the data.
|
||||||
tq, err := wait(s.ctx, t.shutdownChan, t.sendQuotaPool.acquire())
|
tq, err := wait(s.ctx, s.done, t.shutdownChan, t.sendQuotaPool.acquire())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if _, ok := err.(StreamError); ok {
|
if _, ok := err.(StreamError); ok || err == io.EOF {
|
||||||
t.sendQuotaPool.cancel()
|
t.sendQuotaPool.cancel()
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
@ -544,8 +540,8 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
|
|||||||
// Indicate there is a writer who is about to write a data frame.
|
// Indicate there is a writer who is about to write a data frame.
|
||||||
t.framer.adjustNumWriters(1)
|
t.framer.adjustNumWriters(1)
|
||||||
// Got some quota. Try to acquire writing privilege on the transport.
|
// Got some quota. Try to acquire writing privilege on the transport.
|
||||||
if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
|
if _, err := wait(s.ctx, s.done, t.shutdownChan, t.writableChan); err != nil {
|
||||||
if _, ok := err.(StreamError); ok {
|
if _, ok := err.(StreamError); ok || err == io.EOF {
|
||||||
// Return the connection quota back.
|
// Return the connection quota back.
|
||||||
t.sendQuotaPool.add(len(p))
|
t.sendQuotaPool.add(len(p))
|
||||||
}
|
}
|
||||||
@ -593,11 +589,7 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
|
|||||||
}
|
}
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
if s.state != streamDone {
|
if s.state != streamDone {
|
||||||
if s.state == streamReadDone {
|
s.state = streamWriteDone
|
||||||
s.state = streamDone
|
|
||||||
} else {
|
|
||||||
s.state = streamWriteDone
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
return nil
|
return nil
|
||||||
@ -655,6 +647,7 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
|
|||||||
s.state = streamDone
|
s.state = streamDone
|
||||||
s.statusCode = codes.Internal
|
s.statusCode = codes.Internal
|
||||||
s.statusDesc = err.Error()
|
s.statusDesc = err.Error()
|
||||||
|
close(s.done)
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
s.write(recvMsg{err: io.EOF})
|
s.write(recvMsg{err: io.EOF})
|
||||||
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
|
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
|
||||||
@ -672,13 +665,14 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
|
|||||||
// the read direction is closed, and set the status appropriately.
|
// the read direction is closed, and set the status appropriately.
|
||||||
if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
|
if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
if s.state == streamWriteDone {
|
if s.state == streamDone {
|
||||||
s.state = streamDone
|
s.mu.Unlock()
|
||||||
} else {
|
return
|
||||||
s.state = streamReadDone
|
|
||||||
}
|
}
|
||||||
|
s.state = streamDone
|
||||||
s.statusCode = codes.Internal
|
s.statusCode = codes.Internal
|
||||||
s.statusDesc = "server closed the stream without sending trailers"
|
s.statusDesc = "server closed the stream without sending trailers"
|
||||||
|
close(s.done)
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
s.write(recvMsg{err: io.EOF})
|
s.write(recvMsg{err: io.EOF})
|
||||||
}
|
}
|
||||||
@ -704,6 +698,7 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
|
|||||||
grpclog.Println("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error ", f.ErrCode)
|
grpclog.Println("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error ", f.ErrCode)
|
||||||
s.statusCode = codes.Unknown
|
s.statusCode = codes.Unknown
|
||||||
}
|
}
|
||||||
|
close(s.done)
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
s.write(recvMsg{err: io.EOF})
|
s.write(recvMsg{err: io.EOF})
|
||||||
}
|
}
|
||||||
@ -780,11 +775,11 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
|
|||||||
if len(state.mdata) > 0 {
|
if len(state.mdata) > 0 {
|
||||||
s.trailer = state.mdata
|
s.trailer = state.mdata
|
||||||
}
|
}
|
||||||
s.state = streamDone
|
|
||||||
s.statusCode = state.statusCode
|
s.statusCode = state.statusCode
|
||||||
s.statusDesc = state.statusDesc
|
s.statusDesc = state.statusDesc
|
||||||
|
close(s.done)
|
||||||
|
s.state = streamDone
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
|
|
||||||
s.write(recvMsg{err: io.EOF})
|
s.write(recvMsg{err: io.EOF})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -364,11 +364,7 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
|
|||||||
// Received the end of stream from the client.
|
// Received the end of stream from the client.
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
if s.state != streamDone {
|
if s.state != streamDone {
|
||||||
if s.state == streamWriteDone {
|
s.state = streamReadDone
|
||||||
s.state = streamDone
|
|
||||||
} else {
|
|
||||||
s.state = streamReadDone
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
s.write(recvMsg{err: io.EOF})
|
s.write(recvMsg{err: io.EOF})
|
||||||
@ -455,7 +451,7 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
|
|||||||
}
|
}
|
||||||
s.headerOk = true
|
s.headerOk = true
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
|
if _, err := wait(s.ctx, nil, t.shutdownChan, t.writableChan); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
t.hBuf.Reset()
|
t.hBuf.Reset()
|
||||||
@ -495,7 +491,7 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s
|
|||||||
headersSent = true
|
headersSent = true
|
||||||
}
|
}
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
|
if _, err := wait(s.ctx, nil, t.shutdownChan, t.writableChan); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
t.hBuf.Reset()
|
t.hBuf.Reset()
|
||||||
@ -544,7 +540,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
|
|||||||
}
|
}
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
if writeHeaderFrame {
|
if writeHeaderFrame {
|
||||||
if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
|
if _, err := wait(s.ctx, nil, t.shutdownChan, t.writableChan); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
t.hBuf.Reset()
|
t.hBuf.Reset()
|
||||||
@ -572,13 +568,13 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
|
|||||||
size := http2MaxFrameLen
|
size := http2MaxFrameLen
|
||||||
s.sendQuotaPool.add(0)
|
s.sendQuotaPool.add(0)
|
||||||
// Wait until the stream has some quota to send the data.
|
// Wait until the stream has some quota to send the data.
|
||||||
sq, err := wait(s.ctx, t.shutdownChan, s.sendQuotaPool.acquire())
|
sq, err := wait(s.ctx, nil, t.shutdownChan, s.sendQuotaPool.acquire())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
t.sendQuotaPool.add(0)
|
t.sendQuotaPool.add(0)
|
||||||
// Wait until the transport has some quota to send the data.
|
// Wait until the transport has some quota to send the data.
|
||||||
tq, err := wait(s.ctx, t.shutdownChan, t.sendQuotaPool.acquire())
|
tq, err := wait(s.ctx, nil, t.shutdownChan, t.sendQuotaPool.acquire())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if _, ok := err.(StreamError); ok {
|
if _, ok := err.(StreamError); ok {
|
||||||
t.sendQuotaPool.cancel()
|
t.sendQuotaPool.cancel()
|
||||||
@ -604,7 +600,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
|
|||||||
t.framer.adjustNumWriters(1)
|
t.framer.adjustNumWriters(1)
|
||||||
// Got some quota. Try to acquire writing privilege on the
|
// Got some quota. Try to acquire writing privilege on the
|
||||||
// transport.
|
// transport.
|
||||||
if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
|
if _, err := wait(s.ctx, nil, t.shutdownChan, t.writableChan); err != nil {
|
||||||
if _, ok := err.(StreamError); ok {
|
if _, ok := err.(StreamError); ok {
|
||||||
// Return the connection quota back.
|
// Return the connection quota back.
|
||||||
t.sendQuotaPool.add(ps)
|
t.sendQuotaPool.add(ps)
|
||||||
|
@ -158,7 +158,7 @@ const (
|
|||||||
streamActive streamState = iota
|
streamActive streamState = iota
|
||||||
streamWriteDone // EndStream sent
|
streamWriteDone // EndStream sent
|
||||||
streamReadDone // EndStream received
|
streamReadDone // EndStream received
|
||||||
streamDone // sendDone and recvDone or RSTStreamFrame is sent or received.
|
streamDone // the entire stream is finished.
|
||||||
)
|
)
|
||||||
|
|
||||||
// Stream represents an RPC in the transport layer.
|
// Stream represents an RPC in the transport layer.
|
||||||
@ -169,6 +169,8 @@ type Stream struct {
|
|||||||
// ctx is the associated context of the stream.
|
// ctx is the associated context of the stream.
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
|
// done is closed when the final status arrives.
|
||||||
|
done chan struct{}
|
||||||
// method records the associated RPC method of the stream.
|
// method records the associated RPC method of the stream.
|
||||||
method string
|
method string
|
||||||
recvCompress string
|
recvCompress string
|
||||||
@ -214,6 +216,10 @@ func (s *Stream) SetSendCompress(str string) {
|
|||||||
s.sendCompress = str
|
s.sendCompress = str
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Stream) Done() <-chan struct{} {
|
||||||
|
return s.done
|
||||||
|
}
|
||||||
|
|
||||||
// Header acquires the key-value pairs of header metadata once it
|
// Header acquires the key-value pairs of header metadata once it
|
||||||
// is available. It blocks until i) the metadata is ready or ii) there is no
|
// is available. It blocks until i) the metadata is ready or ii) there is no
|
||||||
// header metadata or iii) the stream is cancelled/expired.
|
// header metadata or iii) the stream is cancelled/expired.
|
||||||
@ -501,12 +507,22 @@ func ContextErr(err error) StreamError {
|
|||||||
|
|
||||||
// wait blocks until it can receive from ctx.Done, closing, or proceed.
|
// wait blocks until it can receive from ctx.Done, closing, or proceed.
|
||||||
// If it receives from ctx.Done, it returns 0, the StreamError for ctx.Err.
|
// If it receives from ctx.Done, it returns 0, the StreamError for ctx.Err.
|
||||||
|
// If it receives from done, it returns 0, io.EOF if ctx is not done; otherwise
|
||||||
|
// it return the StreamError for ctx.Err.
|
||||||
// If it receives from closing, it returns 0, ErrConnClosing.
|
// If it receives from closing, it returns 0, ErrConnClosing.
|
||||||
// If it receives from proceed, it returns the received integer, nil.
|
// If it receives from proceed, it returns the received integer, nil.
|
||||||
func wait(ctx context.Context, closing <-chan struct{}, proceed <-chan int) (int, error) {
|
func wait(ctx context.Context, done, closing <-chan struct{}, proceed <-chan int) (int, error) {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return 0, ContextErr(ctx.Err())
|
return 0, ContextErr(ctx.Err())
|
||||||
|
case <-done:
|
||||||
|
// User cancellation has precedence.
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return 0, ContextErr(ctx.Err())
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return 0, io.EOF
|
||||||
case <-closing:
|
case <-closing:
|
||||||
return 0, ErrConnClosing
|
return 0, ErrConnClosing
|
||||||
case i := <-proceed:
|
case i := <-proceed:
|
||||||
|
Reference in New Issue
Block a user