mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-01 19:01:34 +08:00
dht ctxcloserify
This commit is contained in:
@ -138,6 +138,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (n *IpfsNode, err error) {
|
|||||||
// TODO(brian): perform this inside NewDHT factory method
|
// TODO(brian): perform this inside NewDHT factory method
|
||||||
dhtService.SetHandler(dhtRouting) // wire the handler to the service.
|
dhtService.SetHandler(dhtRouting) // wire the handler to the service.
|
||||||
n.Routing = dhtRouting
|
n.Routing = dhtRouting
|
||||||
|
n.AddCloserChild(dhtRouting)
|
||||||
|
|
||||||
// setup exchange service
|
// setup exchange service
|
||||||
const alwaysSendToPeer = true // use YesManStrategy
|
const alwaysSendToPeer = true // use YesManStrategy
|
||||||
|
@ -14,6 +14,7 @@ import (
|
|||||||
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
|
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
|
||||||
kb "github.com/jbenet/go-ipfs/routing/kbucket"
|
kb "github.com/jbenet/go-ipfs/routing/kbucket"
|
||||||
u "github.com/jbenet/go-ipfs/util"
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
|
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
|
||||||
|
|
||||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||||
@ -56,7 +57,7 @@ type IpfsDHT struct {
|
|||||||
//lock to make diagnostics work better
|
//lock to make diagnostics work better
|
||||||
diaglock sync.Mutex
|
diaglock sync.Mutex
|
||||||
|
|
||||||
ctx context.Context
|
ctxc.ContextCloser
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDHT creates a new DHT object with the given peer as the 'local' host
|
// NewDHT creates a new DHT object with the given peer as the 'local' host
|
||||||
@ -67,9 +68,10 @@ func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, dialer inet.Dia
|
|||||||
dht.datastore = dstore
|
dht.datastore = dstore
|
||||||
dht.self = p
|
dht.self = p
|
||||||
dht.peerstore = ps
|
dht.peerstore = ps
|
||||||
dht.ctx = ctx
|
dht.ContextCloser = ctxc.NewContextCloser(ctx, nil)
|
||||||
|
|
||||||
dht.providers = NewProviderManager(p.ID())
|
dht.providers = NewProviderManager(dht.Context(), p.ID())
|
||||||
|
dht.AddCloserChild(dht.providers)
|
||||||
|
|
||||||
dht.routingTables = make([]*kb.RoutingTable, 3)
|
dht.routingTables = make([]*kb.RoutingTable, 3)
|
||||||
dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Millisecond*1000)
|
dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Millisecond*1000)
|
||||||
@ -78,6 +80,7 @@ func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, dialer inet.Dia
|
|||||||
dht.birth = time.Now()
|
dht.birth = time.Now()
|
||||||
|
|
||||||
if doPinging {
|
if doPinging {
|
||||||
|
dht.Children().Add(1)
|
||||||
go dht.PingRoutine(time.Second * 10)
|
go dht.PingRoutine(time.Second * 10)
|
||||||
}
|
}
|
||||||
return dht
|
return dht
|
||||||
@ -516,6 +519,8 @@ func (dht *IpfsDHT) loadProvidableKeys() error {
|
|||||||
|
|
||||||
// PingRoutine periodically pings nearest neighbors.
|
// PingRoutine periodically pings nearest neighbors.
|
||||||
func (dht *IpfsDHT) PingRoutine(t time.Duration) {
|
func (dht *IpfsDHT) PingRoutine(t time.Duration) {
|
||||||
|
defer dht.Children().Done()
|
||||||
|
|
||||||
tick := time.Tick(t)
|
tick := time.Tick(t)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@ -524,13 +529,13 @@ func (dht *IpfsDHT) PingRoutine(t time.Duration) {
|
|||||||
rand.Read(id)
|
rand.Read(id)
|
||||||
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(u.Key(id)), 5)
|
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(u.Key(id)), 5)
|
||||||
for _, p := range peers {
|
for _, p := range peers {
|
||||||
ctx, _ := context.WithTimeout(dht.ctx, time.Second*5)
|
ctx, _ := context.WithTimeout(dht.Context(), time.Second*5)
|
||||||
err := dht.Ping(ctx, p)
|
err := dht.Ping(ctx, p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Ping error: %s", err)
|
log.Errorf("Ping error: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case <-dht.ctx.Done():
|
case <-dht.Closing():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -92,8 +92,8 @@ func TestPing(t *testing.T) {
|
|||||||
dhtA := setupDHT(ctx, t, peerA)
|
dhtA := setupDHT(ctx, t, peerA)
|
||||||
dhtB := setupDHT(ctx, t, peerB)
|
dhtB := setupDHT(ctx, t, peerB)
|
||||||
|
|
||||||
defer dhtA.Halt()
|
defer dhtA.Close()
|
||||||
defer dhtB.Halt()
|
defer dhtB.Close()
|
||||||
defer dhtA.dialer.(inet.Network).Close()
|
defer dhtA.dialer.(inet.Network).Close()
|
||||||
defer dhtB.dialer.(inet.Network).Close()
|
defer dhtB.dialer.(inet.Network).Close()
|
||||||
|
|
||||||
@ -136,8 +136,8 @@ func TestValueGetSet(t *testing.T) {
|
|||||||
dhtA := setupDHT(ctx, t, peerA)
|
dhtA := setupDHT(ctx, t, peerA)
|
||||||
dhtB := setupDHT(ctx, t, peerB)
|
dhtB := setupDHT(ctx, t, peerB)
|
||||||
|
|
||||||
defer dhtA.Halt()
|
defer dhtA.Close()
|
||||||
defer dhtB.Halt()
|
defer dhtB.Close()
|
||||||
defer dhtA.dialer.(inet.Network).Close()
|
defer dhtA.dialer.(inet.Network).Close()
|
||||||
defer dhtB.dialer.(inet.Network).Close()
|
defer dhtB.dialer.(inet.Network).Close()
|
||||||
|
|
||||||
@ -179,7 +179,7 @@ func TestProvides(t *testing.T) {
|
|||||||
_, peers, dhts := setupDHTS(ctx, 4, t)
|
_, peers, dhts := setupDHTS(ctx, 4, t)
|
||||||
defer func() {
|
defer func() {
|
||||||
for i := 0; i < 4; i++ {
|
for i := 0; i < 4; i++ {
|
||||||
dhts[i].Halt()
|
dhts[i].Close()
|
||||||
defer dhts[i].dialer.(inet.Network).Close()
|
defer dhts[i].dialer.(inet.Network).Close()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@ -239,7 +239,7 @@ func TestProvidesAsync(t *testing.T) {
|
|||||||
_, peers, dhts := setupDHTS(ctx, 4, t)
|
_, peers, dhts := setupDHTS(ctx, 4, t)
|
||||||
defer func() {
|
defer func() {
|
||||||
for i := 0; i < 4; i++ {
|
for i := 0; i < 4; i++ {
|
||||||
dhts[i].Halt()
|
dhts[i].Close()
|
||||||
defer dhts[i].dialer.(inet.Network).Close()
|
defer dhts[i].dialer.(inet.Network).Close()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@ -302,7 +302,7 @@ func TestLayeredGet(t *testing.T) {
|
|||||||
_, peers, dhts := setupDHTS(ctx, 4, t)
|
_, peers, dhts := setupDHTS(ctx, 4, t)
|
||||||
defer func() {
|
defer func() {
|
||||||
for i := 0; i < 4; i++ {
|
for i := 0; i < 4; i++ {
|
||||||
dhts[i].Halt()
|
dhts[i].Close()
|
||||||
defer dhts[i].dialer.(inet.Network).Close()
|
defer dhts[i].dialer.(inet.Network).Close()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@ -355,7 +355,7 @@ func TestFindPeer(t *testing.T) {
|
|||||||
_, peers, dhts := setupDHTS(ctx, 4, t)
|
_, peers, dhts := setupDHTS(ctx, 4, t)
|
||||||
defer func() {
|
defer func() {
|
||||||
for i := 0; i < 4; i++ {
|
for i := 0; i < 4; i++ {
|
||||||
dhts[i].Halt()
|
dhts[i].Close()
|
||||||
dhts[i].dialer.(inet.Network).Close()
|
dhts[i].dialer.(inet.Network).Close()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@ -443,8 +443,8 @@ func TestConnectCollision(t *testing.T) {
|
|||||||
t.Fatal("Timeout received!")
|
t.Fatal("Timeout received!")
|
||||||
}
|
}
|
||||||
|
|
||||||
dhtA.Halt()
|
dhtA.Close()
|
||||||
dhtB.Halt()
|
dhtB.Close()
|
||||||
dhtA.dialer.(inet.Network).Close()
|
dhtA.dialer.(inet.Network).Close()
|
||||||
dhtB.dialer.(inet.Network).Close()
|
dhtB.dialer.(inet.Network).Close()
|
||||||
|
|
||||||
|
@ -205,9 +205,3 @@ func (dht *IpfsDHT) handleAddProvider(p peer.Peer, pmes *pb.Message) (*pb.Messag
|
|||||||
|
|
||||||
return pmes, nil // send back same msg as confirmation.
|
return pmes, nil // send back same msg as confirmation.
|
||||||
}
|
}
|
||||||
|
|
||||||
// Halt stops all communications from this peer and shut down
|
|
||||||
// TODO -- remove this in favor of context
|
|
||||||
func (dht *IpfsDHT) Halt() {
|
|
||||||
dht.providers.Halt()
|
|
||||||
}
|
|
||||||
|
@ -5,6 +5,9 @@ import (
|
|||||||
|
|
||||||
peer "github.com/jbenet/go-ipfs/peer"
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
u "github.com/jbenet/go-ipfs/util"
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
|
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
|
||||||
|
|
||||||
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ProviderManager struct {
|
type ProviderManager struct {
|
||||||
@ -14,8 +17,8 @@ type ProviderManager struct {
|
|||||||
getlocal chan chan []u.Key
|
getlocal chan chan []u.Key
|
||||||
newprovs chan *addProv
|
newprovs chan *addProv
|
||||||
getprovs chan *getProv
|
getprovs chan *getProv
|
||||||
halt chan struct{}
|
|
||||||
period time.Duration
|
period time.Duration
|
||||||
|
ctxc.ContextCloser
|
||||||
}
|
}
|
||||||
|
|
||||||
type addProv struct {
|
type addProv struct {
|
||||||
@ -28,19 +31,24 @@ type getProv struct {
|
|||||||
resp chan []peer.Peer
|
resp chan []peer.Peer
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewProviderManager(local peer.ID) *ProviderManager {
|
func NewProviderManager(ctx context.Context, local peer.ID) *ProviderManager {
|
||||||
pm := new(ProviderManager)
|
pm := new(ProviderManager)
|
||||||
pm.getprovs = make(chan *getProv)
|
pm.getprovs = make(chan *getProv)
|
||||||
pm.newprovs = make(chan *addProv)
|
pm.newprovs = make(chan *addProv)
|
||||||
pm.providers = make(map[u.Key][]*providerInfo)
|
pm.providers = make(map[u.Key][]*providerInfo)
|
||||||
pm.getlocal = make(chan chan []u.Key)
|
pm.getlocal = make(chan chan []u.Key)
|
||||||
pm.local = make(map[u.Key]struct{})
|
pm.local = make(map[u.Key]struct{})
|
||||||
pm.halt = make(chan struct{})
|
pm.ContextCloser = ctxc.NewContextCloser(ctx, nil)
|
||||||
|
|
||||||
|
pm.Children().Add(1)
|
||||||
go pm.run()
|
go pm.run()
|
||||||
|
|
||||||
return pm
|
return pm
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pm *ProviderManager) run() {
|
func (pm *ProviderManager) run() {
|
||||||
|
defer pm.Children().Done()
|
||||||
|
|
||||||
tick := time.NewTicker(time.Hour)
|
tick := time.NewTicker(time.Hour)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@ -53,6 +61,7 @@ func (pm *ProviderManager) run() {
|
|||||||
pi.Value = np.val
|
pi.Value = np.val
|
||||||
arr := pm.providers[np.k]
|
arr := pm.providers[np.k]
|
||||||
pm.providers[np.k] = append(arr, pi)
|
pm.providers[np.k] = append(arr, pi)
|
||||||
|
|
||||||
case gp := <-pm.getprovs:
|
case gp := <-pm.getprovs:
|
||||||
var parr []peer.Peer
|
var parr []peer.Peer
|
||||||
provs := pm.providers[gp.k]
|
provs := pm.providers[gp.k]
|
||||||
@ -60,12 +69,14 @@ func (pm *ProviderManager) run() {
|
|||||||
parr = append(parr, p.Value)
|
parr = append(parr, p.Value)
|
||||||
}
|
}
|
||||||
gp.resp <- parr
|
gp.resp <- parr
|
||||||
|
|
||||||
case lc := <-pm.getlocal:
|
case lc := <-pm.getlocal:
|
||||||
var keys []u.Key
|
var keys []u.Key
|
||||||
for k, _ := range pm.local {
|
for k, _ := range pm.local {
|
||||||
keys = append(keys, k)
|
keys = append(keys, k)
|
||||||
}
|
}
|
||||||
lc <- keys
|
lc <- keys
|
||||||
|
|
||||||
case <-tick.C:
|
case <-tick.C:
|
||||||
for k, provs := range pm.providers {
|
for k, provs := range pm.providers {
|
||||||
var filtered []*providerInfo
|
var filtered []*providerInfo
|
||||||
@ -76,7 +87,8 @@ func (pm *ProviderManager) run() {
|
|||||||
}
|
}
|
||||||
pm.providers[k] = filtered
|
pm.providers[k] = filtered
|
||||||
}
|
}
|
||||||
case <-pm.halt:
|
|
||||||
|
case <-pm.Closing():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -102,7 +114,3 @@ func (pm *ProviderManager) GetLocal() []u.Key {
|
|||||||
pm.getlocal <- resp
|
pm.getlocal <- resp
|
||||||
return <-resp
|
return <-resp
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pm *ProviderManager) Halt() {
|
|
||||||
pm.halt <- struct{}{}
|
|
||||||
}
|
|
||||||
|
@ -5,16 +5,19 @@ import (
|
|||||||
|
|
||||||
"github.com/jbenet/go-ipfs/peer"
|
"github.com/jbenet/go-ipfs/peer"
|
||||||
u "github.com/jbenet/go-ipfs/util"
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
|
|
||||||
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestProviderManager(t *testing.T) {
|
func TestProviderManager(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
mid := peer.ID("testing")
|
mid := peer.ID("testing")
|
||||||
p := NewProviderManager(mid)
|
p := NewProviderManager(ctx, mid)
|
||||||
a := u.Key("test")
|
a := u.Key("test")
|
||||||
p.AddProvider(a, peer.WithIDString("testingprovider"))
|
p.AddProvider(a, peer.WithIDString("testingprovider"))
|
||||||
resp := p.GetProviders(a)
|
resp := p.GetProviders(a)
|
||||||
if len(resp) != 1 {
|
if len(resp) != 1 {
|
||||||
t.Fatal("Could not retrieve provider.")
|
t.Fatal("Could not retrieve provider.")
|
||||||
}
|
}
|
||||||
p.Halt()
|
p.Close()
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user