From 0abc72c0625374c48db5ee7d2e97a4d120fddaff Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 19 Nov 2014 23:32:51 +0000 Subject: [PATCH] move some variables into strategy --- blockservice/blockservice.go | 5 +++++ exchange/bitswap/bitswap.go | 18 ++++++++++-------- exchange/bitswap/strategy/interface.go | 7 +++++++ exchange/bitswap/strategy/strategy.go | 13 +++++++++++++ net/interface.go | 4 ++++ net/mux/mux.go | 16 ++++++++++++++++ net/net.go | 5 +++++ 7 files changed, 60 insertions(+), 8 deletions(-) diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 2eb3d695d..4413eee16 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -96,6 +96,11 @@ func (s *BlockService) GetBlock(ctx context.Context, k u.Key) (*blocks.Block, er } } +func (s *BlockService) GetBlocks(ctx context.Context, ks []u.Key) (<-chan blocks.Block, error) { + // TODO: + return nil, nil +} + // DeleteBlock deletes a block in the blockservice from the datastore func (s *BlockService) DeleteBlock(k u.Key) error { return s.Datastore.Delete(k.DsKey()) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 1539b5fc8..7ad9afb6e 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -157,9 +157,10 @@ func (bs *bitswap) run(ctx context.Context) { rebroadcastTime := time.Second * 5 var providers <-chan peer.Peer // NB: must be initialized to zero value - broadcastSignal := time.After(rebroadcastPeriod) - unsentKeys := 0 + broadcastSignal := time.After(bs.strategy.GetRebroadcastDelay()) + // Number of unsent keys for the current batch + unsentKeys := 0 for { select { case <-broadcastSignal: @@ -170,14 +171,14 @@ func (bs *bitswap) run(ctx context.Context) { if providers == nil { // rely on semi randomness of maps firstKey := wantlist[0] - providers = bs.routing.FindProvidersAsync(ctx, firstKey, 6) + providers = bs.routing.FindProvidersAsync(ctx, firstKey, maxProvidersPerRequest) } err := bs.sendWantListTo(ctx, providers) if err != nil { log.Errorf("error sending wantlist: %s", err) } providers = nil - broadcastSignal = time.After(rebroadcastPeriod) + broadcastSignal = time.After(bs.strategy.GetRebroadcastDelay()) case k := <-bs.blockRequests: if unsentKeys == 0 { @@ -185,19 +186,19 @@ func (bs *bitswap) run(ctx context.Context) { } unsentKeys++ - if unsentKeys >= numKeysPerBatch { + if unsentKeys >= bs.strategy.GetBatchSize() { // send wantlist to providers err := bs.sendWantListTo(ctx, providers) if err != nil { log.Errorf("error sending wantlist: %s", err) } unsentKeys = 0 - broadcastSignal = time.After(rebroadcastPeriod) + broadcastSignal = time.After(bs.strategy.GetRebroadcastDelay()) providers = nil } else { // set a timeout to wait for more blocks or send current wantlist - broadcastSignal = time.After(batchDelay) + broadcastSignal = time.After(bs.strategy.GetBatchDelay()) } case <-ctx.Done(): return @@ -217,7 +218,7 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error { // TODO(brian): handle errors func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsmsg.BitSwapMessage) ( peer.Peer, bsmsg.BitSwapMessage) { - log.Debugf("ReceiveMessage from %v", p.Key()) + log.Debugf("ReceiveMessage from %s", p) log.Debugf("Message wantlist: %v", incoming.Wantlist()) if p == nil { @@ -239,6 +240,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm for _, block := range incoming.Blocks() { // TODO verify blocks? if err := bs.blockstore.Put(&block); err != nil { + log.Criticalf("error putting block: %s", err) continue // FIXME(brian): err ignored } bs.notifications.Publish(block) diff --git a/exchange/bitswap/strategy/interface.go b/exchange/bitswap/strategy/interface.go index ac1f09a1f..9ac601d70 100644 --- a/exchange/bitswap/strategy/interface.go +++ b/exchange/bitswap/strategy/interface.go @@ -1,6 +1,8 @@ package strategy import ( + "time" + bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" @@ -29,4 +31,9 @@ type Strategy interface { NumBytesSentTo(peer.Peer) uint64 NumBytesReceivedFrom(peer.Peer) uint64 + + // Values determining bitswap behavioural patterns + GetBatchSize() int + GetBatchDelay() time.Duration + GetRebroadcastDelay() time.Duration } diff --git a/exchange/bitswap/strategy/strategy.go b/exchange/bitswap/strategy/strategy.go index 78209c38e..d58894b05 100644 --- a/exchange/bitswap/strategy/strategy.go +++ b/exchange/bitswap/strategy/strategy.go @@ -3,6 +3,7 @@ package strategy import ( "errors" "sync" + "time" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" peer "github.com/jbenet/go-ipfs/peer" @@ -139,3 +140,15 @@ func (s *strategist) ledger(p peer.Peer) *ledger { } return l } + +func (s *strategist) GetBatchSize() int { + return 10 +} + +func (s *strategist) GetBatchDelay() time.Duration { + return time.Millisecond * 3 +} + +func (s *strategist) GetRebroadcastDelay() time.Duration { + return time.Second * 2 +} diff --git a/net/interface.go b/net/interface.go index 3c0e8d204..60e650c2f 100644 --- a/net/interface.go +++ b/net/interface.go @@ -42,6 +42,10 @@ type Network interface { // the network since it was instantiated GetBandwidthTotals() (uint64, uint64) + // GetMessageCounts returns the total number of messages passed through + // the network since it was instantiated + GetMessageCounts() (uint64, uint64) + // SendMessage sends given Message out SendMessage(msg.NetMessage) error diff --git a/net/mux/mux.go b/net/mux/mux.go index 1b4c06344..d971e9054 100644 --- a/net/mux/mux.go +++ b/net/mux/mux.go @@ -45,9 +45,11 @@ type Muxer struct { bwiLock sync.Mutex bwIn uint64 + msgIn uint64 bwoLock sync.Mutex bwOut uint64 + msgOut uint64 *msg.Pipe ctxc.ContextCloser @@ -76,6 +78,18 @@ func (m *Muxer) GetPipe() *msg.Pipe { return m.Pipe } +// GetMessageCounts return the in/out message count measured over this muxer. +func (m *Muxer) GetMessageCounts() (in uint64, out uint64) { + m.bwiLock.Lock() + in = m.msgIn + m.bwiLock.Unlock() + + m.bwoLock.Lock() + out = m.msgOut + m.bwoLock.Unlock() + return +} + // GetBandwidthTotals return the in/out bandwidth measured over this muxer. func (m *Muxer) GetBandwidthTotals() (in uint64, out uint64) { m.bwiLock.Lock() @@ -125,6 +139,7 @@ func (m *Muxer) handleIncomingMessage(m1 msg.NetMessage) { m.bwiLock.Lock() // TODO: compensate for overhead m.bwIn += uint64(len(m1.Data())) + m.msgIn++ m.bwiLock.Unlock() data, pid, err := unwrapData(m1.Data()) @@ -182,6 +197,7 @@ func (m *Muxer) handleOutgoingMessage(pid pb.ProtocolID, m1 msg.NetMessage) { // TODO: compensate for overhead // TODO(jbenet): switch this to a goroutine to prevent sync waiting. m.bwOut += uint64(len(data)) + m.msgOut++ m.bwoLock.Unlock() m2 := msg.New(m1.Peer(), data) diff --git a/net/net.go b/net/net.go index 0ee1e76b5..3778d839a 100644 --- a/net/net.go +++ b/net/net.go @@ -110,6 +110,11 @@ func (n *IpfsNetwork) GetBandwidthTotals() (in uint64, out uint64) { return n.muxer.GetBandwidthTotals() } +// GetBandwidthTotals returns the total amount of messages transferred +func (n *IpfsNetwork) GetMessageCounts() (in uint64, out uint64) { + return n.muxer.GetMessageCounts() +} + // ListenAddresses returns a list of addresses at which this network listens. func (n *IpfsNetwork) ListenAddresses() []ma.Multiaddr { return n.swarm.ListenAddresses()