experimental commit

This commit is contained in:
Mahak Mukhi
2017-02-17 15:00:07 -08:00
parent 888efe30a6
commit 7f74821ff2
2 changed files with 14 additions and 2 deletions

View File

@ -491,8 +491,14 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
return return
} }
t.mu.Unlock() t.mu.Unlock()
defer t.streamsQuota.add(1) var rstStream bool
defer func() {
if !rstStream {
t.streamsQuota.add(1)
}
}()
s.mu.Lock() s.mu.Lock()
rstStream = s.rstStream
if q := s.fc.resetPendingData(); q > 0 { if q := s.fc.resetPendingData(); q > 0 {
if n := t.fc.onRead(q); n > 0 { if n := t.fc.onRead(q); n > 0 {
t.controlBuf.put(&windowUpdate{0, n}) t.controlBuf.put(&windowUpdate{0, n})
@ -509,6 +515,7 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
s.state = streamDone s.state = streamDone
s.mu.Unlock() s.mu.Unlock()
if se, ok := err.(StreamError); ok && se.Code != codes.DeadlineExceeded { if se, ok := err.(StreamError); ok && se.Code != codes.DeadlineExceeded {
rstStream = true
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeCancel}) t.controlBuf.put(&resetStream{s.id, http2.ErrCodeCancel})
} }
} }
@ -750,6 +757,7 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
s.state = streamDone s.state = streamDone
s.statusCode = codes.Internal s.statusCode = codes.Internal
s.statusDesc = err.Error() s.statusDesc = err.Error()
s.rstStream = true
close(s.done) close(s.done)
s.mu.Unlock() s.mu.Unlock()
s.write(recvMsg{err: io.EOF}) s.write(recvMsg{err: io.EOF})
@ -1060,6 +1068,7 @@ func (t *http2Client) controller() {
t.framer.writeSettings(true, i.ss...) t.framer.writeSettings(true, i.ss...)
} }
case *resetStream: case *resetStream:
t.streamsQuota.add(1)
t.framer.writeRSTStream(true, i.streamID, i.code) t.framer.writeRSTStream(true, i.streamID, i.code)
case *flushIO: case *flushIO:
t.framer.flushWrite() t.framer.flushWrite()

View File

@ -35,7 +35,7 @@
Package transport defines and implements message oriented communication channel Package transport defines and implements message oriented communication channel
to complete various transactions (e.g., an RPC). to complete various transactions (e.g., an RPC).
*/ */
package transport // import "google.golang.org/grpc/transport" package transport // externally used as import "google.golang.org/grpc/transport"
import ( import (
"bytes" "bytes"
@ -213,6 +213,9 @@ type Stream struct {
// the status received from the server. // the status received from the server.
statusCode codes.Code statusCode codes.Code
statusDesc string statusDesc string
// rstStream is a flag that is true when a RST stream frame
// is sent to the server signifying that this stream is closing.
rstStream bool
} }
// RecvCompress returns the compression algorithm applied to the inbound // RecvCompress returns the compression algorithm applied to the inbound