mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-29 17:36:38 +08:00
refactor(bitswap:notifications) move, rename
add interface
This commit is contained in:
@ -7,6 +7,7 @@ import (
|
|||||||
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
|
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
|
||||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
|
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
|
||||||
|
|
||||||
|
notifications "github.com/jbenet/go-ipfs/bitswap/notifications"
|
||||||
blocks "github.com/jbenet/go-ipfs/blocks"
|
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||||
swarm "github.com/jbenet/go-ipfs/net/swarm"
|
swarm "github.com/jbenet/go-ipfs/net/swarm"
|
||||||
peer "github.com/jbenet/go-ipfs/peer"
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
@ -39,7 +40,7 @@ type BitSwap struct {
|
|||||||
// routing interface for communication
|
// routing interface for communication
|
||||||
routing *dht.IpfsDHT
|
routing *dht.IpfsDHT
|
||||||
|
|
||||||
notifications *notifications
|
notifications notifications.PubSub
|
||||||
|
|
||||||
// partners is a map of currently active bitswap relationships.
|
// partners is a map of currently active bitswap relationships.
|
||||||
// The Ledger has the peer.ID, and the peer connection works through net.
|
// The Ledger has the peer.ID, and the peer connection works through net.
|
||||||
@ -69,7 +70,7 @@ func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore, r routing.IpfsR
|
|||||||
routing: r.(*dht.IpfsDHT),
|
routing: r.(*dht.IpfsDHT),
|
||||||
meschan: net.GetChannel(swarm.PBWrapper_BITSWAP),
|
meschan: net.GetChannel(swarm.PBWrapper_BITSWAP),
|
||||||
haltChan: make(chan struct{}),
|
haltChan: make(chan struct{}),
|
||||||
notifications: newNotifications(),
|
notifications: notifications.New(),
|
||||||
}
|
}
|
||||||
|
|
||||||
go bs.handleMessages()
|
go bs.handleMessages()
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
package bitswap
|
package notifications
|
||||||
|
|
||||||
import (
|
import (
|
||||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
@ -8,16 +8,22 @@ import (
|
|||||||
u "github.com/jbenet/go-ipfs/util"
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
type notifications struct {
|
type PubSub interface {
|
||||||
|
Publish(block *blocks.Block)
|
||||||
|
Subscribe(ctx context.Context, k u.Key) <-chan *blocks.Block
|
||||||
|
Shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
|
func New() PubSub {
|
||||||
|
const bufferSize = 16
|
||||||
|
return &impl{pubsub.New(bufferSize)}
|
||||||
|
}
|
||||||
|
|
||||||
|
type impl struct {
|
||||||
wrapped *pubsub.PubSub
|
wrapped *pubsub.PubSub
|
||||||
}
|
}
|
||||||
|
|
||||||
func newNotifications() *notifications {
|
func (ps *impl) Publish(block *blocks.Block) {
|
||||||
const bufferSize = 16
|
|
||||||
return ¬ifications{pubsub.New(bufferSize)}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ps *notifications) Publish(block *blocks.Block) {
|
|
||||||
topic := string(block.Key())
|
topic := string(block.Key())
|
||||||
ps.wrapped.Pub(block, topic)
|
ps.wrapped.Pub(block, topic)
|
||||||
}
|
}
|
||||||
@ -25,7 +31,7 @@ func (ps *notifications) Publish(block *blocks.Block) {
|
|||||||
// Subscribe returns a one-time use |blockChannel|. |blockChannel| returns nil
|
// Subscribe returns a one-time use |blockChannel|. |blockChannel| returns nil
|
||||||
// if the |ctx| times out or is cancelled. Then channel is closed after the
|
// if the |ctx| times out or is cancelled. Then channel is closed after the
|
||||||
// block given by |k| is sent.
|
// block given by |k| is sent.
|
||||||
func (ps *notifications) Subscribe(ctx context.Context, k u.Key) <-chan *blocks.Block {
|
func (ps *impl) Subscribe(ctx context.Context, k u.Key) <-chan *blocks.Block {
|
||||||
topic := string(k)
|
topic := string(k)
|
||||||
subChan := ps.wrapped.SubOnce(topic)
|
subChan := ps.wrapped.SubOnce(topic)
|
||||||
blockChannel := make(chan *blocks.Block)
|
blockChannel := make(chan *blocks.Block)
|
||||||
@ -44,6 +50,6 @@ func (ps *notifications) Subscribe(ctx context.Context, k u.Key) <-chan *blocks.
|
|||||||
return blockChannel
|
return blockChannel
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ps *notifications) Shutdown() {
|
func (ps *impl) Shutdown() {
|
||||||
ps.wrapped.Shutdown()
|
ps.wrapped.Shutdown()
|
||||||
}
|
}
|
@ -1,4 +1,4 @@
|
|||||||
package bitswap
|
package notifications
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
@ -13,7 +13,7 @@ import (
|
|||||||
func TestPublishSubscribe(t *testing.T) {
|
func TestPublishSubscribe(t *testing.T) {
|
||||||
blockSent := getBlockOrFail(t, "Greetings from The Interval")
|
blockSent := getBlockOrFail(t, "Greetings from The Interval")
|
||||||
|
|
||||||
n := newNotifications()
|
n := New()
|
||||||
defer n.Shutdown()
|
defer n.Shutdown()
|
||||||
ch := n.Subscribe(context.Background(), blockSent.Key())
|
ch := n.Subscribe(context.Background(), blockSent.Key())
|
||||||
|
|
||||||
@ -28,7 +28,7 @@ func TestCarryOnWhenDeadlineExpires(t *testing.T) {
|
|||||||
impossibleDeadline := time.Nanosecond
|
impossibleDeadline := time.Nanosecond
|
||||||
fastExpiringCtx, _ := context.WithTimeout(context.Background(), impossibleDeadline)
|
fastExpiringCtx, _ := context.WithTimeout(context.Background(), impossibleDeadline)
|
||||||
|
|
||||||
n := newNotifications()
|
n := New()
|
||||||
defer n.Shutdown()
|
defer n.Shutdown()
|
||||||
blockChannel := n.Subscribe(fastExpiringCtx, getBlockOrFail(t, "A Missed Connection").Key())
|
blockChannel := n.Subscribe(fastExpiringCtx, getBlockOrFail(t, "A Missed Connection").Key())
|
||||||
|
|
Reference in New Issue
Block a user