Files
lotus/itests/gateway_test.go
Masih H. Derkani d9a93185e5 feat: expose /v2 APIs through Lotus Gateway (#13075)
Integrate the existing v2 APIs into Lotus Gateway such that they are
accessible via `/rpc/v2` endpoint similar to full node APIs except
through the gateway reverse proxy along with rate limiting and handling
of stateful connections over web socket.

The prior gateway implementation did not have a concept of per-uri
reverse proxy handling. The changes here separate reverse proxy
implementation in gateway from the gateway node to introduce separate
handling of v1 and v2 APIs. The work introduces a top-level
`v2api.Gateway` interface to capture the APIs exposed by the v2 gateway
and generate any relevant documentation based on the interface.

Specifically, the work here separates the handling of stateful
connections between v1 and v2 to avoid any cross-contamination between
Eth calls to either API. However, the total rate limiting has stayed
shared across both URIs.

The underused mock interface in v1 is now removed in favor of the
generated gomock interfaces to reduce the LOC in `gateway` package. The
interface was barely used by the tests and would have resulted in
unnecessary LOC to implement v2.

I have held back on unrelated refactoring of `gateway` package. The code
can use further restructuring, but for now the focus is to expose
existing /v2 APIs over the gateway.

The implementation of `Version` in /v2 APIs is also left as a TODO since
it requires implementation at the top level API and bubbles up into
`Common` APIs.
2025-04-29 10:02:48 +01:00

616 lines
20 KiB
Go

package itests
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"math"
"net"
"net/http"
"sync"
"testing"
"time"
"github.com/ipfs/go-cid"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-f3"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-state-types/abi"
init2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/init"
multisig2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/multisig"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/client"
"github.com/filecoin-project/lotus/api/v2api"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/ethtypes"
"github.com/filecoin-project/lotus/gateway"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/itests/multisig"
res "github.com/filecoin-project/lotus/lib/result"
"github.com/filecoin-project/lotus/node"
)
const (
maxLookbackCap = time.Duration(math.MaxInt64)
maxMessageLookbackEpochs = stmgr.LookbackNoLimit
)
// TestGatewayWalletMsig tests that API calls to wallet and msig can be made on a lite
// node that is connected through a gateway to a full API node
func TestGatewayWalletMsig(t *testing.T) {
kit.QuietMiningLogs()
ctx := context.Background()
nodes := startNodes(ctx, t)
lite := nodes.lite
full := nodes.full
// The full node starts with a wallet
fullWalletAddr, err := full.WalletDefaultAddress(ctx)
require.NoError(t, err)
// Check the full node's wallet balance from the lite node
balance, err := lite.WalletBalance(ctx, fullWalletAddr)
require.NoError(t, err)
fmt.Println(balance)
// Create a wallet on the lite node
liteWalletAddr, err := lite.WalletNew(ctx, types.KTSecp256k1)
require.NoError(t, err)
// Send some funds from the full node to the lite node
err = sendFunds(ctx, full, fullWalletAddr, liteWalletAddr, types.NewInt(1e18))
require.NoError(t, err)
// Send some funds from the lite node back to the full node
err = sendFunds(ctx, lite, liteWalletAddr, fullWalletAddr, types.NewInt(100))
require.NoError(t, err)
// Sign some data with the lite node wallet address
data := []byte("hello")
sig, err := lite.WalletSign(ctx, liteWalletAddr, data)
require.NoError(t, err)
// Verify the signature
ok, err := lite.WalletVerify(ctx, liteWalletAddr, data, sig)
require.NoError(t, err)
require.True(t, ok)
// Create some wallets on the lite node to use for testing multisig
var walletAddrs []address.Address
for i := 0; i < 4; i++ {
addr, err := lite.WalletNew(ctx, types.KTSecp256k1)
require.NoError(t, err)
walletAddrs = append(walletAddrs, addr)
err = sendFunds(ctx, lite, liteWalletAddr, addr, types.NewInt(1e15))
require.NoError(t, err)
}
// Create an msig with three of the addresses and threshold of two sigs
msigAddrs := walletAddrs[:3]
amt := types.NewInt(1000)
proto, err := lite.MsigCreate(ctx, 2, msigAddrs, abi.ChainEpoch(50), amt, liteWalletAddr, types.NewInt(0))
require.NoError(t, err)
doSend := func(proto *api.MessagePrototype) (cid.Cid, error) {
if proto.ValidNonce {
sm, err := lite.WalletSignMessage(ctx, proto.Message.From, &proto.Message)
if err != nil {
return cid.Undef, err
}
return lite.MpoolPush(ctx, sm)
}
sm, err := lite.MpoolPushMessage(ctx, &proto.Message, nil)
if err != nil {
return cid.Undef, err
}
return sm.Cid(), nil
}
addProposal, err := doSend(proto)
require.NoError(t, err)
res, err := lite.StateWaitMsg(ctx, addProposal, 1, api.LookbackNoLimit, true)
require.NoError(t, err)
require.EqualValues(t, 0, res.Receipt.ExitCode)
var execReturn init2.ExecReturn
err = execReturn.UnmarshalCBOR(bytes.NewReader(res.Receipt.Return))
require.NoError(t, err)
// Get available balance of msig: should be greater than zero and less
// than initial amount
msig := execReturn.IDAddress
msigBalance, err := lite.MsigGetAvailableBalance(ctx, msig, types.EmptyTSK)
require.NoError(t, err)
require.GreaterOrEqual(t, msigBalance.Int64(), int64(0))
require.LessOrEqual(t, msigBalance.Int64(), amt.Int64())
// Propose to add a new address to the msig
proto, err = lite.MsigAddPropose(ctx, msig, walletAddrs[0], walletAddrs[3], false)
require.NoError(t, err)
addProposal, err = doSend(proto)
require.NoError(t, err)
res, err = lite.StateWaitMsg(ctx, addProposal, 1, api.LookbackNoLimit, true)
require.NoError(t, err)
require.EqualValues(t, 0, res.Receipt.ExitCode)
var proposeReturn multisig2.ProposeReturn
err = proposeReturn.UnmarshalCBOR(bytes.NewReader(res.Receipt.Return))
require.NoError(t, err)
// Approve proposal (proposer is first (implicit) signer, approver is
// second signer
txnID := uint64(proposeReturn.TxnID)
proto, err = lite.MsigAddApprove(ctx, msig, walletAddrs[1], txnID, walletAddrs[0], walletAddrs[3], false)
require.NoError(t, err)
approval1, err := doSend(proto)
require.NoError(t, err)
res, err = lite.StateWaitMsg(ctx, approval1, 1, api.LookbackNoLimit, true)
require.NoError(t, err)
require.EqualValues(t, 0, res.Receipt.ExitCode)
var approveReturn multisig2.ApproveReturn
err = approveReturn.UnmarshalCBOR(bytes.NewReader(res.Receipt.Return))
require.NoError(t, err)
require.True(t, approveReturn.Applied)
}
// TestGatewayMsigCLI tests that msig CLI calls can be made
// on a lite node that is connected through a gateway to a full API node
func TestGatewayMsigCLI(t *testing.T) {
kit.QuietMiningLogs()
ctx := context.Background()
nodes := startNodes(ctx, t, withFunds())
lite := nodes.lite
multisig.RunMultisigTests(t, lite)
}
type testNodes struct {
lite *kit.TestFullNode
full *kit.TestFullNode
miner *kit.TestMiner
gatewayAddr string
rpcCloser jsonrpc.ClientCloser
}
type startOptions struct {
blockTime time.Duration
lookbackCap time.Duration
maxMessageLookbackEpochs abi.ChainEpoch
fund bool
perConnectionAPIRateLimit int
perHostConnectionsPerMinute int
nodeOpts []kit.NodeOpt
}
type startOption func(*startOptions)
func newStartOptions(opts ...startOption) startOptions {
o := startOptions{
blockTime: 5 * time.Millisecond,
lookbackCap: maxLookbackCap,
maxMessageLookbackEpochs: maxMessageLookbackEpochs,
fund: false,
}
for _, opt := range opts {
opt(&o)
}
return o
}
func withFunds() startOption {
return func(opts *startOptions) {
opts.fund = true
}
}
func withPerConnectionAPIRateLimit(rateLimit int) startOption {
return func(opts *startOptions) {
opts.perConnectionAPIRateLimit = rateLimit
}
}
func withPerHostRequestsPerMinute(rateLimit int) startOption {
return func(opts *startOptions) {
opts.perHostConnectionsPerMinute = rateLimit
}
}
func withNodeOpts(nodeOpts ...kit.NodeOpt) startOption {
return func(opts *startOptions) {
opts.nodeOpts = nodeOpts
}
}
func startNodes(ctx context.Context, t *testing.T, opts ...startOption) *testNodes {
options := newStartOptions(opts...)
var (
full *kit.TestFullNode
miner *kit.TestMiner
lite kit.TestFullNode
)
// - Create one full node and one lite node
// - Put a gateway server in front of full node 1
// - Start full node 2 in lite mode
// - Connect lite node -> gateway server -> full node
// create the full node and the miner.
var ens *kit.Ensemble
full, miner, ens = kit.EnsembleMinimal(t, kit.MockProofs())
ens.InterconnectAll().BeginMining(options.blockTime)
api.RunningNodeType = api.NodeFull
// Create a gateway server in front of the full node
v1EthSubHandler := gateway.NewEthSubHandler()
v2EthSubHandler := gateway.NewEthSubHandler()
gwapi := gateway.NewNode(
full,
full.V2,
gateway.WithV1EthSubHandler(v1EthSubHandler),
gateway.WithV2EthSubHandler(v2EthSubHandler),
gateway.WithMaxLookbackDuration(options.lookbackCap),
gateway.WithMaxMessageLookbackEpochs(options.maxMessageLookbackEpochs),
)
handler, err := gateway.Handler(
gwapi,
gateway.WithPerConnectionAPIRateLimit(options.perConnectionAPIRateLimit),
gateway.WithPerHostConnectionsPerMinute(options.perHostConnectionsPerMinute),
)
t.Cleanup(func() { _ = handler.Shutdown(ctx) })
require.NoError(t, err)
l, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
srv, _, _ := kit.CreateRPCServer(t, handler, l)
// Create a gateway client API that connects to the gateway server
gapiv1, v1RpcCloser, err := client.NewGatewayRPCV1(ctx, "ws://"+srv.Listener.Addr().String()+"/rpc/v1", nil,
jsonrpc.WithClientHandler("Filecoin", v1EthSubHandler),
jsonrpc.WithClientHandlerAlias("eth_subscription", "Filecoin.EthSubscription"),
)
require.NoError(t, err)
gapiv2, v2RpcCloser, err := client.NewGatewayRPCV2(ctx, "ws://"+srv.Listener.Addr().String()+"/rpc/v2", nil,
jsonrpc.WithClientHandler("Filecoin", v2EthSubHandler),
jsonrpc.WithClientHandlerAlias("eth_subscription", "Filecoin.EthSubscription"),
)
require.NoError(t, err)
var closeOnce sync.Once
closer := func() {
closeOnce.Do(func() {
v1RpcCloser()
v2RpcCloser()
})
}
t.Cleanup(closer)
nodeOpts := append([]kit.NodeOpt{
kit.LiteNode(),
kit.ThroughRPC(),
kit.ConstructorOpts(
node.Override(new(api.Gateway), gapiv1),
node.Override(new(v2api.Gateway), gapiv2),
),
}, options.nodeOpts...)
ens.FullNode(&lite, nodeOpts...).Start().InterconnectAll()
nodes := &testNodes{
lite: &lite,
full: full,
miner: miner,
gatewayAddr: srv.Listener.Addr().String(),
rpcCloser: closer,
}
if options.fund {
// The full node starts with a wallet
fullWalletAddr, err := nodes.full.WalletDefaultAddress(ctx)
require.NoError(t, err)
// Get the lite node default wallet address.
liteWalletAddr, err := nodes.lite.WalletDefaultAddress(ctx)
require.NoError(t, err)
// Send some funds from the full node to the lite node
err = sendFunds(ctx, nodes.full, fullWalletAddr, liteWalletAddr, types.NewInt(1e18))
require.NoError(t, err)
}
return nodes
}
func sendFunds(ctx context.Context, fromNode *kit.TestFullNode, fromAddr address.Address, toAddr address.Address, amt types.BigInt) error {
msg := &types.Message{
From: fromAddr,
To: toAddr,
Value: amt,
}
sm, err := fromNode.MpoolPushMessage(ctx, msg, nil)
if err != nil {
return err
}
res, err := fromNode.StateWaitMsg(ctx, sm.Cid(), 3, api.LookbackNoLimit, true)
if err != nil {
return err
}
if res.Receipt.ExitCode != 0 {
return xerrors.Errorf("send funds failed with exit code %d", res.Receipt.ExitCode)
}
return nil
}
func TestGatewayRateLimits(t *testing.T) {
t.Skip("this test is flaky and needs to be fixed")
// Fails on the RequireDuration check, e.g. error like:
// Max difference between 2025-02-10 12:05:34.63725116 +0000 UTC m=+30.240446844 and 2025-02-10 12:05:33.519935593 +0000 UTC m=+29.123131278 allowed is 800ms, but difference was 1.117315566s
// There may be additional calls going through the API that only show up at random and these
// aren't accounted for. See note below about paymentChannelSettler, which is one such call.
// Tracking issue: https://github.com/filecoin-project/lotus/issues/12566
req := require.New(t)
kit.QuietMiningLogs()
ctx := context.Background()
tokensPerSecond := 10
requestsPerMinute := 30 // http requests
nodes := startNodes(ctx, t,
withNodeOpts(kit.DisableEthRPC()),
withPerConnectionAPIRateLimit(tokensPerSecond),
withPerHostRequestsPerMinute(requestsPerMinute),
)
nodes.full.WaitTillChain(ctx, kit.HeightAtLeast(10)) // let's get going first
// ChainHead uses chainRateLimitTokens=2.
// But we're also competing with the paymentChannelSettler which listens to the chain uses
// ChainGetBlockMessages on each change, which also uses chainRateLimitTokens=2. But because we're
// rate limiting, it only gets a ChainGetBlockMessages in between our ChainHead calls, not on each
// 5ms block (which it wants). So each loop should be 4 tokens.
loops := 25
tokensPerLoop := 4
start := time.Now()
for i := 0; i < loops; i++ {
_, err := nodes.lite.ChainHead(ctx)
req.NoError(err)
}
tokensUsed := loops * tokensPerLoop
expectedEnd := start.Add(time.Duration(float64(tokensUsed) / float64(tokensPerSecond) * float64(time.Second)))
allowPad := time.Duration(float64(tokensPerLoop*2) / float64(tokensPerSecond) * float64(time.Second)) // add padding to account for slow test runs
t.Logf("expected end: %s, now: %s, allowPad: %s, actual delta: %s", expectedEnd, time.Now(), allowPad, time.Since(expectedEnd))
req.WithinDuration(expectedEnd, time.Now(), allowPad)
client := &http.Client{}
jsonPayload := []byte(`{"method":"Filecoin.ChainHead","params":[],"id":1,"jsonrpc":"2.0"}`)
var failed bool
for i := 0; i < requestsPerMinute*2 && !failed; i++ {
status, body := makeManualRpcCall(t, 1, client, nodes.gatewayAddr, string(jsonPayload))
if http.StatusOK == status {
result := map[string]interface{}{}
req.NoError(json.Unmarshal([]byte(body), &result))
req.NotNil(result["result"])
height, ok := result["result"].(map[string]interface{})["Height"].(float64)
req.True(ok)
req.Greater(int(height), 0)
} else {
req.Equal(http.StatusTooManyRequests, status)
req.LessOrEqual(i, requestsPerMinute+1)
failed = true
}
}
req.True(failed, "expected requests to fail due to rate limiting")
}
func makeManualRpcCall(t *testing.T, version int, client *http.Client, gatewayAddr, payload string) (int, string) {
// not available over plain http
url := fmt.Sprintf("http://%s/rpc/v%d", gatewayAddr, version)
request, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(payload)))
require.NoError(t, err)
request.Header.Set("Content-Type", "application/json")
response, err := client.Do(request)
require.NoError(t, err)
defer func() { _ = response.Body.Close() }()
body, err := io.ReadAll(response.Body)
require.NoError(t, err)
return response.StatusCode, string(body)
}
func TestStatefulCallHandling(t *testing.T) {
req := require.New(t)
kit.QuietMiningLogs()
ctx := context.Background()
nodes := startNodes(ctx, t)
t.Logf("Testing stateful call handling rejection via plain http")
for version := range 3 {
for _, typ := range []string{
"EthNewBlockFilter",
"EthNewPendingTransactionFilter",
"EthNewFilter",
"EthGetFilterChanges",
"EthGetFilterLogs",
"EthUninstallFilter",
"EthSubscribe",
"EthUnsubscribe",
} {
params := ""
expErr := typ + " not supported: stateful methods are only available on websocket connections"
switch typ {
case "EthNewFilter":
params = "{}"
case "EthGetFilterChanges", "EthGetFilterLogs", "EthUninstallFilter", "EthUnsubscribe":
params = `"0x0000000000000000000000000000000000000000000000000000000000000000"`
case "EthSubscribe":
params = `"newHeads"`
expErr = "EthSubscribe not supported: connection doesn't support callbacks"
}
status, body := makeManualRpcCall(
t,
version,
&http.Client{},
nodes.gatewayAddr,
`{"method":"Filecoin.`+typ+`","params":[`+params+`],"id":1,"jsonrpc":"2.0"}`,
)
req.Equal(http.StatusOK, status, "not ok for "+typ)
req.Contains(body, `{"error":{"code":1,"message":"`+expErr+`"},"id":1,"jsonrpc":"2.0"}`, "unexpected response for "+typ)
}
}
t.Logf("Testing subscriptions")
// subscribe twice, so we can unsub one over ws to check unsub works, then unsub after ws close to
// check that auto-cleanup happened
subId1, err := nodes.lite.EthSubscribe(ctx, res.Wrap[jsonrpc.RawParams](json.Marshal(ethtypes.EthSubscribeParams{EventType: "newHeads"})).Assert(req.NoError))
req.NoError(err)
err = nodes.lite.EthSubRouter.AddSub(ctx, subId1, func(ctx context.Context, resp *ethtypes.EthSubscriptionResponse) error {
t.Logf("Received subscription response (sub1): %v", resp)
return nil
})
req.NoError(err)
subId2, err := nodes.lite.EthSubscribe(ctx, res.Wrap[jsonrpc.RawParams](json.Marshal(ethtypes.EthSubscribeParams{EventType: "newHeads"})).Assert(req.NoError))
req.NoError(err)
err = nodes.lite.EthSubRouter.AddSub(ctx, subId2, func(ctx context.Context, resp *ethtypes.EthSubscriptionResponse) error {
t.Logf("Received subscription response (sub2): %v", resp)
return nil
})
req.NoError(err)
ok, err := nodes.lite.EthUnsubscribe(ctx, subId1) // unsub on lite node, should work
req.NoError(err)
req.True(ok)
ok, err = nodes.full.EthUnsubscribe(ctx, subId1) // unsub on full node, already done
req.NoError(err)
req.False(ok)
t.Logf("Installing a stateful filters via ws")
// install the variety of stateful filters we have, but only up to the max total
var (
blockFilterIds = make([]ethtypes.EthFilterID, gateway.DefaultEthMaxFiltersPerConn/3)
pendingFilterIds = make([]ethtypes.EthFilterID, gateway.DefaultEthMaxFiltersPerConn/3)
// matchFilterIds takes up the remainder, minus 1 because we still have 1 live subscription that counts
matchFilterIds = make([]ethtypes.EthFilterID, gateway.DefaultEthMaxFiltersPerConn-len(blockFilterIds)-len(pendingFilterIds)-1)
)
for i := 0; i < len(blockFilterIds); i++ {
fid, err := nodes.lite.EthNewBlockFilter(ctx)
req.NoError(err)
blockFilterIds[i] = fid
}
for i := 0; i < len(pendingFilterIds); i++ {
fid, err := nodes.lite.EthNewPendingTransactionFilter(ctx)
req.NoError(err)
pendingFilterIds[i] = fid
}
for i := 0; i < len(matchFilterIds); i++ {
fid, err := nodes.lite.EthNewFilter(ctx, &ethtypes.EthFilterSpec{})
req.NoError(err)
matchFilterIds[i] = fid
}
// sanity check we're actually doing something
req.Greater(len(blockFilterIds), 0)
req.Greater(len(pendingFilterIds), 0)
req.Greater(len(matchFilterIds), 0)
t.Logf("Testing 'too many filters' rejection")
_, err = nodes.lite.EthNewBlockFilter(ctx)
require.ErrorContains(t, err, "too many subscriptions and filters per connection")
_, err = nodes.lite.EthNewPendingTransactionFilter(ctx)
require.ErrorContains(t, err, "too many subscriptions and filters per connection")
_, err = nodes.lite.EthNewFilter(ctx, &ethtypes.EthFilterSpec{})
require.ErrorContains(t, err, "too many subscriptions and filters per connection")
_, err = nodes.lite.EthSubscribe(ctx, res.Wrap[jsonrpc.RawParams](json.Marshal(ethtypes.EthSubscribeParams{EventType: "newHeads"})).Assert(req.NoError))
require.ErrorContains(t, err, "too many subscriptions and filters per connection")
t.Logf("Shutting down the lite node")
req.NoError(nodes.lite.Stop(ctx))
nodes.rpcCloser()
// Once the websocket connection is closed, the server should clean up the filters for us.
// Unfortunately we have no other way to check for completeness of shutdown and cleanup.
// * Asynchronously the rpcCloser() call will end the client websockets connection to the gateway.
// * When the gateway recognises the end of the HTTP connection, it will asynchronously make calls
// to the fullnode to clean up the filters.
// * The fullnode will then uninstall the filters and we can finally move on to check it directly
// that this has happened.
// This should happen quickly, but we have no way to synchronously check for it. So we just wait a bit.
time.Sleep(time.Second)
t.Logf("Checking that all filters and subs were cleared up by directly talking to full node")
ok, err = nodes.full.EthUnsubscribe(ctx, subId2) // unsub on full node, already done
req.NoError(err)
req.False(ok) // already unsubscribed because of auto-cleanup
for _, fid := range blockFilterIds {
_, err = nodes.full.EthGetFilterChanges(ctx, fid)
req.ErrorContains(err, "filter not found")
}
for _, fid := range pendingFilterIds {
_, err = nodes.full.EthGetFilterChanges(ctx, fid)
req.ErrorContains(err, "filter not found")
}
for _, fid := range matchFilterIds {
_, err = nodes.full.EthGetFilterChanges(ctx, fid)
req.ErrorContains(err, "filter not found")
}
}
func TestGatewayF3(t *testing.T) {
// Test that disabled & not-running F3 calls properly error
kit.QuietMiningLogs()
t.Run("not running", func(t *testing.T) {
ctx := context.Background()
nodes := startNodes(ctx, t)
cert, err := nodes.lite.F3GetLatestCertificate(ctx)
require.ErrorContains(t, err, f3.ErrF3NotRunning.Error())
require.Nil(t, cert)
cert, err = nodes.lite.F3GetCertificate(ctx, 2)
require.ErrorContains(t, err, f3.ErrF3NotRunning.Error())
require.Nil(t, cert)
})
t.Run("disabled", func(t *testing.T) {
ctx := context.Background()
nodes := startNodes(ctx, t, withNodeOpts(kit.F3Disabled()))
cert, err := nodes.lite.F3GetLatestCertificate(ctx)
require.ErrorIs(t, err, api.ErrF3Disabled)
require.Nil(t, cert)
cert, err = nodes.lite.F3GetCertificate(ctx, 2)
require.ErrorIs(t, err, api.ErrF3Disabled)
require.Nil(t, cert)
})
}