Merge branch 'master' of https://github.com/grpc/grpc-go
This commit is contained in:
5
call.go
5
call.go
@ -179,7 +179,10 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
|
|||||||
put()
|
put()
|
||||||
put = nil
|
put = nil
|
||||||
}
|
}
|
||||||
if _, ok := err.(transport.ConnectionError); ok {
|
// Retry a non-failfast RPC when
|
||||||
|
// i) there is a connection error; or
|
||||||
|
// ii) the server started to drain before this RPC was initiated.
|
||||||
|
if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
|
||||||
if c.failFast {
|
if c.failFast {
|
||||||
return toRPCErr(err)
|
return toRPCErr(err)
|
||||||
}
|
}
|
||||||
|
@ -625,6 +625,10 @@ func (ac *addrConn) transportMonitor() {
|
|||||||
// the addrConn is idle (i.e., no RPC in flight).
|
// the addrConn is idle (i.e., no RPC in flight).
|
||||||
case <-ac.shutdownChan:
|
case <-ac.shutdownChan:
|
||||||
return
|
return
|
||||||
|
case <-t.GoAway():
|
||||||
|
ac.tearDown(errConnDrain)
|
||||||
|
ac.cc.newAddrConn(ac.addr, true)
|
||||||
|
return
|
||||||
case <-t.Error():
|
case <-t.Error():
|
||||||
ac.mu.Lock()
|
ac.mu.Lock()
|
||||||
if ac.state == Shutdown {
|
if ac.state == Shutdown {
|
||||||
@ -693,14 +697,21 @@ func (ac *addrConn) tearDown(err error) {
|
|||||||
}
|
}
|
||||||
ac.cc.mu.Unlock()
|
ac.cc.mu.Unlock()
|
||||||
}()
|
}()
|
||||||
if ac.state == Shutdown {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
ac.state = Shutdown
|
|
||||||
if ac.down != nil {
|
if ac.down != nil {
|
||||||
ac.down(downErrorf(false, false, "%v", err))
|
ac.down(downErrorf(false, false, "%v", err))
|
||||||
ac.down = nil
|
ac.down = nil
|
||||||
}
|
}
|
||||||
|
if err == errConnDrain && ac.transport != nil {
|
||||||
|
// GracefulClose(...) may be executed multiple times when
|
||||||
|
// i) receiving multiple GoAway frames from the server; or
|
||||||
|
// ii) there are concurrent name resolver/Balancer triggered
|
||||||
|
// address removal and GoAway.
|
||||||
|
ac.transport.GracefulClose()
|
||||||
|
}
|
||||||
|
if ac.state == Shutdown {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ac.state = Shutdown
|
||||||
ac.stateCV.Broadcast()
|
ac.stateCV.Broadcast()
|
||||||
if ac.events != nil {
|
if ac.events != nil {
|
||||||
ac.events.Finish()
|
ac.events.Finish()
|
||||||
@ -710,12 +721,8 @@ func (ac *addrConn) tearDown(err error) {
|
|||||||
close(ac.ready)
|
close(ac.ready)
|
||||||
ac.ready = nil
|
ac.ready = nil
|
||||||
}
|
}
|
||||||
if ac.transport != nil {
|
if ac.transport != nil && err != errConnDrain {
|
||||||
if err == errConnDrain {
|
ac.transport.Close()
|
||||||
ac.transport.GracefulClose()
|
|
||||||
} else {
|
|
||||||
ac.transport.Close()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if ac.shutdownChan != nil {
|
if ac.shutdownChan != nil {
|
||||||
close(ac.shutdownChan)
|
close(ac.shutdownChan)
|
||||||
|
45
server.go
45
server.go
@ -89,9 +89,13 @@ type service struct {
|
|||||||
type Server struct {
|
type Server struct {
|
||||||
opts options
|
opts options
|
||||||
|
|
||||||
mu sync.Mutex // guards following
|
mu sync.Mutex // guards following
|
||||||
lis map[net.Listener]bool
|
lis map[net.Listener]bool
|
||||||
conns map[io.Closer]bool
|
conns map[io.Closer]bool
|
||||||
|
drain bool
|
||||||
|
// A CondVar to let GracefulStop() blocks until all the pending RPCs are finished
|
||||||
|
// and all the transport goes away.
|
||||||
|
cv *sync.Cond
|
||||||
m map[string]*service // service name -> service info
|
m map[string]*service // service name -> service info
|
||||||
events trace.EventLog
|
events trace.EventLog
|
||||||
}
|
}
|
||||||
@ -196,6 +200,7 @@ func NewServer(opt ...ServerOption) *Server {
|
|||||||
conns: make(map[io.Closer]bool),
|
conns: make(map[io.Closer]bool),
|
||||||
m: make(map[string]*service),
|
m: make(map[string]*service),
|
||||||
}
|
}
|
||||||
|
s.cv = sync.NewCond(&s.mu)
|
||||||
if EnableTracing {
|
if EnableTracing {
|
||||||
_, file, line, _ := runtime.Caller(1)
|
_, file, line, _ := runtime.Caller(1)
|
||||||
s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
|
s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
|
||||||
@ -478,7 +483,7 @@ func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Strea
|
|||||||
func (s *Server) addConn(c io.Closer) bool {
|
func (s *Server) addConn(c io.Closer) bool {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
if s.conns == nil {
|
if s.conns == nil || s.drain {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
s.conns[c] = true
|
s.conns[c] = true
|
||||||
@ -490,6 +495,7 @@ func (s *Server) removeConn(c io.Closer) {
|
|||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
if s.conns != nil {
|
if s.conns != nil {
|
||||||
delete(s.conns, c)
|
delete(s.conns, c)
|
||||||
|
s.cv.Signal()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -782,14 +788,14 @@ func (s *Server) Stop() {
|
|||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
listeners := s.lis
|
listeners := s.lis
|
||||||
s.lis = nil
|
s.lis = nil
|
||||||
cs := s.conns
|
st := s.conns
|
||||||
s.conns = nil
|
s.conns = nil
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
|
|
||||||
for lis := range listeners {
|
for lis := range listeners {
|
||||||
lis.Close()
|
lis.Close()
|
||||||
}
|
}
|
||||||
for c := range cs {
|
for c := range st {
|
||||||
c.Close()
|
c.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -801,6 +807,33 @@ func (s *Server) Stop() {
|
|||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GracefulStop stops the gRPC server gracefully. It stops the server to accept new
|
||||||
|
// connections and RPCs and blocks until all the pending RPCs are finished.
|
||||||
|
func (s *Server) GracefulStop() {
|
||||||
|
s.mu.Lock()
|
||||||
|
if s.drain == true || s.conns == nil {
|
||||||
|
s.mu.Lock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.drain = true
|
||||||
|
for lis := range s.lis {
|
||||||
|
lis.Close()
|
||||||
|
}
|
||||||
|
for c := range s.conns {
|
||||||
|
c.(transport.ServerTransport).Drain()
|
||||||
|
}
|
||||||
|
for len(s.conns) != 0 {
|
||||||
|
s.cv.Wait()
|
||||||
|
}
|
||||||
|
s.lis = nil
|
||||||
|
s.conns = nil
|
||||||
|
if s.events != nil {
|
||||||
|
s.events.Finish()
|
||||||
|
s.events = nil
|
||||||
|
}
|
||||||
|
s.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
internal.TestingCloseConns = func(arg interface{}) {
|
internal.TestingCloseConns = func(arg interface{}) {
|
||||||
arg.(*Server).testingCloseConns()
|
arg.(*Server).testingCloseConns()
|
||||||
|
@ -195,6 +195,9 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
|||||||
cs.finish(Errorf(s.StatusCode(), "%s", s.StatusDesc()))
|
cs.finish(Errorf(s.StatusCode(), "%s", s.StatusDesc()))
|
||||||
}
|
}
|
||||||
cs.closeTransportStream(nil)
|
cs.closeTransportStream(nil)
|
||||||
|
case <-s.GoAway():
|
||||||
|
cs.finish(errConnDrain)
|
||||||
|
cs.closeTransportStream(errConnDrain)
|
||||||
case <-s.Context().Done():
|
case <-s.Context().Done():
|
||||||
err := s.Context().Err()
|
err := s.Context().Err()
|
||||||
cs.finish(err)
|
cs.finish(err)
|
||||||
|
@ -565,6 +565,127 @@ func testTimeoutOnDeadServer(t *testing.T, e env) {
|
|||||||
awaitNewConnLogOutput()
|
awaitNewConnLogOutput()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestServerGoAway(t *testing.T) {
|
||||||
|
defer leakCheck(t)()
|
||||||
|
for _, e := range listTestEnv() {
|
||||||
|
if e.name == "handler-tls" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
testServerGoAway(t, e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testServerGoAway(t *testing.T, e env) {
|
||||||
|
te := newTest(t, e)
|
||||||
|
te.userAgent = testAppUA
|
||||||
|
te.declareLogNoise(
|
||||||
|
"transport: http2Client.notifyError got notified that the client transport was broken EOF",
|
||||||
|
"grpc: Conn.transportMonitor exits due to: grpc: the client connection is closing",
|
||||||
|
"grpc: Conn.resetTransport failed to create client transport: connection error",
|
||||||
|
"grpc: Conn.resetTransport failed to create client transport: connection error: desc = \"transport: dial unix",
|
||||||
|
)
|
||||||
|
te.startServer(&testServer{security: e.security})
|
||||||
|
defer te.tearDown()
|
||||||
|
|
||||||
|
cc := te.clientConn()
|
||||||
|
tc := testpb.NewTestServiceClient(cc)
|
||||||
|
// Finish an RPC to make sure the connection is good.
|
||||||
|
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
|
||||||
|
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
|
||||||
|
}
|
||||||
|
ch := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
te.srv.GracefulStop()
|
||||||
|
close(ch)
|
||||||
|
}()
|
||||||
|
// Loop until the server side GoAway signal is propagated to the client.
|
||||||
|
for {
|
||||||
|
ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond)
|
||||||
|
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// A new RPC should fail with Unavailable error.
|
||||||
|
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err == nil || grpc.Code(err) != codes.Unavailable {
|
||||||
|
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, error code: %d", err, codes.Unavailable)
|
||||||
|
}
|
||||||
|
<-ch
|
||||||
|
awaitNewConnLogOutput()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestServerGoAwayPendingRPC(t *testing.T) {
|
||||||
|
defer leakCheck(t)()
|
||||||
|
for _, e := range listTestEnv() {
|
||||||
|
if e.name == "handler-tls" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
testServerGoAwayPendingRPC(t, e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testServerGoAwayPendingRPC(t *testing.T, e env) {
|
||||||
|
te := newTest(t, e)
|
||||||
|
te.userAgent = testAppUA
|
||||||
|
te.declareLogNoise(
|
||||||
|
"transport: http2Client.notifyError got notified that the client transport was broken EOF",
|
||||||
|
"grpc: Conn.transportMonitor exits due to: grpc: the client connection is closing",
|
||||||
|
"grpc: Conn.resetTransport failed to create client transport: connection error",
|
||||||
|
"grpc: Conn.resetTransport failed to create client transport: connection error: desc = \"transport: dial unix",
|
||||||
|
)
|
||||||
|
te.startServer(&testServer{security: e.security})
|
||||||
|
defer te.tearDown()
|
||||||
|
|
||||||
|
cc := te.clientConn()
|
||||||
|
tc := testpb.NewTestServiceClient(cc)
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
|
||||||
|
}
|
||||||
|
// Finish an RPC to make sure the connection is good.
|
||||||
|
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
|
||||||
|
t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
|
||||||
|
}
|
||||||
|
ch := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
te.srv.GracefulStop()
|
||||||
|
close(ch)
|
||||||
|
}()
|
||||||
|
// Loop until the server side GoAway signal is propagated to the client.
|
||||||
|
for {
|
||||||
|
ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond)
|
||||||
|
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
respParam := []*testpb.ResponseParameters{
|
||||||
|
{
|
||||||
|
Size: proto.Int32(1),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
req := &testpb.StreamingOutputCallRequest{
|
||||||
|
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
|
||||||
|
ResponseParameters: respParam,
|
||||||
|
Payload: payload,
|
||||||
|
}
|
||||||
|
// The existing RPC should be still good to proceed.
|
||||||
|
if err := stream.Send(req); err != nil {
|
||||||
|
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
|
||||||
|
}
|
||||||
|
if _, err := stream.Recv(); err != nil {
|
||||||
|
t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
|
||||||
|
}
|
||||||
|
cancel()
|
||||||
|
<-ch
|
||||||
|
awaitNewConnLogOutput()
|
||||||
|
}
|
||||||
|
|
||||||
func TestFailFast(t *testing.T) {
|
func TestFailFast(t *testing.T) {
|
||||||
defer leakCheck(t)()
|
defer leakCheck(t)()
|
||||||
for _, e := range listTestEnv() {
|
for _, e := range listTestEnv() {
|
||||||
@ -1993,6 +2114,7 @@ func interestingGoroutines() (gs []string) {
|
|||||||
|
|
||||||
if stack == "" ||
|
if stack == "" ||
|
||||||
strings.Contains(stack, "testing.Main(") ||
|
strings.Contains(stack, "testing.Main(") ||
|
||||||
|
strings.Contains(stack, "testing.tRunner(") ||
|
||||||
strings.Contains(stack, "runtime.goexit") ||
|
strings.Contains(stack, "runtime.goexit") ||
|
||||||
strings.Contains(stack, "created by runtime.gc") ||
|
strings.Contains(stack, "created by runtime.gc") ||
|
||||||
strings.Contains(stack, "created by google3/base/go/log.init") ||
|
strings.Contains(stack, "created by google3/base/go/log.init") ||
|
||||||
|
@ -72,6 +72,11 @@ type resetStream struct {
|
|||||||
|
|
||||||
func (*resetStream) item() {}
|
func (*resetStream) item() {}
|
||||||
|
|
||||||
|
type goAway struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*goAway) item() {}
|
||||||
|
|
||||||
type flushIO struct {
|
type flushIO struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -370,6 +370,10 @@ func (ht *serverHandlerTransport) runStream() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ht *serverHandlerTransport) Drain() {
|
||||||
|
panic("Drain() is not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
// mapRecvMsgError returns the non-nil err into the appropriate
|
// mapRecvMsgError returns the non-nil err into the appropriate
|
||||||
// error value as expected by callers of *grpc.parser.recvMsg.
|
// error value as expected by callers of *grpc.parser.recvMsg.
|
||||||
// In particular, in can only be:
|
// In particular, in can only be:
|
||||||
|
@ -72,6 +72,9 @@ type http2Client struct {
|
|||||||
shutdownChan chan struct{}
|
shutdownChan chan struct{}
|
||||||
// errorChan is closed to notify the I/O error to the caller.
|
// errorChan is closed to notify the I/O error to the caller.
|
||||||
errorChan chan struct{}
|
errorChan chan struct{}
|
||||||
|
// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
|
||||||
|
// that the server sent GoAway on this transport.
|
||||||
|
goAway chan struct{}
|
||||||
|
|
||||||
framer *framer
|
framer *framer
|
||||||
hBuf *bytes.Buffer // the buffer for HPACK encoding
|
hBuf *bytes.Buffer // the buffer for HPACK encoding
|
||||||
@ -98,6 +101,10 @@ type http2Client struct {
|
|||||||
maxStreams int
|
maxStreams int
|
||||||
// the per-stream outbound flow control window size set by the peer.
|
// the per-stream outbound flow control window size set by the peer.
|
||||||
streamSendQuota uint32
|
streamSendQuota uint32
|
||||||
|
// goAwayID records the Last-Stream-ID in the GoAway frame from the server.
|
||||||
|
goAwayID uint32
|
||||||
|
// prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
|
||||||
|
prevGoAwayID uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
|
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
|
||||||
@ -148,6 +155,7 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e
|
|||||||
writableChan: make(chan int, 1),
|
writableChan: make(chan int, 1),
|
||||||
shutdownChan: make(chan struct{}),
|
shutdownChan: make(chan struct{}),
|
||||||
errorChan: make(chan struct{}),
|
errorChan: make(chan struct{}),
|
||||||
|
goAway: make(chan struct{}),
|
||||||
framer: newFramer(conn),
|
framer: newFramer(conn),
|
||||||
hBuf: &buf,
|
hBuf: &buf,
|
||||||
hEnc: hpack.NewEncoder(&buf),
|
hEnc: hpack.NewEncoder(&buf),
|
||||||
@ -204,6 +212,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
|
|||||||
s := &Stream{
|
s := &Stream{
|
||||||
id: t.nextID,
|
id: t.nextID,
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
|
goAway: make(chan struct{}),
|
||||||
method: callHdr.Method,
|
method: callHdr.Method,
|
||||||
sendCompress: callHdr.SendCompress,
|
sendCompress: callHdr.SendCompress,
|
||||||
buf: newRecvBuffer(),
|
buf: newRecvBuffer(),
|
||||||
@ -218,8 +227,9 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
|
|||||||
// Make a stream be able to cancel the pending operations by itself.
|
// Make a stream be able to cancel the pending operations by itself.
|
||||||
s.ctx, s.cancel = context.WithCancel(ctx)
|
s.ctx, s.cancel = context.WithCancel(ctx)
|
||||||
s.dec = &recvBufferReader{
|
s.dec = &recvBufferReader{
|
||||||
ctx: s.ctx,
|
ctx: s.ctx,
|
||||||
recv: s.buf,
|
goAway: s.goAway,
|
||||||
|
recv: s.buf,
|
||||||
}
|
}
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
@ -280,7 +290,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||||||
checkStreamsQuota := t.streamsQuota != nil
|
checkStreamsQuota := t.streamsQuota != nil
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
if checkStreamsQuota {
|
if checkStreamsQuota {
|
||||||
sq, err := wait(ctx, nil, t.shutdownChan, t.streamsQuota.acquire())
|
sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -289,7 +299,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||||||
t.streamsQuota.add(sq - 1)
|
t.streamsQuota.add(sq - 1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if _, err := wait(ctx, nil, t.shutdownChan, t.writableChan); err != nil {
|
if _, err := wait(ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
|
||||||
// Return the quota back now because there is no stream returned to the caller.
|
// Return the quota back now because there is no stream returned to the caller.
|
||||||
if _, ok := err.(StreamError); ok && checkStreamsQuota {
|
if _, ok := err.(StreamError); ok && checkStreamsQuota {
|
||||||
t.streamsQuota.add(1)
|
t.streamsQuota.add(1)
|
||||||
@ -405,13 +415,13 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
|
|||||||
if t.streamsQuota != nil {
|
if t.streamsQuota != nil {
|
||||||
updateStreams = true
|
updateStreams = true
|
||||||
}
|
}
|
||||||
if t.state == draining && len(t.activeStreams) == 1 {
|
delete(t.activeStreams, s.id)
|
||||||
|
if t.state == draining && len(t.activeStreams) == 0 {
|
||||||
// The transport is draining and s is the last live stream on t.
|
// The transport is draining and s is the last live stream on t.
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
t.Close()
|
t.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
delete(t.activeStreams, s.id)
|
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
if updateStreams {
|
if updateStreams {
|
||||||
t.streamsQuota.add(1)
|
t.streamsQuota.add(1)
|
||||||
@ -442,13 +452,13 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
|
|||||||
// accessed any more.
|
// accessed any more.
|
||||||
func (t *http2Client) Close() (err error) {
|
func (t *http2Client) Close() (err error) {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
if t.state == reachable {
|
|
||||||
close(t.errorChan)
|
|
||||||
}
|
|
||||||
if t.state == closing {
|
if t.state == closing {
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if t.state == reachable {
|
||||||
|
close(t.errorChan)
|
||||||
|
}
|
||||||
t.state = closing
|
t.state = closing
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
close(t.shutdownChan)
|
close(t.shutdownChan)
|
||||||
@ -476,6 +486,24 @@ func (t *http2Client) GracefulClose() error {
|
|||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
// Notify the streams which were initiated after the server sent GOAWAY.
|
||||||
|
select {
|
||||||
|
case <-t.goAway:
|
||||||
|
n := t.prevGoAwayID
|
||||||
|
if n == 0 && t.nextID > 1 {
|
||||||
|
n = t.nextID - 2
|
||||||
|
}
|
||||||
|
m := t.goAwayID + 2
|
||||||
|
if m == 2 {
|
||||||
|
m = 1
|
||||||
|
}
|
||||||
|
for i := m; i <= n; i += 2 {
|
||||||
|
if s, ok := t.activeStreams[i]; ok {
|
||||||
|
close(s.goAway)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
}
|
||||||
if t.state == draining {
|
if t.state == draining {
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
return nil
|
return nil
|
||||||
@ -501,13 +529,13 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
|
|||||||
size := http2MaxFrameLen
|
size := http2MaxFrameLen
|
||||||
s.sendQuotaPool.add(0)
|
s.sendQuotaPool.add(0)
|
||||||
// Wait until the stream has some quota to send the data.
|
// Wait until the stream has some quota to send the data.
|
||||||
sq, err := wait(s.ctx, s.done, t.shutdownChan, s.sendQuotaPool.acquire())
|
sq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, s.sendQuotaPool.acquire())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
t.sendQuotaPool.add(0)
|
t.sendQuotaPool.add(0)
|
||||||
// Wait until the transport has some quota to send the data.
|
// Wait until the transport has some quota to send the data.
|
||||||
tq, err := wait(s.ctx, s.done, t.shutdownChan, t.sendQuotaPool.acquire())
|
tq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.sendQuotaPool.acquire())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if _, ok := err.(StreamError); ok || err == io.EOF {
|
if _, ok := err.(StreamError); ok || err == io.EOF {
|
||||||
t.sendQuotaPool.cancel()
|
t.sendQuotaPool.cancel()
|
||||||
@ -541,7 +569,7 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
|
|||||||
// Indicate there is a writer who is about to write a data frame.
|
// Indicate there is a writer who is about to write a data frame.
|
||||||
t.framer.adjustNumWriters(1)
|
t.framer.adjustNumWriters(1)
|
||||||
// Got some quota. Try to acquire writing privilege on the transport.
|
// Got some quota. Try to acquire writing privilege on the transport.
|
||||||
if _, err := wait(s.ctx, s.done, t.shutdownChan, t.writableChan); err != nil {
|
if _, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.writableChan); err != nil {
|
||||||
if _, ok := err.(StreamError); ok || err == io.EOF {
|
if _, ok := err.(StreamError); ok || err == io.EOF {
|
||||||
// Return the connection quota back.
|
// Return the connection quota back.
|
||||||
t.sendQuotaPool.add(len(p))
|
t.sendQuotaPool.add(len(p))
|
||||||
@ -725,7 +753,32 @@ func (t *http2Client) handlePing(f *http2.PingFrame) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
|
func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
|
||||||
// TODO(zhaoq): GoAwayFrame handler to be implemented
|
t.mu.Lock()
|
||||||
|
if t.state == reachable || t.state == draining {
|
||||||
|
if f.LastStreamID > 0 && f.LastStreamID%2 != 1 {
|
||||||
|
t.mu.Unlock()
|
||||||
|
t.notifyError(ConnectionErrorf("received illegal http2 GOAWAY frame: stream ID %d is even", f.LastStreamID))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-t.goAway:
|
||||||
|
id := t.goAwayID
|
||||||
|
// t.goAway has been closed (i.e.,multiple GoAways).
|
||||||
|
if id < f.LastStreamID {
|
||||||
|
t.mu.Unlock()
|
||||||
|
t.notifyError(ConnectionErrorf("received illegal http2 GOAWAY frame: previously recv GOAWAY frame with LastStramID %d, currently recv %d", id, f.LastStreamID))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
t.prevGoAwayID = id
|
||||||
|
t.goAwayID = f.LastStreamID
|
||||||
|
t.mu.Unlock()
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
t.goAwayID = f.LastStreamID
|
||||||
|
close(t.goAway)
|
||||||
|
}
|
||||||
|
t.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
|
func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
|
||||||
@ -934,6 +987,10 @@ func (t *http2Client) Error() <-chan struct{} {
|
|||||||
return t.errorChan
|
return t.errorChan
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *http2Client) GoAway() <-chan struct{} {
|
||||||
|
return t.goAway
|
||||||
|
}
|
||||||
|
|
||||||
func (t *http2Client) notifyError(err error) {
|
func (t *http2Client) notifyError(err error) {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
defer t.mu.Unlock()
|
defer t.mu.Unlock()
|
||||||
|
@ -142,7 +142,7 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI
|
|||||||
}
|
}
|
||||||
|
|
||||||
// operateHeader takes action on the decoded headers.
|
// operateHeader takes action on the decoded headers.
|
||||||
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) {
|
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) (close bool) {
|
||||||
buf := newRecvBuffer()
|
buf := newRecvBuffer()
|
||||||
s := &Stream{
|
s := &Stream{
|
||||||
id: frame.Header().StreamID,
|
id: frame.Header().StreamID,
|
||||||
@ -205,6 +205,13 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
|||||||
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
|
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if s.id%2 != 1 || s.id <= t.maxStreamID {
|
||||||
|
t.mu.Unlock()
|
||||||
|
// illegal gRPC stream id.
|
||||||
|
grpclog.Println("transport: http2Server.HandleStreams received an illegal stream id: ", s.id)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
t.maxStreamID = s.id
|
||||||
s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
|
s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
|
||||||
t.activeStreams[s.id] = s
|
t.activeStreams[s.id] = s
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
@ -212,6 +219,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
|||||||
t.updateWindow(s, uint32(n))
|
t.updateWindow(s, uint32(n))
|
||||||
}
|
}
|
||||||
handle(s)
|
handle(s)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// HandleStreams receives incoming streams using the given handler. This is
|
// HandleStreams receives incoming streams using the given handler. This is
|
||||||
@ -262,15 +270,10 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
|
|||||||
}
|
}
|
||||||
switch frame := frame.(type) {
|
switch frame := frame.(type) {
|
||||||
case *http2.MetaHeadersFrame:
|
case *http2.MetaHeadersFrame:
|
||||||
id := frame.Header().StreamID
|
if t.operateHeaders(frame, handle) {
|
||||||
if id%2 != 1 || id <= t.maxStreamID {
|
|
||||||
// illegal gRPC stream id.
|
|
||||||
grpclog.Println("transport: http2Server.HandleStreams received an illegal stream id: ", id)
|
|
||||||
t.Close()
|
t.Close()
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
t.maxStreamID = id
|
|
||||||
t.operateHeaders(frame, handle)
|
|
||||||
case *http2.DataFrame:
|
case *http2.DataFrame:
|
||||||
t.handleData(frame)
|
t.handleData(frame)
|
||||||
case *http2.RSTStreamFrame:
|
case *http2.RSTStreamFrame:
|
||||||
@ -282,7 +285,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
|
|||||||
case *http2.WindowUpdateFrame:
|
case *http2.WindowUpdateFrame:
|
||||||
t.handleWindowUpdate(frame)
|
t.handleWindowUpdate(frame)
|
||||||
case *http2.GoAwayFrame:
|
case *http2.GoAwayFrame:
|
||||||
break
|
// TODO: Handle GoAway from the client appropriately.
|
||||||
default:
|
default:
|
||||||
grpclog.Printf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
|
grpclog.Printf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
|
||||||
}
|
}
|
||||||
@ -451,7 +454,7 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
|
|||||||
}
|
}
|
||||||
s.headerOk = true
|
s.headerOk = true
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
if _, err := wait(s.ctx, nil, t.shutdownChan, t.writableChan); err != nil {
|
if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
t.hBuf.Reset()
|
t.hBuf.Reset()
|
||||||
@ -491,7 +494,7 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s
|
|||||||
headersSent = true
|
headersSent = true
|
||||||
}
|
}
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
if _, err := wait(s.ctx, nil, t.shutdownChan, t.writableChan); err != nil {
|
if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
t.hBuf.Reset()
|
t.hBuf.Reset()
|
||||||
@ -540,7 +543,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
|
|||||||
}
|
}
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
if writeHeaderFrame {
|
if writeHeaderFrame {
|
||||||
if _, err := wait(s.ctx, nil, t.shutdownChan, t.writableChan); err != nil {
|
if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
t.hBuf.Reset()
|
t.hBuf.Reset()
|
||||||
@ -568,13 +571,13 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
|
|||||||
size := http2MaxFrameLen
|
size := http2MaxFrameLen
|
||||||
s.sendQuotaPool.add(0)
|
s.sendQuotaPool.add(0)
|
||||||
// Wait until the stream has some quota to send the data.
|
// Wait until the stream has some quota to send the data.
|
||||||
sq, err := wait(s.ctx, nil, t.shutdownChan, s.sendQuotaPool.acquire())
|
sq, err := wait(s.ctx, nil, nil, t.shutdownChan, s.sendQuotaPool.acquire())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
t.sendQuotaPool.add(0)
|
t.sendQuotaPool.add(0)
|
||||||
// Wait until the transport has some quota to send the data.
|
// Wait until the transport has some quota to send the data.
|
||||||
tq, err := wait(s.ctx, nil, t.shutdownChan, t.sendQuotaPool.acquire())
|
tq, err := wait(s.ctx, nil, nil, t.shutdownChan, t.sendQuotaPool.acquire())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if _, ok := err.(StreamError); ok {
|
if _, ok := err.(StreamError); ok {
|
||||||
t.sendQuotaPool.cancel()
|
t.sendQuotaPool.cancel()
|
||||||
@ -600,7 +603,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
|
|||||||
t.framer.adjustNumWriters(1)
|
t.framer.adjustNumWriters(1)
|
||||||
// Got some quota. Try to acquire writing privilege on the
|
// Got some quota. Try to acquire writing privilege on the
|
||||||
// transport.
|
// transport.
|
||||||
if _, err := wait(s.ctx, nil, t.shutdownChan, t.writableChan); err != nil {
|
if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
|
||||||
if _, ok := err.(StreamError); ok {
|
if _, ok := err.(StreamError); ok {
|
||||||
// Return the connection quota back.
|
// Return the connection quota back.
|
||||||
t.sendQuotaPool.add(ps)
|
t.sendQuotaPool.add(ps)
|
||||||
@ -675,6 +678,12 @@ func (t *http2Server) controller() {
|
|||||||
}
|
}
|
||||||
case *resetStream:
|
case *resetStream:
|
||||||
t.framer.writeRSTStream(true, i.streamID, i.code)
|
t.framer.writeRSTStream(true, i.streamID, i.code)
|
||||||
|
case *goAway:
|
||||||
|
t.mu.Lock()
|
||||||
|
sid := t.maxStreamID
|
||||||
|
t.state = draining
|
||||||
|
t.mu.Unlock()
|
||||||
|
t.framer.writeGoAway(true, sid, http2.ErrCodeNo, nil)
|
||||||
case *flushIO:
|
case *flushIO:
|
||||||
t.framer.flushWrite()
|
t.framer.flushWrite()
|
||||||
case *ping:
|
case *ping:
|
||||||
@ -720,6 +729,9 @@ func (t *http2Server) Close() (err error) {
|
|||||||
func (t *http2Server) closeStream(s *Stream) {
|
func (t *http2Server) closeStream(s *Stream) {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
delete(t.activeStreams, s.id)
|
delete(t.activeStreams, s.id)
|
||||||
|
if t.state == draining && len(t.activeStreams) == 0 {
|
||||||
|
defer t.Close()
|
||||||
|
}
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
// In case stream sending and receiving are invoked in separate
|
// In case stream sending and receiving are invoked in separate
|
||||||
// goroutines (e.g., bi-directional streaming), cancel needs to be
|
// goroutines (e.g., bi-directional streaming), cancel needs to be
|
||||||
@ -742,3 +754,7 @@ func (t *http2Server) closeStream(s *Stream) {
|
|||||||
func (t *http2Server) RemoteAddr() net.Addr {
|
func (t *http2Server) RemoteAddr() net.Addr {
|
||||||
return t.conn.RemoteAddr()
|
return t.conn.RemoteAddr()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *http2Server) Drain() {
|
||||||
|
t.controlBuf.put(&goAway{})
|
||||||
|
}
|
||||||
|
@ -121,10 +121,11 @@ func (b *recvBuffer) get() <-chan item {
|
|||||||
// recvBufferReader implements io.Reader interface to read the data from
|
// recvBufferReader implements io.Reader interface to read the data from
|
||||||
// recvBuffer.
|
// recvBuffer.
|
||||||
type recvBufferReader struct {
|
type recvBufferReader struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
recv *recvBuffer
|
goAway chan struct{}
|
||||||
last *bytes.Reader // Stores the remaining data in the previous calls.
|
recv *recvBuffer
|
||||||
err error
|
last *bytes.Reader // Stores the remaining data in the previous calls.
|
||||||
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read reads the next len(p) bytes from last. If last is drained, it tries to
|
// Read reads the next len(p) bytes from last. If last is drained, it tries to
|
||||||
@ -142,6 +143,8 @@ func (r *recvBufferReader) Read(p []byte) (n int, err error) {
|
|||||||
select {
|
select {
|
||||||
case <-r.ctx.Done():
|
case <-r.ctx.Done():
|
||||||
return 0, ContextErr(r.ctx.Err())
|
return 0, ContextErr(r.ctx.Err())
|
||||||
|
case <-r.goAway:
|
||||||
|
return 0, ErrStreamDrain
|
||||||
case i := <-r.recv.get():
|
case i := <-r.recv.get():
|
||||||
r.recv.load()
|
r.recv.load()
|
||||||
m := i.(*recvMsg)
|
m := i.(*recvMsg)
|
||||||
@ -172,6 +175,8 @@ type Stream struct {
|
|||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
// done is closed when the final status arrives.
|
// done is closed when the final status arrives.
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
|
// goAway is closed when the server sent GoAways signal before this stream was initiated.
|
||||||
|
goAway chan struct{}
|
||||||
// method records the associated RPC method of the stream.
|
// method records the associated RPC method of the stream.
|
||||||
method string
|
method string
|
||||||
recvCompress string
|
recvCompress string
|
||||||
@ -217,10 +222,18 @@ func (s *Stream) SetSendCompress(str string) {
|
|||||||
s.sendCompress = str
|
s.sendCompress = str
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Done returns a chanel which is closed when it receives the final status
|
||||||
|
// from the server.
|
||||||
func (s *Stream) Done() <-chan struct{} {
|
func (s *Stream) Done() <-chan struct{} {
|
||||||
return s.done
|
return s.done
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GoAway returns a channel which is closed when the server sent GoAways signal
|
||||||
|
// before this stream was initiated.
|
||||||
|
func (s *Stream) GoAway() <-chan struct{} {
|
||||||
|
return s.goAway
|
||||||
|
}
|
||||||
|
|
||||||
// Header acquires the key-value pairs of header metadata once it
|
// Header acquires the key-value pairs of header metadata once it
|
||||||
// is available. It blocks until i) the metadata is ready or ii) there is no
|
// is available. It blocks until i) the metadata is ready or ii) there is no
|
||||||
// header metadata or iii) the stream is cancelled/expired.
|
// header metadata or iii) the stream is cancelled/expired.
|
||||||
@ -424,6 +437,11 @@ type ClientTransport interface {
|
|||||||
// and create a new one) in error case. It should not return nil
|
// and create a new one) in error case. It should not return nil
|
||||||
// once the transport is initiated.
|
// once the transport is initiated.
|
||||||
Error() <-chan struct{}
|
Error() <-chan struct{}
|
||||||
|
|
||||||
|
// GoAway returns a channel that is closed when ClientTranspor
|
||||||
|
// receives the draining signal from the server (e.g., GOAWAY frame in
|
||||||
|
// HTTP/2).
|
||||||
|
GoAway() <-chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServerTransport is the common interface for all gRPC server-side transport
|
// ServerTransport is the common interface for all gRPC server-side transport
|
||||||
@ -455,6 +473,9 @@ type ServerTransport interface {
|
|||||||
|
|
||||||
// RemoteAddr returns the remote network address.
|
// RemoteAddr returns the remote network address.
|
||||||
RemoteAddr() net.Addr
|
RemoteAddr() net.Addr
|
||||||
|
|
||||||
|
// Drain notifies the client this ServerTransport stops accepting new RPCs.
|
||||||
|
Drain()
|
||||||
}
|
}
|
||||||
|
|
||||||
// StreamErrorf creates an StreamError with the specified error code and description.
|
// StreamErrorf creates an StreamError with the specified error code and description.
|
||||||
@ -482,8 +503,13 @@ func (e ConnectionError) Error() string {
|
|||||||
return fmt.Sprintf("connection error: desc = %q", e.Desc)
|
return fmt.Sprintf("connection error: desc = %q", e.Desc)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ErrConnClosing indicates that the transport is closing.
|
var (
|
||||||
var ErrConnClosing = ConnectionError{Desc: "transport is closing"}
|
// ErrConnClosing indicates that the transport is closing.
|
||||||
|
ErrConnClosing = ConnectionError{Desc: "transport is closing"}
|
||||||
|
// ErrStreamDrain indicates that the stream is rejected by the server because
|
||||||
|
// the server stops accepting new RPCs.
|
||||||
|
ErrStreamDrain = StreamErrorf(codes.Unavailable, "the server stops accepting new RPCs")
|
||||||
|
)
|
||||||
|
|
||||||
// StreamError is an error that only affects one stream within a connection.
|
// StreamError is an error that only affects one stream within a connection.
|
||||||
type StreamError struct {
|
type StreamError struct {
|
||||||
@ -510,9 +536,10 @@ func ContextErr(err error) StreamError {
|
|||||||
// If it receives from ctx.Done, it returns 0, the StreamError for ctx.Err.
|
// If it receives from ctx.Done, it returns 0, the StreamError for ctx.Err.
|
||||||
// If it receives from done, it returns 0, io.EOF if ctx is not done; otherwise
|
// If it receives from done, it returns 0, io.EOF if ctx is not done; otherwise
|
||||||
// it return the StreamError for ctx.Err.
|
// it return the StreamError for ctx.Err.
|
||||||
|
// If it receives from goAway, it returns 0, ErrStreamDrain.
|
||||||
// If it receives from closing, it returns 0, ErrConnClosing.
|
// If it receives from closing, it returns 0, ErrConnClosing.
|
||||||
// If it receives from proceed, it returns the received integer, nil.
|
// If it receives from proceed, it returns the received integer, nil.
|
||||||
func wait(ctx context.Context, done, closing <-chan struct{}, proceed <-chan int) (int, error) {
|
func wait(ctx context.Context, done, goAway, closing <-chan struct{}, proceed <-chan int) (int, error) {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return 0, ContextErr(ctx.Err())
|
return 0, ContextErr(ctx.Err())
|
||||||
@ -524,6 +551,8 @@ func wait(ctx context.Context, done, closing <-chan struct{}, proceed <-chan int
|
|||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
return 0, io.EOF
|
return 0, io.EOF
|
||||||
|
case <-goAway:
|
||||||
|
return 0, ErrStreamDrain
|
||||||
case <-closing:
|
case <-closing:
|
||||||
return 0, ErrConnClosing
|
return 0, ErrConnClosing
|
||||||
case i := <-proceed:
|
case i := <-proceed:
|
||||||
|
Reference in New Issue
Block a user