mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-28 00:39:31 +08:00
added swarm (fanout/in with peers)
This commit is contained in:
@ -2,10 +2,11 @@ package swarm
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
ma "github.com/jbenet/go-multiaddr"
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
msgio "github.com/jbenet/go-msgio"
|
||||
ma "github.com/jbenet/go-multiaddr"
|
||||
"net"
|
||||
)
|
||||
|
||||
const ChanBuffer = 10
|
||||
@ -20,6 +21,7 @@ type Conn struct {
|
||||
Incoming *msgio.Chan
|
||||
}
|
||||
|
||||
type ConnMap map[u.Key]*Conn
|
||||
|
||||
func Dial(network string, peer *peer.Peer) (*Conn, error) {
|
||||
addr := peer.NetAddress(network)
|
||||
|
@ -2,11 +2,11 @@ package swarm
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"testing"
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
ma "github.com/jbenet/go-multiaddr"
|
||||
mh "github.com/jbenet/go-multihash"
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
"net"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func setupPeer(id string, addr string) (*peer.Peer, error) {
|
||||
@ -70,7 +70,6 @@ func TestDial(t *testing.T) {
|
||||
t.Fatal("error dialing peer", err)
|
||||
}
|
||||
|
||||
|
||||
fmt.Println("sending")
|
||||
c.Outgoing.MsgChan <- []byte("beep")
|
||||
c.Outgoing.MsgChan <- []byte("boop")
|
||||
@ -89,5 +88,3 @@ func TestDial(t *testing.T) {
|
||||
c.Close()
|
||||
listener.Close()
|
||||
}
|
||||
|
||||
|
||||
|
138
swarm/swarm.go
138
swarm/swarm.go
@ -1,9 +1,143 @@
|
||||
package swarm
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type Swarm struct {
|
||||
Conns map[string]*Conn
|
||||
type Message struct {
|
||||
// To or from, depending on direction.
|
||||
Peer *peer.Peer
|
||||
|
||||
// Opaque data
|
||||
Data []byte
|
||||
}
|
||||
|
||||
type Chan struct {
|
||||
Outgoing chan Message
|
||||
Incoming chan Message
|
||||
Errors chan error
|
||||
Close chan bool
|
||||
}
|
||||
|
||||
func NewChan(bufsize int) *Chan {
|
||||
return &Chan{
|
||||
Outgoing: make(chan Message, bufsize),
|
||||
Incoming: make(chan Message, bufsize),
|
||||
Errors: make(chan error),
|
||||
Close: make(chan bool, bufsize),
|
||||
}
|
||||
}
|
||||
|
||||
type Swarm struct {
|
||||
Chan *Chan
|
||||
conns ConnMap
|
||||
connsLock sync.RWMutex
|
||||
}
|
||||
|
||||
func NewSwarm() *Swarm {
|
||||
s := &Swarm{
|
||||
Chan: NewChan(10),
|
||||
conns: ConnMap{},
|
||||
}
|
||||
go s.fanOut()
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *Swarm) Close() {
|
||||
s.connsLock.RLock()
|
||||
l := len(s.conns)
|
||||
s.connsLock.RUnlock()
|
||||
|
||||
for i := 0; i < l; i++ {
|
||||
s.Chan.Close <- true // fan ins
|
||||
}
|
||||
s.Chan.Close <- true // fan out
|
||||
s.Chan.Close <- true // listener
|
||||
}
|
||||
|
||||
func (s *Swarm) Dial(peer *peer.Peer) (*Conn, error) {
|
||||
k := peer.Key()
|
||||
|
||||
// check if we already have an open connection first
|
||||
s.connsLock.RLock()
|
||||
conn, found := s.conns[k]
|
||||
s.connsLock.RUnlock()
|
||||
if found {
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
// open connection to peer
|
||||
conn, err := Dial("tcp", peer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// add to conns
|
||||
s.connsLock.Lock()
|
||||
s.conns[k] = conn
|
||||
s.connsLock.Unlock()
|
||||
|
||||
// kick off reader goroutine
|
||||
go s.fanIn(conn)
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
// Handles the unwrapping + sending of messages to the right connection.
|
||||
func (s *Swarm) fanOut() {
|
||||
for {
|
||||
select {
|
||||
case <-s.Chan.Close:
|
||||
return // told to close.
|
||||
case msg, ok := <-s.Chan.Outgoing:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
s.connsLock.RLock()
|
||||
conn, found := s.conns[msg.Peer.Key()]
|
||||
s.connsLock.RUnlock()
|
||||
if !found {
|
||||
e := fmt.Errorf("Sent msg to peer without open conn: %v", msg.Peer)
|
||||
s.Chan.Errors <- e
|
||||
}
|
||||
|
||||
// queue it in the connection's buffer
|
||||
conn.Outgoing.MsgChan <- msg.Data
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Handles the receiving + wrapping of messages, per conn.
|
||||
// Consider using reflect.Select with one goroutine instead of n.
|
||||
func (s *Swarm) fanIn(conn *Conn) {
|
||||
Loop:
|
||||
for {
|
||||
select {
|
||||
case <-s.Chan.Close:
|
||||
// close Conn.
|
||||
conn.Close()
|
||||
break Loop
|
||||
|
||||
case <-conn.Closed:
|
||||
break Loop
|
||||
|
||||
case data, ok := <-conn.Incoming.MsgChan:
|
||||
fmt.Println("got back data", data)
|
||||
if !ok {
|
||||
e := fmt.Errorf("Error retrieving from conn: %v", conn)
|
||||
s.Chan.Errors <- e
|
||||
break Loop
|
||||
}
|
||||
|
||||
// wrap it for consumers.
|
||||
msg := Message{Peer: conn.Peer, Data: data}
|
||||
s.Chan.Incoming <- msg
|
||||
}
|
||||
}
|
||||
|
||||
s.connsLock.Lock()
|
||||
delete(s.conns, conn.Peer.Key())
|
||||
s.connsLock.Unlock()
|
||||
}
|
||||
|
109
swarm/swarm_test.go
Normal file
109
swarm/swarm_test.go
Normal file
@ -0,0 +1,109 @@
|
||||
package swarm
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
msgio "github.com/jbenet/go-msgio"
|
||||
"net"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func pingListen(listener *net.TCPListener, peer *peer.Peer) {
|
||||
for {
|
||||
c, err := listener.Accept()
|
||||
if err == nil {
|
||||
fmt.Println("accepeted")
|
||||
go pong(c, peer)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func pong(c net.Conn, peer *peer.Peer) {
|
||||
mrw := msgio.NewReadWriter(c)
|
||||
for {
|
||||
data := make([]byte, 1024)
|
||||
n, err := mrw.ReadMsg(data)
|
||||
if err != nil {
|
||||
fmt.Printf("error %v\n", err)
|
||||
return
|
||||
}
|
||||
if string(data[:n]) != "ping" {
|
||||
fmt.Printf("error: didn't receive ping: '%v'\n", data[:n])
|
||||
return
|
||||
}
|
||||
err = mrw.WriteMsg([]byte("pong"))
|
||||
if err != nil {
|
||||
fmt.Printf("error %v\n", err)
|
||||
return
|
||||
}
|
||||
fmt.Println("pong")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSwarm(t *testing.T) {
|
||||
|
||||
swarm := NewSwarm()
|
||||
peers := []*peer.Peer{}
|
||||
listeners := []*net.Listener{}
|
||||
peerNames := map[string]string{
|
||||
"11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a30": "/ip4/127.0.0.1/tcp/1234",
|
||||
"11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31": "/ip4/127.0.0.1/tcp/2345",
|
||||
"11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a32": "/ip4/127.0.0.1/tcp/3456",
|
||||
"11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33": "/ip4/127.0.0.1/tcp/4567",
|
||||
}
|
||||
|
||||
for k, n := range peerNames {
|
||||
peer, err := setupPeer(k, n)
|
||||
if err != nil {
|
||||
t.Fatal("error setting up peer", err)
|
||||
}
|
||||
a := peer.NetAddress("tcp")
|
||||
if a == nil {
|
||||
t.Fatal("error setting up peer (addr is nil)", peer)
|
||||
}
|
||||
n, h, err := a.DialArgs()
|
||||
if err != nil {
|
||||
t.Fatal("error getting dial args from addr")
|
||||
}
|
||||
listener, err := net.Listen(n, h)
|
||||
if err != nil {
|
||||
t.Fatal("error setting up listener", err)
|
||||
}
|
||||
go pingListen(listener.(*net.TCPListener), peer)
|
||||
|
||||
_, err = swarm.Dial(peer)
|
||||
if err != nil {
|
||||
t.Fatal("error swarm dialing to peer", err)
|
||||
}
|
||||
|
||||
// ok done, add it.
|
||||
peers = append(peers, peer)
|
||||
listeners = append(listeners, &listener)
|
||||
}
|
||||
|
||||
for i, p := range peers {
|
||||
swarm.Chan.Outgoing <- Message{Peer: p, Data: []byte("ping")}
|
||||
fmt.Println("ping", i)
|
||||
}
|
||||
|
||||
got := map[u.Key]bool{}
|
||||
for _, _ = range peers {
|
||||
msg := <-swarm.Chan.Incoming
|
||||
fmt.Println("recving", string(msg.Data))
|
||||
if string(msg.Data) != "pong" {
|
||||
t.Error("unexpected conn output", msg.Data)
|
||||
}
|
||||
got[msg.Peer.Key()] = true
|
||||
}
|
||||
|
||||
if len(peers) != len(got) {
|
||||
t.Error("got less messages than sent")
|
||||
}
|
||||
|
||||
fmt.Println("closing")
|
||||
swarm.Close()
|
||||
for _, listener := range listeners {
|
||||
(*listener).(*net.TCPListener).Close()
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user