Merge branch 'master' of https://github.com/grpc/grpc-go
This commit is contained in:
@ -70,7 +70,8 @@ Metadata can be retrieved from context using `FromContext`:
|
|||||||
|
|
||||||
```go
|
```go
|
||||||
func (s *server) SomeRPC(ctx context.Context, in *pb.SomeRequest) (*pb.SomeResponse, err) {
|
func (s *server) SomeRPC(ctx context.Context, in *pb.SomeRequest) (*pb.SomeResponse, err) {
|
||||||
md := metadata.FromContext(ctx)
|
md, ok := metadata.FromContext(ctx)
|
||||||
|
// do something with metadata
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
@ -1598,6 +1598,61 @@ func testExceedMaxStreamsLimit(t *testing.T, e env) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStreamsQuotaRecovery(t *testing.T) {
|
||||||
|
defer leakCheck(t)()
|
||||||
|
for _, e := range listTestEnv() {
|
||||||
|
testStreamsQuotaRecovery(t, e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testStreamsQuotaRecovery(t *testing.T, e env) {
|
||||||
|
te := newTest(t, e)
|
||||||
|
te.declareLogNoise(
|
||||||
|
"http2Client.notifyError got notified that the client transport was broken",
|
||||||
|
"Conn.resetTransport failed to create client transport",
|
||||||
|
"grpc: the client connection is closing",
|
||||||
|
)
|
||||||
|
te.maxStream = 1 // Allows 1 live stream.
|
||||||
|
te.startServer()
|
||||||
|
defer te.tearDown()
|
||||||
|
|
||||||
|
cc := te.clientConn()
|
||||||
|
tc := testpb.NewTestServiceClient(cc)
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
if _, err := tc.StreamingInputCall(ctx); err != nil {
|
||||||
|
t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
|
||||||
|
}
|
||||||
|
// Loop until the new max stream setting is effective.
|
||||||
|
for {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||||
|
defer cancel()
|
||||||
|
_, err := tc.StreamingInputCall(ctx)
|
||||||
|
if err == nil {
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if grpc.Code(err) == codes.DeadlineExceeded {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %d", tc, err, codes.DeadlineExceeded)
|
||||||
|
}
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
if _, err := tc.StreamingInputCall(ctx); err != nil {
|
||||||
|
t.Errorf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
|
||||||
|
}
|
||||||
|
cancel()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
func TestCompressServerHasNoSupport(t *testing.T) {
|
func TestCompressServerHasNoSupport(t *testing.T) {
|
||||||
defer leakCheck(t)()
|
defer leakCheck(t)()
|
||||||
for _, e := range listTestEnv() {
|
for _, e := range listTestEnv() {
|
||||||
|
@ -293,7 +293,10 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if _, err := wait(ctx, t.shutdownChan, t.writableChan); err != nil {
|
if _, err := wait(ctx, t.shutdownChan, t.writableChan); err != nil {
|
||||||
// t.streamsQuota will be updated when t.CloseStream is invoked.
|
// Return the quota back now because there is no stream returned to the caller.
|
||||||
|
if _, ok := err.(StreamError); ok && checkStreamsQuota {
|
||||||
|
t.streamsQuota.add(1)
|
||||||
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
|
Reference in New Issue
Block a user