Update module google.golang.org/grpc to v1.76.0

Signed-off-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
This commit is contained in:
renovate[bot]
2025-11-10 13:47:59 +00:00
committed by GitHub
parent 80840578be
commit d0d10c4ccd
75 changed files with 15174 additions and 962 deletions

View File

@@ -101,9 +101,9 @@ type ClientStream interface {
// It must only be called after stream.CloseAndRecv has returned, or
// stream.Recv has returned a non-nil error (including io.EOF).
Trailer() metadata.MD
// CloseSend closes the send direction of the stream. It closes the stream
// when non-nil error is met. It is also not safe to call CloseSend
// concurrently with SendMsg.
// CloseSend closes the send direction of the stream. This method always
// returns a nil error. The status of the stream may be discovered using
// RecvMsg. It is also not safe to call CloseSend concurrently with SendMsg.
CloseSend() error
// Context returns the context for this stream.
//
@@ -212,14 +212,15 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
}
// Provide an opportunity for the first RPC to see the first service config
// provided by the resolver.
if err := cc.waitForResolvedAddrs(ctx); err != nil {
nameResolutionDelayed, err := cc.waitForResolvedAddrs(ctx)
if err != nil {
return nil, err
}
var mc serviceconfig.MethodConfig
var onCommit func()
newStream := func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, nameResolutionDelayed, opts...)
}
rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method}
@@ -257,7 +258,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
return newStream(ctx, func() {})
}
func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), nameResolutionDelayed bool, opts ...CallOption) (_ iresolver.ClientStream, err error) {
callInfo := defaultCallInfo()
if mc.WaitForReady != nil {
callInfo.failFast = !*mc.WaitForReady
@@ -296,6 +297,7 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
Method: method,
ContentSubtype: callInfo.contentSubtype,
DoneFunc: doneFunc,
Authority: callInfo.authority,
}
// Set our outgoing compression according to the UseCompressor CallOption, if
@@ -321,19 +323,20 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
}
cs := &clientStream{
callHdr: callHdr,
ctx: ctx,
methodConfig: &mc,
opts: opts,
callInfo: callInfo,
cc: cc,
desc: desc,
codec: callInfo.codec,
compressorV0: compressorV0,
compressorV1: compressorV1,
cancel: cancel,
firstAttempt: true,
onCommit: onCommit,
callHdr: callHdr,
ctx: ctx,
methodConfig: &mc,
opts: opts,
callInfo: callInfo,
cc: cc,
desc: desc,
codec: callInfo.codec,
compressorV0: compressorV0,
compressorV1: compressorV1,
cancel: cancel,
firstAttempt: true,
onCommit: onCommit,
nameResolutionDelay: nameResolutionDelayed,
}
if !cc.dopts.disableRetry {
cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
@@ -417,7 +420,7 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error)
var beginTime time.Time
shs := cs.cc.dopts.copts.StatsHandlers
for _, sh := range shs {
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast})
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast, NameResolutionDelay: cs.nameResolutionDelay})
beginTime = time.Now()
begin := &stats.Begin{
Client: true,
@@ -466,8 +469,9 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error)
func (a *csAttempt) getTransport() error {
cs := a.cs
var err error
a.transport, a.pickResult, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method)
pickInfo := balancer.PickInfo{Ctx: a.ctx, FullMethodName: cs.callHdr.Method}
pick, err := cs.cc.pickerWrapper.pick(a.ctx, cs.callInfo.failFast, pickInfo)
a.transport, a.pickResult = pick.transport, pick.result
if err != nil {
if de, ok := err.(dropError); ok {
err = de.error
@@ -478,6 +482,11 @@ func (a *csAttempt) getTransport() error {
if a.trInfo != nil {
a.trInfo.firstLine.SetRemoteAddr(a.transport.RemoteAddr())
}
if pick.blocked {
for _, sh := range a.statsHandlers {
sh.HandleRPC(a.ctx, &stats.DelayedPickComplete{})
}
}
return nil
}
@@ -540,6 +549,8 @@ type clientStream struct {
sentLast bool // sent an end stream
receivedFirstMsg bool // set after the first message is received
methodConfig *MethodConfig
ctx context.Context // the application's context, wrapped by stats/tracing
@@ -573,6 +584,9 @@ type clientStream struct {
onCommit func()
replayBuffer []replayOp // operations to replay on retry
replayBufferSize int // current size of replayBuffer
// nameResolutionDelay indicates if there was a delay in the name resolution.
// This field is only valid on client side, it's always false on server side.
nameResolutionDelay bool
}
type replayOp struct {
@@ -987,7 +1001,7 @@ func (cs *clientStream) RecvMsg(m any) error {
func (cs *clientStream) CloseSend() error {
if cs.sentLast {
// TODO: return an error and finish the stream instead, due to API misuse?
// Return a nil error on repeated calls to this method.
return nil
}
cs.sentLast = true
@@ -1008,7 +1022,10 @@ func (cs *clientStream) CloseSend() error {
binlog.Log(cs.ctx, chc)
}
}
// We never returned an error here for reasons.
// We don't return an error here as we expect users to read all messages
// from the stream and get the RPC status from RecvMsg(). Note that
// SendMsg() must return an error when one occurs so the application
// knows to stop sending messages, but that does not apply here.
return nil
}
@@ -1129,11 +1146,16 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
if statusErr := a.transportStream.Status().Err(); statusErr != nil {
return statusErr
}
// Received no msg and status OK for non-server streaming rpcs.
if !cs.desc.ServerStreams && !cs.receivedFirstMsg {
return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC")
}
return io.EOF // indicates successful end of stream.
}
return toRPCErr(err)
}
cs.receivedFirstMsg = true
if a.trInfo != nil {
a.mu.Lock()
if a.trInfo.tr != nil {
@@ -1162,7 +1184,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
} else if err != nil {
return toRPCErr(err)
}
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
}
func (a *csAttempt) finish(err error) {
@@ -1344,6 +1366,7 @@ type addrConnStream struct {
transport transport.ClientTransport
ctx context.Context
sentLast bool
receivedFirstMsg bool
desc *StreamDesc
codec baseCodec
sendCompressorV0 Compressor
@@ -1372,7 +1395,7 @@ func (as *addrConnStream) Trailer() metadata.MD {
func (as *addrConnStream) CloseSend() error {
if as.sentLast {
// TODO: return an error and finish the stream instead, due to API misuse?
// Return a nil error on repeated calls to this method.
return nil
}
as.sentLast = true
@@ -1469,10 +1492,15 @@ func (as *addrConnStream) RecvMsg(m any) (err error) {
if statusErr := as.transportStream.Status().Err(); statusErr != nil {
return statusErr
}
// Received no msg and status OK for non-server streaming rpcs.
if !as.desc.ServerStreams && !as.receivedFirstMsg {
return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC")
}
return io.EOF // indicates successful end of stream.
}
return toRPCErr(err)
}
as.receivedFirstMsg = true
if as.desc.ServerStreams {
// Subsequent messages should be received by subsequent RecvMsg calls.
@@ -1486,7 +1514,7 @@ func (as *addrConnStream) RecvMsg(m any) (err error) {
} else if err != nil {
return toRPCErr(err)
}
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
}
func (as *addrConnStream) finish(err error) {
@@ -1571,6 +1599,7 @@ type serverStream struct {
s *transport.ServerStream
p *parser
codec baseCodec
desc *StreamDesc
compressorV0 Compressor
compressorV1 encoding.Compressor
@@ -1579,6 +1608,8 @@ type serverStream struct {
sendCompressorName string
recvFirstMsg bool // set after the first message is received
maxReceiveMessageSize int
maxSendMessageSize int
trInfo *traceInfo
@@ -1765,6 +1796,10 @@ func (ss *serverStream) RecvMsg(m any) (err error) {
binlog.Log(ss.ctx, chc)
}
}
// Received no request msg for non-client streaming rpcs.
if !ss.desc.ClientStreams && !ss.recvFirstMsg {
return status.Error(codes.Internal, "cardinality violation: received no request message from non-client-streaming RPC")
}
return err
}
if err == io.ErrUnexpectedEOF {
@@ -1772,6 +1807,7 @@ func (ss *serverStream) RecvMsg(m any) (err error) {
}
return toRPCErr(err)
}
ss.recvFirstMsg = true
if len(ss.statsHandler) != 0 {
for _, sh := range ss.statsHandler {
sh.HandleRPC(ss.s.Context(), &stats.InPayload{
@@ -1791,7 +1827,19 @@ func (ss *serverStream) RecvMsg(m any) (err error) {
binlog.Log(ss.ctx, cm)
}
}
return nil
if ss.desc.ClientStreams {
// Subsequent messages should be received by subsequent RecvMsg calls.
return nil
}
// Special handling for non-client-stream rpcs.
// This recv expects EOF or errors, so we don't collect inPayload.
if err := recv(ss.p, ss.codec, ss.s, ss.decompressorV0, m, ss.maxReceiveMessageSize, nil, ss.decompressorV1, true); err == io.EOF {
return nil
} else if err != nil {
return err
}
return status.Error(codes.Internal, "cardinality violation: received multiple request messages for non-client-streaming RPC")
}
// MethodFromServerStream returns the method string for the input stream.