From a159e6825c2c8c82b734e290e8fc2116d3afd429 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 8 May 2015 23:55:35 -0700 Subject: [PATCH 01/15] implement peermanager to control outgoing messages Also more refactoring of bitswap in general, including some perf improvements and eventlog removal. clean up, and buffer channels move some things around correctly buffer work messages more cleanup, and improve test perf remove unneccessary test revert changes to bitswap message, they werent necessary --- exchange/bitswap/bitswap.go | 88 ++------ exchange/bitswap/bitswap_test.go | 35 +-- exchange/bitswap/decision/engine.go | 22 +- exchange/bitswap/decision/engine_test.go | 2 +- .../bitswap/decision/peer_request_queue.go | 2 +- exchange/bitswap/message/message.go | 11 +- exchange/bitswap/network/interface.go | 2 + exchange/bitswap/network/ipfs_impl.go | 4 + exchange/bitswap/peermanager.go | 203 ++++++++++++++++++ exchange/bitswap/testnet/virtual.go | 9 + exchange/bitswap/testutils.go | 11 +- exchange/bitswap/workers.go | 6 +- 12 files changed, 275 insertions(+), 120 deletions(-) create mode 100644 exchange/bitswap/peermanager.go diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 757c9067e..b8dcdab1e 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -91,7 +91,9 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, process: px, newBlocks: make(chan *blocks.Block, HasBlockBufferSize), provideKeys: make(chan u.Key), + pm: NewPeerManager(network), } + go bs.pm.Run(ctx) network.SetDelegate(bs) // Start up bitswaps async worker routines @@ -108,6 +110,10 @@ type Bitswap struct { // network delivers messages on behalf of the session network bsnet.BitSwapNetwork + // the peermanager manages sending messages to peers in a way that + // wont block bitswap operation + pm *PeerManager + // blockstore is the local database // NB: ensure threadsafety blockstore blockstore.Blockstore @@ -217,7 +223,6 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks. // HasBlock announces the existance of a block to this bitswap service. The // service will potentially notify its peers. func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { - log.Event(ctx, "hasBlock", blk) select { case <-bs.process.Closing(): return errors.New("bitswap is closed") @@ -227,6 +232,7 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { if err := bs.blockstore.Put(blk); err != nil { return err } + bs.wantlist.Remove(blk.Key()) bs.notifications.Publish(blk) select { @@ -239,7 +245,6 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { func (bs *Bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error { set := pset.New() - wg := sync.WaitGroup{} loop: for { @@ -253,37 +258,22 @@ loop: continue } - wg.Add(1) - go func(p peer.ID) { - defer wg.Done() - if err := bs.send(ctx, p, m); err != nil { - log.Debug(err) // TODO remove if too verbose - } - }(peerToQuery) + bs.pm.Send(peerToQuery, m) case <-ctx.Done(): return nil } } - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) - }() - - select { - case <-done: - case <-ctx.Done(): - // NB: we may be abandoning goroutines here before they complete - // this shouldnt be an issue because they will complete soon anyways - // we just don't want their being slow to impact bitswap transfer speeds - } return nil } func (bs *Bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID) error { + entries := bs.wantlist.Entries() + if len(entries) == 0 { + return nil + } message := bsmsg.New() message.SetFull(true) - for _, wanted := range bs.wantlist.Entries() { + for _, wanted := range entries { message.AddEntry(wanted.Key, wanted.Priority) } return bs.sendWantlistMsgToPeers(ctx, message, peers) @@ -326,7 +316,7 @@ func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli // TODO(brian): handle errors func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) error { - defer log.EventBegin(ctx, "receiveMessage", p, incoming).Done() + //defer log.EventBegin(ctx, "receiveMessage", p, incoming).Done() // This call records changes to wantlists, blocks received, // and number of bytes transfered. @@ -356,6 +346,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg // Connected/Disconnected warns bitswap about peer connections func (bs *Bitswap) PeerConnected(p peer.ID) { // TODO: add to clientWorker?? + bs.pm.Connected(p) peers := make(chan peer.ID, 1) peers <- p close(peers) @@ -367,6 +358,7 @@ func (bs *Bitswap) PeerConnected(p peer.ID) { // Connected/Disconnected warns bitswap about peer connections func (bs *Bitswap) PeerDisconnected(p peer.ID) { + bs.pm.Disconnected(p) bs.engine.PeerDisconnected(p) } @@ -381,19 +373,7 @@ func (bs *Bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) { message.Cancel(k) } - wg := sync.WaitGroup{} - for _, p := range bs.engine.Peers() { - wg.Add(1) - go func(p peer.ID) { - defer wg.Done() - err := bs.send(ctx, p, message) - if err != nil { - log.Warningf("Error sending message: %s", err) - return - } - }(p) - } - wg.Wait() + bs.pm.Broadcast(message) return } @@ -408,29 +388,7 @@ func (bs *Bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) { message.AddEntry(k, kMaxPriority-i) } - wg := sync.WaitGroup{} - for _, p := range bs.engine.Peers() { - wg.Add(1) - go func(p peer.ID) { - defer wg.Done() - err := bs.send(ctx, p, message) - if err != nil { - log.Debugf("Error sending message: %s", err) - } - }(p) - } - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) - }() - select { - case <-done: - case <-ctx.Done(): - // NB: we may be abandoning goroutines here before they complete - // this shouldnt be an issue because they will complete soon anyways - // we just don't want their being slow to impact bitswap transfer speeds - } + bs.pm.Broadcast(message) } func (bs *Bitswap) ReceiveError(err error) { @@ -439,16 +397,6 @@ func (bs *Bitswap) ReceiveError(err error) { // TODO bubble the network error up to the parent context/error logger } -// send strives to ensure that accounting is always performed when a message is -// sent -func (bs *Bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) error { - defer log.EventBegin(ctx, "sendMessage", p, m).Done() - if err := bs.network.SendMessage(ctx, p, m); err != nil { - return err - } - return bs.engine.MessageSent(p, m) -} - func (bs *Bitswap) Close() error { return bs.process.Close() } diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index 354eb73e5..c04946692 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -13,7 +13,6 @@ import ( blocks "github.com/ipfs/go-ipfs/blocks" blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil" tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet" - p2ptestutil "github.com/ipfs/go-ipfs/p2p/test/util" mockrouting "github.com/ipfs/go-ipfs/routing/mock" delay "github.com/ipfs/go-ipfs/thirdparty/delay" u "github.com/ipfs/go-ipfs/util" @@ -36,30 +35,6 @@ func TestClose(t *testing.T) { bitswap.Exchange.GetBlock(context.Background(), block.Key()) } -func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this - - rs := mockrouting.NewServer() - net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay)) - g := NewTestSessionGenerator(net) - defer g.Close() - - block := blocks.NewBlock([]byte("block")) - pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t) - rs.Client(pinfo).Provide(context.Background(), block.Key()) // but not on network - - solo := g.Next() - defer solo.Exchange.Close() - - ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond) - _, err := solo.Exchange.GetBlock(ctx, block.Key()) - - if err != context.DeadlineExceeded { - t.Fatal("Expected DeadlineExceeded error") - } -} - -// TestGetBlockAfterRequesting... - func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) @@ -67,14 +42,15 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { g := NewTestSessionGenerator(net) defer g.Close() - hasBlock := g.Next() + peers := g.Instances(2) + hasBlock := peers[0] defer hasBlock.Exchange.Close() if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil { t.Fatal(err) } - wantsBlock := g.Next() + wantsBlock := peers[1] defer wantsBlock.Exchange.Close() ctx, _ := context.WithTimeout(context.Background(), time.Second) @@ -196,8 +172,9 @@ func TestSendToWantingPeer(t *testing.T) { prev := rebroadcastDelay.Set(time.Second / 2) defer func() { rebroadcastDelay.Set(prev) }() - peerA := sg.Next() - peerB := sg.Next() + peers := sg.Instances(2) + peerA := peers[0] + peerB := peers[1] t.Logf("Session %v\n", peerA.Peer) t.Logf("Session %v\n", peerB.Peer) diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index 60b95e469..0b08a55fb 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -5,6 +5,7 @@ import ( "sync" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + blocks "github.com/ipfs/go-ipfs/blocks" bstore "github.com/ipfs/go-ipfs/blocks/blockstore" bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" wl "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist" @@ -53,8 +54,9 @@ const ( type Envelope struct { // Peer is the intended recipient Peer peer.ID - // Message is the payload - Message bsmsg.BitSwapMessage + + // Block is the payload + Block *blocks.Block // A callback to notify the decision queue that the task is complete Sent func() @@ -151,12 +153,10 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) { continue } - m := bsmsg.New() // TODO: maybe add keys from our wantlist? - m.AddBlock(block) return &Envelope{ - Peer: nextTask.Target, - Message: m, - Sent: nextTask.Done, + Peer: nextTask.Target, + Block: block, + Sent: nextTask.Done, }, nil } } @@ -185,7 +185,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { defer e.lock.Unlock() if len(m.Wantlist()) == 0 && len(m.Blocks()) == 0 { - log.Debug("received empty message from", p) + log.Debugf("received empty message from %s", p) } newWorkExists := false @@ -202,11 +202,11 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { for _, entry := range m.Wantlist() { if entry.Cancel { - log.Debug("cancel", entry.Key) + log.Debugf("cancel %s", entry.Key) l.CancelWant(entry.Key) e.peerRequestQueue.Remove(entry.Key, p) } else { - log.Debug("wants", entry.Key, entry.Priority) + log.Debugf("wants %s", entry.Key, entry.Priority) l.Wants(entry.Key, entry.Priority) if exists, err := e.bs.Has(entry.Key); err == nil && exists { e.peerRequestQueue.Push(entry.Entry, p) @@ -216,7 +216,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { } for _, block := range m.Blocks() { - log.Debug("got block %s %d bytes", block.Key(), len(block.Data)) + log.Debugf("got block %s %d bytes", block.Key(), len(block.Data)) l.ReceivedBytes(len(block.Data)) for _, l := range e.ledgerMap { if entry, ok := l.WantListContains(block.Key()); ok { diff --git a/exchange/bitswap/decision/engine_test.go b/exchange/bitswap/decision/engine_test.go index afe6ba9ad..31e46c776 100644 --- a/exchange/bitswap/decision/engine_test.go +++ b/exchange/bitswap/decision/engine_test.go @@ -185,7 +185,7 @@ func checkHandledInOrder(t *testing.T, e *Engine, keys []string) error { for _, k := range keys { next := <-e.Outbox() envelope := <-next - received := envelope.Message.Blocks()[0] + received := envelope.Block expected := blocks.NewBlock([]byte(k)) if received.Key() != expected.Key() { return errors.New(fmt.Sprintln("received", string(received.Data), "expected", string(expected.Data))) diff --git a/exchange/bitswap/decision/peer_request_queue.go b/exchange/bitswap/decision/peer_request_queue.go index 42928487d..15f52da74 100644 --- a/exchange/bitswap/decision/peer_request_queue.go +++ b/exchange/bitswap/decision/peer_request_queue.go @@ -156,7 +156,7 @@ func (t *peerRequestTask) SetIndex(i int) { // taskKey returns a key that uniquely identifies a task. func taskKey(p peer.ID, k u.Key) string { - return string(p.String() + k.String()) + return string(p) + string(k) } // FIFO is a basic task comparator that returns tasks in the order created. diff --git a/exchange/bitswap/message/message.go b/exchange/bitswap/message/message.go index 3a7d70aae..4e88e738c 100644 --- a/exchange/bitswap/message/message.go +++ b/exchange/bitswap/message/message.go @@ -29,6 +29,8 @@ type BitSwapMessage interface { Cancel(key u.Key) + Empty() bool + // Sets whether or not the contained wantlist represents the entire wantlist // true = full wantlist // false = wantlist 'patch' @@ -51,7 +53,7 @@ type Exportable interface { type impl struct { full bool wantlist map[u.Key]Entry - blocks map[u.Key]*blocks.Block // map to detect duplicates + blocks map[u.Key]*blocks.Block } func New() BitSwapMessage { @@ -92,6 +94,10 @@ func (m *impl) Full() bool { return m.full } +func (m *impl) Empty() bool { + return len(m.blocks) == 0 && len(m.wantlist) == 0 +} + func (m *impl) Wantlist() []Entry { var out []Entry for _, e := range m.wantlist { @@ -101,7 +107,7 @@ func (m *impl) Wantlist() []Entry { } func (m *impl) Blocks() []*blocks.Block { - bs := make([]*blocks.Block, 0) + bs := make([]*blocks.Block, 0, len(m.blocks)) for _, block := range m.blocks { bs = append(bs, block) } @@ -109,6 +115,7 @@ func (m *impl) Blocks() []*blocks.Block { } func (m *impl) Cancel(k u.Key) { + delete(m.wantlist, k) m.addEntry(k, 0, true) } diff --git a/exchange/bitswap/network/interface.go b/exchange/bitswap/network/interface.go index a6ed070c0..849a1c28e 100644 --- a/exchange/bitswap/network/interface.go +++ b/exchange/bitswap/network/interface.go @@ -23,6 +23,8 @@ type BitSwapNetwork interface { // network. SetDelegate(Receiver) + ConnectTo(context.Context, peer.ID) error + Routing } diff --git a/exchange/bitswap/network/ipfs_impl.go b/exchange/bitswap/network/ipfs_impl.go index 97745e32d..4e5a1317f 100644 --- a/exchange/bitswap/network/ipfs_impl.go +++ b/exchange/bitswap/network/ipfs_impl.go @@ -97,6 +97,10 @@ func (bsnet *impl) SetDelegate(r Receiver) { bsnet.receiver = r } +func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error { + return bsnet.host.Connect(ctx, peer.PeerInfo{ID: p}) +} + // FindProvidersAsync returns a channel of providers for the given key func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.ID { diff --git a/exchange/bitswap/peermanager.go b/exchange/bitswap/peermanager.go new file mode 100644 index 000000000..ff3d9ab31 --- /dev/null +++ b/exchange/bitswap/peermanager.go @@ -0,0 +1,203 @@ +package bitswap + +import ( + "sync" + + context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision" + bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" + bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network" + peer "github.com/ipfs/go-ipfs/p2p/peer" + u "github.com/ipfs/go-ipfs/util" +) + +type PeerManager struct { + receiver bsnet.Receiver + + incoming chan *msgPair + connect chan peer.ID + disconnect chan peer.ID + + peers map[peer.ID]*msgQueue + + network bsnet.BitSwapNetwork +} + +func NewPeerManager(network bsnet.BitSwapNetwork) *PeerManager { + return &PeerManager{ + incoming: make(chan *msgPair, 10), + connect: make(chan peer.ID, 10), + disconnect: make(chan peer.ID, 10), + peers: make(map[peer.ID]*msgQueue), + network: network, + } +} + +type msgPair struct { + to peer.ID + msg bsmsg.BitSwapMessage +} + +type cancellation struct { + who peer.ID + blk u.Key +} + +type msgQueue struct { + p peer.ID + + lk sync.Mutex + wlmsg bsmsg.BitSwapMessage + + work chan struct{} + done chan struct{} +} + +func (pm *PeerManager) SendBlock(env *engine.Envelope) { + // Blocks need to be sent synchronously to maintain proper backpressure + // throughout the network stack + defer env.Sent() + + msg := bsmsg.New() + msg.AddBlock(env.Block) + err := pm.network.SendMessage(context.TODO(), env.Peer, msg) + if err != nil { + log.Error(err) + } +} + +func (pm *PeerManager) startPeerHandler(p peer.ID) { + _, ok := pm.peers[p] + if ok { + // TODO: log an error? + return + } + + mq := new(msgQueue) + mq.done = make(chan struct{}) + mq.work = make(chan struct{}, 1) + mq.p = p + + pm.peers[p] = mq + go pm.runQueue(mq) +} + +func (pm *PeerManager) stopPeerHandler(p peer.ID) { + pq, ok := pm.peers[p] + if !ok { + // TODO: log error? + return + } + + close(pq.done) + delete(pm.peers, p) +} + +func (pm *PeerManager) runQueue(mq *msgQueue) { + for { + select { + case <-mq.work: // there is work to be done + + // TODO: this might not need to be done every time, figure out + // a good heuristic + err := pm.network.ConnectTo(context.TODO(), mq.p) + if err != nil { + log.Error(err) + // TODO: cant connect, what now? + } + + // grab messages from queue + mq.lk.Lock() + wlm := mq.wlmsg + mq.wlmsg = nil + mq.lk.Unlock() + + if wlm != nil && !wlm.Empty() { + // send wantlist updates + err = pm.network.SendMessage(context.TODO(), mq.p, wlm) + if err != nil { + log.Error("bitswap send error: ", err) + // TODO: what do we do if this fails? + } + } + case <-mq.done: + return + } + } +} + +func (pm *PeerManager) Send(to peer.ID, msg bsmsg.BitSwapMessage) { + if len(msg.Blocks()) > 0 { + panic("no blocks here!") + } + pm.incoming <- &msgPair{to: to, msg: msg} +} + +func (pm *PeerManager) Broadcast(msg bsmsg.BitSwapMessage) { + pm.incoming <- &msgPair{msg: msg} +} + +func (pm *PeerManager) Connected(p peer.ID) { + pm.connect <- p +} + +func (pm *PeerManager) Disconnected(p peer.ID) { + pm.disconnect <- p +} + +// TODO: use goprocess here once i trust it +func (pm *PeerManager) Run(ctx context.Context) { + for { + select { + case msgp := <-pm.incoming: + + // Broadcast message to all if recipient not set + if msgp.to == "" { + for _, p := range pm.peers { + p.addMessage(msgp.msg) + } + continue + } + + p, ok := pm.peers[msgp.to] + if !ok { + //TODO: decide, drop message? or dial? + pm.startPeerHandler(msgp.to) + p = pm.peers[msgp.to] + } + + p.addMessage(msgp.msg) + case p := <-pm.connect: + pm.startPeerHandler(p) + case p := <-pm.disconnect: + pm.stopPeerHandler(p) + case <-ctx.Done(): + return + } + } +} + +func (mq *msgQueue) addMessage(msg bsmsg.BitSwapMessage) { + mq.lk.Lock() + defer func() { + mq.lk.Unlock() + select { + case mq.work <- struct{}{}: + default: + } + }() + + if mq.wlmsg == nil || msg.Full() { + mq.wlmsg = msg + return + } + + // TODO: add a msg.Combine(...) method + for _, e := range msg.Wantlist() { + if e.Cancel { + mq.wlmsg.Cancel(e.Key) + } else { + mq.wlmsg.AddEntry(e.Key, e.Priority) + } + } +} diff --git a/exchange/bitswap/testnet/virtual.go b/exchange/bitswap/testnet/virtual.go index feb5fd722..f2c814f81 100644 --- a/exchange/bitswap/testnet/virtual.go +++ b/exchange/bitswap/testnet/virtual.go @@ -119,3 +119,12 @@ func (nc *networkClient) Provide(ctx context.Context, k util.Key) error { func (nc *networkClient) SetDelegate(r bsnet.Receiver) { nc.Receiver = r } + +func (nc *networkClient) ConnectTo(_ context.Context, p peer.ID) error { + if !nc.network.HasPeer(p) { + return errors.New("no such peer in network") + } + nc.network.clients[p].PeerConnected(nc.local) + nc.Receiver.PeerConnected(p) + return nil +} diff --git a/exchange/bitswap/testutils.go b/exchange/bitswap/testutils.go index 2ce035c3d..47930de69 100644 --- a/exchange/bitswap/testutils.go +++ b/exchange/bitswap/testutils.go @@ -7,7 +7,6 @@ import ( ds_sync "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" blockstore "github.com/ipfs/go-ipfs/blocks/blockstore" - exchange "github.com/ipfs/go-ipfs/exchange" tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet" peer "github.com/ipfs/go-ipfs/p2p/peer" p2ptestutil "github.com/ipfs/go-ipfs/p2p/test/util" @@ -56,12 +55,18 @@ func (g *SessionGenerator) Instances(n int) []Instance { inst := g.Next() instances = append(instances, inst) } + for i, inst := range instances { + for j := i + 1; j < len(instances); j++ { + oinst := instances[j] + inst.Exchange.PeerConnected(oinst.Peer) + } + } return instances } type Instance struct { Peer peer.ID - Exchange exchange.Interface + Exchange *Bitswap blockstore blockstore.Blockstore blockstoreDelay delay.D @@ -94,7 +99,7 @@ func session(ctx context.Context, net tn.Network, p testutil.Identity) Instance const alwaysSendToPeer = true - bs := New(ctx, p.ID(), adapter, bstore, alwaysSendToPeer) + bs := New(ctx, p.ID(), adapter, bstore, alwaysSendToPeer).(*Bitswap) return Instance{ Peer: p.ID(), diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index dff3d911c..c6c2bbb25 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -70,9 +70,9 @@ func (bs *Bitswap) taskWorker(ctx context.Context) { if !ok { continue } - log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer) - bs.send(ctx, envelope.Peer, envelope.Message) - envelope.Sent() + + //log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer) + bs.pm.SendBlock(envelope) case <-ctx.Done(): return } From 8443b99c1d8060f626079114f0bea9f0e2784a2f Mon Sep 17 00:00:00 2001 From: Jeromy Date: Tue, 12 May 2015 23:50:57 -0700 Subject: [PATCH 02/15] update comments and reintroduce test --- exchange/bitswap/bitswap_test.go | 23 +++++++++++++++++++++++ exchange/bitswap/peermanager.go | 30 +++++++++++++++++------------- 2 files changed, 40 insertions(+), 13 deletions(-) diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index c04946692..9f9fbae25 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -13,6 +13,7 @@ import ( blocks "github.com/ipfs/go-ipfs/blocks" blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil" tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet" + p2ptestutil "github.com/ipfs/go-ipfs/p2p/test/util" mockrouting "github.com/ipfs/go-ipfs/routing/mock" delay "github.com/ipfs/go-ipfs/thirdparty/delay" u "github.com/ipfs/go-ipfs/util" @@ -35,6 +36,28 @@ func TestClose(t *testing.T) { bitswap.Exchange.GetBlock(context.Background(), block.Key()) } +func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this + + rs := mockrouting.NewServer() + net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay)) + g := NewTestSessionGenerator(net) + defer g.Close() + + block := blocks.NewBlock([]byte("block")) + pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t) + rs.Client(pinfo).Provide(context.Background(), block.Key()) // but not on network + + solo := g.Next() + defer solo.Exchange.Close() + + ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond) + _, err := solo.Exchange.GetBlock(ctx, block.Key()) + + if err != context.DeadlineExceeded { + t.Fatal("Expected DeadlineExceeded error") + } +} + func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) diff --git a/exchange/bitswap/peermanager.go b/exchange/bitswap/peermanager.go index ff3d9ab31..a91acd45b 100644 --- a/exchange/bitswap/peermanager.go +++ b/exchange/bitswap/peermanager.go @@ -46,8 +46,8 @@ type cancellation struct { type msgQueue struct { p peer.ID - lk sync.Mutex - wlmsg bsmsg.BitSwapMessage + outlk sync.Mutex + out bsmsg.BitSwapMessage work chan struct{} done chan struct{} @@ -106,11 +106,11 @@ func (pm *PeerManager) runQueue(mq *msgQueue) { // TODO: cant connect, what now? } - // grab messages from queue - mq.lk.Lock() - wlm := mq.wlmsg - mq.wlmsg = nil - mq.lk.Unlock() + // grab outgoin message + mq.outlk.Lock() + wlm := mq.out + mq.out = nil + mq.outlk.Unlock() if wlm != nil && !wlm.Empty() { // send wantlist updates @@ -178,26 +178,30 @@ func (pm *PeerManager) Run(ctx context.Context) { } func (mq *msgQueue) addMessage(msg bsmsg.BitSwapMessage) { - mq.lk.Lock() + mq.outlk.Lock() defer func() { - mq.lk.Unlock() + mq.outlk.Unlock() select { case mq.work <- struct{}{}: default: } }() - if mq.wlmsg == nil || msg.Full() { - mq.wlmsg = msg + // if we have no message held, or the one we are given is full + // overwrite the one we are holding + if mq.out == nil || msg.Full() { + mq.out = msg return } // TODO: add a msg.Combine(...) method + // otherwise, combine the one we are holding with the + // one passed in for _, e := range msg.Wantlist() { if e.Cancel { - mq.wlmsg.Cancel(e.Key) + mq.out.Cancel(e.Key) } else { - mq.wlmsg.AddEntry(e.Key, e.Priority) + mq.out.AddEntry(e.Key, e.Priority) } } } From ef967ceeef50995877c0245ccbde1bd87a0fe9c9 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 13 May 2015 16:35:08 -0700 Subject: [PATCH 03/15] contextify peermanager --- exchange/bitswap/bitswap.go | 2 -- exchange/bitswap/decision/engine.go | 2 +- exchange/bitswap/peermanager.go | 22 +++++++++++----------- exchange/bitswap/workers.go | 4 ++-- 4 files changed, 14 insertions(+), 16 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index b8dcdab1e..a05ea8091 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -316,8 +316,6 @@ func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli // TODO(brian): handle errors func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) error { - //defer log.EventBegin(ctx, "receiveMessage", p, incoming).Done() - // This call records changes to wantlists, blocks received, // and number of bytes transfered. bs.engine.MessageReceived(p, incoming) diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index 0b08a55fb..2644885d3 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -206,7 +206,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { l.CancelWant(entry.Key) e.peerRequestQueue.Remove(entry.Key, p) } else { - log.Debugf("wants %s", entry.Key, entry.Priority) + log.Debugf("wants %s - %d", entry.Key, entry.Priority) l.Wants(entry.Key, entry.Priority) if exists, err := e.bs.Has(entry.Key); err == nil && exists { e.peerRequestQueue.Push(entry.Entry, p) diff --git a/exchange/bitswap/peermanager.go b/exchange/bitswap/peermanager.go index a91acd45b..a1ce7c7a8 100644 --- a/exchange/bitswap/peermanager.go +++ b/exchange/bitswap/peermanager.go @@ -53,24 +53,24 @@ type msgQueue struct { done chan struct{} } -func (pm *PeerManager) SendBlock(env *engine.Envelope) { +func (pm *PeerManager) SendBlock(ctx context.Context, env *engine.Envelope) { // Blocks need to be sent synchronously to maintain proper backpressure // throughout the network stack defer env.Sent() msg := bsmsg.New() msg.AddBlock(env.Block) - err := pm.network.SendMessage(context.TODO(), env.Peer, msg) + err := pm.network.SendMessage(ctx, env.Peer, msg) if err != nil { log.Error(err) } } -func (pm *PeerManager) startPeerHandler(p peer.ID) { +func (pm *PeerManager) startPeerHandler(ctx context.Context, p peer.ID) *msgQueue { _, ok := pm.peers[p] if ok { // TODO: log an error? - return + return nil } mq := new(msgQueue) @@ -79,7 +79,8 @@ func (pm *PeerManager) startPeerHandler(p peer.ID) { mq.p = p pm.peers[p] = mq - go pm.runQueue(mq) + go pm.runQueue(ctx, mq) + return mq } func (pm *PeerManager) stopPeerHandler(p peer.ID) { @@ -93,14 +94,14 @@ func (pm *PeerManager) stopPeerHandler(p peer.ID) { delete(pm.peers, p) } -func (pm *PeerManager) runQueue(mq *msgQueue) { +func (pm *PeerManager) runQueue(ctx context.Context, mq *msgQueue) { for { select { case <-mq.work: // there is work to be done // TODO: this might not need to be done every time, figure out // a good heuristic - err := pm.network.ConnectTo(context.TODO(), mq.p) + err := pm.network.ConnectTo(ctx, mq.p) if err != nil { log.Error(err) // TODO: cant connect, what now? @@ -114,7 +115,7 @@ func (pm *PeerManager) runQueue(mq *msgQueue) { if wlm != nil && !wlm.Empty() { // send wantlist updates - err = pm.network.SendMessage(context.TODO(), mq.p, wlm) + err = pm.network.SendMessage(ctx, mq.p, wlm) if err != nil { log.Error("bitswap send error: ", err) // TODO: what do we do if this fails? @@ -162,13 +163,12 @@ func (pm *PeerManager) Run(ctx context.Context) { p, ok := pm.peers[msgp.to] if !ok { //TODO: decide, drop message? or dial? - pm.startPeerHandler(msgp.to) - p = pm.peers[msgp.to] + p = pm.startPeerHandler(ctx, msgp.to) } p.addMessage(msgp.msg) case p := <-pm.connect: - pm.startPeerHandler(p) + pm.startPeerHandler(ctx, p) case p := <-pm.disconnect: pm.stopPeerHandler(p) case <-ctx.Done(): diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index c6c2bbb25..ba9a77549 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -46,6 +46,7 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { bs.rebroadcastWorker(ctx) }) + // Start up a worker to manage sending out provides messages px.Go(func(px process.Process) { bs.provideCollector(ctx) }) @@ -71,8 +72,7 @@ func (bs *Bitswap) taskWorker(ctx context.Context) { continue } - //log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer) - bs.pm.SendBlock(envelope) + bs.pm.SendBlock(ctx, envelope) case <-ctx.Done(): return } From 6bf33ad62fc4d209932fb8b81b4f2dc28ca6123a Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 16 May 2015 12:30:13 -0700 Subject: [PATCH 04/15] WIP: super awesome bitswap cleanup fixtime --- exchange/bitswap/bitswap.go | 134 +++------------- exchange/bitswap/bitswap_test.go | 14 +- exchange/bitswap/decision/engine.go | 16 +- .../bitswap/decision/peer_request_queue.go | 18 ++- exchange/bitswap/network/interface.go | 2 +- exchange/bitswap/peermanager.go | 150 ++++++++++++------ exchange/bitswap/testnet/network_test.go | 16 +- exchange/bitswap/testnet/virtual.go | 3 +- exchange/bitswap/workers.go | 45 +++--- 9 files changed, 190 insertions(+), 208 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index a05ea8091..881de1538 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -4,7 +4,6 @@ package bitswap import ( "errors" - "fmt" "math" "sync" "time" @@ -23,7 +22,6 @@ import ( "github.com/ipfs/go-ipfs/thirdparty/delay" eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog" u "github.com/ipfs/go-ipfs/util" - pset "github.com/ipfs/go-ipfs/util/peerset" // TODO move this to peerstore ) var log = eventlog.Logger("bitswap") @@ -45,9 +43,7 @@ const ( provideWorkers = 4 ) -var ( - rebroadcastDelay = delay.Fixed(time.Second * 10) -) +var rebroadcastDelay = delay.Fixed(time.Second * 10) // New initializes a BitSwap instance that communicates over the provided // BitSwapNetwork. This function registers the returned instance as the network @@ -86,14 +82,13 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, notifications: notif, engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method network: network, - wantlist: wantlist.NewThreadSafe(), batchRequests: make(chan *blockRequest, sizeBatchRequestChan), process: px, newBlocks: make(chan *blocks.Block, HasBlockBufferSize), provideKeys: make(chan u.Key), - pm: NewPeerManager(network), + wm: NewWantManager(network), } - go bs.pm.Run(ctx) + go bs.wm.Run(ctx) network.SetDelegate(bs) // Start up bitswaps async worker routines @@ -112,7 +107,7 @@ type Bitswap struct { // the peermanager manages sending messages to peers in a way that // wont block bitswap operation - pm *PeerManager + wm *WantManager // blockstore is the local database // NB: ensure threadsafety @@ -127,8 +122,6 @@ type Bitswap struct { engine *decision.Engine - wantlist *wantlist.ThreadSafe - process process.Process newBlocks chan *blocks.Block @@ -233,60 +226,21 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { return err } - bs.wantlist.Remove(blk.Key()) bs.notifications.Publish(blk) select { case bs.newBlocks <- blk: + // send block off to be reprovided case <-ctx.Done(): return ctx.Err() } return nil } -func (bs *Bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error { - set := pset.New() - -loop: - for { - select { - case peerToQuery, ok := <-peers: - if !ok { - break loop - } - - if !set.TryAdd(peerToQuery) { //Do once per peer - continue - } - - bs.pm.Send(peerToQuery, m) - case <-ctx.Done(): - return nil - } - } - return nil -} - -func (bs *Bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID) error { - entries := bs.wantlist.Entries() - if len(entries) == 0 { - return nil - } - message := bsmsg.New() - message.SetFull(true) - for _, wanted := range entries { - message.AddEntry(wanted.Key, wanted.Priority) - } - return bs.sendWantlistMsgToPeers(ctx, message, peers) -} - -func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantlist.Entry) { +func (bs *Bitswap) connectToProviders(ctx context.Context, entries []wantlist.Entry) { ctx, cancel := context.WithCancel(ctx) defer cancel() - // prepare a channel to hand off to sendWantlistToPeers - sendToPeers := make(chan peer.ID) - // Get providers for all entries in wantlist (could take a while) wg := sync.WaitGroup{} for _, e := range entries { @@ -298,97 +252,61 @@ func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli defer cancel() providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest) for prov := range providers { - sendToPeers <- prov + go func(p peer.ID) { + bs.network.ConnectTo(ctx, p) + }(prov) } }(e.Key) } - go func() { - wg.Wait() // make sure all our children do finish. - close(sendToPeers) - }() - - err := bs.sendWantlistToPeers(ctx, sendToPeers) - if err != nil { - log.Debugf("sendWantlistToPeers error: %s", err) - } + wg.Wait() // make sure all our children do finish. } -// TODO(brian): handle errors -func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) error { +func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) { // This call records changes to wantlists, blocks received, // and number of bytes transfered. bs.engine.MessageReceived(p, incoming) // TODO: this is bad, and could be easily abused. // Should only track *useful* messages in ledger + if len(incoming.Blocks()) == 0 { + return + } + + // quickly send out cancels, reduces chances of duplicate block receives var keys []u.Key + for _, block := range incoming.Blocks() { + keys = append(keys, block.Key()) + } + bs.wm.CancelWants(keys) + for _, block := range incoming.Blocks() { bs.blocksRecvd++ if has, err := bs.blockstore.Has(block.Key()); err == nil && has { bs.dupBlocksRecvd++ } log.Debugf("got block %s from %s", block, p) + hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout) if err := bs.HasBlock(hasBlockCtx, block); err != nil { - return fmt.Errorf("ReceiveMessage HasBlock error: %s", err) + log.Warningf("ReceiveMessage HasBlock error: %s", err) } cancel() - keys = append(keys, block.Key()) } - - bs.cancelBlocks(ctx, keys) - return nil } // Connected/Disconnected warns bitswap about peer connections func (bs *Bitswap) PeerConnected(p peer.ID) { // TODO: add to clientWorker?? - bs.pm.Connected(p) - peers := make(chan peer.ID, 1) - peers <- p - close(peers) - err := bs.sendWantlistToPeers(context.TODO(), peers) - if err != nil { - log.Debugf("error sending wantlist: %s", err) - } + bs.wm.Connected(p) } // Connected/Disconnected warns bitswap about peer connections func (bs *Bitswap) PeerDisconnected(p peer.ID) { - bs.pm.Disconnected(p) + bs.wm.Disconnected(p) bs.engine.PeerDisconnected(p) } -func (bs *Bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) { - if len(bkeys) < 1 { - return - } - message := bsmsg.New() - message.SetFull(false) - for _, k := range bkeys { - log.Debug("cancel block: %s", k) - message.Cancel(k) - } - - bs.pm.Broadcast(message) - return -} - -func (bs *Bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) { - if len(bkeys) < 1 { - return - } - - message := bsmsg.New() - message.SetFull(false) - for i, k := range bkeys { - message.AddEntry(k, kMaxPriority-i) - } - - bs.pm.Broadcast(message) -} - func (bs *Bitswap) ReceiveError(err error) { log.Debugf("Bitswap ReceiveError: %s", err) // TODO log the network error @@ -401,7 +319,7 @@ func (bs *Bitswap) Close() error { func (bs *Bitswap) GetWantlist() []u.Key { var out []u.Key - for _, e := range bs.wantlist.Entries() { + for _, e := range bs.wm.wl.Entries() { out = append(out, e.Key) } return out diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index 9f9fbae25..fa5b3b97d 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -120,6 +120,16 @@ func TestLargeFile(t *testing.T) { PerformDistributionTest(t, numInstances, numBlocks) } +func TestLargeFileTwoPeers(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + t.Parallel() + numInstances := 2 + numBlocks := 100 + PerformDistributionTest(t, numInstances, numBlocks) +} + func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { if testing.Short() { t.SkipNow() @@ -129,8 +139,6 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { defer sg.Close() bg := blocksutil.NewBlockGenerator() - t.Log("Test a few nodes trying to get one file with a lot of blocks") - instances := sg.Instances(numInstances) blocks := bg.Blocks(numBlocks) @@ -238,7 +246,7 @@ func TestBasicBitswap(t *testing.T) { defer sg.Close() bg := blocksutil.NewBlockGenerator() - t.Log("Test a few nodes trying to get one file with a lot of blocks") + t.Log("Test a one node trying to get one block from another") instances := sg.Instances(2) blocks := bg.Blocks(1) diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index 2644885d3..186c7ba1a 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -92,7 +92,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine { bs: bs, peerRequestQueue: newPRQ(), outbox: make(chan (<-chan *Envelope), outboxChanBuffer), - workSignal: make(chan struct{}), + workSignal: make(chan struct{}, 1), } go e.taskWorker(ctx) return e @@ -156,7 +156,15 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) { return &Envelope{ Peer: nextTask.Target, Block: block, - Sent: nextTask.Done, + Sent: func() { + nextTask.Done() + select { + case e.workSignal <- struct{}{}: + // work completing may mean that our queue will provide new + // work to be done. + default: + } + }, }, nil } } @@ -202,11 +210,11 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { for _, entry := range m.Wantlist() { if entry.Cancel { - log.Debugf("cancel %s", entry.Key) + log.Errorf("cancel %s", entry.Key) l.CancelWant(entry.Key) e.peerRequestQueue.Remove(entry.Key, p) } else { - log.Debugf("wants %s - %d", entry.Key, entry.Priority) + log.Errorf("wants %s - %d", entry.Key, entry.Priority) l.Wants(entry.Key, entry.Priority) if exists, err := e.bs.Has(entry.Key); err == nil && exists { e.peerRequestQueue.Push(entry.Entry, p) diff --git a/exchange/bitswap/decision/peer_request_queue.go b/exchange/bitswap/decision/peer_request_queue.go index 15f52da74..1d15578ed 100644 --- a/exchange/bitswap/decision/peer_request_queue.go +++ b/exchange/bitswap/decision/peer_request_queue.go @@ -51,12 +51,6 @@ func (tl *prq) Push(entry wantlist.Entry, to peer.ID) { tl.partners[to] = partner } - if task, ok := tl.taskMap[taskKey(to, entry.Key)]; ok { - task.Entry.Priority = entry.Priority - partner.taskQueue.Update(task.index) - return - } - partner.activelk.Lock() defer partner.activelk.Unlock() _, ok = partner.activeBlocks[entry.Key] @@ -64,6 +58,12 @@ func (tl *prq) Push(entry wantlist.Entry, to peer.ID) { return } + if task, ok := tl.taskMap[taskKey(to, entry.Key)]; ok { + task.Entry.Priority = entry.Priority + partner.taskQueue.Update(task.index) + return + } + task := &peerRequestTask{ Entry: entry, Target: to, @@ -220,6 +220,12 @@ func partnerCompare(a, b pq.Elem) bool { if pb.requests == 0 { return true } + if pa.active == pb.active { + // sorting by taskQueue.Len() aids in cleaning out trash entries faster + // if we sorted instead by requests, one peer could potentially build up + // a huge number of cancelled entries in the queue resulting in a memory leak + return pa.taskQueue.Len() > pb.taskQueue.Len() + } return pa.active < pb.active } diff --git a/exchange/bitswap/network/interface.go b/exchange/bitswap/network/interface.go index 849a1c28e..83fca0793 100644 --- a/exchange/bitswap/network/interface.go +++ b/exchange/bitswap/network/interface.go @@ -33,7 +33,7 @@ type Receiver interface { ReceiveMessage( ctx context.Context, sender peer.ID, - incoming bsmsg.BitSwapMessage) error + incoming bsmsg.BitSwapMessage) ReceiveError(error) diff --git a/exchange/bitswap/peermanager.go b/exchange/bitswap/peermanager.go index a1ce7c7a8..2eaf36fa5 100644 --- a/exchange/bitswap/peermanager.go +++ b/exchange/bitswap/peermanager.go @@ -7,28 +7,36 @@ import ( engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision" bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network" + wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist" peer "github.com/ipfs/go-ipfs/p2p/peer" u "github.com/ipfs/go-ipfs/util" ) -type PeerManager struct { +type WantManager struct { receiver bsnet.Receiver - incoming chan *msgPair - connect chan peer.ID + incoming chan []*bsmsg.Entry + + // notification channel for new peers connecting + connect chan peer.ID + + // notification channel for peers disconnecting disconnect chan peer.ID peers map[peer.ID]*msgQueue + wl *wantlist.Wantlist + network bsnet.BitSwapNetwork } -func NewPeerManager(network bsnet.BitSwapNetwork) *PeerManager { - return &PeerManager{ - incoming: make(chan *msgPair, 10), +func NewWantManager(network bsnet.BitSwapNetwork) *WantManager { + return &WantManager{ + incoming: make(chan []*bsmsg.Entry, 10), connect: make(chan peer.ID, 10), disconnect: make(chan peer.ID, 10), peers: make(map[peer.ID]*msgQueue), + wl: wantlist.New(), network: network, } } @@ -53,37 +61,68 @@ type msgQueue struct { done chan struct{} } -func (pm *PeerManager) SendBlock(ctx context.Context, env *engine.Envelope) { +func (pm *WantManager) WantBlocks(ks []u.Key) { + log.Error("WANT: ", ks) + pm.addEntries(ks, false) +} + +func (pm *WantManager) CancelWants(ks []u.Key) { + log.Error("CANCEL: ", ks) + pm.addEntries(ks, true) +} + +func (pm *WantManager) addEntries(ks []u.Key, cancel bool) { + var entries []*bsmsg.Entry + for i, k := range ks { + entries = append(entries, &bsmsg.Entry{ + Cancel: cancel, + Entry: wantlist.Entry{ + Key: k, + Priority: kMaxPriority - i, + }, + }) + } + pm.incoming <- entries +} + +func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) { // Blocks need to be sent synchronously to maintain proper backpressure // throughout the network stack defer env.Sent() msg := bsmsg.New() msg.AddBlock(env.Block) + msg.SetFull(false) err := pm.network.SendMessage(ctx, env.Peer, msg) if err != nil { log.Error(err) } } -func (pm *PeerManager) startPeerHandler(ctx context.Context, p peer.ID) *msgQueue { +func (pm *WantManager) startPeerHandler(ctx context.Context, p peer.ID) *msgQueue { _, ok := pm.peers[p] if ok { // TODO: log an error? return nil } - mq := new(msgQueue) - mq.done = make(chan struct{}) - mq.work = make(chan struct{}, 1) - mq.p = p + mq := newMsgQueue(p) + + // new peer, we will want to give them our full wantlist + fullwantlist := bsmsg.New() + for _, e := range pm.wl.Entries() { + fullwantlist.AddEntry(e.Key, e.Priority) + } + fullwantlist.SetFull(true) + mq.out = fullwantlist + mq.work <- struct{}{} pm.peers[p] = mq go pm.runQueue(ctx, mq) return mq } -func (pm *PeerManager) stopPeerHandler(p peer.ID) { +func (pm *WantManager) stopPeerHandler(p peer.ID) { pq, ok := pm.peers[p] if !ok { // TODO: log error? @@ -94,32 +133,38 @@ func (pm *PeerManager) stopPeerHandler(p peer.ID) { delete(pm.peers, p) } -func (pm *PeerManager) runQueue(ctx context.Context, mq *msgQueue) { +func (pm *WantManager) runQueue(ctx context.Context, mq *msgQueue) { for { select { case <-mq.work: // there is work to be done - // TODO: this might not need to be done every time, figure out - // a good heuristic err := pm.network.ConnectTo(ctx, mq.p) if err != nil { log.Error(err) // TODO: cant connect, what now? } - // grab outgoin message + // grab outgoing message mq.outlk.Lock() wlm := mq.out mq.out = nil mq.outlk.Unlock() - if wlm != nil && !wlm.Empty() { - // send wantlist updates - err = pm.network.SendMessage(ctx, mq.p, wlm) - if err != nil { - log.Error("bitswap send error: ", err) - // TODO: what do we do if this fails? - } + // no message or empty message, continue + if wlm == nil { + log.Error("nil wantlist") + continue + } + if wlm.Empty() { + log.Error("empty wantlist") + continue + } + + // send wantlist updates + err = pm.network.SendMessage(ctx, mq.p, wlm) + if err != nil { + log.Error("bitswap send error: ", err) + // TODO: what do we do if this fails? } case <-mq.done: return @@ -127,46 +172,38 @@ func (pm *PeerManager) runQueue(ctx context.Context, mq *msgQueue) { } } -func (pm *PeerManager) Send(to peer.ID, msg bsmsg.BitSwapMessage) { - if len(msg.Blocks()) > 0 { - panic("no blocks here!") - } - pm.incoming <- &msgPair{to: to, msg: msg} -} - -func (pm *PeerManager) Broadcast(msg bsmsg.BitSwapMessage) { - pm.incoming <- &msgPair{msg: msg} -} - -func (pm *PeerManager) Connected(p peer.ID) { +func (pm *WantManager) Connected(p peer.ID) { pm.connect <- p } -func (pm *PeerManager) Disconnected(p peer.ID) { +func (pm *WantManager) Disconnected(p peer.ID) { pm.disconnect <- p } // TODO: use goprocess here once i trust it -func (pm *PeerManager) Run(ctx context.Context) { +func (pm *WantManager) Run(ctx context.Context) { for { select { - case msgp := <-pm.incoming: + case entries := <-pm.incoming: - // Broadcast message to all if recipient not set - if msgp.to == "" { - for _, p := range pm.peers { - p.addMessage(msgp.msg) + msg := bsmsg.New() + msg.SetFull(false) + // add changes to our wantlist + for _, e := range entries { + if e.Cancel { + pm.wl.Remove(e.Key) + msg.Cancel(e.Key) + } else { + pm.wl.Add(e.Key, e.Priority) + msg.AddEntry(e.Key, e.Priority) } - continue } - p, ok := pm.peers[msgp.to] - if !ok { - //TODO: decide, drop message? or dial? - p = pm.startPeerHandler(ctx, msgp.to) + // broadcast those wantlist changes + for _, p := range pm.peers { + p.addMessage(msg) } - p.addMessage(msgp.msg) case p := <-pm.connect: pm.startPeerHandler(ctx, p) case p := <-pm.disconnect: @@ -177,6 +214,15 @@ func (pm *PeerManager) Run(ctx context.Context) { } } +func newMsgQueue(p peer.ID) *msgQueue { + mq := new(msgQueue) + mq.done = make(chan struct{}) + mq.work = make(chan struct{}, 1) + mq.p = p + + return mq +} + func (mq *msgQueue) addMessage(msg bsmsg.BitSwapMessage) { mq.outlk.Lock() defer func() { @@ -187,6 +233,10 @@ func (mq *msgQueue) addMessage(msg bsmsg.BitSwapMessage) { } }() + if msg.Full() { + log.Error("GOt FULL MESSAGE") + } + // if we have no message held, or the one we are given is full // overwrite the one we are holding if mq.out == nil || msg.Full() { @@ -199,8 +249,10 @@ func (mq *msgQueue) addMessage(msg bsmsg.BitSwapMessage) { // one passed in for _, e := range msg.Wantlist() { if e.Cancel { + log.Error("add message cancel: ", e.Key, mq.p) mq.out.Cancel(e.Key) } else { + log.Error("add message want: ", e.Key, mq.p) mq.out.AddEntry(e.Key, e.Priority) } } diff --git a/exchange/bitswap/testnet/network_test.go b/exchange/bitswap/testnet/network_test.go index 9091ff255..c963ae9ac 100644 --- a/exchange/bitswap/testnet/network_test.go +++ b/exchange/bitswap/testnet/network_test.go @@ -29,19 +29,17 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { responder.SetDelegate(lambda(func( ctx context.Context, fromWaiter peer.ID, - msgFromWaiter bsmsg.BitSwapMessage) error { + msgFromWaiter bsmsg.BitSwapMessage) { msgToWaiter := bsmsg.New() msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr))) waiter.SendMessage(ctx, fromWaiter, msgToWaiter) - - return nil })) waiter.SetDelegate(lambda(func( ctx context.Context, fromResponder peer.ID, - msgFromResponder bsmsg.BitSwapMessage) error { + msgFromResponder bsmsg.BitSwapMessage) { // TODO assert that this came from the correct peer and that the message contents are as expected ok := false @@ -54,9 +52,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { if !ok { t.Fatal("Message not received from the responder") - } - return nil })) messageSentAsync := bsmsg.New() @@ -71,7 +67,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { } type receiverFunc func(ctx context.Context, p peer.ID, - incoming bsmsg.BitSwapMessage) error + incoming bsmsg.BitSwapMessage) // lambda returns a Receiver instance given a receiver function func lambda(f receiverFunc) bsnet.Receiver { @@ -81,12 +77,12 @@ func lambda(f receiverFunc) bsnet.Receiver { } type lambdaImpl struct { - f func(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) error + f func(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) } func (lam *lambdaImpl) ReceiveMessage(ctx context.Context, - p peer.ID, incoming bsmsg.BitSwapMessage) error { - return lam.f(ctx, p, incoming) + p peer.ID, incoming bsmsg.BitSwapMessage) { + lam.f(ctx, p, incoming) } func (lam *lambdaImpl) ReceiveError(err error) { diff --git a/exchange/bitswap/testnet/virtual.go b/exchange/bitswap/testnet/virtual.go index f2c814f81..f8ca0cd55 100644 --- a/exchange/bitswap/testnet/virtual.go +++ b/exchange/bitswap/testnet/virtual.go @@ -72,7 +72,8 @@ func (n *network) deliver( n.delay.Wait() - return r.ReceiveMessage(context.TODO(), from, message) + r.ReceiveMessage(context.TODO(), from, message) + return nil } type networkClient struct { diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index ba9a77549..82fb40de9 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -42,9 +42,11 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { } // Start up a worker to manage periodically resending our wantlist out to peers - px.Go(func(px process.Process) { - bs.rebroadcastWorker(ctx) - }) + /* + px.Go(func(px process.Process) { + bs.rebroadcastWorker(ctx) + }) + */ // Start up a worker to manage sending out provides messages px.Go(func(px process.Process) { @@ -72,7 +74,7 @@ func (bs *Bitswap) taskWorker(ctx context.Context) { continue } - bs.pm.SendBlock(ctx, envelope) + bs.wm.SendBlock(ctx, envelope) case <-ctx.Done(): return } @@ -146,30 +148,19 @@ func (bs *Bitswap) clientWorker(parent context.Context) { log.Warning("Received batch request for zero blocks") continue } - for i, k := range keys { - bs.wantlist.Add(k, kMaxPriority-i) - } - done := make(chan struct{}) - go func() { - bs.wantNewBlocks(req.ctx, keys) - close(done) - }() + bs.wm.WantBlocks(keys) // NB: Optimization. Assumes that providers of key[0] are likely to // be able to provide for all keys. This currently holds true in most // every situation. Later, this assumption may not hold as true. child, cancel := context.WithTimeout(req.ctx, providerRequestTimeout) providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest) - err := bs.sendWantlistToPeers(req.ctx, providers) - if err != nil { - log.Debugf("error sending wantlist: %s", err) + for p := range providers { + go bs.network.ConnectTo(req.ctx, p) } cancel() - // Wait for wantNewBlocks to finish - <-done - case <-parent.Done(): return } @@ -180,22 +171,24 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) { ctx, cancel := context.WithCancel(parent) defer cancel() - broadcastSignal := time.After(rebroadcastDelay.Get()) - tick := time.Tick(10 * time.Second) + broadcastSignal := time.NewTicker(rebroadcastDelay.Get()) + defer broadcastSignal.Stop() + + tick := time.NewTicker(10 * time.Second) + defer tick.Stop() for { select { - case <-tick: - n := bs.wantlist.Len() + case <-tick.C: + n := bs.wm.wl.Len() if n > 0 { log.Debug(n, "keys in bitswap wantlist") } - case <-broadcastSignal: // resend unfulfilled wantlist keys - entries := bs.wantlist.Entries() + case <-broadcastSignal.C: // resend unfulfilled wantlist keys + entries := bs.wm.wl.Entries() if len(entries) > 0 { - bs.sendWantlistToProviders(ctx, entries) + bs.connectToProviders(ctx, entries) } - broadcastSignal = time.After(rebroadcastDelay.Get()) case <-parent.Done(): return } From 32da687774427bdd44efd1686218448e8c834fd0 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 16 May 2015 14:26:29 -0700 Subject: [PATCH 05/15] fix race bugs --- exchange/bitswap/bitswap.go | 3 +++ exchange/bitswap/decision/engine.go | 4 ++-- exchange/bitswap/message/message.go | 2 +- exchange/bitswap/peermanager.go | 37 +++++++---------------------- exchange/bitswap/stat.go | 2 ++ 5 files changed, 17 insertions(+), 31 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 881de1538..6a1e58ff4 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -128,6 +128,7 @@ type Bitswap struct { provideKeys chan u.Key + counterLk sync.Mutex blocksRecvd int dupBlocksRecvd int } @@ -281,10 +282,12 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg bs.wm.CancelWants(keys) for _, block := range incoming.Blocks() { + bs.counterLk.Lock() bs.blocksRecvd++ if has, err := bs.blockstore.Has(block.Key()); err == nil && has { bs.dupBlocksRecvd++ } + bs.counterLk.Unlock() log.Debugf("got block %s from %s", block, p) hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout) diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index 186c7ba1a..d08636d80 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -210,11 +210,11 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { for _, entry := range m.Wantlist() { if entry.Cancel { - log.Errorf("cancel %s", entry.Key) + log.Debugf("cancel %s", entry.Key) l.CancelWant(entry.Key) e.peerRequestQueue.Remove(entry.Key, p) } else { - log.Errorf("wants %s - %d", entry.Key, entry.Priority) + log.Debugf("wants %s - %d", entry.Key, entry.Priority) l.Wants(entry.Key, entry.Priority) if exists, err := e.bs.Has(entry.Key); err == nil && exists { e.peerRequestQueue.Push(entry.Entry, p) diff --git a/exchange/bitswap/message/message.go b/exchange/bitswap/message/message.go index 4e88e738c..63f7f28b5 100644 --- a/exchange/bitswap/message/message.go +++ b/exchange/bitswap/message/message.go @@ -162,7 +162,7 @@ func (m *impl) ToProto() *pb.Message { pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, &pb.Message_Wantlist_Entry{ Block: proto.String(string(e.Key)), Priority: proto.Int32(int32(e.Priority)), - Cancel: &e.Cancel, + Cancel: proto.Bool(e.Cancel), }) } for _, b := range m.Blocks() { diff --git a/exchange/bitswap/peermanager.go b/exchange/bitswap/peermanager.go index 2eaf36fa5..8ec89c8e3 100644 --- a/exchange/bitswap/peermanager.go +++ b/exchange/bitswap/peermanager.go @@ -62,12 +62,10 @@ type msgQueue struct { } func (pm *WantManager) WantBlocks(ks []u.Key) { - log.Error("WANT: ", ks) pm.addEntries(ks, false) } func (pm *WantManager) CancelWants(ks []u.Key) { - log.Error("CANCEL: ", ks) pm.addEntries(ks, true) } @@ -147,19 +145,13 @@ func (pm *WantManager) runQueue(ctx context.Context, mq *msgQueue) { // grab outgoing message mq.outlk.Lock() wlm := mq.out + if wlm == nil || wlm.Empty() { + mq.outlk.Unlock() + continue + } mq.out = nil mq.outlk.Unlock() - // no message or empty message, continue - if wlm == nil { - log.Error("nil wantlist") - continue - } - if wlm.Empty() { - log.Error("empty wantlist") - continue - } - // send wantlist updates err = pm.network.SendMessage(ctx, mq.p, wlm) if err != nil { @@ -186,22 +178,18 @@ func (pm *WantManager) Run(ctx context.Context) { select { case entries := <-pm.incoming: - msg := bsmsg.New() - msg.SetFull(false) // add changes to our wantlist for _, e := range entries { if e.Cancel { pm.wl.Remove(e.Key) - msg.Cancel(e.Key) } else { pm.wl.Add(e.Key, e.Priority) - msg.AddEntry(e.Key, e.Priority) } } // broadcast those wantlist changes for _, p := range pm.peers { - p.addMessage(msg) + p.addMessage(entries) } case p := <-pm.connect: @@ -223,7 +211,7 @@ func newMsgQueue(p peer.ID) *msgQueue { return mq } -func (mq *msgQueue) addMessage(msg bsmsg.BitSwapMessage) { +func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) { mq.outlk.Lock() defer func() { mq.outlk.Unlock() @@ -233,26 +221,19 @@ func (mq *msgQueue) addMessage(msg bsmsg.BitSwapMessage) { } }() - if msg.Full() { - log.Error("GOt FULL MESSAGE") - } - // if we have no message held, or the one we are given is full // overwrite the one we are holding - if mq.out == nil || msg.Full() { - mq.out = msg - return + if mq.out == nil { + mq.out = bsmsg.New() } // TODO: add a msg.Combine(...) method // otherwise, combine the one we are holding with the // one passed in - for _, e := range msg.Wantlist() { + for _, e := range entries { if e.Cancel { - log.Error("add message cancel: ", e.Key, mq.p) mq.out.Cancel(e.Key) } else { - log.Error("add message want: ", e.Key, mq.p) mq.out.AddEntry(e.Key, e.Priority) } } diff --git a/exchange/bitswap/stat.go b/exchange/bitswap/stat.go index ceab4b2ee..a4db4c9c5 100644 --- a/exchange/bitswap/stat.go +++ b/exchange/bitswap/stat.go @@ -17,8 +17,10 @@ func (bs *Bitswap) Stat() (*Stat, error) { st := new(Stat) st.ProvideBufLen = len(bs.newBlocks) st.Wantlist = bs.GetWantlist() + bs.counterLk.Lock() st.BlocksReceived = bs.blocksRecvd st.DupBlksReceived = bs.dupBlocksRecvd + bs.counterLk.Unlock() for _, p := range bs.engine.Peers() { st.Peers = append(st.Peers, p.Pretty()) From 65f815a27b20c75e2e59043176f3452f642b53b5 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 16 May 2015 17:16:09 -0700 Subject: [PATCH 06/15] move taskdone inside lock boundaries --- exchange/bitswap/decision/peer_request_queue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exchange/bitswap/decision/peer_request_queue.go b/exchange/bitswap/decision/peer_request_queue.go index 1d15578ed..397a16223 100644 --- a/exchange/bitswap/decision/peer_request_queue.go +++ b/exchange/bitswap/decision/peer_request_queue.go @@ -69,8 +69,8 @@ func (tl *prq) Push(entry wantlist.Entry, to peer.ID) { Target: to, created: time.Now(), Done: func() { - partner.TaskDone(entry.Key) tl.lock.Lock() + partner.TaskDone(entry.Key) tl.pQueue.Update(partner.Index()) tl.lock.Unlock() }, From 6ab4bfea95464a3e995425a716213ed342dbeac5 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 16 May 2015 17:46:26 -0700 Subject: [PATCH 07/15] turn tests down a bit and better context passing --- exchange/bitswap/bitswap.go | 4 +-- exchange/bitswap/bitswap_test.go | 4 +-- .../{peermanager.go => wantmanager.go} | 26 ++++++++++++------- 3 files changed, 20 insertions(+), 14 deletions(-) rename exchange/bitswap/{peermanager.go => wantmanager.go} (89%) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 6a1e58ff4..c6f3c74a9 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -86,9 +86,9 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, process: px, newBlocks: make(chan *blocks.Block, HasBlockBufferSize), provideKeys: make(chan u.Key), - wm: NewWantManager(network), + wm: NewWantManager(ctx, network), } - go bs.wm.Run(ctx) + go bs.wm.Run() network.SetDelegate(bs) // Start up bitswaps async worker routines diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index fa5b3b97d..86eb2d764 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -92,7 +92,7 @@ func TestLargeSwarm(t *testing.T) { if testing.Short() { t.SkipNow() } - numInstances := 500 + numInstances := 100 numBlocks := 2 if detectrace.WithRace() { // when running with the race detector, 500 instances launches @@ -124,7 +124,6 @@ func TestLargeFileTwoPeers(t *testing.T) { if testing.Short() { t.SkipNow() } - t.Parallel() numInstances := 2 numBlocks := 100 PerformDistributionTest(t, numInstances, numBlocks) @@ -164,6 +163,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { } for _ = range outch { } + log.Error("DONE") }(inst) } wg.Wait() diff --git a/exchange/bitswap/peermanager.go b/exchange/bitswap/wantmanager.go similarity index 89% rename from exchange/bitswap/peermanager.go rename to exchange/bitswap/wantmanager.go index 8ec89c8e3..3b2067914 100644 --- a/exchange/bitswap/peermanager.go +++ b/exchange/bitswap/wantmanager.go @@ -28,9 +28,11 @@ type WantManager struct { wl *wantlist.Wantlist network bsnet.BitSwapNetwork + + ctx context.Context } -func NewWantManager(network bsnet.BitSwapNetwork) *WantManager { +func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager { return &WantManager{ incoming: make(chan []*bsmsg.Entry, 10), connect: make(chan peer.ID, 10), @@ -38,6 +40,7 @@ func NewWantManager(network bsnet.BitSwapNetwork) *WantManager { peers: make(map[peer.ID]*msgQueue), wl: wantlist.New(), network: network, + ctx: ctx, } } @@ -80,7 +83,10 @@ func (pm *WantManager) addEntries(ks []u.Key, cancel bool) { }, }) } - pm.incoming <- entries + select { + case pm.incoming <- entries: + case <-pm.ctx.Done(): + } } func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) { @@ -97,7 +103,7 @@ func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) { } } -func (pm *WantManager) startPeerHandler(ctx context.Context, p peer.ID) *msgQueue { +func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue { _, ok := pm.peers[p] if ok { // TODO: log an error? @@ -116,7 +122,7 @@ func (pm *WantManager) startPeerHandler(ctx context.Context, p peer.ID) *msgQueu mq.work <- struct{}{} pm.peers[p] = mq - go pm.runQueue(ctx, mq) + go pm.runQueue(mq) return mq } @@ -131,12 +137,12 @@ func (pm *WantManager) stopPeerHandler(p peer.ID) { delete(pm.peers, p) } -func (pm *WantManager) runQueue(ctx context.Context, mq *msgQueue) { +func (pm *WantManager) runQueue(mq *msgQueue) { for { select { case <-mq.work: // there is work to be done - err := pm.network.ConnectTo(ctx, mq.p) + err := pm.network.ConnectTo(pm.ctx, mq.p) if err != nil { log.Error(err) // TODO: cant connect, what now? @@ -153,7 +159,7 @@ func (pm *WantManager) runQueue(ctx context.Context, mq *msgQueue) { mq.outlk.Unlock() // send wantlist updates - err = pm.network.SendMessage(ctx, mq.p, wlm) + err = pm.network.SendMessage(pm.ctx, mq.p, wlm) if err != nil { log.Error("bitswap send error: ", err) // TODO: what do we do if this fails? @@ -173,7 +179,7 @@ func (pm *WantManager) Disconnected(p peer.ID) { } // TODO: use goprocess here once i trust it -func (pm *WantManager) Run(ctx context.Context) { +func (pm *WantManager) Run() { for { select { case entries := <-pm.incoming: @@ -193,10 +199,10 @@ func (pm *WantManager) Run(ctx context.Context) { } case p := <-pm.connect: - pm.startPeerHandler(ctx, p) + pm.startPeerHandler(p) case p := <-pm.disconnect: pm.stopPeerHandler(p) - case <-ctx.Done(): + case <-pm.ctx.Done(): return } } From 594c7786c3349d7b83c11d677329e7ebd79451ae Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 16 May 2015 20:38:42 -0700 Subject: [PATCH 08/15] turn rebroadcast back on --- exchange/bitswap/workers.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index 82fb40de9..1083566a1 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -42,11 +42,9 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { } // Start up a worker to manage periodically resending our wantlist out to peers - /* - px.Go(func(px process.Process) { - bs.rebroadcastWorker(ctx) - }) - */ + px.Go(func(px process.Process) { + bs.rebroadcastWorker(ctx) + }) // Start up a worker to manage sending out provides messages px.Go(func(px process.Process) { From 829b88420e0a50c9d03f6ef5689123baec8487fb Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 16 May 2015 22:08:18 -0700 Subject: [PATCH 09/15] explicitly set bitswap message fullness --- exchange/bitswap/bitswap.go | 2 +- exchange/bitswap/bitswap_test.go | 1 - exchange/bitswap/decision/engine_test.go | 8 ++++---- exchange/bitswap/message/message.go | 22 ++++++---------------- exchange/bitswap/message/message_test.go | 14 +++++++------- exchange/bitswap/testnet/network_test.go | 4 ++-- exchange/bitswap/wantmanager.go | 23 ++++++++++++++++++----- 7 files changed, 38 insertions(+), 36 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index c6f3c74a9..57359c0ec 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -288,7 +288,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg bs.dupBlocksRecvd++ } bs.counterLk.Unlock() - log.Debugf("got block %s from %s", block, p) + log.Debugf("got block %s from %s (%d,%d)", block, p, bs.blocksRecvd, bs.dupBlocksRecvd) hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout) if err := bs.HasBlock(hasBlockCtx, block); err != nil { diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index 86eb2d764..6548472c9 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -163,7 +163,6 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { } for _ = range outch { } - log.Error("DONE") }(inst) } wg.Wait() diff --git a/exchange/bitswap/decision/engine_test.go b/exchange/bitswap/decision/engine_test.go index 31e46c776..8337c4800 100644 --- a/exchange/bitswap/decision/engine_test.go +++ b/exchange/bitswap/decision/engine_test.go @@ -41,7 +41,7 @@ func TestConsistentAccounting(t *testing.T) { // Send messages from Ernie to Bert for i := 0; i < 1000; i++ { - m := message.New() + m := message.New(false) content := []string{"this", "is", "message", "i"} m.AddBlock(blocks.NewBlock([]byte(strings.Join(content, " ")))) @@ -73,7 +73,7 @@ func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) { sanfrancisco := newEngine(ctx, "sf") seattle := newEngine(ctx, "sea") - m := message.New() + m := message.New(true) sanfrancisco.Engine.MessageSent(seattle.Peer, m) seattle.Engine.MessageReceived(sanfrancisco.Peer, m) @@ -164,7 +164,7 @@ func TestPartnerWantsThenCancels(t *testing.T) { } func partnerWants(e *Engine, keys []string, partner peer.ID) { - add := message.New() + add := message.New(false) for i, letter := range keys { block := blocks.NewBlock([]byte(letter)) add.AddEntry(block.Key(), math.MaxInt32-i) @@ -173,7 +173,7 @@ func partnerWants(e *Engine, keys []string, partner peer.ID) { } func partnerCancels(e *Engine, keys []string, partner peer.ID) { - cancels := message.New() + cancels := message.New(false) for _, k := range keys { block := blocks.NewBlock([]byte(k)) cancels.Cancel(block.Key()) diff --git a/exchange/bitswap/message/message.go b/exchange/bitswap/message/message.go index 63f7f28b5..d885bb373 100644 --- a/exchange/bitswap/message/message.go +++ b/exchange/bitswap/message/message.go @@ -31,12 +31,7 @@ type BitSwapMessage interface { Empty() bool - // Sets whether or not the contained wantlist represents the entire wantlist - // true = full wantlist - // false = wantlist 'patch' - // default: true - SetFull(isFull bool) - + // A full wantlist is an authoritative copy, a 'non-full' wantlist is a patch-set Full() bool AddBlock(*blocks.Block) @@ -56,15 +51,15 @@ type impl struct { blocks map[u.Key]*blocks.Block } -func New() BitSwapMessage { - return newMsg() +func New(full bool) BitSwapMessage { + return newMsg(full) } -func newMsg() *impl { +func newMsg(full bool) *impl { return &impl{ blocks: make(map[u.Key]*blocks.Block), wantlist: make(map[u.Key]Entry), - full: true, + full: full, } } @@ -74,8 +69,7 @@ type Entry struct { } func newMessageFromProto(pbm pb.Message) BitSwapMessage { - m := newMsg() - m.SetFull(pbm.GetWantlist().GetFull()) + m := newMsg(pbm.GetWantlist().GetFull()) for _, e := range pbm.GetWantlist().GetEntries() { m.addEntry(u.Key(e.GetBlock()), int(e.GetPriority()), e.GetCancel()) } @@ -86,10 +80,6 @@ func newMessageFromProto(pbm pb.Message) BitSwapMessage { return m } -func (m *impl) SetFull(full bool) { - m.full = full -} - func (m *impl) Full() bool { return m.full } diff --git a/exchange/bitswap/message/message_test.go b/exchange/bitswap/message/message_test.go index cbeed8892..7a6a28a04 100644 --- a/exchange/bitswap/message/message_test.go +++ b/exchange/bitswap/message/message_test.go @@ -13,7 +13,7 @@ import ( func TestAppendWanted(t *testing.T) { const str = "foo" - m := New() + m := New(true) m.AddEntry(u.Key(str), 1) if !wantlistContains(m.ToProto().GetWantlist(), str) { @@ -44,7 +44,7 @@ func TestAppendBlock(t *testing.T) { strs = append(strs, "Celeritas") strs = append(strs, "Incendia") - m := New() + m := New(true) for _, str := range strs { block := blocks.NewBlock([]byte(str)) m.AddBlock(block) @@ -61,7 +61,7 @@ func TestAppendBlock(t *testing.T) { func TestWantlist(t *testing.T) { keystrs := []string{"foo", "bar", "baz", "bat"} - m := New() + m := New(true) for _, s := range keystrs { m.AddEntry(u.Key(s), 1) } @@ -84,7 +84,7 @@ func TestWantlist(t *testing.T) { func TestCopyProtoByValue(t *testing.T) { const str = "foo" - m := New() + m := New(true) protoBeforeAppend := m.ToProto() m.AddEntry(u.Key(str), 1) if wantlistContains(protoBeforeAppend.GetWantlist(), str) { @@ -93,7 +93,7 @@ func TestCopyProtoByValue(t *testing.T) { } func TestToNetFromNetPreservesWantList(t *testing.T) { - original := New() + original := New(true) original.AddEntry(u.Key("M"), 1) original.AddEntry(u.Key("B"), 1) original.AddEntry(u.Key("D"), 1) @@ -124,7 +124,7 @@ func TestToNetFromNetPreservesWantList(t *testing.T) { func TestToAndFromNetMessage(t *testing.T) { - original := New() + original := New(true) original.AddBlock(blocks.NewBlock([]byte("W"))) original.AddBlock(blocks.NewBlock([]byte("E"))) original.AddBlock(blocks.NewBlock([]byte("F"))) @@ -172,7 +172,7 @@ func contains(strs []string, x string) bool { func TestDuplicates(t *testing.T) { b := blocks.NewBlock([]byte("foo")) - msg := New() + msg := New(true) msg.AddEntry(b.Key(), 1) msg.AddEntry(b.Key(), 1) diff --git a/exchange/bitswap/testnet/network_test.go b/exchange/bitswap/testnet/network_test.go index c963ae9ac..9624df5f8 100644 --- a/exchange/bitswap/testnet/network_test.go +++ b/exchange/bitswap/testnet/network_test.go @@ -31,7 +31,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { fromWaiter peer.ID, msgFromWaiter bsmsg.BitSwapMessage) { - msgToWaiter := bsmsg.New() + msgToWaiter := bsmsg.New(true) msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr))) waiter.SendMessage(ctx, fromWaiter, msgToWaiter) })) @@ -55,7 +55,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { } })) - messageSentAsync := bsmsg.New() + messageSentAsync := bsmsg.New(true) messageSentAsync.AddBlock(blocks.NewBlock([]byte("data"))) errSending := waiter.SendMessage( context.Background(), responderPeer.ID(), messageSentAsync) diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index 3b2067914..eb49201a6 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -2,6 +2,7 @@ package bitswap import ( "sync" + "time" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision" @@ -94,9 +95,8 @@ func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) { // throughout the network stack defer env.Sent() - msg := bsmsg.New() + msg := bsmsg.New(false) msg.AddBlock(env.Block) - msg.SetFull(false) err := pm.network.SendMessage(ctx, env.Peer, msg) if err != nil { log.Error(err) @@ -113,11 +113,10 @@ func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue { mq := newMsgQueue(p) // new peer, we will want to give them our full wantlist - fullwantlist := bsmsg.New() + fullwantlist := bsmsg.New(true) for _, e := range pm.wl.Entries() { fullwantlist.AddEntry(e.Key, e.Priority) } - fullwantlist.SetFull(true) mq.out = fullwantlist mq.work <- struct{}{} @@ -180,6 +179,7 @@ func (pm *WantManager) Disconnected(p peer.ID) { // TODO: use goprocess here once i trust it func (pm *WantManager) Run() { + tock := time.NewTicker(rebroadcastDelay.Get()) for { select { case entries := <-pm.incoming: @@ -198,6 +198,19 @@ func (pm *WantManager) Run() { p.addMessage(entries) } + case <-tock.C: + // resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY) + var es []*bsmsg.Entry + for _, e := range pm.wl.Entries() { + es = append(es, &bsmsg.Entry{Entry: e}) + } + for _, p := range pm.peers { + p.outlk.Lock() + p.out = bsmsg.New(true) + p.outlk.Unlock() + + p.addMessage(es) + } case p := <-pm.connect: pm.startPeerHandler(p) case p := <-pm.disconnect: @@ -230,7 +243,7 @@ func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) { // if we have no message held, or the one we are given is full // overwrite the one we are holding if mq.out == nil { - mq.out = bsmsg.New() + mq.out = bsmsg.New(false) } // TODO: add a msg.Combine(...) method From 2eac921e1d27403539589a8abe684a6e43fb2f35 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Thu, 21 May 2015 01:11:57 -0400 Subject: [PATCH 10/15] fixup the bitswap readme --- exchange/bitswap/README.md | 74 +++++++++++++++++--------------------- 1 file changed, 32 insertions(+), 42 deletions(-) diff --git a/exchange/bitswap/README.md b/exchange/bitswap/README.md index bfa0aaa86..cfdbd27e0 100644 --- a/exchange/bitswap/README.md +++ b/exchange/bitswap/README.md @@ -1,47 +1,37 @@ -#Welcome to Bitswap -###(The data trading engine) +# Bitswap -Bitswap is the module that is responsible for requesting and providing data -blocks over the network to and from other ipfs peers. The role of bitswap is -to be a merchant in the large global marketplace of data. +## Protocol +Bitswap is the data trading module for ipfs, it manages requesting and sending +blocks to and from other peers in the network. Bitswap has two main jobs, the +first is to acquire blocks requested by the client from the network. The second +is to judiciously send blocks in its posession to other peers who want them. -##Main Operations -Bitswap has three high level operations: +Bitswap is a message based protocol, as opposed to response-reply. All messages +contain wantlists, or blocks. Upon receiving a wantlist, a node should consider +sending out wanted blocks if they have them. Upon receiving blocks, the node +should send out a notification called a 'Cancel' signifying that they no longer +want the block. At a protocol level, bitswap is very simple. -- **GetBlocks** - - `GetBlocks` is a bitswap method used to request multiple blocks that are likely -to all be provided by the same set of peers (part of a single file, for example). +## go-ipfs Implementation +Internally, when a message with a wantlist is received, it is sent to the +decision engine to be considered, and blocks that we have that are wanted are +placed into the peer request queue. Any block we possess that is wanted by +another peer has a task in the peer request queue created for it. The peer +request queue is a priority queue that sorts available tasks by some metric, +currently, that metric is very simple and aims to fairly address the tasks +of each other peer. More advanced decision logic will be implemented in the +future. Task workers pull tasks to be done off of the queue, retreive the block +to be sent, and send it off. The number of task workers is limited by a constant +factor. -- **GetBlock** - - `GetBlock` is a special case of `GetBlocks` that just requests a single block. +Client requests for new blocks are handled by the want manager, for every new +block (or set of blocks) wanted, the 'WantBlocks' method is invoked. The want +manager then ensures that connected peers are notified of the new block that we +want by sending the new entries to a message queue for each peer. The message +queue will loop while there is work available and do the following: 1) Ensure it +has a connection to its peer, 2) grab the message to be sent, and 3) send it. +If new messages are added while the loop is in steps 1 or 3, the messages are +combined into one to avoid having to keep an actual queue and send multiple +messages. The same process occurs when the client receives a block and sends a +cancel message for it. -- **HasBlock** - - `HasBlock` registers a local block with bitswap. Bitswap will then send that -block to any connected peers who want it (with the strategies approval), record -that transaction in the ledger and announce to the DHT that the block is being -provided. - -##Internal Details -All `GetBlock` requests are relayed into a single for-select loop via channels. -Calls to `GetBlocks` will have `FindProviders` called for only the first key in -the set initially, This is an optimization attempting to cut down on the number -of RPCs required. After a timeout (specified by the strategies -`GetRebroadcastDelay`) Bitswap will iterate through all keys still in the local -wantlist, perform a find providers call for each, and sent the wantlist out to -those providers. This is the fallback behaviour for cases where our initial -assumption about one peer potentially having multiple blocks in a set does not -hold true. - -When receiving messages, Bitswaps `ReceiveMessage` method is called. A bitswap -message may contain the wantlist of the peer who sent the message, and an array -of blocks that were on our local wantlist. Any blocks we receive in a bitswap -message will be passed to `HasBlock`, and the other peers wantlist gets updated -in the strategy by `bs.strategy.MessageReceived`. -If another peers wantlist is received, Bitswap will call its strategies -`ShouldSendBlockToPeer` method to determine whether or not the other peer will -be sent the block they are requesting (if we even have it). - -##Outstanding TODOs: -- [ ] Ensure only one request active per key -- [ ] More involved strategies -- [ ] Ensure only wanted blocks are counted in ledgers From 2882c793e23457bedf3b2955c1a57fc9fcdb086d Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sun, 17 May 2015 14:08:05 -0700 Subject: [PATCH 11/15] add a distribution test with the rebroadcast delay disabled --- exchange/bitswap/bitswap_test.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index 6548472c9..803bcd223 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -120,6 +120,18 @@ func TestLargeFile(t *testing.T) { PerformDistributionTest(t, numInstances, numBlocks) } +func TestLargeFileNoRebroadcast(t *testing.T) { + rbd := rebroadcastDelay.Get() + rebroadcastDelay.Set(time.Hour * 24 * 365 * 10) // ten years should be long enough + if testing.Short() { + t.SkipNow() + } + numInstances := 10 + numBlocks := 100 + PerformDistributionTest(t, numInstances, numBlocks) + rebroadcastDelay.Set(rbd) +} + func TestLargeFileTwoPeers(t *testing.T) { if testing.Short() { t.SkipNow() From c273a3bd4f616cb95cebb1cf0b4b665bac8a6d39 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sun, 17 May 2015 17:09:53 -0700 Subject: [PATCH 12/15] better bitswap logging --- exchange/bitswap/bitswap.go | 2 +- exchange/bitswap/wantmanager.go | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 57359c0ec..db7bc033f 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -288,7 +288,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg bs.dupBlocksRecvd++ } bs.counterLk.Unlock() - log.Debugf("got block %s from %s (%d,%d)", block, p, bs.blocksRecvd, bs.dupBlocksRecvd) + log.Infof("got block %s from %s (%d,%d)", block, p, bs.blocksRecvd, bs.dupBlocksRecvd) hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout) if err := bs.HasBlock(hasBlockCtx, block); err != nil { diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index eb49201a6..74372f7f0 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -66,6 +66,7 @@ type msgQueue struct { } func (pm *WantManager) WantBlocks(ks []u.Key) { + log.Infof("want blocks: %s", ks) pm.addEntries(ks, false) } @@ -97,6 +98,7 @@ func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) { msg := bsmsg.New(false) msg.AddBlock(env.Block) + log.Infof("Sending block %s to %s", env.Peer, env.Block) err := pm.network.SendMessage(ctx, env.Peer, msg) if err != nil { log.Error(err) @@ -143,8 +145,9 @@ func (pm *WantManager) runQueue(mq *msgQueue) { err := pm.network.ConnectTo(pm.ctx, mq.p) if err != nil { - log.Error(err) + log.Errorf("cant connect to peer %s: %s", mq.p, err) // TODO: cant connect, what now? + continue } // grab outgoing message From b71a0aced018909ca2d56797394e4ae3b02516b8 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Tue, 19 May 2015 11:26:50 -0700 Subject: [PATCH 13/15] clarify synhronization constructs --- exchange/bitswap/wantmanager.go | 38 +++++++++++++++------------------ 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index 74372f7f0..4efd120ef 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -14,23 +14,17 @@ import ( ) type WantManager struct { - receiver bsnet.Receiver - - incoming chan []*bsmsg.Entry - - // notification channel for new peers connecting - connect chan peer.ID - - // notification channel for peers disconnecting - disconnect chan peer.ID + // sync channels for Run loop + incoming chan []*bsmsg.Entry + connect chan peer.ID // notification channel for new peers connecting + disconnect chan peer.ID // notification channel for peers disconnecting + // synchronized by Run loop, only touch inside there peers map[peer.ID]*msgQueue - - wl *wantlist.Wantlist + wl *wantlist.Wantlist network bsnet.BitSwapNetwork - - ctx context.Context + ctx context.Context } func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager { @@ -58,8 +52,9 @@ type cancellation struct { type msgQueue struct { p peer.ID - outlk sync.Mutex - out bsmsg.BitSwapMessage + outlk sync.Mutex + out bsmsg.BitSwapMessage + network bsnet.BitSwapNetwork work chan struct{} done chan struct{} @@ -112,7 +107,7 @@ func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue { return nil } - mq := newMsgQueue(p) + mq := pm.newMsgQueue(p) // new peer, we will want to give them our full wantlist fullwantlist := bsmsg.New(true) @@ -123,7 +118,7 @@ func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue { mq.work <- struct{}{} pm.peers[p] = mq - go pm.runQueue(mq) + go mq.runQueue(pm.ctx) return mq } @@ -138,12 +133,12 @@ func (pm *WantManager) stopPeerHandler(p peer.ID) { delete(pm.peers, p) } -func (pm *WantManager) runQueue(mq *msgQueue) { +func (mq *msgQueue) runQueue(ctx context.Context) { for { select { case <-mq.work: // there is work to be done - err := pm.network.ConnectTo(pm.ctx, mq.p) + err := mq.network.ConnectTo(ctx, mq.p) if err != nil { log.Errorf("cant connect to peer %s: %s", mq.p, err) // TODO: cant connect, what now? @@ -161,7 +156,7 @@ func (pm *WantManager) runQueue(mq *msgQueue) { mq.outlk.Unlock() // send wantlist updates - err = pm.network.SendMessage(pm.ctx, mq.p, wlm) + err = mq.network.SendMessage(ctx, mq.p, wlm) if err != nil { log.Error("bitswap send error: ", err) // TODO: what do we do if this fails? @@ -224,10 +219,11 @@ func (pm *WantManager) Run() { } } -func newMsgQueue(p peer.ID) *msgQueue { +func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue { mq := new(msgQueue) mq.done = make(chan struct{}) mq.work = make(chan struct{}, 1) + mq.network = wm.network mq.p = p return mq From 2f934e8c58697d7b6fbc374d4903f85725cc1fc5 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Tue, 19 May 2015 13:13:38 -0700 Subject: [PATCH 14/15] warning -> notice --- exchange/bitswap/wantmanager.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index 4efd120ef..a1ab8a022 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -96,7 +96,7 @@ func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) { log.Infof("Sending block %s to %s", env.Peer, env.Block) err := pm.network.SendMessage(ctx, env.Peer, msg) if err != nil { - log.Error(err) + log.Noticef("sendblock error: %s", err) } } @@ -158,7 +158,7 @@ func (mq *msgQueue) runQueue(ctx context.Context) { // send wantlist updates err = mq.network.SendMessage(ctx, mq.p, wlm) if err != nil { - log.Error("bitswap send error: ", err) + log.Noticef("bitswap send error: %s", err) // TODO: what do we do if this fails? } case <-mq.done: From ce0d2f46d6ff2c9f03c5cac8358812dd532afacf Mon Sep 17 00:00:00 2001 From: Jeromy Date: Tue, 19 May 2015 15:48:12 -0700 Subject: [PATCH 15/15] defer tock.Stop() --- exchange/bitswap/wantmanager.go | 1 + 1 file changed, 1 insertion(+) diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index a1ab8a022..29706710f 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -178,6 +178,7 @@ func (pm *WantManager) Disconnected(p peer.ID) { // TODO: use goprocess here once i trust it func (pm *WantManager) Run() { tock := time.NewTicker(rebroadcastDelay.Get()) + defer tock.Stop() for { select { case entries := <-pm.incoming: