mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-29 17:36:38 +08:00
refactor(bitswap) leverage third-party pubsub lib
use a third-party pubsub library for internal communications Insights: * Within bitswap, the actors don't need anything more than simple pubsub behavior. Wrapping and unwrapping messages proves unneccessary. Changes: * Simplifies the interface for both actors calling GetBlock and actors receiving blocks on the network * Leverages a well-tested third-party pubsub library Design Goals: * reduce complexity * extract implementation details (wrapping and unwrapping data, etc) from bitswap and let bitswap focus on composition of core algorithms operations
This commit is contained in:
4
Godeps/Godeps.json
generated
4
Godeps/Godeps.json
generated
@ -71,6 +71,10 @@
|
|||||||
{
|
{
|
||||||
"ImportPath": "github.com/syndtr/goleveldb/leveldb",
|
"ImportPath": "github.com/syndtr/goleveldb/leveldb",
|
||||||
"Rev": "99056d50e56252fbe0021d5c893defca5a76baf8"
|
"Rev": "99056d50e56252fbe0021d5c893defca5a76baf8"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"ImportPath": "github.com/tuxychandru/pubsub",
|
||||||
|
"Rev": "02de8aa2db3d570c5ab1be5ba67b456fd0fb7c4e"
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
30
Godeps/_workspace/src/github.com/tuxychandru/pubsub/README.md
generated
vendored
Normal file
30
Godeps/_workspace/src/github.com/tuxychandru/pubsub/README.md
generated
vendored
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
Install pubsub with,
|
||||||
|
|
||||||
|
go get github.com/tuxychandru/pubsub
|
||||||
|
|
||||||
|
View the [API Documentation](http://godoc.org/github.com/tuxychandru/pubsub).
|
||||||
|
|
||||||
|
## License
|
||||||
|
|
||||||
|
Copyright (c) 2013, Chandra Sekar S
|
||||||
|
All rights reserved.
|
||||||
|
|
||||||
|
Redistribution and use in source and binary forms, with or without
|
||||||
|
modification, are permitted provided that the following conditions are met:
|
||||||
|
|
||||||
|
1. Redistributions of source code must retain the above copyright notice, this
|
||||||
|
list of conditions and the following disclaimer.
|
||||||
|
2. Redistributions in binary form must reproduce the above copyright notice,
|
||||||
|
this list of conditions and the following disclaimer in the documentation
|
||||||
|
and/or other materials provided with the distribution.
|
||||||
|
|
||||||
|
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
|
||||||
|
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
||||||
|
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||||
|
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
|
||||||
|
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
|
||||||
|
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
|
||||||
|
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
|
||||||
|
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||||
|
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
|
||||||
|
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
208
Godeps/_workspace/src/github.com/tuxychandru/pubsub/pubsub.go
generated
vendored
Normal file
208
Godeps/_workspace/src/github.com/tuxychandru/pubsub/pubsub.go
generated
vendored
Normal file
@ -0,0 +1,208 @@
|
|||||||
|
// Copyright 2013, Chandra Sekar S. All rights reserved.
|
||||||
|
// Use of this source code is governed by a BSD-style
|
||||||
|
// license that can be found in the README.md file.
|
||||||
|
|
||||||
|
// Package pubsub implements a simple multi-topic pub-sub
|
||||||
|
// library.
|
||||||
|
//
|
||||||
|
// Topics must be strings and messages of any type can be
|
||||||
|
// published. A topic can have any number of subcribers and
|
||||||
|
// all of them receive messages published on the topic.
|
||||||
|
package pubsub
|
||||||
|
|
||||||
|
type operation int
|
||||||
|
|
||||||
|
const (
|
||||||
|
sub operation = iota
|
||||||
|
subOnce
|
||||||
|
pub
|
||||||
|
unsub
|
||||||
|
unsubAll
|
||||||
|
closeTopic
|
||||||
|
shutdown
|
||||||
|
)
|
||||||
|
|
||||||
|
// PubSub is a collection of topics.
|
||||||
|
type PubSub struct {
|
||||||
|
cmdChan chan cmd
|
||||||
|
capacity int
|
||||||
|
}
|
||||||
|
|
||||||
|
type cmd struct {
|
||||||
|
op operation
|
||||||
|
topics []string
|
||||||
|
ch chan interface{}
|
||||||
|
msg interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// New creates a new PubSub and starts a goroutine for handling operations.
|
||||||
|
// The capacity of the channels created by Sub and SubOnce will be as specified.
|
||||||
|
func New(capacity int) *PubSub {
|
||||||
|
ps := &PubSub{make(chan cmd), capacity}
|
||||||
|
go ps.start()
|
||||||
|
return ps
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sub returns a channel on which messages published on any of
|
||||||
|
// the specified topics can be received.
|
||||||
|
func (ps *PubSub) Sub(topics ...string) chan interface{} {
|
||||||
|
return ps.sub(sub, topics...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SubOnce is similar to Sub, but only the first message published, after subscription,
|
||||||
|
// on any of the specified topics can be received.
|
||||||
|
func (ps *PubSub) SubOnce(topics ...string) chan interface{} {
|
||||||
|
return ps.sub(subOnce, topics...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ps *PubSub) sub(op operation, topics ...string) chan interface{} {
|
||||||
|
ch := make(chan interface{}, ps.capacity)
|
||||||
|
ps.cmdChan <- cmd{op: op, topics: topics, ch: ch}
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddSub adds subscriptions to an existing channel.
|
||||||
|
func (ps *PubSub) AddSub(ch chan interface{}, topics ...string) {
|
||||||
|
ps.cmdChan <- cmd{op: sub, topics: topics, ch: ch}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pub publishes the given message to all subscribers of
|
||||||
|
// the specified topics.
|
||||||
|
func (ps *PubSub) Pub(msg interface{}, topics ...string) {
|
||||||
|
ps.cmdChan <- cmd{op: pub, topics: topics, msg: msg}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unsub unsubscribes the given channel from the specified
|
||||||
|
// topics. If no topic is specified, it is unsubscribed
|
||||||
|
// from all topics.
|
||||||
|
func (ps *PubSub) Unsub(ch chan interface{}, topics ...string) {
|
||||||
|
if len(topics) == 0 {
|
||||||
|
ps.cmdChan <- cmd{op: unsubAll, ch: ch}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ps.cmdChan <- cmd{op: unsub, topics: topics, ch: ch}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes all channels currently subscribed to the specified topics.
|
||||||
|
// If a channel is subscribed to multiple topics, some of which is
|
||||||
|
// not specified, it is not closed.
|
||||||
|
func (ps *PubSub) Close(topics ...string) {
|
||||||
|
ps.cmdChan <- cmd{op: closeTopic, topics: topics}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Shutdown closes all subscribed channels and terminates the goroutine.
|
||||||
|
func (ps *PubSub) Shutdown() {
|
||||||
|
ps.cmdChan <- cmd{op: shutdown}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ps *PubSub) start() {
|
||||||
|
reg := registry{
|
||||||
|
topics: make(map[string]map[chan interface{}]bool),
|
||||||
|
revTopics: make(map[chan interface{}]map[string]bool),
|
||||||
|
}
|
||||||
|
|
||||||
|
loop:
|
||||||
|
for cmd := range ps.cmdChan {
|
||||||
|
if cmd.topics == nil {
|
||||||
|
switch cmd.op {
|
||||||
|
case unsubAll:
|
||||||
|
reg.removeChannel(cmd.ch)
|
||||||
|
|
||||||
|
case shutdown:
|
||||||
|
break loop
|
||||||
|
}
|
||||||
|
|
||||||
|
continue loop
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, topic := range cmd.topics {
|
||||||
|
switch cmd.op {
|
||||||
|
case sub:
|
||||||
|
reg.add(topic, cmd.ch, false)
|
||||||
|
|
||||||
|
case subOnce:
|
||||||
|
reg.add(topic, cmd.ch, true)
|
||||||
|
|
||||||
|
case pub:
|
||||||
|
reg.send(topic, cmd.msg)
|
||||||
|
|
||||||
|
case unsub:
|
||||||
|
reg.remove(topic, cmd.ch)
|
||||||
|
|
||||||
|
case closeTopic:
|
||||||
|
reg.removeTopic(topic)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for topic, chans := range reg.topics {
|
||||||
|
for ch, _ := range chans {
|
||||||
|
reg.remove(topic, ch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// registry maintains the current subscription state. It's not
|
||||||
|
// safe to access a registry from multiple goroutines simultaneously.
|
||||||
|
type registry struct {
|
||||||
|
topics map[string]map[chan interface{}]bool
|
||||||
|
revTopics map[chan interface{}]map[string]bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (reg *registry) add(topic string, ch chan interface{}, once bool) {
|
||||||
|
if reg.topics[topic] == nil {
|
||||||
|
reg.topics[topic] = make(map[chan interface{}]bool)
|
||||||
|
}
|
||||||
|
reg.topics[topic][ch] = once
|
||||||
|
|
||||||
|
if reg.revTopics[ch] == nil {
|
||||||
|
reg.revTopics[ch] = make(map[string]bool)
|
||||||
|
}
|
||||||
|
reg.revTopics[ch][topic] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (reg *registry) send(topic string, msg interface{}) {
|
||||||
|
for ch, once := range reg.topics[topic] {
|
||||||
|
ch <- msg
|
||||||
|
if once {
|
||||||
|
for topic := range reg.revTopics[ch] {
|
||||||
|
reg.remove(topic, ch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (reg *registry) removeTopic(topic string) {
|
||||||
|
for ch := range reg.topics[topic] {
|
||||||
|
reg.remove(topic, ch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (reg *registry) removeChannel(ch chan interface{}) {
|
||||||
|
for topic := range reg.revTopics[ch] {
|
||||||
|
reg.remove(topic, ch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (reg *registry) remove(topic string, ch chan interface{}) {
|
||||||
|
if _, ok := reg.topics[topic]; !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok := reg.topics[topic][ch]; !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(reg.topics[topic], ch)
|
||||||
|
delete(reg.revTopics[ch], topic)
|
||||||
|
|
||||||
|
if len(reg.topics[topic]) == 0 {
|
||||||
|
delete(reg.topics, topic)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(reg.revTopics[ch]) == 0 {
|
||||||
|
close(ch)
|
||||||
|
delete(reg.revTopics, ch)
|
||||||
|
}
|
||||||
|
}
|
230
Godeps/_workspace/src/github.com/tuxychandru/pubsub/pubsub_test.go
generated
vendored
Normal file
230
Godeps/_workspace/src/github.com/tuxychandru/pubsub/pubsub_test.go
generated
vendored
Normal file
@ -0,0 +1,230 @@
|
|||||||
|
// Copyright 2013, Chandra Sekar S. All rights reserved.
|
||||||
|
// Use of this source code is governed by a BSD-style
|
||||||
|
// license that can be found in the README.md file.
|
||||||
|
|
||||||
|
package pubsub
|
||||||
|
|
||||||
|
import (
|
||||||
|
check "launchpad.net/gocheck"
|
||||||
|
"runtime"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ = check.Suite(new(Suite))
|
||||||
|
|
||||||
|
func Test(t *testing.T) {
|
||||||
|
check.TestingT(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
type Suite struct{}
|
||||||
|
|
||||||
|
func (s *Suite) TestSub(c *check.C) {
|
||||||
|
ps := New(1)
|
||||||
|
ch1 := ps.Sub("t1")
|
||||||
|
ch2 := ps.Sub("t1")
|
||||||
|
ch3 := ps.Sub("t2")
|
||||||
|
|
||||||
|
ps.Pub("hi", "t1")
|
||||||
|
c.Check(<-ch1, check.Equals, "hi")
|
||||||
|
c.Check(<-ch2, check.Equals, "hi")
|
||||||
|
|
||||||
|
ps.Pub("hello", "t2")
|
||||||
|
c.Check(<-ch3, check.Equals, "hello")
|
||||||
|
|
||||||
|
ps.Shutdown()
|
||||||
|
_, ok := <-ch1
|
||||||
|
c.Check(ok, check.Equals, false)
|
||||||
|
_, ok = <-ch2
|
||||||
|
c.Check(ok, check.Equals, false)
|
||||||
|
_, ok = <-ch3
|
||||||
|
c.Check(ok, check.Equals, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Suite) TestSubOnce(c *check.C) {
|
||||||
|
ps := New(1)
|
||||||
|
ch := ps.SubOnce("t1")
|
||||||
|
|
||||||
|
ps.Pub("hi", "t1")
|
||||||
|
c.Check(<-ch, check.Equals, "hi")
|
||||||
|
|
||||||
|
_, ok := <-ch
|
||||||
|
c.Check(ok, check.Equals, false)
|
||||||
|
ps.Shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Suite) TestAddSub(c *check.C) {
|
||||||
|
ps := New(1)
|
||||||
|
ch1 := ps.Sub("t1")
|
||||||
|
ch2 := ps.Sub("t2")
|
||||||
|
|
||||||
|
ps.Pub("hi1", "t1")
|
||||||
|
c.Check(<-ch1, check.Equals, "hi1")
|
||||||
|
|
||||||
|
ps.Pub("hi2", "t2")
|
||||||
|
c.Check(<-ch2, check.Equals, "hi2")
|
||||||
|
|
||||||
|
ps.AddSub(ch1, "t2", "t3")
|
||||||
|
ps.Pub("hi3", "t2")
|
||||||
|
c.Check(<-ch1, check.Equals, "hi3")
|
||||||
|
c.Check(<-ch2, check.Equals, "hi3")
|
||||||
|
|
||||||
|
ps.Pub("hi4", "t3")
|
||||||
|
c.Check(<-ch1, check.Equals, "hi4")
|
||||||
|
|
||||||
|
ps.Shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Suite) TestUnsub(c *check.C) {
|
||||||
|
ps := New(1)
|
||||||
|
ch := ps.Sub("t1")
|
||||||
|
|
||||||
|
ps.Pub("hi", "t1")
|
||||||
|
c.Check(<-ch, check.Equals, "hi")
|
||||||
|
|
||||||
|
ps.Unsub(ch, "t1")
|
||||||
|
_, ok := <-ch
|
||||||
|
c.Check(ok, check.Equals, false)
|
||||||
|
ps.Shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Suite) TestUnsubAll(c *check.C) {
|
||||||
|
ps := New(1)
|
||||||
|
ch1 := ps.Sub("t1", "t2", "t3")
|
||||||
|
ch2 := ps.Sub("t1", "t3")
|
||||||
|
|
||||||
|
ps.Unsub(ch1)
|
||||||
|
|
||||||
|
m, ok := <-ch1
|
||||||
|
c.Check(ok, check.Equals, false)
|
||||||
|
|
||||||
|
ps.Pub("hi", "t1")
|
||||||
|
m, ok = <-ch2
|
||||||
|
c.Check(m, check.Equals, "hi")
|
||||||
|
|
||||||
|
ps.Shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Suite) TestClose(c *check.C) {
|
||||||
|
ps := New(1)
|
||||||
|
ch1 := ps.Sub("t1")
|
||||||
|
ch2 := ps.Sub("t1")
|
||||||
|
ch3 := ps.Sub("t2")
|
||||||
|
ch4 := ps.Sub("t3")
|
||||||
|
|
||||||
|
ps.Pub("hi", "t1")
|
||||||
|
ps.Pub("hello", "t2")
|
||||||
|
c.Check(<-ch1, check.Equals, "hi")
|
||||||
|
c.Check(<-ch2, check.Equals, "hi")
|
||||||
|
c.Check(<-ch3, check.Equals, "hello")
|
||||||
|
|
||||||
|
ps.Close("t1", "t2")
|
||||||
|
_, ok := <-ch1
|
||||||
|
c.Check(ok, check.Equals, false)
|
||||||
|
_, ok = <-ch2
|
||||||
|
c.Check(ok, check.Equals, false)
|
||||||
|
_, ok = <-ch3
|
||||||
|
c.Check(ok, check.Equals, false)
|
||||||
|
|
||||||
|
ps.Pub("welcome", "t3")
|
||||||
|
c.Check(<-ch4, check.Equals, "welcome")
|
||||||
|
|
||||||
|
ps.Shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Suite) TestUnsubAfterClose(c *check.C) {
|
||||||
|
ps := New(1)
|
||||||
|
ch := ps.Sub("t1")
|
||||||
|
defer func() {
|
||||||
|
ps.Unsub(ch, "t1")
|
||||||
|
ps.Shutdown()
|
||||||
|
}()
|
||||||
|
|
||||||
|
ps.Close("t1")
|
||||||
|
_, ok := <-ch
|
||||||
|
c.Check(ok, check.Equals, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Suite) TestShutdown(c *check.C) {
|
||||||
|
start := runtime.NumGoroutine()
|
||||||
|
New(10).Shutdown()
|
||||||
|
time.Sleep(1)
|
||||||
|
c.Check(runtime.NumGoroutine()-start, check.Equals, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Suite) TestMultiSub(c *check.C) {
|
||||||
|
ps := New(1)
|
||||||
|
ch := ps.Sub("t1", "t2")
|
||||||
|
|
||||||
|
ps.Pub("hi", "t1")
|
||||||
|
c.Check(<-ch, check.Equals, "hi")
|
||||||
|
|
||||||
|
ps.Pub("hello", "t2")
|
||||||
|
c.Check(<-ch, check.Equals, "hello")
|
||||||
|
|
||||||
|
ps.Shutdown()
|
||||||
|
_, ok := <-ch
|
||||||
|
c.Check(ok, check.Equals, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Suite) TestMultiSubOnce(c *check.C) {
|
||||||
|
ps := New(1)
|
||||||
|
ch := ps.SubOnce("t1", "t2")
|
||||||
|
|
||||||
|
ps.Pub("hi", "t1")
|
||||||
|
c.Check(<-ch, check.Equals, "hi")
|
||||||
|
|
||||||
|
ps.Pub("hello", "t2")
|
||||||
|
|
||||||
|
_, ok := <-ch
|
||||||
|
c.Check(ok, check.Equals, false)
|
||||||
|
ps.Shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Suite) TestMultiPub(c *check.C) {
|
||||||
|
ps := New(1)
|
||||||
|
ch1 := ps.Sub("t1")
|
||||||
|
ch2 := ps.Sub("t2")
|
||||||
|
|
||||||
|
ps.Pub("hi", "t1", "t2")
|
||||||
|
c.Check(<-ch1, check.Equals, "hi")
|
||||||
|
c.Check(<-ch2, check.Equals, "hi")
|
||||||
|
|
||||||
|
ps.Shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Suite) TestMultiUnsub(c *check.C) {
|
||||||
|
ps := New(1)
|
||||||
|
ch := ps.Sub("t1", "t2", "t3")
|
||||||
|
|
||||||
|
ps.Unsub(ch, "t1")
|
||||||
|
|
||||||
|
ps.Pub("hi", "t1")
|
||||||
|
|
||||||
|
ps.Pub("hello", "t2")
|
||||||
|
c.Check(<-ch, check.Equals, "hello")
|
||||||
|
|
||||||
|
ps.Unsub(ch, "t2", "t3")
|
||||||
|
_, ok := <-ch
|
||||||
|
c.Check(ok, check.Equals, false)
|
||||||
|
|
||||||
|
ps.Shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Suite) TestMultiClose(c *check.C) {
|
||||||
|
ps := New(1)
|
||||||
|
ch := ps.Sub("t1", "t2")
|
||||||
|
|
||||||
|
ps.Pub("hi", "t1")
|
||||||
|
c.Check(<-ch, check.Equals, "hi")
|
||||||
|
|
||||||
|
ps.Close("t1")
|
||||||
|
ps.Pub("hello", "t2")
|
||||||
|
c.Check(<-ch, check.Equals, "hello")
|
||||||
|
|
||||||
|
ps.Close("t2")
|
||||||
|
_, ok := <-ch
|
||||||
|
c.Check(ok, check.Equals, false)
|
||||||
|
|
||||||
|
ps.Shutdown()
|
||||||
|
}
|
50
bitswap/notifications.go
Normal file
50
bitswap/notifications.go
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
package bitswap
|
||||||
|
|
||||||
|
import (
|
||||||
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
|
pubsub "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/tuxychandru/pubsub"
|
||||||
|
|
||||||
|
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||||
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
type notifications struct {
|
||||||
|
wrapped *pubsub.PubSub
|
||||||
|
}
|
||||||
|
|
||||||
|
func newNotifications() *notifications {
|
||||||
|
const bufferSize = 16
|
||||||
|
return ¬ifications{pubsub.New(bufferSize)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ps *notifications) Publish(block *blocks.Block) {
|
||||||
|
topic := string(block.Key())
|
||||||
|
ps.wrapped.Pub(block, topic)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sub returns a one-time use |blockChannel|. |blockChannel| returns nil if the
|
||||||
|
// |ctx| times out or is cancelled
|
||||||
|
func (ps *notifications) Subscribe(ctx context.Context, k u.Key) <-chan *blocks.Block {
|
||||||
|
topic := string(k)
|
||||||
|
subChan := ps.wrapped.Sub(topic)
|
||||||
|
blockChannel := make(chan *blocks.Block)
|
||||||
|
go func() {
|
||||||
|
defer close(blockChannel)
|
||||||
|
select {
|
||||||
|
case val := <-subChan:
|
||||||
|
block, ok := val.(*blocks.Block)
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
blockChannel <- block
|
||||||
|
case <-ctx.Done():
|
||||||
|
ps.wrapped.Unsub(subChan, topic)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return blockChannel
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ps *notifications) Shutdown() {
|
||||||
|
ps.wrapped.Shutdown()
|
||||||
|
}
|
60
bitswap/notifications_test.go
Normal file
60
bitswap/notifications_test.go
Normal file
@ -0,0 +1,60 @@
|
|||||||
|
package bitswap
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
|
|
||||||
|
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPublishSubscribe(t *testing.T) {
|
||||||
|
blockSent := getBlockOrFail(t, "Greetings from The Interval")
|
||||||
|
|
||||||
|
n := newNotifications()
|
||||||
|
defer n.Shutdown()
|
||||||
|
ch := n.Subscribe(context.Background(), blockSent.Key())
|
||||||
|
|
||||||
|
n.Publish(blockSent)
|
||||||
|
blockRecvd := <-ch
|
||||||
|
|
||||||
|
assertBlocksEqual(t, blockRecvd, blockSent)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCarryOnWhenDeadlineExpires(t *testing.T) {
|
||||||
|
|
||||||
|
impossibleDeadline := time.Nanosecond
|
||||||
|
fastExpiringCtx, _ := context.WithTimeout(context.Background(), impossibleDeadline)
|
||||||
|
|
||||||
|
n := newNotifications()
|
||||||
|
defer n.Shutdown()
|
||||||
|
blockChannel := n.Subscribe(fastExpiringCtx, getBlockOrFail(t, "A Missed Connection").Key())
|
||||||
|
|
||||||
|
assertBlockChannelNil(t, blockChannel)
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertBlockChannelNil(t *testing.T, blockChannel <-chan *blocks.Block) {
|
||||||
|
blockReceived := <-blockChannel
|
||||||
|
if blockReceived != nil {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertBlocksEqual(t *testing.T, a, b *blocks.Block) {
|
||||||
|
if !bytes.Equal(a.Data, b.Data) {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
if a.Key() != b.Key() {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getBlockOrFail(t *testing.T, msg string) *blocks.Block {
|
||||||
|
block, blockCreationErr := blocks.NewBlock([]byte(msg))
|
||||||
|
if blockCreationErr != nil {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
return block
|
||||||
|
}
|
Reference in New Issue
Block a user