mirror of
https://github.com/filecoin-project/lotus.git
synced 2025-08-23 16:55:22 +08:00

The initial implementation of IPNI used GossipSub as a way to propagate IPNI advertisement chain. To do this the propagation had to be relayed through the Lotus node due to strict Filecoin GossipSub validation rules. Since then IPNI has moved on to roll out its own sync protocol that works over HTTP, and HTTP-over-libp2p. This move has been the official way of advertising content to IPNI federation over a year now. Therefore, remove the ad relay over pubsub via Lotus node as it is now considered to have reached its EOL as a mechanism for advertising to IPNI.
229 lines
6.5 KiB
Go
229 lines
6.5 KiB
Go
package modules
|
|
|
|
import (
|
|
"context"
|
|
"os"
|
|
"strconv"
|
|
"time"
|
|
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
"github.com/libp2p/go-libp2p/core/event"
|
|
"github.com/libp2p/go-libp2p/core/host"
|
|
"github.com/libp2p/go-libp2p/core/network"
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
|
|
"go.uber.org/fx"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/lotus/build"
|
|
"github.com/filecoin-project/lotus/build/buildconstants"
|
|
"github.com/filecoin-project/lotus/chain"
|
|
"github.com/filecoin-project/lotus/chain/beacon"
|
|
"github.com/filecoin-project/lotus/chain/beacon/drand"
|
|
"github.com/filecoin-project/lotus/chain/consensus"
|
|
"github.com/filecoin-project/lotus/chain/exchange"
|
|
"github.com/filecoin-project/lotus/chain/messagepool"
|
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
|
"github.com/filecoin-project/lotus/chain/store"
|
|
"github.com/filecoin-project/lotus/chain/sub"
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
"github.com/filecoin-project/lotus/journal"
|
|
"github.com/filecoin-project/lotus/journal/fsjournal"
|
|
"github.com/filecoin-project/lotus/lib/peermgr"
|
|
"github.com/filecoin-project/lotus/node/hello"
|
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
|
"github.com/filecoin-project/lotus/node/modules/helpers"
|
|
"github.com/filecoin-project/lotus/node/repo"
|
|
)
|
|
|
|
var pubsubMsgsSyncEpochs = 10
|
|
|
|
func init() {
|
|
if s := os.Getenv("LOTUS_MSGS_SYNC_EPOCHS"); s != "" {
|
|
val, err := strconv.Atoi(s)
|
|
if err != nil {
|
|
log.Errorf("failed to parse LOTUS_MSGS_SYNC_EPOCHS: %s", err)
|
|
return
|
|
}
|
|
pubsubMsgsSyncEpochs = val
|
|
}
|
|
}
|
|
|
|
func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.Service) error {
|
|
h.SetStreamHandler(hello.ProtocolID, svc.HandleStream)
|
|
|
|
sub, err := h.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted), eventbus.BufSize(1024))
|
|
if err != nil {
|
|
return xerrors.Errorf("failed to subscribe to event bus: %w", err)
|
|
}
|
|
|
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
|
|
|
go func() {
|
|
// We want to get information on connected peers, we don't want to trigger new connections.
|
|
ctx := network.WithNoDial(ctx, "filecoin hello")
|
|
for evt := range sub.Out() {
|
|
pic := evt.(event.EvtPeerIdentificationCompleted)
|
|
// We just finished identifying the peer, that means we should know what
|
|
// protocols it speaks. Check if it speeks the Filecoin hello protocol
|
|
// before continuing.
|
|
if p, _ := h.Peerstore().FirstSupportedProtocol(pic.Peer, hello.ProtocolID); p != hello.ProtocolID {
|
|
continue
|
|
}
|
|
|
|
go func() {
|
|
if err := svc.SayHello(ctx, pic.Peer); err != nil {
|
|
protos, _ := h.Peerstore().GetProtocols(pic.Peer)
|
|
agent, _ := h.Peerstore().Get(pic.Peer, "AgentVersion")
|
|
log.Warnw("failed to say hello", "error", err, "peer", pic.Peer, "supported", protos, "agent", agent)
|
|
}
|
|
}()
|
|
}
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
func RunPeerMgr(mctx helpers.MetricsCtx, lc fx.Lifecycle, pmgr *peermgr.PeerMgr) {
|
|
go pmgr.Run(helpers.LifecycleCtx(mctx, lc))
|
|
}
|
|
|
|
func RunChainExchange(h host.Host, svc exchange.Server) {
|
|
h.SetStreamHandler(exchange.ChainExchangeProtocolID, svc.HandleStream) // new
|
|
}
|
|
|
|
func waitForSync(stmgr *stmgr.StateManager, epochs int, subscribe func()) {
|
|
nearsync := time.Duration(epochs*int(buildconstants.BlockDelaySecs)) * time.Second
|
|
|
|
// early check, are we synced at start up?
|
|
ts := stmgr.ChainStore().GetHeaviestTipSet()
|
|
timestamp := ts.MinTimestamp()
|
|
timestampTime := time.Unix(int64(timestamp), 0)
|
|
if build.Clock.Since(timestampTime) < nearsync {
|
|
subscribe()
|
|
return
|
|
}
|
|
|
|
// we are not synced, subscribe to head changes and wait for sync
|
|
stmgr.ChainStore().SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
|
|
if len(app) == 0 {
|
|
return nil
|
|
}
|
|
|
|
latest := app[0].MinTimestamp()
|
|
for _, ts := range app[1:] {
|
|
timestamp := ts.MinTimestamp()
|
|
if timestamp > latest {
|
|
latest = timestamp
|
|
}
|
|
}
|
|
|
|
latestTime := time.Unix(int64(latest), 0)
|
|
if build.Clock.Since(latestTime) < nearsync {
|
|
subscribe()
|
|
return store.ErrNotifeeDone
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func HandleIncomingBlocks(mctx helpers.MetricsCtx,
|
|
lc fx.Lifecycle,
|
|
ps *pubsub.PubSub,
|
|
s *chain.Syncer,
|
|
bserv dtypes.ChainBlockService,
|
|
chain *store.ChainStore,
|
|
cns consensus.Consensus,
|
|
h host.Host,
|
|
nn dtypes.NetworkName) {
|
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
|
|
|
v := sub.NewBlockValidator(
|
|
h.ID(), chain, cns,
|
|
func(p peer.ID) {
|
|
ps.BlacklistPeer(p)
|
|
h.ConnManager().TagPeer(p, "badblock", -1000)
|
|
})
|
|
|
|
if err := ps.RegisterTopicValidator(build.BlocksTopic(nn), v.Validate); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
log.Infof("subscribing to pubsub topic %s", build.BlocksTopic(nn))
|
|
|
|
blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager())
|
|
}
|
|
|
|
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, stmgr *stmgr.StateManager, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName, bootstrapper dtypes.Bootstrapper) {
|
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
|
|
|
v := sub.NewMessageValidator(h.ID(), mpool)
|
|
|
|
if err := ps.RegisterTopicValidator(build.MessagesTopic(nn), v.Validate); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
subscribe := func() {
|
|
log.Infof("subscribing to pubsub topic %s", build.MessagesTopic(nn))
|
|
|
|
msgsub, err := ps.Subscribe(build.MessagesTopic(nn)) //nolint
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
go sub.HandleIncomingMessages(ctx, mpool, msgsub)
|
|
}
|
|
|
|
if bootstrapper {
|
|
subscribe()
|
|
return
|
|
}
|
|
|
|
// wait until we are synced within 10 epochs -- env var can override
|
|
waitForSync(stmgr, pubsubMsgsSyncEpochs, subscribe)
|
|
}
|
|
|
|
type RandomBeaconParams struct {
|
|
fx.In
|
|
|
|
PubSub *pubsub.PubSub `optional:"true"`
|
|
Cs *store.ChainStore
|
|
DrandConfig dtypes.DrandSchedule
|
|
}
|
|
|
|
func BuiltinDrandConfig() dtypes.DrandSchedule {
|
|
return buildconstants.DrandConfigSchedule()
|
|
}
|
|
|
|
func RandomSchedule(lc fx.Lifecycle, mctx helpers.MetricsCtx, p RandomBeaconParams, _ dtypes.AfterGenesisSet) (beacon.Schedule, error) {
|
|
gen, err := p.Cs.GetGenesis(helpers.LifecycleCtx(mctx, lc))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
shd, err := drand.BeaconScheduleFromDrandSchedule(p.DrandConfig, gen.Timestamp, p.PubSub)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("failed to create beacon schedule: %w", err)
|
|
}
|
|
|
|
return shd, nil
|
|
}
|
|
|
|
func OpenFilesystemJournal(lr repo.LockedRepo, lc fx.Lifecycle, disabled journal.DisabledEvents) (journal.Journal, error) {
|
|
jrnl, err := fsjournal.OpenFSJournal(lr, disabled)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(_ context.Context) error { return jrnl.Close() },
|
|
})
|
|
|
|
return jrnl, err
|
|
}
|