From 086f6de8a8e82afada100da3da6c1eba7c6633d2 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 27 Jan 2016 16:39:34 -0800 Subject: [PATCH] force flush headers frame for server streaming and bi-di streaming --- stream.go | 1 + transport/http2_client.go | 8 ++++++-- transport/transport.go | 4 ++++ 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/stream.go b/stream.go index 63f934dd..9b42b9e7 100644 --- a/stream.go +++ b/stream.go @@ -113,6 +113,7 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth callHdr := &transport.CallHdr{ Host: cc.authority, Method: method, + Flush: desc.ServerStreams, } if cp != nil { callHdr.SendCompress = cp.Type() diff --git a/transport/http2_client.go b/transport/http2_client.go index 7006cd87..7cf700fe 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -353,6 +353,10 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea } else { endHeaders = true } + var flush bool + if endHeaders && (hasMD || callHdr.Flush) { + flush = true + } if first { // Sends a HeadersFrame to server to start a new stream. p := http2.HeadersFrameParam{ @@ -364,11 +368,11 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea // Do a force flush for the buffered frames iff it is the last headers frame // and there is header metadata to be sent. Otherwise, there is flushing until // the corresponding data frame is written. - err = t.framer.writeHeaders(hasMD && endHeaders, p) + err = t.framer.writeHeaders(flush, p) first = false } else { // Sends Continuation frames for the leftover headers. - err = t.framer.writeContinuation(hasMD && endHeaders, s.id, endHeaders, t.hBuf.Next(size)) + err = t.framer.writeContinuation(flush, s.id, endHeaders, t.hBuf.Next(size)) } if err != nil { t.notifyError(err) diff --git a/transport/transport.go b/transport/transport.go index 79564796..6c3b9437 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -369,6 +369,10 @@ type CallHdr struct { RecvCompress string // SendCompress specifies the compression algorithm applied on outbound message. SendCompress string + // Flush indicates if new stream command should be sent to the peer without + // waiting for the first data. This is a hint though. The transport may modify + // the flush decision for performance purpose. + Flush bool } // ClientTransport is the common interface for all gRPC client side transport