force flush headers frame for server streaming and bi-di streaming
This commit is contained in:
@ -113,6 +113,7 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
|||||||
callHdr := &transport.CallHdr{
|
callHdr := &transport.CallHdr{
|
||||||
Host: cc.authority,
|
Host: cc.authority,
|
||||||
Method: method,
|
Method: method,
|
||||||
|
Flush: desc.ServerStreams,
|
||||||
}
|
}
|
||||||
if cp != nil {
|
if cp != nil {
|
||||||
callHdr.SendCompress = cp.Type()
|
callHdr.SendCompress = cp.Type()
|
||||||
|
@ -353,6 +353,10 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||||||
} else {
|
} else {
|
||||||
endHeaders = true
|
endHeaders = true
|
||||||
}
|
}
|
||||||
|
var flush bool
|
||||||
|
if endHeaders && (hasMD || callHdr.Flush) {
|
||||||
|
flush = true
|
||||||
|
}
|
||||||
if first {
|
if first {
|
||||||
// Sends a HeadersFrame to server to start a new stream.
|
// Sends a HeadersFrame to server to start a new stream.
|
||||||
p := http2.HeadersFrameParam{
|
p := http2.HeadersFrameParam{
|
||||||
@ -364,11 +368,11 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||||||
// Do a force flush for the buffered frames iff it is the last headers frame
|
// Do a force flush for the buffered frames iff it is the last headers frame
|
||||||
// and there is header metadata to be sent. Otherwise, there is flushing until
|
// and there is header metadata to be sent. Otherwise, there is flushing until
|
||||||
// the corresponding data frame is written.
|
// the corresponding data frame is written.
|
||||||
err = t.framer.writeHeaders(hasMD && endHeaders, p)
|
err = t.framer.writeHeaders(flush, p)
|
||||||
first = false
|
first = false
|
||||||
} else {
|
} else {
|
||||||
// Sends Continuation frames for the leftover headers.
|
// Sends Continuation frames for the leftover headers.
|
||||||
err = t.framer.writeContinuation(hasMD && endHeaders, s.id, endHeaders, t.hBuf.Next(size))
|
err = t.framer.writeContinuation(flush, s.id, endHeaders, t.hBuf.Next(size))
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.notifyError(err)
|
t.notifyError(err)
|
||||||
|
@ -369,6 +369,10 @@ type CallHdr struct {
|
|||||||
RecvCompress string
|
RecvCompress string
|
||||||
// SendCompress specifies the compression algorithm applied on outbound message.
|
// SendCompress specifies the compression algorithm applied on outbound message.
|
||||||
SendCompress string
|
SendCompress string
|
||||||
|
// Flush indicates if new stream command should be sent to the peer without
|
||||||
|
// waiting for the first data. This is a hint though. The transport may modify
|
||||||
|
// the flush decision for performance purpose.
|
||||||
|
Flush bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// ClientTransport is the common interface for all gRPC client side transport
|
// ClientTransport is the common interface for all gRPC client side transport
|
||||||
|
Reference in New Issue
Block a user