Files
lotus/storage/pipeline/states_proving.go
Peter Rabbitson 1bc8a8b32c chore: remove (deprecated) deps on build/ proxy-constants
This is a large diff, yet should have exactly zero functional changes

Ideally as a result of this some parts of the depchain will become lighter,
with downstream reaping the same benefits as the team that initiated this split.

P.S. work was done while forming better intuition of current dependency graph
2024-07-22 17:36:12 +04:00

167 lines
5.6 KiB
Go

package sealing
import (
"time"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/exitcode"
"github.com/filecoin-project/go-statemachine"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build/buildconstants"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
)
func (m *Sealing) handleFaulty(ctx statemachine.Context, sector SectorInfo) error {
// TODO: noop because this is now handled by the PoSt scheduler. We can reuse
// this state for tracking faulty sectors, or remove it when that won't be
// a breaking change
return nil
}
func (m *Sealing) handleFaultReported(ctx statemachine.Context, sector SectorInfo) error {
if sector.FaultReportMsg == nil {
return xerrors.Errorf("entered fault reported state without a FaultReportMsg cid")
}
mw, err := m.Api.StateWaitMsg(ctx.Context(), *sector.FaultReportMsg, buildconstants.MessageConfidence, api.LookbackNoLimit, true)
if err != nil {
return xerrors.Errorf("failed to wait for fault declaration: %w", err)
}
if mw.Receipt.ExitCode != 0 {
log.Errorf("UNHANDLED: declaring sector fault failed (exit=%d, msg=%s) (id: %d)", mw.Receipt.ExitCode, *sector.FaultReportMsg, sector.SectorNumber)
return xerrors.Errorf("UNHANDLED: submitting fault declaration failed (exit %d)", mw.Receipt.ExitCode)
}
return ctx.Send(SectorFaultedFinal{})
}
func (m *Sealing) handleTerminating(ctx statemachine.Context, sector SectorInfo) error {
// First step of sector termination
// * See if sector is live
// * If not, goto removing
// * Add to termination queue
// * Wait for message to land on-chain
// * Check for correct termination
// * wait for expiration (+winning lookback?)
si, err := m.Api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, types.EmptyTSK)
if err != nil {
return ctx.Send(SectorTerminateFailed{xerrors.Errorf("getting sector info: %w", err)})
}
if si == nil {
// either already terminated or not committed yet
pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, types.EmptyTSK)
if err != nil {
return ctx.Send(SectorTerminateFailed{xerrors.Errorf("checking precommit presence: %w", err)})
}
if pci != nil {
return ctx.Send(SectorTerminateFailed{xerrors.Errorf("sector was precommitted but not proven, remove instead of terminating")})
}
return ctx.Send(SectorRemove{})
}
termCid, terminated, err := m.terminator.AddTermination(ctx.Context(), m.minerSectorID(sector.SectorNumber))
if err != nil {
return ctx.Send(SectorTerminateFailed{xerrors.Errorf("queueing termination: %w", err)})
}
if terminated {
return ctx.Send(SectorTerminating{Message: nil})
}
return ctx.Send(SectorTerminating{Message: &termCid})
}
func (m *Sealing) handleTerminateWait(ctx statemachine.Context, sector SectorInfo) error {
if sector.TerminateMessage == nil {
ts, err := m.Api.ChainHead(ctx.Context())
if err != nil {
return ctx.Send(SectorTerminateFailed{xerrors.Errorf("getting chain head: %w", err)})
}
return ctx.Send(SectorTerminated{TerminatedAt: ts.Height()})
}
mw, err := m.Api.StateWaitMsg(ctx.Context(), *sector.TerminateMessage, buildconstants.MessageConfidence, api.LookbackNoLimit, true)
if err != nil {
return ctx.Send(SectorTerminateFailed{xerrors.Errorf("waiting for terminate message to land on chain: %w", err)})
}
if mw.Receipt.ExitCode != exitcode.Ok {
return ctx.Send(SectorTerminateFailed{xerrors.Errorf("terminate message failed to execute: exit %d: %w", mw.Receipt.ExitCode, err)})
}
return ctx.Send(SectorTerminated{TerminatedAt: mw.Height})
}
func (m *Sealing) handleTerminateFinality(ctx statemachine.Context, sector SectorInfo) error {
for {
ts, err := m.Api.ChainHead(ctx.Context())
if err != nil {
return ctx.Send(SectorTerminateFailed{xerrors.Errorf("getting chain head: %w", err)})
}
nv, err := m.Api.StateNetworkVersion(ctx.Context(), ts.Key())
if err != nil {
return ctx.Send(SectorTerminateFailed{xerrors.Errorf("getting network version: %w", err)})
}
if ts.Height() >= sector.TerminatedAt+policy.GetWinningPoStSectorSetLookback(nv) {
return ctx.Send(SectorRemove{})
}
toWait := time.Duration(ts.Height()-sector.TerminatedAt+policy.GetWinningPoStSectorSetLookback(nv)) * time.Duration(buildconstants.BlockDelaySecs) * time.Second
select {
case <-time.After(toWait):
continue
case <-ctx.Context().Done():
return ctx.Context().Err()
}
}
}
func (m *Sealing) handleRemoving(ctx statemachine.Context, sector SectorInfo) error {
if err := m.sealer.Remove(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber)); err != nil {
return ctx.Send(SectorRemoveFailed{err})
}
return ctx.Send(SectorRemoved{})
}
func (m *Sealing) handleProvingSector(ctx statemachine.Context, sector SectorInfo) error {
// TODO: track sector health / expiration
m.inputLk.Lock()
// in case we revert into Proving without going into Available
delete(m.available, m.minerSectorID(sector.SectorNumber))
m.inputLk.Unlock()
// guard against manual state updates from snap-deals states into Proving
// note: normally snap deals should be aborted through the abort command, but
// apparently sometimes some SPs would use update-state to force the sector back
// into the Proving state, breaking the deal input pipeline in the process.
m.cleanupAssignedDeals(sector)
// TODO: Watch termination
// TODO: Auto-extend if set
return nil
}
func (m *Sealing) handleAvailableSector(ctx statemachine.Context, sector SectorInfo) error {
m.inputLk.Lock()
m.available[m.minerSectorID(sector.SectorNumber)] = struct{}{}
m.inputLk.Unlock()
// TODO: Watch termination
// TODO: Auto-extend if set
return nil
}