Files
lotus/paychmgr/simple.go
Peter Rabbitson 1bc8a8b32c chore: remove (deprecated) deps on build/ proxy-constants
This is a large diff, yet should have exactly zero functional changes

Ideally as a result of this some parts of the depchain will become lighter,
with downstream reaping the same benefits as the team that initiated this split.

P.S. work was done while forming better intuition of current dependency graph
2024-07-22 17:36:12 +04:00

844 lines
24 KiB
Go

package paychmgr
import (
"bytes"
"context"
"errors"
"fmt"
"sort"
"sync"
"github.com/ipfs/go-cid"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/big"
init2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/init"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build/buildconstants"
"github.com/filecoin-project/lotus/chain/types"
)
// paychFundsRes is the response to a create channel or add funds request
type paychFundsRes struct {
channel address.Address
mcid cid.Cid
err error
}
// fundsReq is a request to create a channel or add funds to a channel
type fundsReq struct {
ctx context.Context
promise chan *paychFundsRes
amt types.BigInt
opts GetOpts
lk sync.Mutex
// merge parent, if this req is part of a merge
merge *mergedFundsReq
}
func newFundsReq(ctx context.Context, amt types.BigInt, opts GetOpts) *fundsReq {
promise := make(chan *paychFundsRes, 1)
return &fundsReq{
ctx: ctx,
promise: promise,
amt: amt,
opts: opts,
}
}
// onComplete is called when the funds request has been executed
func (r *fundsReq) onComplete(res *paychFundsRes) {
select {
case <-r.ctx.Done():
case r.promise <- res:
}
}
// cancel is called when the req's context is cancelled
func (r *fundsReq) cancel() {
r.lk.Lock()
defer r.lk.Unlock()
// If there's a merge parent, tell the merge parent to check if it has any
// active reqs left
if r.merge != nil {
r.merge.checkActive()
}
}
// isActive indicates whether the req's context has been cancelled
func (r *fundsReq) isActive() bool {
return r.ctx.Err() == nil
}
// setMergeParent sets the merge that this req is part of
func (r *fundsReq) setMergeParent(m *mergedFundsReq) {
r.lk.Lock()
defer r.lk.Unlock()
r.merge = m
}
// mergedFundsReq merges together multiple add funds requests that are queued
// up, so that only one message is sent for all the requests (instead of one
// message for each request)
type mergedFundsReq struct {
ctx context.Context
cancel context.CancelFunc
reqs []*fundsReq
}
func newMergedFundsReq(reqs []*fundsReq) *mergedFundsReq {
ctx, cancel := context.WithCancel(context.Background())
rqs := make([]*fundsReq, len(reqs))
copy(rqs, reqs)
m := &mergedFundsReq{
ctx: ctx,
cancel: cancel,
reqs: rqs,
}
for _, r := range m.reqs {
r.setMergeParent(m)
}
sort.Slice(m.reqs, func(i, j int) bool {
if m.reqs[i].opts.OffChain != m.reqs[j].opts.OffChain { // off-chain first
return m.reqs[i].opts.OffChain
}
if m.reqs[i].opts.Reserve != m.reqs[j].opts.Reserve { // non-reserve after off-chain
return m.reqs[i].opts.Reserve
}
// sort by amount asc (reducing latency for smaller requests)
return m.reqs[i].amt.LessThan(m.reqs[j].amt)
})
// If the requests were all cancelled while being added, cancel the context
// immediately
m.checkActive()
return m
}
// Called when a fundsReq is cancelled
func (m *mergedFundsReq) checkActive() {
// Check if there are any active fundsReqs
for _, r := range m.reqs {
if r.isActive() {
return
}
}
// If all fundsReqs have been cancelled, cancel the context
m.cancel()
}
// onComplete is called when the queue has executed the mergeFundsReq.
// Calls onComplete on each fundsReq in the mergeFundsReq.
func (m *mergedFundsReq) onComplete(res *paychFundsRes) {
for _, r := range m.reqs {
if r.isActive() {
r.onComplete(res)
}
}
}
// sum is the sum of the amounts in all requests in the merge
func (m *mergedFundsReq) sum() (types.BigInt, types.BigInt) {
sum := types.NewInt(0)
avail := types.NewInt(0)
for _, r := range m.reqs {
if r.isActive() {
sum = types.BigAdd(sum, r.amt)
if !r.opts.Reserve {
avail = types.BigAdd(avail, r.amt)
}
}
}
return sum, avail
}
// completeAmount completes first non-reserving requests up to the available amount
func (m *mergedFundsReq) completeAmount(avail types.BigInt, channelInfo *ChannelInfo) (*paychFundsRes, types.BigInt, types.BigInt) {
used, failed := types.NewInt(0), types.NewInt(0)
next := 0
// order: [offchain+reserve, !offchain+reserve, !offchain+!reserve]
for i, r := range m.reqs {
if !r.opts.Reserve {
// non-reserving request are put after reserving requests, so we are done here
break
}
// don't try to fill inactive requests
if !r.isActive() {
continue
}
if r.amt.GreaterThan(types.BigSub(avail, used)) {
// requests are sorted by amount ascending, so if we hit this, there aren't any more requests we can fill
if r.opts.OffChain {
// can't fill, so OffChain want an error
if r.isActive() {
failed = types.BigAdd(failed, r.amt)
r.onComplete(&paychFundsRes{
channel: *channelInfo.Channel,
err: xerrors.Errorf("not enough funds available in the payment channel %s; add funds with 'lotus paych add-funds %s %s %s'", channelInfo.Channel, channelInfo.from(), channelInfo.to(), types.FIL(r.amt).Unitless()),
})
}
next = i + 1
continue
}
break
}
used = types.BigAdd(used, r.amt)
r.onComplete(&paychFundsRes{channel: *channelInfo.Channel})
next = i + 1
}
m.reqs = m.reqs[next:]
if len(m.reqs) == 0 {
return &paychFundsRes{channel: *channelInfo.Channel}, used, failed
}
return nil, used, failed
}
func (m *mergedFundsReq) failOffChainNoChannel(from, to address.Address) (*paychFundsRes, types.BigInt) {
next := 0
freed := types.NewInt(0)
for i, r := range m.reqs {
if !r.opts.OffChain {
break
}
freed = types.BigAdd(freed, r.amt)
if !r.isActive() {
continue
}
r.onComplete(&paychFundsRes{err: xerrors.Errorf("payment channel doesn't exist, create with 'lotus paych add-funds %s %s %s'", from, to, types.FIL(r.amt).Unitless())})
next = i + 1
}
m.reqs = m.reqs[next:]
if len(m.reqs) == 0 {
return &paychFundsRes{err: xerrors.Errorf("payment channel doesn't exist, create with 'lotus paych add-funds %s %s 0'", from, to)}, freed
}
return nil, freed
}
// getPaych ensures that a channel exists between the from and to addresses,
// and reserves (or adds as available) the given amount of funds.
// If the channel does not exist a create channel message is sent and the
// message CID is returned.
// If the channel does exist an add funds message is sent and both the channel
// address and message CID are returned.
// If there is an in progress operation (create channel / add funds), getPaych
// blocks until the previous operation completes, then returns both the channel
// address and the CID of the new add funds message.
// If an operation returns an error, subsequent waiting operations will still
// be attempted.
func (ca *channelAccessor) getPaych(ctx context.Context, amt types.BigInt, opts GetOpts) (address.Address, cid.Cid, error) {
// Add the request to add funds to a queue and wait for the result
freq := newFundsReq(ctx, amt, opts)
ca.enqueue(ctx, freq)
select {
case res := <-freq.promise:
return res.channel, res.mcid, res.err
case <-ctx.Done():
freq.cancel()
return address.Undef, cid.Undef, ctx.Err()
}
}
// Queue up an add funds operation
func (ca *channelAccessor) enqueue(ctx context.Context, task *fundsReq) {
ca.lk.Lock()
defer ca.lk.Unlock()
ca.fundsReqQueue = append(ca.fundsReqQueue, task)
go ca.processQueue(ctx, "") // nolint: errcheck
}
// Run the operations in the queue
func (ca *channelAccessor) processQueue(ctx context.Context, channelID string) (*api.ChannelAvailableFunds, error) {
ca.lk.Lock()
defer ca.lk.Unlock()
// Remove cancelled requests
ca.filterQueue()
// If there's nothing in the queue, bail out
if len(ca.fundsReqQueue) == 0 {
return ca.currentAvailableFunds(ctx, channelID, types.NewInt(0))
}
// Merge all pending requests into one.
// For example if there are pending requests for 3, 2, 4 then
// amt = 3 + 2 + 4 = 9
merged := newMergedFundsReq(ca.fundsReqQueue)
amt, avail := merged.sum()
if amt.IsZero() {
// Note: The amount can be zero if requests are cancelled as we're
// building the mergedFundsReq
return ca.currentAvailableFunds(ctx, channelID, amt)
}
res := ca.processTask(merged, amt, avail)
// If the task is waiting on an external event (eg something to appear on
// chain) it will return nil
if res == nil {
// Stop processing the fundsReqQueue and wait. When the event occurs it will
// call processQueue() again
return ca.currentAvailableFunds(ctx, channelID, amt)
}
// Finished processing so clear the queue
ca.fundsReqQueue = nil
// Call the task callback with its results
merged.onComplete(res)
return ca.currentAvailableFunds(ctx, channelID, types.NewInt(0))
}
// filterQueue filters cancelled requests out of the queue
func (ca *channelAccessor) filterQueue() {
if len(ca.fundsReqQueue) == 0 {
return
}
// Remove cancelled requests
i := 0
for _, r := range ca.fundsReqQueue {
if r.isActive() {
ca.fundsReqQueue[i] = r
i++
}
}
// Allow GC of remaining slice elements
for rem := i; rem < len(ca.fundsReqQueue); rem++ {
ca.fundsReqQueue[i] = nil
}
// Resize slice
ca.fundsReqQueue = ca.fundsReqQueue[:i]
}
// queueSize is the size of the funds request queue (used by tests)
func (ca *channelAccessor) queueSize() int {
ca.lk.Lock()
defer ca.lk.Unlock()
return len(ca.fundsReqQueue)
}
// msgWaitComplete is called when the message for a previous task is confirmed
// or there is an error.
func (ca *channelAccessor) msgWaitComplete(ctx context.Context, mcid cid.Cid, err error) {
// if context is canceled, should Not mark message to 'bad', just return.
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
ca.lk.Lock()
defer ca.lk.Unlock()
// Save the message result to the store
dserr := ca.store.SaveMessageResult(ctx, mcid, err)
if dserr != nil {
log.Errorf("saving message result: %s", dserr)
}
// Inform listeners that the message has completed
ca.msgListeners.fireMsgComplete(mcid, err)
// The queue may have been waiting for msg completion to proceed, so
// process the next queue item
if len(ca.fundsReqQueue) > 0 {
go ca.processQueue(ctx, "") // nolint: errcheck
}
}
func (ca *channelAccessor) currentAvailableFunds(ctx context.Context, channelID string, queuedAmt types.BigInt) (*api.ChannelAvailableFunds, error) {
if len(channelID) == 0 {
return nil, nil
}
channelInfo, err := ca.store.ByChannelID(ctx, channelID)
if err != nil {
return nil, err
}
// The channel may have a pending create or add funds message
waitSentinel := channelInfo.CreateMsg
if waitSentinel == nil {
waitSentinel = channelInfo.AddFundsMsg
}
// Get the total amount redeemed by vouchers.
// This includes vouchers that have been submitted, and vouchers that are
// in the datastore but haven't yet been submitted.
totalRedeemed := types.NewInt(0)
if channelInfo.Channel != nil {
ch := *channelInfo.Channel
_, pchState, err := ca.sa.loadPaychActorState(ca.chctx, ch)
if err != nil {
return nil, err
}
laneStates, err := ca.laneState(ctx, pchState, ch)
if err != nil {
return nil, err
}
for _, ls := range laneStates {
r, err := ls.Redeemed()
if err != nil {
return nil, err
}
totalRedeemed = types.BigAdd(totalRedeemed, r)
}
}
return &api.ChannelAvailableFunds{
Channel: channelInfo.Channel,
From: channelInfo.from(),
To: channelInfo.to(),
ConfirmedAmt: channelInfo.Amount,
PendingAmt: channelInfo.PendingAmount,
NonReservedAmt: channelInfo.AvailableAmount,
PendingAvailableAmt: channelInfo.PendingAvailableAmount,
PendingWaitSentinel: waitSentinel,
QueuedAmt: queuedAmt,
VoucherReedeemedAmt: totalRedeemed,
}, nil
}
// processTask checks the state of the channel and takes appropriate action
// (see description of getPaych).
// Note that processTask may be called repeatedly in the same state, and should
// return nil if there is no state change to be made (eg when waiting for a
// message to be confirmed on chain)
func (ca *channelAccessor) processTask(merged *mergedFundsReq, amt, avail types.BigInt) *paychFundsRes {
ctx := merged.ctx
// Get the payment channel for the from/to addresses.
// Note: It's ok if we get ErrChannelNotTracked. It just means we need to
// create a channel.
channelInfo, err := ca.store.OutboundActiveByFromTo(ctx, ca.api, ca.from, ca.to)
if err != nil && err != ErrChannelNotTracked {
return &paychFundsRes{err: err}
}
// If a channel has not yet been created, create one.
if channelInfo == nil {
res, freed := merged.failOffChainNoChannel(ca.from, ca.to)
if res != nil {
return res
}
amt = types.BigSub(amt, freed)
mcid, err := ca.createPaych(ctx, amt, avail)
if err != nil {
return &paychFundsRes{err: err}
}
return &paychFundsRes{mcid: mcid}
}
// If the create channel message has been sent but the channel hasn't
// been created on chain yet
if channelInfo.CreateMsg != nil {
// Wait for the channel to be created before trying again
return nil
}
// If an add funds message was sent to the chain but hasn't been confirmed
// on chain yet
if channelInfo.AddFundsMsg != nil {
// Wait for the add funds message to be confirmed before trying again
return nil
}
// Try to fill requests using available funds, without going to the chain
res, amt := ca.completeAvailable(ctx, merged, channelInfo, amt, avail)
if res != nil || amt.LessThanEqual(types.NewInt(0)) {
return res
}
// We need to add more funds, so send an add funds message to
// cover the amount for this request
mcid, err := ca.addFunds(ctx, channelInfo, amt, avail)
if err != nil {
return &paychFundsRes{err: err}
}
return &paychFundsRes{channel: *channelInfo.Channel, mcid: *mcid}
}
// createPaych sends a message to create the channel and returns the message cid
func (ca *channelAccessor) createPaych(ctx context.Context, amt, avail types.BigInt) (cid.Cid, error) {
mb, err := ca.messageBuilder(ctx, ca.from)
if err != nil {
return cid.Undef, err
}
msg, err := mb.Create(ca.to, amt)
if err != nil {
return cid.Undef, err
}
smsg, err := ca.api.MpoolPushMessage(ctx, msg, nil)
if err != nil {
return cid.Undef, xerrors.Errorf("initializing paych actor: %w", err)
}
mcid := smsg.Cid()
// Create a new channel in the store
ci, err := ca.store.CreateChannel(ctx, ca.from, ca.to, mcid, amt, avail)
if err != nil {
log.Errorf("creating channel: %s", err)
return cid.Undef, err
}
// Wait for the channel to be created on chain
go ca.waitForPaychCreateMsg(ctx, ci.ChannelID, mcid)
return mcid, nil
}
// waitForPaychCreateMsg waits for mcid to appear on chain and stores the robust address of the
// created payment channel
func (ca *channelAccessor) waitForPaychCreateMsg(ctx context.Context, channelID string, mcid cid.Cid) {
err := ca.waitPaychCreateMsg(ctx, channelID, mcid)
ca.msgWaitComplete(ctx, mcid, err)
}
func (ca *channelAccessor) waitPaychCreateMsg(ctx context.Context, channelID string, mcid cid.Cid) error {
mwait, err := ca.api.StateWaitMsg(ca.chctx, mcid, buildconstants.MessageConfidence, api.LookbackNoLimit, true)
if err != nil {
log.Errorf("wait msg: %v", err)
return err
}
// If channel creation failed
if mwait.Receipt.ExitCode.IsError() {
ca.lk.Lock()
defer ca.lk.Unlock()
// Channel creation failed, so remove the channel from the datastore
dserr := ca.store.RemoveChannel(ctx, channelID)
if dserr != nil {
log.Errorf("failed to remove channel %s: %s", channelID, dserr)
}
err := xerrors.Errorf("payment channel creation failed (exit code %d)", mwait.Receipt.ExitCode)
log.Error(err)
return err
}
// TODO: ActorUpgrade abstract over this.
// This "works" because it hasn't changed from v0 to v2, but we still
// need an abstraction here.
var decodedReturn init2.ExecReturn
err = decodedReturn.UnmarshalCBOR(bytes.NewReader(mwait.Receipt.Return))
if err != nil {
log.Error(err)
return err
}
ca.lk.Lock()
defer ca.lk.Unlock()
// Store robust address of channel
ca.mutateChannelInfo(ctx, channelID, func(channelInfo *ChannelInfo) {
channelInfo.Channel = &decodedReturn.RobustAddress
channelInfo.Amount = channelInfo.PendingAmount
channelInfo.AvailableAmount = channelInfo.PendingAvailableAmount
channelInfo.PendingAmount = big.NewInt(0)
channelInfo.PendingAvailableAmount = big.NewInt(0)
channelInfo.CreateMsg = nil
})
return nil
}
// completeAvailable fills reserving fund requests using already available funds, without interacting with the chain
func (ca *channelAccessor) completeAvailable(ctx context.Context, merged *mergedFundsReq, channelInfo *ChannelInfo, amt, av types.BigInt) (*paychFundsRes, types.BigInt) {
toReserve := types.BigSub(amt, av)
avail := types.NewInt(0)
// reserve at most what we need
ca.mutateChannelInfo(ctx, channelInfo.ChannelID, func(ci *ChannelInfo) {
avail = ci.AvailableAmount
if avail.GreaterThan(toReserve) {
avail = toReserve
}
ci.AvailableAmount = big.Sub(ci.AvailableAmount, avail)
})
res, used, failed := merged.completeAmount(avail, channelInfo)
// return any unused reserved funds (e.g. from cancelled requests)
ca.mutateChannelInfo(ctx, channelInfo.ChannelID, func(ci *ChannelInfo) {
ci.AvailableAmount = types.BigAdd(ci.AvailableAmount, types.BigSub(avail, used))
})
return res, types.BigSub(amt, types.BigAdd(used, failed))
}
// addFunds sends a message to add funds to the channel and returns the message cid
func (ca *channelAccessor) addFunds(ctx context.Context, channelInfo *ChannelInfo, amt, avail types.BigInt) (*cid.Cid, error) {
msg := &types.Message{
To: *channelInfo.Channel,
From: channelInfo.Control,
Value: amt,
Method: 0,
}
smsg, err := ca.api.MpoolPushMessage(ctx, msg, nil)
if err != nil {
return nil, err
}
mcid := smsg.Cid()
// Store the add funds message CID on the channel
ca.mutateChannelInfo(ctx, channelInfo.ChannelID, func(ci *ChannelInfo) {
ci.PendingAmount = amt
ci.PendingAvailableAmount = avail
ci.AddFundsMsg = &mcid
})
// Store a reference from the message CID to the channel, so that we can
// look up the channel from the message CID
err = ca.store.SaveNewMessage(ctx, channelInfo.ChannelID, mcid)
if err != nil {
log.Errorf("saving add funds message CID %s: %s", mcid, err)
}
go ca.waitForAddFundsMsg(ctx, channelInfo.ChannelID, mcid)
return &mcid, nil
}
// TODO func (ca *channelAccessor) freeFunds(ctx context.Context, channelInfo *ChannelInfo, amt, avail types.BigInt) (*cid.Cid, error) {
// waitForAddFundsMsg waits for mcid to appear on chain and returns error, if any
func (ca *channelAccessor) waitForAddFundsMsg(ctx context.Context, channelID string, mcid cid.Cid) {
err := ca.waitAddFundsMsg(ctx, channelID, mcid)
ca.msgWaitComplete(ctx, mcid, err)
}
func (ca *channelAccessor) waitAddFundsMsg(ctx context.Context, channelID string, mcid cid.Cid) error {
mwait, err := ca.api.StateWaitMsg(ca.chctx, mcid, buildconstants.MessageConfidence, api.LookbackNoLimit, true)
if err != nil {
log.Error(err)
return err
}
if mwait.Receipt.ExitCode.IsError() {
err := xerrors.Errorf("voucher channel creation failed: adding funds (exit code %d)", mwait.Receipt.ExitCode)
log.Error(err)
ca.lk.Lock()
defer ca.lk.Unlock()
ca.mutateChannelInfo(ctx, channelID, func(channelInfo *ChannelInfo) {
channelInfo.PendingAmount = big.NewInt(0)
channelInfo.PendingAvailableAmount = big.NewInt(0)
channelInfo.AddFundsMsg = nil
})
return err
}
ca.lk.Lock()
defer ca.lk.Unlock()
// Store updated amount
ca.mutateChannelInfo(ctx, channelID, func(channelInfo *ChannelInfo) {
channelInfo.Amount = types.BigAdd(channelInfo.Amount, channelInfo.PendingAmount)
channelInfo.AvailableAmount = types.BigAdd(channelInfo.AvailableAmount, channelInfo.PendingAvailableAmount)
channelInfo.PendingAmount = big.NewInt(0)
channelInfo.PendingAvailableAmount = big.NewInt(0)
channelInfo.AddFundsMsg = nil
})
return nil
}
// Change the state of the channel in the store
func (ca *channelAccessor) mutateChannelInfo(ctx context.Context, channelID string, mutate func(*ChannelInfo)) {
channelInfo, err := ca.store.ByChannelID(ctx, channelID)
// If there's an error reading or writing to the store just log an error.
// For now we're assuming it's unlikely to happen in practice.
// Later we may want to implement a transactional approach, whereby
// we record to the store that we're going to send a message, send
// the message, and then record that the message was sent.
if err != nil {
log.Errorf("Error reading channel info from store: %s", err)
return
}
mutate(channelInfo)
err = ca.store.putChannelInfo(ctx, channelInfo)
if err != nil {
log.Errorf("Error writing channel info to store: %s", err)
}
}
// restartPending checks the datastore to see if there are any channels that
// have outstanding create / add funds messages, and if so, waits on the
// messages.
// Outstanding messages can occur if a create / add funds message was sent and
// then the system was shut down or crashed before the result was received.
func (pm *Manager) restartPending(ctx context.Context) error {
cis, err := pm.store.WithPendingAddFunds(ctx)
if err != nil {
return err
}
group := errgroup.Group{}
for _, chanInfo := range cis {
ci := chanInfo
if ci.CreateMsg != nil {
group.Go(func() error {
ca, err := pm.accessorByFromTo(ci.Control, ci.Target)
if err != nil {
return xerrors.Errorf("error initializing payment channel manager %s -> %s: %s", ci.Control, ci.Target, err)
}
go ca.waitForPaychCreateMsg(ctx, ci.ChannelID, *ci.CreateMsg)
return nil
})
} else if ci.AddFundsMsg != nil {
group.Go(func() error {
ca, err := pm.accessorByAddress(ctx, *ci.Channel)
if err != nil {
return xerrors.Errorf("error initializing payment channel manager %s: %s", ci.Channel, err)
}
go ca.waitForAddFundsMsg(ctx, ci.ChannelID, *ci.AddFundsMsg)
return nil
})
}
}
return group.Wait()
}
// getPaychWaitReady waits for a the response to the message with the given cid
func (ca *channelAccessor) getPaychWaitReady(ctx context.Context, mcid cid.Cid) (address.Address, error) {
ca.lk.Lock()
// First check if the message has completed
msgInfo, err := ca.store.GetMessage(ctx, mcid)
if err != nil {
ca.lk.Unlock()
return address.Undef, err
}
// If the create channel / add funds message failed, return an error
if len(msgInfo.Err) > 0 {
ca.lk.Unlock()
return address.Undef, xerrors.New(msgInfo.Err)
}
// If the message has completed successfully
if msgInfo.Received {
ca.lk.Unlock()
// Get the channel address
ci, err := ca.store.ByMessageCid(ctx, mcid)
if err != nil {
return address.Undef, err
}
if ci.Channel == nil {
panic(fmt.Sprintf("create / add funds message %s succeeded but channelInfo.Channel is nil", mcid))
}
return *ci.Channel, nil
}
// The message hasn't completed yet so wait for it to complete
promise := ca.msgPromise(ctx, mcid)
// Unlock while waiting
ca.lk.Unlock()
select {
case res := <-promise:
return res.channel, res.err
case <-ctx.Done():
return address.Undef, ctx.Err()
}
}
type onMsgRes struct {
channel address.Address
err error
}
// msgPromise returns a channel that receives the result of the message with
// the given CID
func (ca *channelAccessor) msgPromise(ctx context.Context, mcid cid.Cid) chan onMsgRes {
promise := make(chan onMsgRes)
triggerUnsub := make(chan struct{})
unsub := ca.msgListeners.onMsgComplete(mcid, func(err error) {
close(triggerUnsub)
// Use a go-routine so as not to block the event handler loop
go func() {
res := onMsgRes{err: err}
if res.err == nil {
// Get the channel associated with the message cid
ci, err := ca.store.ByMessageCid(ctx, mcid)
if err != nil {
res.err = err
} else {
res.channel = *ci.Channel
}
}
// Pass the result to the caller
select {
case promise <- res:
case <-ctx.Done():
}
}()
})
// Unsubscribe when the message is received or the context is done
go func() {
select {
case <-ctx.Done():
case <-triggerUnsub:
}
unsub()
}()
return promise
}
func (ca *channelAccessor) availableFunds(ctx context.Context, channelID string) (*api.ChannelAvailableFunds, error) {
return ca.processQueue(ctx, channelID)
}