From a7ef09554f92e43e8875337b8040d667b1658a3e Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Fri, 12 Sep 2014 03:08:32 -0700 Subject: [PATCH] refactor(bitswap:notifications) move, rename add interface --- bitswap/bitswap.go | 5 ++-- bitswap/{ => notifications}/notifications.go | 26 ++++++++++++------- .../{ => notifications}/notifications_test.go | 6 ++--- 3 files changed, 22 insertions(+), 15 deletions(-) rename bitswap/{ => notifications}/notifications.go (71%) rename bitswap/{ => notifications}/notifications_test.go (95%) diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index cb1d0b9cb..135739672 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -7,6 +7,7 @@ import ( proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go" + notifications "github.com/jbenet/go-ipfs/bitswap/notifications" blocks "github.com/jbenet/go-ipfs/blocks" swarm "github.com/jbenet/go-ipfs/net/swarm" peer "github.com/jbenet/go-ipfs/peer" @@ -39,7 +40,7 @@ type BitSwap struct { // routing interface for communication routing *dht.IpfsDHT - notifications *notifications + notifications notifications.PubSub // partners is a map of currently active bitswap relationships. // The Ledger has the peer.ID, and the peer connection works through net. @@ -69,7 +70,7 @@ func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore, r routing.IpfsR routing: r.(*dht.IpfsDHT), meschan: net.GetChannel(swarm.PBWrapper_BITSWAP), haltChan: make(chan struct{}), - notifications: newNotifications(), + notifications: notifications.New(), } go bs.handleMessages() diff --git a/bitswap/notifications.go b/bitswap/notifications/notifications.go similarity index 71% rename from bitswap/notifications.go rename to bitswap/notifications/notifications.go index 39fd42bb7..dad2c9ea3 100644 --- a/bitswap/notifications.go +++ b/bitswap/notifications/notifications.go @@ -1,4 +1,4 @@ -package bitswap +package notifications import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" @@ -8,16 +8,22 @@ import ( u "github.com/jbenet/go-ipfs/util" ) -type notifications struct { +type PubSub interface { + Publish(block *blocks.Block) + Subscribe(ctx context.Context, k u.Key) <-chan *blocks.Block + Shutdown() +} + +func New() PubSub { + const bufferSize = 16 + return &impl{pubsub.New(bufferSize)} +} + +type impl struct { wrapped *pubsub.PubSub } -func newNotifications() *notifications { - const bufferSize = 16 - return ¬ifications{pubsub.New(bufferSize)} -} - -func (ps *notifications) Publish(block *blocks.Block) { +func (ps *impl) Publish(block *blocks.Block) { topic := string(block.Key()) ps.wrapped.Pub(block, topic) } @@ -25,7 +31,7 @@ func (ps *notifications) Publish(block *blocks.Block) { // Subscribe returns a one-time use |blockChannel|. |blockChannel| returns nil // if the |ctx| times out or is cancelled. Then channel is closed after the // block given by |k| is sent. -func (ps *notifications) Subscribe(ctx context.Context, k u.Key) <-chan *blocks.Block { +func (ps *impl) Subscribe(ctx context.Context, k u.Key) <-chan *blocks.Block { topic := string(k) subChan := ps.wrapped.SubOnce(topic) blockChannel := make(chan *blocks.Block) @@ -44,6 +50,6 @@ func (ps *notifications) Subscribe(ctx context.Context, k u.Key) <-chan *blocks. return blockChannel } -func (ps *notifications) Shutdown() { +func (ps *impl) Shutdown() { ps.wrapped.Shutdown() } diff --git a/bitswap/notifications_test.go b/bitswap/notifications/notifications_test.go similarity index 95% rename from bitswap/notifications_test.go rename to bitswap/notifications/notifications_test.go index 07006fb36..9430b8ec2 100644 --- a/bitswap/notifications_test.go +++ b/bitswap/notifications/notifications_test.go @@ -1,4 +1,4 @@ -package bitswap +package notifications import ( "bytes" @@ -13,7 +13,7 @@ import ( func TestPublishSubscribe(t *testing.T) { blockSent := getBlockOrFail(t, "Greetings from The Interval") - n := newNotifications() + n := New() defer n.Shutdown() ch := n.Subscribe(context.Background(), blockSent.Key()) @@ -28,7 +28,7 @@ func TestCarryOnWhenDeadlineExpires(t *testing.T) { impossibleDeadline := time.Nanosecond fastExpiringCtx, _ := context.WithTimeout(context.Background(), impossibleDeadline) - n := newNotifications() + n := New() defer n.Shutdown() blockChannel := n.Subscribe(fastExpiringCtx, getBlockOrFail(t, "A Missed Connection").Key())