Register incoming stream with loopy as soon as it gets created. (#2144)

This commit is contained in:
mmukhi
2018-06-18 10:52:49 -07:00
committed by GitHub
parent 7e6dc36bea
commit fd9db51e64
2 changed files with 43 additions and 21 deletions

View File

@ -84,6 +84,13 @@ func (il *itemList) isEmpty() bool {
// the control buffer of transport. They represent different aspects of
// control tasks, e.g., flow control, settings, streaming resetting, etc.
// registerStream is used to register an incoming stream with loopy writer.
type registerStream struct {
streamID uint32
wq *writeQuota
}
// headerFrame is also used to register stream on the client-side.
type headerFrame struct {
streamID uint32
hf []hpack.HeaderField
@ -457,30 +464,39 @@ func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
return l.framer.fr.WriteSettingsAck()
}
func (l *loopyWriter) registerStreamHandler(h *registerStream) error {
str := &outStream{
id: h.streamID,
state: empty,
itl: &itemList{},
wq: h.wq,
}
l.estdStreams[h.streamID] = str
return nil
}
func (l *loopyWriter) headerHandler(h *headerFrame) error {
if l.side == serverSide {
if h.endStream { // Case 1.A: Server wants to close stream.
// Make sure it's not a trailers only response.
if str, ok := l.estdStreams[h.streamID]; ok {
if str.state != empty { // either active or waiting on stream quota.
// add it str's list of items.
str.itl.enqueue(h)
return nil
}
}
if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
return err
}
return l.cleanupStreamHandler(h.cleanup)
str, ok := l.estdStreams[h.streamID]
if !ok {
warningf("transport: loopy doesn't recognize the stream: %d", h.streamID)
return nil
}
// Case 1.B: Server is responding back with headers.
str := &outStream{
state: empty,
itl: &itemList{},
wq: h.wq,
// Case 1.A: Server is responding back with headers.
if !h.endStream {
return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
}
l.estdStreams[h.streamID] = str
return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
// else: Case 1.B: Server wants to close stream.
if str.state != empty { // either active or waiting on stream quota.
// add it str's list of items.
str.itl.enqueue(h)
return nil
}
if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
return err
}
return l.cleanupStreamHandler(h.cleanup)
}
// Case 2: Client wants to originate stream.
str := &outStream{
@ -639,6 +655,8 @@ func (l *loopyWriter) handle(i interface{}) error {
return l.outgoingSettingsHandler(i)
case *headerFrame:
return l.headerHandler(i)
case *registerStream:
return l.registerStreamHandler(i)
case *cleanupStream:
return l.cleanupStreamHandler(i)
case *incomingGoAway:

View File

@ -416,6 +416,11 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
t.updateWindow(s, uint32(n))
},
}
// Register the stream with loopy.
t.controlBuf.put(&registerStream{
streamID: s.id,
wq: s.wq,
})
handle(s)
return
}
@ -733,7 +738,6 @@ func (t *http2Server) writeHeaderLocked(s *Stream) {
onWrite: func() {
atomic.StoreUint32(&t.resetPingStrikes, 1)
},
wq: s.wq,
})
if t.stats != nil {
// Note: WireLength is not set in outHeader.