grpc: implementation of PreparedMsg API
grpc: implementation of PreparedMsg API
This commit is contained in:

committed by
Can Guler

parent
d7af56a5e4
commit
8260df7a61
@ -22,7 +22,7 @@ Package main provides benchmark with setting flags.
|
|||||||
An example to run some benchmarks with profiling enabled:
|
An example to run some benchmarks with profiling enabled:
|
||||||
|
|
||||||
go run benchmark/benchmain/main.go -benchtime=10s -workloads=all \
|
go run benchmark/benchmain/main.go -benchtime=10s -workloads=all \
|
||||||
-compression=on -maxConcurrentCalls=1 -trace=off \
|
-compression=gzip -maxConcurrentCalls=1 -trace=off \
|
||||||
-reqSizeBytes=1,1048576 -respSizeBytes=1,1048576 -networkMode=Local \
|
-reqSizeBytes=1,1048576 -respSizeBytes=1,1048576 -networkMode=Local \
|
||||||
-cpuProfile=cpuProf -memProfile=memProf -memProfileRate=10000 -resultFile=result
|
-cpuProfile=cpuProf -memProfile=memProf -memProfileRate=10000 -resultFile=result
|
||||||
|
|
||||||
@ -74,10 +74,16 @@ const (
|
|||||||
modeOn = "on"
|
modeOn = "on"
|
||||||
modeOff = "off"
|
modeOff = "off"
|
||||||
modeBoth = "both"
|
modeBoth = "both"
|
||||||
|
|
||||||
|
// compression modes
|
||||||
|
modeAll = "all"
|
||||||
|
modeGzip = "gzip"
|
||||||
|
modeNop = "nop"
|
||||||
)
|
)
|
||||||
|
|
||||||
var allCompressionModes = []string{modeOn, modeOff, modeBoth}
|
var allCompressionModes = []string{modeOff, modeGzip, modeNop, modeAll}
|
||||||
var allTraceModes = []string{modeOn, modeOff, modeBoth}
|
var allTraceModes = []string{modeOn, modeOff, modeBoth}
|
||||||
|
var allPreloaderModes = []string{modeOn, modeOff, modeBoth}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
workloadsUnary = "unary"
|
workloadsUnary = "unary"
|
||||||
@ -102,7 +108,8 @@ var (
|
|||||||
benchtime time.Duration
|
benchtime time.Duration
|
||||||
memProfile, cpuProfile string
|
memProfile, cpuProfile string
|
||||||
memProfileRate int
|
memProfileRate int
|
||||||
enableCompressor []bool
|
modeCompressor []string
|
||||||
|
enablePreloader []bool
|
||||||
enableChannelz []bool
|
enableChannelz []bool
|
||||||
networkMode string
|
networkMode string
|
||||||
benchmarkResultFile string
|
benchmarkResultFile string
|
||||||
@ -127,7 +134,13 @@ func streamBenchmark(startTimer func(), stopTimer func(uint64), benchFeatures st
|
|||||||
}
|
}
|
||||||
|
|
||||||
func unconstrainedStreamBenchmark(benchFeatures stats.Features, warmuptime, benchtime time.Duration) (uint64, uint64) {
|
func unconstrainedStreamBenchmark(benchFeatures stats.Features, warmuptime, benchtime time.Duration) (uint64, uint64) {
|
||||||
sender, recver, cleanup := makeFuncUnconstrainedStream(benchFeatures)
|
var sender, recver func(int)
|
||||||
|
var cleanup func()
|
||||||
|
if benchFeatures.EnablePreloader {
|
||||||
|
sender, recver, cleanup = makeFuncUnconstrainedStreamPreloaded(benchFeatures)
|
||||||
|
} else {
|
||||||
|
sender, recver, cleanup = makeFuncUnconstrainedStream(benchFeatures)
|
||||||
|
}
|
||||||
defer cleanup()
|
defer cleanup()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -177,7 +190,7 @@ func makeClient(benchFeatures stats.Features) (testpb.BenchmarkServiceClient, fu
|
|||||||
nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu}
|
nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu}
|
||||||
opts := []grpc.DialOption{}
|
opts := []grpc.DialOption{}
|
||||||
sopts := []grpc.ServerOption{}
|
sopts := []grpc.ServerOption{}
|
||||||
if benchFeatures.EnableCompressor {
|
if benchFeatures.ModeCompressor == "nop" {
|
||||||
sopts = append(sopts,
|
sopts = append(sopts,
|
||||||
grpc.RPCCompressor(nopCompressor{}),
|
grpc.RPCCompressor(nopCompressor{}),
|
||||||
grpc.RPCDecompressor(nopDecompressor{}),
|
grpc.RPCDecompressor(nopDecompressor{}),
|
||||||
@ -187,6 +200,16 @@ func makeClient(benchFeatures stats.Features) (testpb.BenchmarkServiceClient, fu
|
|||||||
grpc.WithDecompressor(nopDecompressor{}),
|
grpc.WithDecompressor(nopDecompressor{}),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
if benchFeatures.ModeCompressor == "gzip" {
|
||||||
|
sopts = append(sopts,
|
||||||
|
grpc.RPCCompressor(grpc.NewGZIPCompressor()),
|
||||||
|
grpc.RPCDecompressor(grpc.NewGZIPDecompressor()),
|
||||||
|
)
|
||||||
|
opts = append(opts,
|
||||||
|
grpc.WithCompressor(grpc.NewGZIPCompressor()),
|
||||||
|
grpc.WithDecompressor(grpc.NewGZIPDecompressor()),
|
||||||
|
)
|
||||||
|
}
|
||||||
sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1)))
|
sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1)))
|
||||||
opts = append(opts, grpc.WithInsecure())
|
opts = append(opts, grpc.WithInsecure())
|
||||||
|
|
||||||
@ -242,7 +265,36 @@ func makeFuncStream(benchFeatures stats.Features) (func(int), func()) {
|
|||||||
}, cleanup
|
}, cleanup
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func makeFuncUnconstrainedStreamPreloaded(benchFeatures stats.Features) (func(int), func(int), func()) {
|
||||||
|
streams, req, cleanup := setupUnconstrainedStream(benchFeatures)
|
||||||
|
|
||||||
|
preparedMsg := make([]*grpc.PreparedMsg, len(streams))
|
||||||
|
for i, stream := range streams {
|
||||||
|
preparedMsg[i] = &grpc.PreparedMsg{}
|
||||||
|
err := preparedMsg[i].Encode(stream, req)
|
||||||
|
if err != nil {
|
||||||
|
grpclog.Fatalf("%v.Encode(%v, %v) = %v", preparedMsg[i], req, stream, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return func(pos int) {
|
||||||
|
streams[pos].SendMsg(preparedMsg[pos])
|
||||||
|
}, func(pos int) {
|
||||||
|
streams[pos].Recv()
|
||||||
|
}, cleanup
|
||||||
|
}
|
||||||
|
|
||||||
func makeFuncUnconstrainedStream(benchFeatures stats.Features) (func(int), func(int), func()) {
|
func makeFuncUnconstrainedStream(benchFeatures stats.Features) (func(int), func(int), func()) {
|
||||||
|
streams, req, cleanup := setupUnconstrainedStream(benchFeatures)
|
||||||
|
|
||||||
|
return func(pos int) {
|
||||||
|
streams[pos].Send(req)
|
||||||
|
}, func(pos int) {
|
||||||
|
streams[pos].Recv()
|
||||||
|
}, cleanup
|
||||||
|
}
|
||||||
|
|
||||||
|
func setupUnconstrainedStream(benchFeatures stats.Features) ([]testpb.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, func()) {
|
||||||
tc, cleanup := makeClient(benchFeatures)
|
tc, cleanup := makeClient(benchFeatures)
|
||||||
|
|
||||||
streams := make([]testpb.BenchmarkService_StreamingCallClient, benchFeatures.MaxConcurrentCalls)
|
streams := make([]testpb.BenchmarkService_StreamingCallClient, benchFeatures.MaxConcurrentCalls)
|
||||||
@ -261,11 +313,7 @@ func makeFuncUnconstrainedStream(benchFeatures stats.Features) (func(int), func(
|
|||||||
Payload: pl,
|
Payload: pl,
|
||||||
}
|
}
|
||||||
|
|
||||||
return func(pos int) {
|
return streams, req, cleanup
|
||||||
streams[pos].Send(req)
|
|
||||||
}, func(pos int) {
|
|
||||||
streams[pos].Recv()
|
|
||||||
}, cleanup
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func unaryCaller(client testpb.BenchmarkServiceClient, reqSize, respSize int) {
|
func unaryCaller(client testpb.BenchmarkServiceClient, reqSize, respSize int) {
|
||||||
@ -323,6 +371,7 @@ var useBufconn = flag.Bool("bufconn", false, "Use in-memory connection instead o
|
|||||||
func init() {
|
func init() {
|
||||||
var (
|
var (
|
||||||
workloads, traceMode, compressorMode, readLatency, channelzOn string
|
workloads, traceMode, compressorMode, readLatency, channelzOn string
|
||||||
|
preloaderMode string
|
||||||
readKbps, readMtu, readMaxConcurrentCalls intSliceType
|
readKbps, readMtu, readMaxConcurrentCalls intSliceType
|
||||||
readReqSizeBytes, readRespSizeBytes intSliceType
|
readReqSizeBytes, readRespSizeBytes intSliceType
|
||||||
)
|
)
|
||||||
@ -345,6 +394,8 @@ func init() {
|
|||||||
flag.StringVar(&cpuProfile, "cpuProfile", "", "Enables CPU profiling output to the filename provided")
|
flag.StringVar(&cpuProfile, "cpuProfile", "", "Enables CPU profiling output to the filename provided")
|
||||||
flag.StringVar(&compressorMode, "compression", modeOff,
|
flag.StringVar(&compressorMode, "compression", modeOff,
|
||||||
fmt.Sprintf("Compression mode - One of: %v", strings.Join(allCompressionModes, ", ")))
|
fmt.Sprintf("Compression mode - One of: %v", strings.Join(allCompressionModes, ", ")))
|
||||||
|
flag.StringVar(&preloaderMode, "preloader", modeOff,
|
||||||
|
fmt.Sprintf("Preloader mode - One of: %v", strings.Join(allPreloaderModes, ", ")))
|
||||||
flag.StringVar(&benchmarkResultFile, "resultFile", "", "Save the benchmark result into a binary file")
|
flag.StringVar(&benchmarkResultFile, "resultFile", "", "Save the benchmark result into a binary file")
|
||||||
flag.StringVar(&networkMode, "networkMode", "", "Network mode includes LAN, WAN, Local and Longhaul")
|
flag.StringVar(&networkMode, "networkMode", "", "Network mode includes LAN, WAN, Local and Longhaul")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
@ -372,7 +423,8 @@ func init() {
|
|||||||
log.Fatalf("Unknown workloads setting: %v (want one of: %v)",
|
log.Fatalf("Unknown workloads setting: %v (want one of: %v)",
|
||||||
workloads, strings.Join(allWorkloads, ", "))
|
workloads, strings.Join(allWorkloads, ", "))
|
||||||
}
|
}
|
||||||
enableCompressor = setMode(compressorMode)
|
modeCompressor = setModeCompressor(compressorMode)
|
||||||
|
enablePreloader = setMode(preloaderMode)
|
||||||
enableTrace = setMode(traceMode)
|
enableTrace = setMode(traceMode)
|
||||||
enableChannelz = setMode(channelzOn)
|
enableChannelz = setMode(channelzOn)
|
||||||
// Time input formats as (time + unit).
|
// Time input formats as (time + unit).
|
||||||
@ -400,11 +452,28 @@ func setMode(name string) []bool {
|
|||||||
return []bool{false, true}
|
return []bool{false, true}
|
||||||
default:
|
default:
|
||||||
log.Fatalf("Unknown %s setting: %v (want one of: %v)",
|
log.Fatalf("Unknown %s setting: %v (want one of: %v)",
|
||||||
name, name, strings.Join(allCompressionModes, ", "))
|
name, name, strings.Join(allTraceModes, ", "))
|
||||||
return []bool{}
|
return []bool{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func setModeCompressor(name string) []string {
|
||||||
|
switch name {
|
||||||
|
case modeNop:
|
||||||
|
return []string{"nop"}
|
||||||
|
case modeGzip:
|
||||||
|
return []string{"gzip"}
|
||||||
|
case modeAll:
|
||||||
|
return []string{"off", "nop", "gzip"}
|
||||||
|
case modeOff:
|
||||||
|
return []string{"off"}
|
||||||
|
default:
|
||||||
|
log.Fatalf("Unknown %s setting: %v (want one of: %v)",
|
||||||
|
name, name, strings.Join(allCompressionModes, ", "))
|
||||||
|
return []string{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type intSliceType []int
|
type intSliceType []int
|
||||||
|
|
||||||
func (intSlice *intSliceType) String() string {
|
func (intSlice *intSliceType) String() string {
|
||||||
@ -456,10 +525,10 @@ func printThroughput(requestCount uint64, requestSize int, responseCount uint64,
|
|||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
before()
|
before()
|
||||||
featuresPos := make([]int, 9)
|
featuresPos := make([]int, 10)
|
||||||
// 0:enableTracing 1:ltc 2:kbps 3:mtu 4:maxC 5:reqSize 6:respSize
|
// 0:enableTracing 1:ltc 2:kbps 3:mtu 4:maxC 5:reqSize 6:respSize
|
||||||
featuresNum := []int{len(enableTrace), len(ltc), len(kbps), len(mtu),
|
featuresNum := []int{len(enableTrace), len(ltc), len(kbps), len(mtu),
|
||||||
len(maxConcurrentCalls), len(reqSizeBytes), len(respSizeBytes), len(enableCompressor), len(enableChannelz)}
|
len(maxConcurrentCalls), len(reqSizeBytes), len(respSizeBytes), len(modeCompressor), len(enableChannelz), len(enablePreloader)}
|
||||||
initalPos := make([]int, len(featuresPos))
|
initalPos := make([]int, len(featuresPos))
|
||||||
s := stats.NewStats(10)
|
s := stats.NewStats(10)
|
||||||
s.SortLatency()
|
s.SortLatency()
|
||||||
@ -499,8 +568,9 @@ func main() {
|
|||||||
MaxConcurrentCalls: maxConcurrentCalls[featuresPos[4]],
|
MaxConcurrentCalls: maxConcurrentCalls[featuresPos[4]],
|
||||||
ReqSizeBytes: reqSizeBytes[featuresPos[5]],
|
ReqSizeBytes: reqSizeBytes[featuresPos[5]],
|
||||||
RespSizeBytes: respSizeBytes[featuresPos[6]],
|
RespSizeBytes: respSizeBytes[featuresPos[6]],
|
||||||
EnableCompressor: enableCompressor[featuresPos[7]],
|
ModeCompressor: modeCompressor[featuresPos[7]],
|
||||||
EnableChannelz: enableChannelz[featuresPos[8]],
|
EnableChannelz: enableChannelz[featuresPos[8]],
|
||||||
|
EnablePreloader: enablePreloader[featuresPos[9]],
|
||||||
}
|
}
|
||||||
|
|
||||||
grpc.EnableTracing = enableTrace[featuresPos[0]]
|
grpc.EnableTracing = enableTrace[featuresPos[0]]
|
||||||
|
@ -39,21 +39,22 @@ type Features struct {
|
|||||||
MaxConcurrentCalls int
|
MaxConcurrentCalls int
|
||||||
ReqSizeBytes int
|
ReqSizeBytes int
|
||||||
RespSizeBytes int
|
RespSizeBytes int
|
||||||
EnableCompressor bool
|
ModeCompressor string
|
||||||
EnableChannelz bool
|
EnableChannelz bool
|
||||||
|
EnablePreloader bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// String returns the textual output of the Features as string.
|
// String returns the textual output of the Features as string.
|
||||||
func (f Features) String() string {
|
func (f Features) String() string {
|
||||||
return fmt.Sprintf("traceMode_%t-latency_%s-kbps_%#v-MTU_%#v-maxConcurrentCalls_"+
|
return fmt.Sprintf("traceMode_%t-latency_%s-kbps_%#v-MTU_%#v-maxConcurrentCalls_"+
|
||||||
"%#v-reqSize_%#vB-respSize_%#vB-Compressor_%t", f.EnableTrace,
|
"%#v-reqSize_%#vB-respSize_%#vB-Compressor_%s-Preloader_%t", f.EnableTrace,
|
||||||
f.Latency.String(), f.Kbps, f.Mtu, f.MaxConcurrentCalls, f.ReqSizeBytes, f.RespSizeBytes, f.EnableCompressor)
|
f.Latency.String(), f.Kbps, f.Mtu, f.MaxConcurrentCalls, f.ReqSizeBytes, f.RespSizeBytes, f.ModeCompressor, f.EnablePreloader)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ConciseString returns the concise textual output of the Features as string, skipping
|
// ConciseString returns the concise textual output of the Features as string, skipping
|
||||||
// setting with default value.
|
// setting with default value.
|
||||||
func (f Features) ConciseString() string {
|
func (f Features) ConciseString() string {
|
||||||
noneEmptyPos := []bool{f.EnableTrace, f.Latency != 0, f.Kbps != 0, f.Mtu != 0, true, true, true, f.EnableCompressor, f.EnableChannelz}
|
noneEmptyPos := []bool{f.EnableTrace, f.Latency != 0, f.Kbps != 0, f.Mtu != 0, true, true, true, f.ModeCompressor != "off", f.EnableChannelz, f.EnablePreloader}
|
||||||
return PartialPrintString(noneEmptyPos, f, false)
|
return PartialPrintString(noneEmptyPos, f, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -99,11 +100,14 @@ func PartialPrintString(noneEmptyPos []bool, f Features, shared bool) string {
|
|||||||
s += fmt.Sprintf("%srespSize%s%#vB%s", prefix, linker, f.RespSizeBytes, suffix)
|
s += fmt.Sprintf("%srespSize%s%#vB%s", prefix, linker, f.RespSizeBytes, suffix)
|
||||||
}
|
}
|
||||||
if noneEmptyPos[7] {
|
if noneEmptyPos[7] {
|
||||||
s += fmt.Sprintf("%sCompressor%s%t%s", prefix, linker, f.EnableCompressor, suffix)
|
s += fmt.Sprintf("%sCompressor%s%s%s", prefix, linker, f.ModeCompressor, suffix)
|
||||||
}
|
}
|
||||||
if noneEmptyPos[8] {
|
if noneEmptyPos[8] {
|
||||||
s += fmt.Sprintf("%sChannelz%s%t%s", prefix, linker, f.EnableChannelz, suffix)
|
s += fmt.Sprintf("%sChannelz%s%t%s", prefix, linker, f.EnableChannelz, suffix)
|
||||||
}
|
}
|
||||||
|
if noneEmptyPos[9] {
|
||||||
|
s += fmt.Sprintf("%sPreloader%s%t%s", prefix, linker, f.EnablePreloader, suffix)
|
||||||
|
}
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
66
preloader.go
Normal file
66
preloader.go
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
/*
|
||||||
|
*
|
||||||
|
* Copyright 2019 gRPC authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
package grpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
|
)
|
||||||
|
|
||||||
|
// PreparedMsg is responsible for creating a Marshalled and Compressed object
|
||||||
|
type PreparedMsg struct {
|
||||||
|
// Struct for preparing msg before sending them
|
||||||
|
encodedData []byte
|
||||||
|
hdr []byte
|
||||||
|
payload []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// Encode is responsible for preprocessing the data using relevant information
|
||||||
|
// from the stream's Context
|
||||||
|
// TODO(prannayk) : if something changes then mark prepared msg as old
|
||||||
|
// Encode : marshal and compresses data based on stream context
|
||||||
|
// Returns error in case of error
|
||||||
|
func (p *PreparedMsg) Encode(s Stream, msg interface{}) error {
|
||||||
|
ctx := s.Context()
|
||||||
|
rpcInfo, ok := rpcInfoFromContext(ctx)
|
||||||
|
if !ok {
|
||||||
|
return status.Errorf(codes.Internal, "grpc: unable to get rpcInfo")
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if the context has the relevant information to prepareMsg
|
||||||
|
if rpcInfo.preloaderInfo == nil {
|
||||||
|
return status.Errorf(codes.Internal, "grpc: rpcInfo.preloaderInfo is nil")
|
||||||
|
}
|
||||||
|
if rpcInfo.preloaderInfo.codec == nil {
|
||||||
|
return status.Errorf(codes.Internal, "grpc: rpcInfo.preloaderInfo.codec is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
// prepare the msg
|
||||||
|
data, err := encode(rpcInfo.preloaderInfo.codec, msg)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
p.encodedData = data
|
||||||
|
compData, err := compress(data, rpcInfo.preloaderInfo.cp, rpcInfo.preloaderInfo.comp)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
p.hdr, p.payload = msgHeader(data, compData)
|
||||||
|
return nil
|
||||||
|
}
|
24
rpc_util.go
24
rpc_util.go
@ -694,14 +694,34 @@ func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interf
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Information about RPC
|
||||||
type rpcInfo struct {
|
type rpcInfo struct {
|
||||||
failfast bool
|
failfast bool
|
||||||
|
preloaderInfo *compressorInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
// Information about Preloader
|
||||||
|
// Responsible for storing codec, and compressors
|
||||||
|
// If stream (s) has context s.Context which stores rpcInfo that has non nil
|
||||||
|
// pointers to codec, and compressors, then we can use preparedMsg for Async message prep
|
||||||
|
// and reuse marshalled bytes
|
||||||
|
type compressorInfo struct {
|
||||||
|
codec baseCodec
|
||||||
|
cp Compressor
|
||||||
|
comp encoding.Compressor
|
||||||
}
|
}
|
||||||
|
|
||||||
type rpcInfoContextKey struct{}
|
type rpcInfoContextKey struct{}
|
||||||
|
|
||||||
func newContextWithRPCInfo(ctx context.Context, failfast bool) context.Context {
|
func newContextWithRPCInfo(ctx context.Context, failfast bool, codec baseCodec, cp Compressor, comp encoding.Compressor) context.Context {
|
||||||
return context.WithValue(ctx, rpcInfoContextKey{}, &rpcInfo{failfast: failfast})
|
return context.WithValue(ctx, rpcInfoContextKey{}, &rpcInfo{
|
||||||
|
failfast: failfast,
|
||||||
|
preloaderInfo: &compressorInfo{
|
||||||
|
codec: codec,
|
||||||
|
cp: cp,
|
||||||
|
comp: comp,
|
||||||
|
},
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func rpcInfoFromContext(ctx context.Context) (s *rpcInfo, ok bool) {
|
func rpcInfoFromContext(ctx context.Context) (s *rpcInfo, ok bool) {
|
||||||
|
53
stream.go
53
stream.go
@ -245,7 +245,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
|||||||
trInfo.tr.LazyLog(&trInfo.firstLine, false)
|
trInfo.tr.LazyLog(&trInfo.firstLine, false)
|
||||||
ctx = trace.NewContext(ctx, trInfo.tr)
|
ctx = trace.NewContext(ctx, trInfo.tr)
|
||||||
}
|
}
|
||||||
ctx = newContextWithRPCInfo(ctx, c.failFast)
|
ctx = newContextWithRPCInfo(ctx, c.failFast, c.codec, cp, comp)
|
||||||
sh := cc.dopts.copts.StatsHandler
|
sh := cc.dopts.copts.StatsHandler
|
||||||
var beginTime time.Time
|
var beginTime time.Time
|
||||||
if sh != nil {
|
if sh != nil {
|
||||||
@ -677,15 +677,13 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
|
|||||||
if !cs.desc.ClientStreams {
|
if !cs.desc.ClientStreams {
|
||||||
cs.sentLast = true
|
cs.sentLast = true
|
||||||
}
|
}
|
||||||
data, err := encode(cs.codec, m)
|
|
||||||
|
// load hdr, payload, data
|
||||||
|
hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
compData, err := compress(data, cs.cp, cs.comp)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
hdr, payload := msgHeader(data, compData)
|
|
||||||
// TODO(dfawley): should we be checking len(data) instead?
|
// TODO(dfawley): should we be checking len(data) instead?
|
||||||
if len(payload) > *cs.callInfo.maxSendMessageSize {
|
if len(payload) > *cs.callInfo.maxSendMessageSize {
|
||||||
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
|
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
|
||||||
@ -1150,15 +1148,13 @@ func (as *addrConnStream) SendMsg(m interface{}) (err error) {
|
|||||||
if !as.desc.ClientStreams {
|
if !as.desc.ClientStreams {
|
||||||
as.sentLast = true
|
as.sentLast = true
|
||||||
}
|
}
|
||||||
data, err := encode(as.codec, m)
|
|
||||||
|
// load hdr, payload, data
|
||||||
|
hdr, payld, _, err := prepareMsg(m, as.codec, as.cp, as.comp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
compData, err := compress(data, as.cp, as.comp)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
hdr, payld := msgHeader(data, compData)
|
|
||||||
// TODO(dfawley): should we be checking len(data) instead?
|
// TODO(dfawley): should we be checking len(data) instead?
|
||||||
if len(payld) > *as.callInfo.maxSendMessageSize {
|
if len(payld) > *as.callInfo.maxSendMessageSize {
|
||||||
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize)
|
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize)
|
||||||
@ -1395,15 +1391,13 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
|
|||||||
ss.t.IncrMsgSent()
|
ss.t.IncrMsgSent()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
data, err := encode(ss.codec, m)
|
|
||||||
|
// load hdr, payload, data
|
||||||
|
hdr, payload, data, err := prepareMsg(m, ss.codec, ss.cp, ss.comp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
compData, err := compress(data, ss.cp, ss.comp)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
hdr, payload := msgHeader(data, compData)
|
|
||||||
// TODO(dfawley): should we be checking len(data) instead?
|
// TODO(dfawley): should we be checking len(data) instead?
|
||||||
if len(payload) > ss.maxSendMessageSize {
|
if len(payload) > ss.maxSendMessageSize {
|
||||||
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
|
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
|
||||||
@ -1496,3 +1490,24 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
|
|||||||
func MethodFromServerStream(stream ServerStream) (string, bool) {
|
func MethodFromServerStream(stream ServerStream) (string, bool) {
|
||||||
return Method(stream.Context())
|
return Method(stream.Context())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// prepareMsg returns the hdr, payload and data
|
||||||
|
// using the compressors passed or using the
|
||||||
|
// passed preparedmsg
|
||||||
|
func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) {
|
||||||
|
if preparedMsg, ok := m.(*PreparedMsg); ok {
|
||||||
|
return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, nil
|
||||||
|
}
|
||||||
|
// The input interface is not a prepared msg.
|
||||||
|
// Marshal and Compress the data at this point
|
||||||
|
data, err = encode(codec, m)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil, err
|
||||||
|
}
|
||||||
|
compData, err := compress(data, cp, comp)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil, err
|
||||||
|
}
|
||||||
|
hdr, payload = msgHeader(data, compData)
|
||||||
|
return hdr, payload, data, nil
|
||||||
|
}
|
||||||
|
@ -2076,6 +2076,80 @@ func (s) TestStreamingRPCWithTimeoutInServiceConfigRecv(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s) TestPreloaderClientSend(t *testing.T) {
|
||||||
|
for _, e := range listTestEnv() {
|
||||||
|
testPreloaderClientSend(t, e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testPreloaderClientSend(t *testing.T, e env) {
|
||||||
|
te := newTest(t, e)
|
||||||
|
te.userAgent = testAppUA
|
||||||
|
te.declareLogNoise(
|
||||||
|
"transport: http2Client.notifyError got notified that the client transport was broken EOF",
|
||||||
|
"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
|
||||||
|
"grpc: addrConn.resetTransport failed to create client transport: connection error",
|
||||||
|
"Failed to dial : context canceled; please retry.",
|
||||||
|
)
|
||||||
|
te.startServer(&testServer{security: e.security})
|
||||||
|
|
||||||
|
defer te.tearDown()
|
||||||
|
tc := testpb.NewTestServiceClient(te.clientConn())
|
||||||
|
|
||||||
|
// Test for streaming RPC recv.
|
||||||
|
// Set context for send with proper RPC Information
|
||||||
|
stream, err := tc.FullDuplexCall(te.ctx, grpc.UseCompressor("gzip"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
|
||||||
|
}
|
||||||
|
var index int
|
||||||
|
for index < len(reqSizes) {
|
||||||
|
respParam := []*testpb.ResponseParameters{
|
||||||
|
{
|
||||||
|
Size: int32(respSizes[index]),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index]))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
req := &testpb.StreamingOutputCallRequest{
|
||||||
|
ResponseType: testpb.PayloadType_COMPRESSABLE,
|
||||||
|
ResponseParameters: respParam,
|
||||||
|
Payload: payload,
|
||||||
|
}
|
||||||
|
preparedMsg := &grpc.PreparedMsg{}
|
||||||
|
err = preparedMsg.Encode(stream, req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("PrepareMsg failed for size %d : %v", reqSizes[index], err)
|
||||||
|
}
|
||||||
|
if err := stream.SendMsg(preparedMsg); err != nil {
|
||||||
|
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
|
||||||
|
}
|
||||||
|
reply, err := stream.Recv()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
|
||||||
|
}
|
||||||
|
pt := reply.GetPayload().GetType()
|
||||||
|
if pt != testpb.PayloadType_COMPRESSABLE {
|
||||||
|
t.Fatalf("Got the reply of type %d, want %d", pt, testpb.PayloadType_COMPRESSABLE)
|
||||||
|
}
|
||||||
|
size := len(reply.GetPayload().GetBody())
|
||||||
|
if size != int(respSizes[index]) {
|
||||||
|
t.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
|
||||||
|
}
|
||||||
|
index++
|
||||||
|
}
|
||||||
|
if err := stream.CloseSend(); err != nil {
|
||||||
|
t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
|
||||||
|
}
|
||||||
|
if _, err := stream.Recv(); err != io.EOF {
|
||||||
|
t.Fatalf("%v failed to complele the ping pong test: %v", stream, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s) TestMaxMsgSizeClientDefault(t *testing.T) {
|
func (s) TestMaxMsgSizeClientDefault(t *testing.T) {
|
||||||
for _, e := range listTestEnv() {
|
for _, e := range listTestEnv() {
|
||||||
testMaxMsgSizeClientDefault(t, e)
|
testMaxMsgSizeClientDefault(t, e)
|
||||||
|
Reference in New Issue
Block a user