diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index e95ffbc4f..4458db946 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -239,18 +239,8 @@ func (bs *bitswap) taskWorker(ctx context.Context) { select { case <-ctx.Done(): return - case task := <-bs.ledgermanager.GetTaskChan(): - block, err := bs.blockstore.Get(task.Key) - if err != nil { - log.Errorf("Expected to have block %s, but it was not found!", task.Key) - continue - } - - message := bsmsg.New() - message.AddBlock(block) - // TODO: maybe add keys from our wantlist? - - bs.send(ctx, task.Target, message) + case envelope := <-bs.ledgermanager.Outbox(): + bs.send(ctx, envelope.Peer, envelope.Message) } } } diff --git a/exchange/bitswap/strategy/ledgermanager.go b/exchange/bitswap/strategy/ledgermanager.go index df10072eb..3c79c855c 100644 --- a/exchange/bitswap/strategy/ledgermanager.go +++ b/exchange/bitswap/strategy/ledgermanager.go @@ -20,12 +20,17 @@ type ledgerMap map[peerKey]*ledger // FIXME share this externally type peerKey u.Key +type Envelope struct { + Peer peer.Peer + Message bsmsg.BitSwapMessage +} + type LedgerManager struct { lock sync.RWMutex ledgerMap ledgerMap bs bstore.Blockstore tasklist *TaskList - taskOut chan *Task + outbox chan Envelope workSignal chan struct{} } @@ -34,7 +39,7 @@ func NewLedgerManager(bs bstore.Blockstore, ctx context.Context) *LedgerManager ledgerMap: make(ledgerMap), bs: bs, tasklist: NewTaskList(), - taskOut: make(chan *Task, 4), + outbox: make(chan Envelope, 4), // TODO extract constant workSignal: make(chan struct{}), } go lm.taskWorker(ctx) @@ -54,17 +59,25 @@ func (lm *LedgerManager) taskWorker(ctx context.Context) { } continue } - + block, err := lm.bs.Get(nextTask.Key) + if err != nil { + continue // TODO maybe return an error + } + // construct message here so we can make decisions about any additional + // information we may want to include at this time. + m := bsmsg.New() + m.AddBlock(block) + // TODO: maybe add keys from our wantlist? select { case <-ctx.Done(): return - case lm.taskOut <- nextTask: + case lm.outbox <- Envelope{Peer: nextTask.Target, Message: m}: } } } -func (lm *LedgerManager) GetTaskChan() <-chan *Task { - return lm.taskOut +func (lm *LedgerManager) Outbox() <-chan Envelope { + return lm.outbox } // Returns a slice of Peers with whom the local node has active sessions