mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-01 19:24:14 +08:00
refactor: change Tasks to Outbox
notice that moving the blockstore fetch into the manager removes the weird error handling case. License: MIT Signed-off-by: Brian Tiger Chow <brian@perfmode.com>
This commit is contained in:

committed by
Juan Batiz-Benet

parent
a495a014af
commit
2ea8ed81ac
@ -239,18 +239,8 @@ func (bs *bitswap) taskWorker(ctx context.Context) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case task := <-bs.ledgermanager.GetTaskChan():
|
||||
block, err := bs.blockstore.Get(task.Key)
|
||||
if err != nil {
|
||||
log.Errorf("Expected to have block %s, but it was not found!", task.Key)
|
||||
continue
|
||||
}
|
||||
|
||||
message := bsmsg.New()
|
||||
message.AddBlock(block)
|
||||
// TODO: maybe add keys from our wantlist?
|
||||
|
||||
bs.send(ctx, task.Target, message)
|
||||
case envelope := <-bs.ledgermanager.Outbox():
|
||||
bs.send(ctx, envelope.Peer, envelope.Message)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -20,12 +20,17 @@ type ledgerMap map[peerKey]*ledger
|
||||
// FIXME share this externally
|
||||
type peerKey u.Key
|
||||
|
||||
type Envelope struct {
|
||||
Peer peer.Peer
|
||||
Message bsmsg.BitSwapMessage
|
||||
}
|
||||
|
||||
type LedgerManager struct {
|
||||
lock sync.RWMutex
|
||||
ledgerMap ledgerMap
|
||||
bs bstore.Blockstore
|
||||
tasklist *TaskList
|
||||
taskOut chan *Task
|
||||
outbox chan Envelope
|
||||
workSignal chan struct{}
|
||||
}
|
||||
|
||||
@ -34,7 +39,7 @@ func NewLedgerManager(bs bstore.Blockstore, ctx context.Context) *LedgerManager
|
||||
ledgerMap: make(ledgerMap),
|
||||
bs: bs,
|
||||
tasklist: NewTaskList(),
|
||||
taskOut: make(chan *Task, 4),
|
||||
outbox: make(chan Envelope, 4), // TODO extract constant
|
||||
workSignal: make(chan struct{}),
|
||||
}
|
||||
go lm.taskWorker(ctx)
|
||||
@ -54,17 +59,25 @@ func (lm *LedgerManager) taskWorker(ctx context.Context) {
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
block, err := lm.bs.Get(nextTask.Key)
|
||||
if err != nil {
|
||||
continue // TODO maybe return an error
|
||||
}
|
||||
// construct message here so we can make decisions about any additional
|
||||
// information we may want to include at this time.
|
||||
m := bsmsg.New()
|
||||
m.AddBlock(block)
|
||||
// TODO: maybe add keys from our wantlist?
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case lm.taskOut <- nextTask:
|
||||
case lm.outbox <- Envelope{Peer: nextTask.Target, Message: m}:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (lm *LedgerManager) GetTaskChan() <-chan *Task {
|
||||
return lm.taskOut
|
||||
func (lm *LedgerManager) Outbox() <-chan Envelope {
|
||||
return lm.outbox
|
||||
}
|
||||
|
||||
// Returns a slice of Peers with whom the local node has active sessions
|
||||
|
Reference in New Issue
Block a user