refactor a bit
This commit is contained in:
@ -166,15 +166,16 @@ func newHTTP2Client(addr string, opts *DialOptions) (_ ClientTransport, err erro
|
|||||||
return t, nil
|
return t, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *http2Client) newStream(ctx context.Context, streamID uint32, callHdr *CallHdr) *Stream {
|
func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
|
||||||
// TODO(zhaoq): Handle uint32 overflow.
|
// TODO(zhaoq): Handle uint32 overflow of Stream.id.
|
||||||
s := &Stream{
|
s := &Stream{
|
||||||
id: streamID,
|
id: t.nextID,
|
||||||
method: callHdr.Method,
|
method: callHdr.Method,
|
||||||
buf: newRecvBuffer(),
|
buf: newRecvBuffer(),
|
||||||
sendQuotaPool: newQuotaPool(initialWindowSize),
|
sendQuotaPool: newQuotaPool(initialWindowSize),
|
||||||
headerChan: make(chan struct{}),
|
headerChan: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
t.nextID += 2
|
||||||
s.windowHandler = func(n int) {
|
s.windowHandler = func(n int) {
|
||||||
t.addRecvQuota(s, n)
|
t.addRecvQuota(s, n)
|
||||||
}
|
}
|
||||||
@ -211,6 +212,18 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||||||
if _, err := wait(ctx, t.shutdownChan, t.writableChan); err != nil {
|
if _, err := wait(ctx, t.shutdownChan, t.writableChan); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
t.mu.Lock()
|
||||||
|
if t.state != reachable {
|
||||||
|
t.mu.Unlock()
|
||||||
|
return nil, ErrConnClosing
|
||||||
|
}
|
||||||
|
if uint32(len(t.activeStreams)) >= t.maxStreams {
|
||||||
|
t.mu.Unlock()
|
||||||
|
return nil, StreamErrorf(codes.Unavailable, "transport: failed to create new stream because the limit has been reached.")
|
||||||
|
}
|
||||||
|
s := t.newStream(ctx, callHdr)
|
||||||
|
t.activeStreams[s.id] = s
|
||||||
|
t.mu.Unlock()
|
||||||
// HPACK encodes various headers. Note that once WriteField(...) is
|
// HPACK encodes various headers. Note that once WriteField(...) is
|
||||||
// called, the corresponding headers/continuation frame has to be sent
|
// called, the corresponding headers/continuation frame has to be sent
|
||||||
// because hpack.Encoder is stateful.
|
// because hpack.Encoder is stateful.
|
||||||
@ -238,8 +251,6 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
first := true
|
first := true
|
||||||
streamID := t.nextID
|
|
||||||
t.nextID += 2
|
|
||||||
// Sends the headers in a single batch even when they span multiple frames.
|
// Sends the headers in a single batch even when they span multiple frames.
|
||||||
for !endHeaders {
|
for !endHeaders {
|
||||||
size := t.hBuf.Len()
|
size := t.hBuf.Len()
|
||||||
@ -251,7 +262,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||||||
if first {
|
if first {
|
||||||
// Sends a HeadersFrame to server to start a new stream.
|
// Sends a HeadersFrame to server to start a new stream.
|
||||||
p := http2.HeadersFrameParam{
|
p := http2.HeadersFrameParam{
|
||||||
StreamID: streamID,
|
StreamID: s.id,
|
||||||
BlockFragment: t.hBuf.Next(size),
|
BlockFragment: t.hBuf.Next(size),
|
||||||
EndStream: false,
|
EndStream: false,
|
||||||
EndHeaders: endHeaders,
|
EndHeaders: endHeaders,
|
||||||
@ -263,7 +274,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||||||
first = false
|
first = false
|
||||||
} else {
|
} else {
|
||||||
// Sends Continuation frames for the leftover headers.
|
// Sends Continuation frames for the leftover headers.
|
||||||
err = t.framer.writeContinuation(hasMD && endHeaders, t.nextID, endHeaders, t.hBuf.Next(size))
|
err = t.framer.writeContinuation(hasMD && endHeaders, s.id, endHeaders, t.hBuf.Next(size))
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.notifyError(err)
|
t.notifyError(err)
|
||||||
@ -271,18 +282,6 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
t.writableChan <- 0
|
t.writableChan <- 0
|
||||||
s := t.newStream(ctx, streamID, callHdr)
|
|
||||||
t.mu.Lock()
|
|
||||||
if t.state != reachable {
|
|
||||||
t.mu.Unlock()
|
|
||||||
return nil, ErrConnClosing
|
|
||||||
}
|
|
||||||
if uint32(len(t.activeStreams)) >= t.maxStreams {
|
|
||||||
t.mu.Unlock()
|
|
||||||
return nil, StreamErrorf(codes.Unavailable, "transport: failed to create new stream because the limit has been reached.")
|
|
||||||
}
|
|
||||||
t.activeStreams[s.id] = s
|
|
||||||
t.mu.Unlock()
|
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user