mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-01 10:49:24 +08:00
p2p/net/swarm: dial once at a time
This commit is contained in:
@ -11,6 +11,49 @@ import (
|
|||||||
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestSimultDials(t *testing.T) {
|
||||||
|
// t.Skip("skipping for another test")
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
swarms := makeSwarms(ctx, t, 2)
|
||||||
|
|
||||||
|
// connect everyone
|
||||||
|
{
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) {
|
||||||
|
// copy for other peer
|
||||||
|
log.Debugf("TestSimultOpen: connecting: %s --> %s (%s)", s.local, dst, addr)
|
||||||
|
s.peers.AddAddress(dst, addr)
|
||||||
|
if _, err := s.Dial(ctx, dst); err != nil {
|
||||||
|
t.Fatal("error swarm dialing to peer", err)
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("Connecting swarms simultaneously.")
|
||||||
|
for i := 0; i < 10; i++ { // connect 10x for each.
|
||||||
|
wg.Add(2)
|
||||||
|
go connect(swarms[0], swarms[1].local, swarms[1].ListenAddresses()[0])
|
||||||
|
go connect(swarms[1], swarms[0].local, swarms[0].ListenAddresses()[0])
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// should still just have 1, at most 2 connections :)
|
||||||
|
c01l := len(swarms[0].ConnectionsToPeer(swarms[1].local))
|
||||||
|
if c01l > 2 {
|
||||||
|
t.Error("0->1 has", c01l)
|
||||||
|
}
|
||||||
|
c10l := len(swarms[1].ConnectionsToPeer(swarms[0].local))
|
||||||
|
if c10l > 2 {
|
||||||
|
t.Error("1->0 has", c10l)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, s := range swarms {
|
||||||
|
s.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestSimultOpen(t *testing.T) {
|
func TestSimultOpen(t *testing.T) {
|
||||||
// t.Skip("skipping for another test")
|
// t.Skip("skipping for another test")
|
||||||
|
|
||||||
|
@ -4,6 +4,7 @@ package swarm
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
inet "github.com/jbenet/go-ipfs/p2p/net"
|
inet "github.com/jbenet/go-ipfs/p2p/net"
|
||||||
addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr"
|
addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr"
|
||||||
@ -33,6 +34,11 @@ type Swarm struct {
|
|||||||
peers peer.Peerstore
|
peers peer.Peerstore
|
||||||
connh ConnHandler
|
connh ConnHandler
|
||||||
|
|
||||||
|
// dialing is a channel for the current peers being dialed.
|
||||||
|
// this way, we dont kick off N dials simultaneously.
|
||||||
|
dialing map[peer.ID]chan struct{}
|
||||||
|
dialingmu sync.Mutex
|
||||||
|
|
||||||
cg ctxgroup.ContextGroup
|
cg ctxgroup.ContextGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -49,10 +55,11 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
|
|||||||
}
|
}
|
||||||
|
|
||||||
s := &Swarm{
|
s := &Swarm{
|
||||||
swarm: ps.NewSwarm(PSTransport),
|
swarm: ps.NewSwarm(PSTransport),
|
||||||
local: local,
|
local: local,
|
||||||
peers: peers,
|
peers: peers,
|
||||||
cg: ctxgroup.WithContext(ctx),
|
cg: ctxgroup.WithContext(ctx),
|
||||||
|
dialing: map[peer.ID]chan struct{}{},
|
||||||
}
|
}
|
||||||
|
|
||||||
// configure Swarm
|
// configure Swarm
|
||||||
|
@ -25,12 +25,41 @@ func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) {
|
|||||||
return nil, errors.New("Attempted connection to self!")
|
return nil, errors.New("Attempted connection to self!")
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if we already have an open connection first
|
for {
|
||||||
cs := s.ConnectionsToPeer(p)
|
// check if we already have an open connection first
|
||||||
for _, c := range cs {
|
cs := s.ConnectionsToPeer(p)
|
||||||
if c != nil { // dump out the first one we find
|
for _, c := range cs {
|
||||||
return c, nil
|
if c != nil { // dump out the first one we find
|
||||||
|
return c, nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check if there's an ongoing dial to this peer
|
||||||
|
s.dialingmu.Lock()
|
||||||
|
dialDone, found := s.dialing[p]
|
||||||
|
if !found { // if not, set one up.
|
||||||
|
dialDone = make(chan struct{})
|
||||||
|
s.dialing[p] = dialDone
|
||||||
|
}
|
||||||
|
s.dialingmu.Unlock()
|
||||||
|
|
||||||
|
if found {
|
||||||
|
select {
|
||||||
|
case <-dialDone: // wait for that dial to finish.
|
||||||
|
continue // and see if it worked (loop). it may not have.
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// else, we're the ones dialing for others.
|
||||||
|
defer func() {
|
||||||
|
s.dialingmu.Lock()
|
||||||
|
delete(s.dialing, p)
|
||||||
|
close(dialDone)
|
||||||
|
s.dialingmu.Unlock()
|
||||||
|
}()
|
||||||
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
sk := s.peers.PrivKey(s.local)
|
sk := s.peers.PrivKey(s.local)
|
||||||
|
Reference in New Issue
Block a user