From e6498b37338d6b0653a1c80e8aa4580e1aadb49c Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 5 Sep 2014 02:58:31 +0000 Subject: [PATCH] fix issue with blocks not being trimmed properly and being too large to be sent over the network --- bitswap/bitswap.go | 4 +++- core/core.go | 1 + importer/splitting.go | 2 +- swarm/conn.go | 5 ++++- swarm/swarm.go | 5 +++++ 5 files changed, 14 insertions(+), 3 deletions(-) diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index 99096f2f7..b8ff1584f 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -220,6 +220,8 @@ func (bs *BitSwap) peerWantsBlock(p *peer.Peer, want string) { } bs.SendBlock(p, bblk) ledg.SentBytes(len(blk)) + } else { + u.DOut("Decided not to send block.") } } @@ -248,7 +250,7 @@ func (bs *BitSwap) GetLedger(p *peer.Peer) *Ledger { } l = new(Ledger) - l.Strategy = StandardStrategy + l.Strategy = bs.strategy l.Partner = p bs.partners[p.Key()] = l return l diff --git a/core/core.go b/core/core.go index 882d445eb..20d21188a 100644 --- a/core/core.go +++ b/core/core.go @@ -75,6 +75,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) { if err != nil { return nil, err } + swap.SetStrategy(bitswap.YesManStrategy) } bs, err := bserv.NewBlockService(d, swap) diff --git a/importer/splitting.go b/importer/splitting.go index 6ffdeaf12..1832743d8 100644 --- a/importer/splitting.go +++ b/importer/splitting.go @@ -24,7 +24,7 @@ func SplitterBySize(n int) BlockSplitter { return } if nread < n { - chunk = chunk[:n] + chunk = chunk[:nread] } out <- chunk } diff --git a/swarm/conn.go b/swarm/conn.go index 072b53437..98070ab17 100644 --- a/swarm/conn.go +++ b/swarm/conn.go @@ -13,6 +13,8 @@ import ( // ChanBuffer is the size of the buffer in the Conn Chan const ChanBuffer = 10 +const MaxMessageSize = 1 << 19 + // Conn represents a connection to another Peer (IPFS Node). type Conn struct { Peer *peer.Peer @@ -66,13 +68,14 @@ func newConnChans(c *Conn) error { c.Closed = make(chan bool, 1) go c.Outgoing.WriteTo(c.Conn) - go c.Incoming.ReadFrom(c.Conn, 1<<12) + go c.Incoming.ReadFrom(c.Conn, MaxMessageSize) return nil } // Close closes the connection, and associated channels. func (s *Conn) Close() error { + u.DOut("Closing Conn.\n") if s.Conn == nil { return fmt.Errorf("Already closed") // already closed } diff --git a/swarm/swarm.go b/swarm/swarm.go index fc32ff249..ea9aaf9cd 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -279,6 +279,10 @@ func (s *Swarm) fanOut() { return } + if len(msg.Data) > MaxMessageSize { + s.Error(fmt.Errorf("Exceeded max message size! (tried to send len = %d)", len(msg.Data))) + } + s.connsLock.RLock() conn, found := s.conns[msg.Peer.Key()] s.connsLock.RUnlock() @@ -459,6 +463,7 @@ func (s *Swarm) ConnectNew(addr *ma.Multiaddr) (*peer.Peer, error) { // Removes a given peer from the swarm and closes connections to it func (s *Swarm) Drop(p *peer.Peer) error { + u.DOut("Dropping peer: [%s]\n", p.ID.Pretty()) s.connsLock.RLock() conn, found := s.conns[u.Key(p.ID)] s.connsLock.RUnlock()