Set peer before sending request (#1423)
This commit is contained in:
6
call.go
6
call.go
@ -74,9 +74,6 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran
|
|||||||
dopts.copts.StatsHandler.HandleRPC(ctx, inPayload)
|
dopts.copts.StatsHandler.HandleRPC(ctx, inPayload)
|
||||||
}
|
}
|
||||||
c.trailerMD = stream.Trailer()
|
c.trailerMD = stream.Trailer()
|
||||||
if peer, ok := peer.FromContext(stream.Context()); ok {
|
|
||||||
c.peer = peer
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -262,6 +259,9 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
|
|||||||
}
|
}
|
||||||
return toRPCErr(err)
|
return toRPCErr(err)
|
||||||
}
|
}
|
||||||
|
if peer, ok := peer.FromContext(stream.Context()); ok {
|
||||||
|
c.peer = peer
|
||||||
|
}
|
||||||
err = sendRequest(ctx, cc.dopts, cc.dopts.cp, &c, callHdr, stream, t, args, topts)
|
err = sendRequest(ctx, cc.dopts, cc.dopts.cp, &c, callHdr, stream, t, args, topts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if put != nil {
|
if put != nil {
|
||||||
|
|||||||
@ -2263,6 +2263,62 @@ func testPeerNegative(t *testing.T, e env) {
|
|||||||
tc.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer))
|
tc.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPeerFailedRPC(t *testing.T) {
|
||||||
|
defer leakCheck(t)()
|
||||||
|
for _, e := range listTestEnv() {
|
||||||
|
testPeerFailedRPC(t, e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testPeerFailedRPC(t *testing.T, e env) {
|
||||||
|
te := newTest(t, e)
|
||||||
|
te.maxServerReceiveMsgSize = newInt(1 * 1024)
|
||||||
|
te.startServer(&testServer{security: e.security})
|
||||||
|
|
||||||
|
defer te.tearDown()
|
||||||
|
tc := testpb.NewTestServiceClient(te.clientConn())
|
||||||
|
|
||||||
|
// first make a successful request to the server
|
||||||
|
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
|
||||||
|
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// make a second request that will be rejected by the server
|
||||||
|
const largeSize = 5 * 1024
|
||||||
|
largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
req := &testpb.SimpleRequest{
|
||||||
|
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
|
||||||
|
Payload: largePayload,
|
||||||
|
}
|
||||||
|
|
||||||
|
peer := new(peer.Peer)
|
||||||
|
if _, err := tc.UnaryCall(context.Background(), req, grpc.Peer(peer)); err == nil || grpc.Code(err) != codes.ResourceExhausted {
|
||||||
|
t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
|
||||||
|
} else {
|
||||||
|
pa := peer.Addr.String()
|
||||||
|
if e.network == "unix" {
|
||||||
|
if pa != te.srvAddr {
|
||||||
|
t.Fatalf("peer.Addr = %v, want %v", pa, te.srvAddr)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
_, pp, err := net.SplitHostPort(pa)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to parse address from peer.")
|
||||||
|
}
|
||||||
|
_, sp, err := net.SplitHostPort(te.srvAddr)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to parse address of test server.")
|
||||||
|
}
|
||||||
|
if pp != sp {
|
||||||
|
t.Fatalf("peer.Addr = localhost:%v, want localhost:%v", pp, sp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestMetadataUnaryRPC(t *testing.T) {
|
func TestMetadataUnaryRPC(t *testing.T) {
|
||||||
defer leakCheck(t)()
|
defer leakCheck(t)()
|
||||||
for _, e := range listTestEnv() {
|
for _, e := range listTestEnv() {
|
||||||
|
|||||||
Reference in New Issue
Block a user