Files
lotus/node/modules/services.go
Masih H. Derkani 9bfd0b4295 chore: remove IPNI advertisement relay over pubsub via Lotus node (#12768)
The initial implementation of IPNI used GossipSub as a way to propagate
IPNI advertisement chain. To do this the propagation had to be relayed
through the Lotus node due to strict Filecoin GossipSub validation
rules.

Since then IPNI has moved on to roll out its own sync protocol that
works over HTTP, and HTTP-over-libp2p. This move has been the official
way of advertising content to IPNI federation over a year now.

Therefore, remove the ad relay over pubsub via Lotus node as it is now
considered to have reached its EOL as a mechanism for advertising to
IPNI.
2024-12-10 12:56:04 +00:00

229 lines
6.5 KiB
Go

package modules
import (
"context"
"os"
"strconv"
"time"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/build/buildconstants"
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/beacon"
"github.com/filecoin-project/lotus/chain/beacon/drand"
"github.com/filecoin-project/lotus/chain/consensus"
"github.com/filecoin-project/lotus/chain/exchange"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/sub"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/journal/fsjournal"
"github.com/filecoin-project/lotus/lib/peermgr"
"github.com/filecoin-project/lotus/node/hello"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo"
)
var pubsubMsgsSyncEpochs = 10
func init() {
if s := os.Getenv("LOTUS_MSGS_SYNC_EPOCHS"); s != "" {
val, err := strconv.Atoi(s)
if err != nil {
log.Errorf("failed to parse LOTUS_MSGS_SYNC_EPOCHS: %s", err)
return
}
pubsubMsgsSyncEpochs = val
}
}
func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.Service) error {
h.SetStreamHandler(hello.ProtocolID, svc.HandleStream)
sub, err := h.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted), eventbus.BufSize(1024))
if err != nil {
return xerrors.Errorf("failed to subscribe to event bus: %w", err)
}
ctx := helpers.LifecycleCtx(mctx, lc)
go func() {
// We want to get information on connected peers, we don't want to trigger new connections.
ctx := network.WithNoDial(ctx, "filecoin hello")
for evt := range sub.Out() {
pic := evt.(event.EvtPeerIdentificationCompleted)
// We just finished identifying the peer, that means we should know what
// protocols it speaks. Check if it speeks the Filecoin hello protocol
// before continuing.
if p, _ := h.Peerstore().FirstSupportedProtocol(pic.Peer, hello.ProtocolID); p != hello.ProtocolID {
continue
}
go func() {
if err := svc.SayHello(ctx, pic.Peer); err != nil {
protos, _ := h.Peerstore().GetProtocols(pic.Peer)
agent, _ := h.Peerstore().Get(pic.Peer, "AgentVersion")
log.Warnw("failed to say hello", "error", err, "peer", pic.Peer, "supported", protos, "agent", agent)
}
}()
}
}()
return nil
}
func RunPeerMgr(mctx helpers.MetricsCtx, lc fx.Lifecycle, pmgr *peermgr.PeerMgr) {
go pmgr.Run(helpers.LifecycleCtx(mctx, lc))
}
func RunChainExchange(h host.Host, svc exchange.Server) {
h.SetStreamHandler(exchange.ChainExchangeProtocolID, svc.HandleStream) // new
}
func waitForSync(stmgr *stmgr.StateManager, epochs int, subscribe func()) {
nearsync := time.Duration(epochs*int(buildconstants.BlockDelaySecs)) * time.Second
// early check, are we synced at start up?
ts := stmgr.ChainStore().GetHeaviestTipSet()
timestamp := ts.MinTimestamp()
timestampTime := time.Unix(int64(timestamp), 0)
if build.Clock.Since(timestampTime) < nearsync {
subscribe()
return
}
// we are not synced, subscribe to head changes and wait for sync
stmgr.ChainStore().SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
if len(app) == 0 {
return nil
}
latest := app[0].MinTimestamp()
for _, ts := range app[1:] {
timestamp := ts.MinTimestamp()
if timestamp > latest {
latest = timestamp
}
}
latestTime := time.Unix(int64(latest), 0)
if build.Clock.Since(latestTime) < nearsync {
subscribe()
return store.ErrNotifeeDone
}
return nil
})
}
func HandleIncomingBlocks(mctx helpers.MetricsCtx,
lc fx.Lifecycle,
ps *pubsub.PubSub,
s *chain.Syncer,
bserv dtypes.ChainBlockService,
chain *store.ChainStore,
cns consensus.Consensus,
h host.Host,
nn dtypes.NetworkName) {
ctx := helpers.LifecycleCtx(mctx, lc)
v := sub.NewBlockValidator(
h.ID(), chain, cns,
func(p peer.ID) {
ps.BlacklistPeer(p)
h.ConnManager().TagPeer(p, "badblock", -1000)
})
if err := ps.RegisterTopicValidator(build.BlocksTopic(nn), v.Validate); err != nil {
panic(err)
}
log.Infof("subscribing to pubsub topic %s", build.BlocksTopic(nn))
blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint
if err != nil {
panic(err)
}
go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager())
}
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, stmgr *stmgr.StateManager, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName, bootstrapper dtypes.Bootstrapper) {
ctx := helpers.LifecycleCtx(mctx, lc)
v := sub.NewMessageValidator(h.ID(), mpool)
if err := ps.RegisterTopicValidator(build.MessagesTopic(nn), v.Validate); err != nil {
panic(err)
}
subscribe := func() {
log.Infof("subscribing to pubsub topic %s", build.MessagesTopic(nn))
msgsub, err := ps.Subscribe(build.MessagesTopic(nn)) //nolint
if err != nil {
panic(err)
}
go sub.HandleIncomingMessages(ctx, mpool, msgsub)
}
if bootstrapper {
subscribe()
return
}
// wait until we are synced within 10 epochs -- env var can override
waitForSync(stmgr, pubsubMsgsSyncEpochs, subscribe)
}
type RandomBeaconParams struct {
fx.In
PubSub *pubsub.PubSub `optional:"true"`
Cs *store.ChainStore
DrandConfig dtypes.DrandSchedule
}
func BuiltinDrandConfig() dtypes.DrandSchedule {
return buildconstants.DrandConfigSchedule()
}
func RandomSchedule(lc fx.Lifecycle, mctx helpers.MetricsCtx, p RandomBeaconParams, _ dtypes.AfterGenesisSet) (beacon.Schedule, error) {
gen, err := p.Cs.GetGenesis(helpers.LifecycleCtx(mctx, lc))
if err != nil {
return nil, err
}
shd, err := drand.BeaconScheduleFromDrandSchedule(p.DrandConfig, gen.Timestamp, p.PubSub)
if err != nil {
return nil, xerrors.Errorf("failed to create beacon schedule: %w", err)
}
return shd, nil
}
func OpenFilesystemJournal(lr repo.LockedRepo, lc fx.Lifecycle, disabled journal.DisabledEvents) (journal.Journal, error) {
jrnl, err := fsjournal.OpenFSJournal(lr, disabled)
if err != nil {
return nil, err
}
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error { return jrnl.Close() },
})
return jrnl, err
}