update the merge of client api and sc
This commit is contained in:
48
call.go
48
call.go
@ -152,42 +152,20 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
|
|||||||
|
|
||||||
func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) {
|
func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) {
|
||||||
c := defaultCallInfo
|
c := defaultCallInfo
|
||||||
maxReceiveMessageSize := defaultClientMaxReceiveMessageSize
|
mc := cc.GetMethodConfig(method)
|
||||||
maxSendMessageSize := defaultClientMaxSendMessageSize
|
if mc.WaitForReady != nil {
|
||||||
if mc, ok := cc.GetMethodConfig(method); ok {
|
c.failFast = !*mc.WaitForReady
|
||||||
if mc.WaitForReady != nil {
|
|
||||||
c.failFast = !*mc.WaitForReady
|
|
||||||
}
|
|
||||||
|
|
||||||
if mc.Timeout != nil && *mc.Timeout >= 0 {
|
|
||||||
var cancel context.CancelFunc
|
|
||||||
ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
|
|
||||||
defer cancel()
|
|
||||||
}
|
|
||||||
|
|
||||||
if mc.MaxReqSize != nil && cc.dopts.maxSendMessageSize >= 0 {
|
|
||||||
maxSendMessageSize = min(*mc.MaxReqSize, cc.dopts.maxSendMessageSize)
|
|
||||||
} else if mc.MaxReqSize != nil {
|
|
||||||
maxSendMessageSize = *mc.MaxReqSize
|
|
||||||
} else if mc.MaxReqSize == nil && cc.dopts.maxSendMessageSize >= 0 {
|
|
||||||
maxSendMessageSize = cc.dopts.maxSendMessageSize
|
|
||||||
}
|
|
||||||
|
|
||||||
if mc.MaxRespSize != nil && cc.dopts.maxReceiveMessageSize >= 0 {
|
|
||||||
maxReceiveMessageSize = min(*mc.MaxRespSize, cc.dopts.maxReceiveMessageSize)
|
|
||||||
} else if mc.MaxRespSize != nil {
|
|
||||||
maxReceiveMessageSize = *mc.MaxRespSize
|
|
||||||
} else if mc.MaxRespSize == nil && cc.dopts.maxReceiveMessageSize >= 0 {
|
|
||||||
maxReceiveMessageSize = cc.dopts.maxReceiveMessageSize
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if cc.dopts.maxSendMessageSize >= 0 {
|
|
||||||
maxSendMessageSize = cc.dopts.maxSendMessageSize
|
|
||||||
}
|
|
||||||
if cc.dopts.maxReceiveMessageSize >= 0 {
|
|
||||||
maxReceiveMessageSize = cc.dopts.maxReceiveMessageSize
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if mc.Timeout != nil && *mc.Timeout >= 0 {
|
||||||
|
var cancel context.CancelFunc
|
||||||
|
ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
|
||||||
|
defer cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
maxSendMessageSize := getMaxSize(mc.MaxReqSize, cc.dopts.maxSendMessageSize, defaultClientMaxSendMessageSize)
|
||||||
|
maxReceiveMessageSize := getMaxSize(mc.MaxRespSize, cc.dopts.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
|
||||||
|
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
if err := o.before(&c); err != nil {
|
if err := o.before(&c); err != nil {
|
||||||
return toRPCErr(err)
|
return toRPCErr(err)
|
||||||
|
@ -98,8 +98,8 @@ type dialOptions struct {
|
|||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
scChan <-chan ServiceConfig
|
scChan <-chan ServiceConfig
|
||||||
copts transport.ConnectOptions
|
copts transport.ConnectOptions
|
||||||
maxReceiveMessageSize int
|
maxReceiveMessageSize *int
|
||||||
maxSendMessageSize int
|
maxSendMessageSize *int
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -120,14 +120,14 @@ func WithMaxMsgSize(s int) DialOption {
|
|||||||
// WithMaxReceiveMessageSize returns a DialOption which sets the maximum message size the client can receive. Negative input is invalid and has the same effect as not setting the field.
|
// WithMaxReceiveMessageSize returns a DialOption which sets the maximum message size the client can receive. Negative input is invalid and has the same effect as not setting the field.
|
||||||
func WithMaxReceiveMessageSize(s int) DialOption {
|
func WithMaxReceiveMessageSize(s int) DialOption {
|
||||||
return func(o *dialOptions) {
|
return func(o *dialOptions) {
|
||||||
o.maxReceiveMessageSize = s
|
*o.maxReceiveMessageSize = s
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithMaxSendMessageSize returns a DialOption which sets the maximum message size the client can send. Negative input is invalid and has the same effect as not seeting the field.
|
// WithMaxSendMessageSize returns a DialOption which sets the maximum message size the client can send. Negative input is invalid and has the same effect as not seeting the field.
|
||||||
func WithMaxSendMessageSize(s int) DialOption {
|
func WithMaxSendMessageSize(s int) DialOption {
|
||||||
return func(o *dialOptions) {
|
return func(o *dialOptions) {
|
||||||
o.maxSendMessageSize = s
|
*o.maxSendMessageSize = s
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -324,10 +324,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
|
|||||||
}
|
}
|
||||||
cc.ctx, cc.cancel = context.WithCancel(context.Background())
|
cc.ctx, cc.cancel = context.WithCancel(context.Background())
|
||||||
|
|
||||||
// initialize maxReceiveMessageSize and maxSendMessageSize to -1 before applying DialOption functions to distinguish whether the user set the message limit or not.
|
|
||||||
cc.dopts.maxReceiveMessageSize = -1
|
|
||||||
cc.dopts.maxSendMessageSize = -1
|
|
||||||
|
|
||||||
for _, opt := range opts {
|
for _, opt := range opts {
|
||||||
opt(&cc.dopts)
|
opt(&cc.dopts)
|
||||||
}
|
}
|
||||||
@ -646,13 +642,13 @@ func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error)
|
|||||||
|
|
||||||
// GetMethodConfig gets the method config of the input method. If there's no exact match for the input method (i.e. /service/method), we will return the default config for all methods under the service (/service/).
|
// GetMethodConfig gets the method config of the input method. If there's no exact match for the input method (i.e. /service/method), we will return the default config for all methods under the service (/service/).
|
||||||
// TODO: Avoid the locking here.
|
// TODO: Avoid the locking here.
|
||||||
func (cc *ClientConn) GetMethodConfig(method string) (m MethodConfig, ok bool) {
|
func (cc *ClientConn) GetMethodConfig(method string) (m MethodConfig) {
|
||||||
cc.mu.RLock()
|
cc.mu.RLock()
|
||||||
defer cc.mu.RUnlock()
|
defer cc.mu.RUnlock()
|
||||||
m, ok = cc.sc.Methods[method]
|
m, ok := cc.sc.Methods[method]
|
||||||
if !ok {
|
if !ok {
|
||||||
i := strings.LastIndex(method, "/")
|
i := strings.LastIndex(method, "/")
|
||||||
m, ok = cc.sc.Methods[method[:i+1]]
|
m, _ = cc.sc.Methods[method[:i+1]]
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
13
rpc_util.go
13
rpc_util.go
@ -483,4 +483,17 @@ func min(a, b int) int {
|
|||||||
return b
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getMaxSize(mcMax, doptMax *int, defaultVal int) int {
|
||||||
|
if mcMax == nil && doptMax == nil {
|
||||||
|
return defaultVal
|
||||||
|
}
|
||||||
|
if mcMax != nil && doptMax != nil {
|
||||||
|
return min(*mcMax, *doptMax)
|
||||||
|
}
|
||||||
|
if mcMax != nil {
|
||||||
|
return *mcMax
|
||||||
|
}
|
||||||
|
return *doptMax
|
||||||
|
}
|
||||||
|
|
||||||
const grpcUA = "grpc-go/" + Version
|
const grpcUA = "grpc-go/" + Version
|
||||||
|
47
stream.go
47
stream.go
@ -113,42 +113,19 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
|||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
)
|
)
|
||||||
c := defaultCallInfo
|
c := defaultCallInfo
|
||||||
maxReceiveMessageSize := defaultClientMaxReceiveMessageSize
|
mc := cc.GetMethodConfig(method)
|
||||||
maxSendMessageSize := defaultClientMaxSendMessageSize
|
if mc.WaitForReady != nil {
|
||||||
if mc, ok := cc.GetMethodConfig(method); ok {
|
c.failFast = !*mc.WaitForReady
|
||||||
if mc.WaitForReady != nil {
|
|
||||||
c.failFast = !*mc.WaitForReady
|
|
||||||
}
|
|
||||||
|
|
||||||
if mc.Timeout != nil && *mc.Timeout >= 0 {
|
|
||||||
var cancel context.CancelFunc
|
|
||||||
ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
|
|
||||||
defer cancel()
|
|
||||||
}
|
|
||||||
|
|
||||||
if mc.MaxReqSize != nil && cc.dopts.maxSendMessageSize >= 0 {
|
|
||||||
maxSendMessageSize = min(*mc.MaxReqSize, cc.dopts.maxSendMessageSize)
|
|
||||||
} else if mc.MaxReqSize != nil {
|
|
||||||
maxSendMessageSize = *mc.MaxReqSize
|
|
||||||
} else if mc.MaxReqSize == nil && cc.dopts.maxSendMessageSize >= 0 {
|
|
||||||
maxSendMessageSize = cc.dopts.maxSendMessageSize
|
|
||||||
}
|
|
||||||
|
|
||||||
if mc.MaxRespSize != nil && cc.dopts.maxReceiveMessageSize >= 0 {
|
|
||||||
maxReceiveMessageSize = min(*mc.MaxRespSize, cc.dopts.maxReceiveMessageSize)
|
|
||||||
} else if mc.MaxRespSize != nil {
|
|
||||||
maxReceiveMessageSize = *mc.MaxRespSize
|
|
||||||
} else if mc.MaxRespSize == nil && cc.dopts.maxReceiveMessageSize >= 0 {
|
|
||||||
maxReceiveMessageSize = cc.dopts.maxReceiveMessageSize
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if cc.dopts.maxSendMessageSize >= 0 {
|
|
||||||
maxSendMessageSize = cc.dopts.maxSendMessageSize
|
|
||||||
}
|
|
||||||
if cc.dopts.maxReceiveMessageSize >= 0 {
|
|
||||||
maxReceiveMessageSize = cc.dopts.maxReceiveMessageSize
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if mc.Timeout != nil && *mc.Timeout >= 0 {
|
||||||
|
var cancel context.CancelFunc
|
||||||
|
ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
|
||||||
|
defer cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
maxSendMessageSize := getMaxSize(mc.MaxReqSize, cc.dopts.maxSendMessageSize, defaultClientMaxSendMessageSize)
|
||||||
|
maxReceiveMessageSize := getMaxSize(mc.MaxRespSize, cc.dopts.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
if err := o.before(&c); err != nil {
|
if err := o.before(&c); err != nil {
|
||||||
return nil, toRPCErr(err)
|
return nil, toRPCErr(err)
|
||||||
|
Reference in New Issue
Block a user