diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 90bb36739..629af22fd 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -71,6 +71,10 @@ { "ImportPath": "github.com/syndtr/goleveldb/leveldb", "Rev": "99056d50e56252fbe0021d5c893defca5a76baf8" + }, + { + "ImportPath": "github.com/tuxychandru/pubsub", + "Rev": "02de8aa2db3d570c5ab1be5ba67b456fd0fb7c4e" } ] } diff --git a/Godeps/_workspace/src/github.com/tuxychandru/pubsub/README.md b/Godeps/_workspace/src/github.com/tuxychandru/pubsub/README.md new file mode 100644 index 000000000..c1aab80b5 --- /dev/null +++ b/Godeps/_workspace/src/github.com/tuxychandru/pubsub/README.md @@ -0,0 +1,30 @@ +Install pubsub with, + + go get github.com/tuxychandru/pubsub + +View the [API Documentation](http://godoc.org/github.com/tuxychandru/pubsub). + +## License + +Copyright (c) 2013, Chandra Sekar S +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/Godeps/_workspace/src/github.com/tuxychandru/pubsub/pubsub.go b/Godeps/_workspace/src/github.com/tuxychandru/pubsub/pubsub.go new file mode 100644 index 000000000..9cbf9cffa --- /dev/null +++ b/Godeps/_workspace/src/github.com/tuxychandru/pubsub/pubsub.go @@ -0,0 +1,208 @@ +// Copyright 2013, Chandra Sekar S. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the README.md file. + +// Package pubsub implements a simple multi-topic pub-sub +// library. +// +// Topics must be strings and messages of any type can be +// published. A topic can have any number of subcribers and +// all of them receive messages published on the topic. +package pubsub + +type operation int + +const ( + sub operation = iota + subOnce + pub + unsub + unsubAll + closeTopic + shutdown +) + +// PubSub is a collection of topics. +type PubSub struct { + cmdChan chan cmd + capacity int +} + +type cmd struct { + op operation + topics []string + ch chan interface{} + msg interface{} +} + +// New creates a new PubSub and starts a goroutine for handling operations. +// The capacity of the channels created by Sub and SubOnce will be as specified. +func New(capacity int) *PubSub { + ps := &PubSub{make(chan cmd), capacity} + go ps.start() + return ps +} + +// Sub returns a channel on which messages published on any of +// the specified topics can be received. +func (ps *PubSub) Sub(topics ...string) chan interface{} { + return ps.sub(sub, topics...) +} + +// SubOnce is similar to Sub, but only the first message published, after subscription, +// on any of the specified topics can be received. +func (ps *PubSub) SubOnce(topics ...string) chan interface{} { + return ps.sub(subOnce, topics...) +} + +func (ps *PubSub) sub(op operation, topics ...string) chan interface{} { + ch := make(chan interface{}, ps.capacity) + ps.cmdChan <- cmd{op: op, topics: topics, ch: ch} + return ch +} + +// AddSub adds subscriptions to an existing channel. +func (ps *PubSub) AddSub(ch chan interface{}, topics ...string) { + ps.cmdChan <- cmd{op: sub, topics: topics, ch: ch} +} + +// Pub publishes the given message to all subscribers of +// the specified topics. +func (ps *PubSub) Pub(msg interface{}, topics ...string) { + ps.cmdChan <- cmd{op: pub, topics: topics, msg: msg} +} + +// Unsub unsubscribes the given channel from the specified +// topics. If no topic is specified, it is unsubscribed +// from all topics. +func (ps *PubSub) Unsub(ch chan interface{}, topics ...string) { + if len(topics) == 0 { + ps.cmdChan <- cmd{op: unsubAll, ch: ch} + return + } + + ps.cmdChan <- cmd{op: unsub, topics: topics, ch: ch} +} + +// Close closes all channels currently subscribed to the specified topics. +// If a channel is subscribed to multiple topics, some of which is +// not specified, it is not closed. +func (ps *PubSub) Close(topics ...string) { + ps.cmdChan <- cmd{op: closeTopic, topics: topics} +} + +// Shutdown closes all subscribed channels and terminates the goroutine. +func (ps *PubSub) Shutdown() { + ps.cmdChan <- cmd{op: shutdown} +} + +func (ps *PubSub) start() { + reg := registry{ + topics: make(map[string]map[chan interface{}]bool), + revTopics: make(map[chan interface{}]map[string]bool), + } + +loop: + for cmd := range ps.cmdChan { + if cmd.topics == nil { + switch cmd.op { + case unsubAll: + reg.removeChannel(cmd.ch) + + case shutdown: + break loop + } + + continue loop + } + + for _, topic := range cmd.topics { + switch cmd.op { + case sub: + reg.add(topic, cmd.ch, false) + + case subOnce: + reg.add(topic, cmd.ch, true) + + case pub: + reg.send(topic, cmd.msg) + + case unsub: + reg.remove(topic, cmd.ch) + + case closeTopic: + reg.removeTopic(topic) + } + } + } + + for topic, chans := range reg.topics { + for ch, _ := range chans { + reg.remove(topic, ch) + } + } +} + +// registry maintains the current subscription state. It's not +// safe to access a registry from multiple goroutines simultaneously. +type registry struct { + topics map[string]map[chan interface{}]bool + revTopics map[chan interface{}]map[string]bool +} + +func (reg *registry) add(topic string, ch chan interface{}, once bool) { + if reg.topics[topic] == nil { + reg.topics[topic] = make(map[chan interface{}]bool) + } + reg.topics[topic][ch] = once + + if reg.revTopics[ch] == nil { + reg.revTopics[ch] = make(map[string]bool) + } + reg.revTopics[ch][topic] = true +} + +func (reg *registry) send(topic string, msg interface{}) { + for ch, once := range reg.topics[topic] { + ch <- msg + if once { + for topic := range reg.revTopics[ch] { + reg.remove(topic, ch) + } + } + } +} + +func (reg *registry) removeTopic(topic string) { + for ch := range reg.topics[topic] { + reg.remove(topic, ch) + } +} + +func (reg *registry) removeChannel(ch chan interface{}) { + for topic := range reg.revTopics[ch] { + reg.remove(topic, ch) + } +} + +func (reg *registry) remove(topic string, ch chan interface{}) { + if _, ok := reg.topics[topic]; !ok { + return + } + + if _, ok := reg.topics[topic][ch]; !ok { + return + } + + delete(reg.topics[topic], ch) + delete(reg.revTopics[ch], topic) + + if len(reg.topics[topic]) == 0 { + delete(reg.topics, topic) + } + + if len(reg.revTopics[ch]) == 0 { + close(ch) + delete(reg.revTopics, ch) + } +} diff --git a/Godeps/_workspace/src/github.com/tuxychandru/pubsub/pubsub_test.go b/Godeps/_workspace/src/github.com/tuxychandru/pubsub/pubsub_test.go new file mode 100644 index 000000000..16392d33b --- /dev/null +++ b/Godeps/_workspace/src/github.com/tuxychandru/pubsub/pubsub_test.go @@ -0,0 +1,230 @@ +// Copyright 2013, Chandra Sekar S. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the README.md file. + +package pubsub + +import ( + check "launchpad.net/gocheck" + "runtime" + "testing" + "time" +) + +var _ = check.Suite(new(Suite)) + +func Test(t *testing.T) { + check.TestingT(t) +} + +type Suite struct{} + +func (s *Suite) TestSub(c *check.C) { + ps := New(1) + ch1 := ps.Sub("t1") + ch2 := ps.Sub("t1") + ch3 := ps.Sub("t2") + + ps.Pub("hi", "t1") + c.Check(<-ch1, check.Equals, "hi") + c.Check(<-ch2, check.Equals, "hi") + + ps.Pub("hello", "t2") + c.Check(<-ch3, check.Equals, "hello") + + ps.Shutdown() + _, ok := <-ch1 + c.Check(ok, check.Equals, false) + _, ok = <-ch2 + c.Check(ok, check.Equals, false) + _, ok = <-ch3 + c.Check(ok, check.Equals, false) +} + +func (s *Suite) TestSubOnce(c *check.C) { + ps := New(1) + ch := ps.SubOnce("t1") + + ps.Pub("hi", "t1") + c.Check(<-ch, check.Equals, "hi") + + _, ok := <-ch + c.Check(ok, check.Equals, false) + ps.Shutdown() +} + +func (s *Suite) TestAddSub(c *check.C) { + ps := New(1) + ch1 := ps.Sub("t1") + ch2 := ps.Sub("t2") + + ps.Pub("hi1", "t1") + c.Check(<-ch1, check.Equals, "hi1") + + ps.Pub("hi2", "t2") + c.Check(<-ch2, check.Equals, "hi2") + + ps.AddSub(ch1, "t2", "t3") + ps.Pub("hi3", "t2") + c.Check(<-ch1, check.Equals, "hi3") + c.Check(<-ch2, check.Equals, "hi3") + + ps.Pub("hi4", "t3") + c.Check(<-ch1, check.Equals, "hi4") + + ps.Shutdown() +} + +func (s *Suite) TestUnsub(c *check.C) { + ps := New(1) + ch := ps.Sub("t1") + + ps.Pub("hi", "t1") + c.Check(<-ch, check.Equals, "hi") + + ps.Unsub(ch, "t1") + _, ok := <-ch + c.Check(ok, check.Equals, false) + ps.Shutdown() +} + +func (s *Suite) TestUnsubAll(c *check.C) { + ps := New(1) + ch1 := ps.Sub("t1", "t2", "t3") + ch2 := ps.Sub("t1", "t3") + + ps.Unsub(ch1) + + m, ok := <-ch1 + c.Check(ok, check.Equals, false) + + ps.Pub("hi", "t1") + m, ok = <-ch2 + c.Check(m, check.Equals, "hi") + + ps.Shutdown() +} + +func (s *Suite) TestClose(c *check.C) { + ps := New(1) + ch1 := ps.Sub("t1") + ch2 := ps.Sub("t1") + ch3 := ps.Sub("t2") + ch4 := ps.Sub("t3") + + ps.Pub("hi", "t1") + ps.Pub("hello", "t2") + c.Check(<-ch1, check.Equals, "hi") + c.Check(<-ch2, check.Equals, "hi") + c.Check(<-ch3, check.Equals, "hello") + + ps.Close("t1", "t2") + _, ok := <-ch1 + c.Check(ok, check.Equals, false) + _, ok = <-ch2 + c.Check(ok, check.Equals, false) + _, ok = <-ch3 + c.Check(ok, check.Equals, false) + + ps.Pub("welcome", "t3") + c.Check(<-ch4, check.Equals, "welcome") + + ps.Shutdown() +} + +func (s *Suite) TestUnsubAfterClose(c *check.C) { + ps := New(1) + ch := ps.Sub("t1") + defer func() { + ps.Unsub(ch, "t1") + ps.Shutdown() + }() + + ps.Close("t1") + _, ok := <-ch + c.Check(ok, check.Equals, false) +} + +func (s *Suite) TestShutdown(c *check.C) { + start := runtime.NumGoroutine() + New(10).Shutdown() + time.Sleep(1) + c.Check(runtime.NumGoroutine()-start, check.Equals, 1) +} + +func (s *Suite) TestMultiSub(c *check.C) { + ps := New(1) + ch := ps.Sub("t1", "t2") + + ps.Pub("hi", "t1") + c.Check(<-ch, check.Equals, "hi") + + ps.Pub("hello", "t2") + c.Check(<-ch, check.Equals, "hello") + + ps.Shutdown() + _, ok := <-ch + c.Check(ok, check.Equals, false) +} + +func (s *Suite) TestMultiSubOnce(c *check.C) { + ps := New(1) + ch := ps.SubOnce("t1", "t2") + + ps.Pub("hi", "t1") + c.Check(<-ch, check.Equals, "hi") + + ps.Pub("hello", "t2") + + _, ok := <-ch + c.Check(ok, check.Equals, false) + ps.Shutdown() +} + +func (s *Suite) TestMultiPub(c *check.C) { + ps := New(1) + ch1 := ps.Sub("t1") + ch2 := ps.Sub("t2") + + ps.Pub("hi", "t1", "t2") + c.Check(<-ch1, check.Equals, "hi") + c.Check(<-ch2, check.Equals, "hi") + + ps.Shutdown() +} + +func (s *Suite) TestMultiUnsub(c *check.C) { + ps := New(1) + ch := ps.Sub("t1", "t2", "t3") + + ps.Unsub(ch, "t1") + + ps.Pub("hi", "t1") + + ps.Pub("hello", "t2") + c.Check(<-ch, check.Equals, "hello") + + ps.Unsub(ch, "t2", "t3") + _, ok := <-ch + c.Check(ok, check.Equals, false) + + ps.Shutdown() +} + +func (s *Suite) TestMultiClose(c *check.C) { + ps := New(1) + ch := ps.Sub("t1", "t2") + + ps.Pub("hi", "t1") + c.Check(<-ch, check.Equals, "hi") + + ps.Close("t1") + ps.Pub("hello", "t2") + c.Check(<-ch, check.Equals, "hello") + + ps.Close("t2") + _, ok := <-ch + c.Check(ok, check.Equals, false) + + ps.Shutdown() +} diff --git a/bitswap/notifications.go b/bitswap/notifications.go new file mode 100644 index 000000000..49cdb982e --- /dev/null +++ b/bitswap/notifications.go @@ -0,0 +1,50 @@ +package bitswap + +import ( + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + pubsub "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/tuxychandru/pubsub" + + blocks "github.com/jbenet/go-ipfs/blocks" + u "github.com/jbenet/go-ipfs/util" +) + +type notifications struct { + wrapped *pubsub.PubSub +} + +func newNotifications() *notifications { + const bufferSize = 16 + return ¬ifications{pubsub.New(bufferSize)} +} + +func (ps *notifications) Publish(block *blocks.Block) { + topic := string(block.Key()) + ps.wrapped.Pub(block, topic) +} + +// Sub returns a one-time use |blockChannel|. |blockChannel| returns nil if the +// |ctx| times out or is cancelled +func (ps *notifications) Subscribe(ctx context.Context, k u.Key) <-chan *blocks.Block { + topic := string(k) + subChan := ps.wrapped.Sub(topic) + blockChannel := make(chan *blocks.Block) + go func() { + defer close(blockChannel) + select { + case val := <-subChan: + block, ok := val.(*blocks.Block) + if !ok { + return + } + blockChannel <- block + case <-ctx.Done(): + ps.wrapped.Unsub(subChan, topic) + return + } + }() + return blockChannel +} + +func (ps *notifications) Shutdown() { + ps.wrapped.Shutdown() +} diff --git a/bitswap/notifications_test.go b/bitswap/notifications_test.go new file mode 100644 index 000000000..07006fb36 --- /dev/null +++ b/bitswap/notifications_test.go @@ -0,0 +1,60 @@ +package bitswap + +import ( + "bytes" + "testing" + "time" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + + blocks "github.com/jbenet/go-ipfs/blocks" +) + +func TestPublishSubscribe(t *testing.T) { + blockSent := getBlockOrFail(t, "Greetings from The Interval") + + n := newNotifications() + defer n.Shutdown() + ch := n.Subscribe(context.Background(), blockSent.Key()) + + n.Publish(blockSent) + blockRecvd := <-ch + + assertBlocksEqual(t, blockRecvd, blockSent) +} + +func TestCarryOnWhenDeadlineExpires(t *testing.T) { + + impossibleDeadline := time.Nanosecond + fastExpiringCtx, _ := context.WithTimeout(context.Background(), impossibleDeadline) + + n := newNotifications() + defer n.Shutdown() + blockChannel := n.Subscribe(fastExpiringCtx, getBlockOrFail(t, "A Missed Connection").Key()) + + assertBlockChannelNil(t, blockChannel) +} + +func assertBlockChannelNil(t *testing.T, blockChannel <-chan *blocks.Block) { + blockReceived := <-blockChannel + if blockReceived != nil { + t.Fail() + } +} + +func assertBlocksEqual(t *testing.T, a, b *blocks.Block) { + if !bytes.Equal(a.Data, b.Data) { + t.Fail() + } + if a.Key() != b.Key() { + t.Fail() + } +} + +func getBlockOrFail(t *testing.T, msg string) *blocks.Block { + block, blockCreationErr := blocks.NewBlock([]byte(msg)) + if blockCreationErr != nil { + t.Fail() + } + return block +}