mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-29 17:36:38 +08:00
Merge pull request #2860 from ipfs/feat/provide-storage
Write providers to disk to avoid memory leaks
This commit is contained in:
@ -171,6 +171,12 @@
|
|||||||
"hash": "QmQdnfvZQuhdT93LNc5bos52wAmdr3G2p6G8teLJMEN32P",
|
"hash": "QmQdnfvZQuhdT93LNc5bos52wAmdr3G2p6G8teLJMEN32P",
|
||||||
"name": "go-libp2p-peerstore",
|
"name": "go-libp2p-peerstore",
|
||||||
"version": "1.1.2"
|
"version": "1.1.2"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"author": "whyrusleeping",
|
||||||
|
"hash": "Qmb1DA2A9LS2wR4FFweB4uEDomFsdmnw1VLawLE1yQzudj",
|
||||||
|
"name": "base32",
|
||||||
|
"version": "0.0.0"
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"gxVersion": "0.4.0",
|
"gxVersion": "0.4.0",
|
||||||
|
@ -12,6 +12,7 @@ import (
|
|||||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||||
routing "github.com/ipfs/go-ipfs/routing"
|
routing "github.com/ipfs/go-ipfs/routing"
|
||||||
pb "github.com/ipfs/go-ipfs/routing/dht/pb"
|
pb "github.com/ipfs/go-ipfs/routing/dht/pb"
|
||||||
|
providers "github.com/ipfs/go-ipfs/routing/dht/providers"
|
||||||
kb "github.com/ipfs/go-ipfs/routing/kbucket"
|
kb "github.com/ipfs/go-ipfs/routing/kbucket"
|
||||||
record "github.com/ipfs/go-ipfs/routing/record"
|
record "github.com/ipfs/go-ipfs/routing/record"
|
||||||
|
|
||||||
@ -48,7 +49,7 @@ type IpfsDHT struct {
|
|||||||
datastore ds.Datastore // Local data
|
datastore ds.Datastore // Local data
|
||||||
|
|
||||||
routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
|
routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
|
||||||
providers *ProviderManager
|
providers *providers.ProviderManager
|
||||||
|
|
||||||
birth time.Time // When this peer started up
|
birth time.Time // When this peer started up
|
||||||
diaglock sync.Mutex // lock to make diagnostics work better
|
diaglock sync.Mutex // lock to make diagnostics work better
|
||||||
@ -84,8 +85,8 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.Datastore) *IpfsDHT {
|
|||||||
dht.ctx = ctx
|
dht.ctx = ctx
|
||||||
|
|
||||||
h.SetStreamHandler(ProtocolDHT, dht.handleNewStream)
|
h.SetStreamHandler(ProtocolDHT, dht.handleNewStream)
|
||||||
dht.providers = NewProviderManager(dht.ctx, dht.self)
|
dht.providers = providers.NewProviderManager(dht.ctx, dht.self, dstore)
|
||||||
dht.proc.AddChild(dht.providers.proc)
|
dht.proc.AddChild(dht.providers.Process())
|
||||||
goprocessctx.CloseAfterContext(dht.proc, ctx)
|
goprocessctx.CloseAfterContext(dht.proc, ctx)
|
||||||
|
|
||||||
dht.routingTable = kb.NewRoutingTable(20, kb.ConvertPeerID(dht.self), time.Minute, dht.peerstore)
|
dht.routingTable = kb.NewRoutingTable(20, kb.ConvertPeerID(dht.self), time.Minute, dht.peerstore)
|
||||||
|
@ -1,165 +0,0 @@
|
|||||||
package dht
|
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
|
|
||||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
|
||||||
goprocess "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess"
|
|
||||||
goprocessctx "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess/context"
|
|
||||||
peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer"
|
|
||||||
|
|
||||||
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
|
|
||||||
)
|
|
||||||
|
|
||||||
var ProvideValidity = time.Hour * 24
|
|
||||||
var defaultCleanupInterval = time.Hour
|
|
||||||
|
|
||||||
type ProviderManager struct {
|
|
||||||
// all non channel fields are meant to be accessed only within
|
|
||||||
// the run method
|
|
||||||
providers map[key.Key]*providerSet
|
|
||||||
local map[key.Key]struct{}
|
|
||||||
lpeer peer.ID
|
|
||||||
|
|
||||||
getlocal chan chan []key.Key
|
|
||||||
newprovs chan *addProv
|
|
||||||
getprovs chan *getProv
|
|
||||||
period time.Duration
|
|
||||||
proc goprocess.Process
|
|
||||||
|
|
||||||
cleanupInterval time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
type providerSet struct {
|
|
||||||
providers []peer.ID
|
|
||||||
set map[peer.ID]time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
type addProv struct {
|
|
||||||
k key.Key
|
|
||||||
val peer.ID
|
|
||||||
}
|
|
||||||
|
|
||||||
type getProv struct {
|
|
||||||
k key.Key
|
|
||||||
resp chan []peer.ID
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewProviderManager(ctx context.Context, local peer.ID) *ProviderManager {
|
|
||||||
pm := new(ProviderManager)
|
|
||||||
pm.getprovs = make(chan *getProv)
|
|
||||||
pm.newprovs = make(chan *addProv)
|
|
||||||
pm.providers = make(map[key.Key]*providerSet)
|
|
||||||
pm.getlocal = make(chan chan []key.Key)
|
|
||||||
pm.local = make(map[key.Key]struct{})
|
|
||||||
pm.proc = goprocessctx.WithContext(ctx)
|
|
||||||
pm.cleanupInterval = defaultCleanupInterval
|
|
||||||
pm.proc.Go(func(p goprocess.Process) { pm.run() })
|
|
||||||
|
|
||||||
return pm
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pm *ProviderManager) run() {
|
|
||||||
tick := time.NewTicker(pm.cleanupInterval)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case np := <-pm.newprovs:
|
|
||||||
if np.val == pm.lpeer {
|
|
||||||
pm.local[np.k] = struct{}{}
|
|
||||||
}
|
|
||||||
provs, ok := pm.providers[np.k]
|
|
||||||
if !ok {
|
|
||||||
provs = newProviderSet()
|
|
||||||
pm.providers[np.k] = provs
|
|
||||||
}
|
|
||||||
provs.Add(np.val)
|
|
||||||
|
|
||||||
case gp := <-pm.getprovs:
|
|
||||||
var parr []peer.ID
|
|
||||||
provs, ok := pm.providers[gp.k]
|
|
||||||
if ok {
|
|
||||||
parr = provs.providers
|
|
||||||
}
|
|
||||||
|
|
||||||
gp.resp <- parr
|
|
||||||
|
|
||||||
case lc := <-pm.getlocal:
|
|
||||||
var keys []key.Key
|
|
||||||
for k := range pm.local {
|
|
||||||
keys = append(keys, k)
|
|
||||||
}
|
|
||||||
lc <- keys
|
|
||||||
|
|
||||||
case <-tick.C:
|
|
||||||
for k, provs := range pm.providers {
|
|
||||||
var filtered []peer.ID
|
|
||||||
for p, t := range provs.set {
|
|
||||||
if time.Now().Sub(t) > ProvideValidity {
|
|
||||||
delete(provs.set, p)
|
|
||||||
} else {
|
|
||||||
filtered = append(filtered, p)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(filtered) > 0 {
|
|
||||||
provs.providers = filtered
|
|
||||||
} else {
|
|
||||||
delete(pm.providers, k)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
case <-pm.proc.Closing():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pm *ProviderManager) AddProvider(ctx context.Context, k key.Key, val peer.ID) {
|
|
||||||
prov := &addProv{
|
|
||||||
k: k,
|
|
||||||
val: val,
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case pm.newprovs <- prov:
|
|
||||||
case <-ctx.Done():
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pm *ProviderManager) GetProviders(ctx context.Context, k key.Key) []peer.ID {
|
|
||||||
gp := &getProv{
|
|
||||||
k: k,
|
|
||||||
resp: make(chan []peer.ID, 1), // buffered to prevent sender from blocking
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return nil
|
|
||||||
case pm.getprovs <- gp:
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return nil
|
|
||||||
case peers := <-gp.resp:
|
|
||||||
return peers
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pm *ProviderManager) GetLocal() []key.Key {
|
|
||||||
resp := make(chan []key.Key)
|
|
||||||
pm.getlocal <- resp
|
|
||||||
return <-resp
|
|
||||||
}
|
|
||||||
|
|
||||||
func newProviderSet() *providerSet {
|
|
||||||
return &providerSet{
|
|
||||||
set: make(map[peer.ID]time.Time),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ps *providerSet) Add(p peer.ID) {
|
|
||||||
_, found := ps.set[p]
|
|
||||||
if !found {
|
|
||||||
ps.providers = append(ps.providers, p)
|
|
||||||
}
|
|
||||||
|
|
||||||
ps.set[p] = time.Now()
|
|
||||||
}
|
|
363
routing/dht/providers/providers.go
Normal file
363
routing/dht/providers/providers.go
Normal file
@ -0,0 +1,363 @@
|
|||||||
|
package providers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
logging "gx/ipfs/QmNQynaz7qfriSUJkiEZUrm2Wen1u3Kj9goZzWtrPyu7XR/go-log"
|
||||||
|
goprocess "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess"
|
||||||
|
goprocessctx "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess/context"
|
||||||
|
peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer"
|
||||||
|
lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru"
|
||||||
|
ds "gx/ipfs/QmZ6A6P6AMo8SR3jXAwzTuSU6B9R2Y4eqW2yW9VvfUayDN/go-datastore"
|
||||||
|
dsq "gx/ipfs/QmZ6A6P6AMo8SR3jXAwzTuSU6B9R2Y4eqW2yW9VvfUayDN/go-datastore/query"
|
||||||
|
base32 "gx/ipfs/Qmb1DA2A9LS2wR4FFweB4uEDomFsdmnw1VLawLE1yQzudj/base32"
|
||||||
|
|
||||||
|
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||||
|
|
||||||
|
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
|
||||||
|
)
|
||||||
|
|
||||||
|
var log = logging.Logger("providers")
|
||||||
|
|
||||||
|
var lruCacheSize = 256
|
||||||
|
var ProvideValidity = time.Hour * 24
|
||||||
|
var defaultCleanupInterval = time.Hour
|
||||||
|
|
||||||
|
type ProviderManager struct {
|
||||||
|
// all non channel fields are meant to be accessed only within
|
||||||
|
// the run method
|
||||||
|
providers *lru.Cache
|
||||||
|
local map[key.Key]struct{}
|
||||||
|
lpeer peer.ID
|
||||||
|
dstore ds.Datastore
|
||||||
|
|
||||||
|
getlocal chan chan []key.Key
|
||||||
|
newprovs chan *addProv
|
||||||
|
getprovs chan *getProv
|
||||||
|
period time.Duration
|
||||||
|
proc goprocess.Process
|
||||||
|
|
||||||
|
cleanupInterval time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
type providerSet struct {
|
||||||
|
providers []peer.ID
|
||||||
|
set map[peer.ID]time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
type addProv struct {
|
||||||
|
k key.Key
|
||||||
|
val peer.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
type getProv struct {
|
||||||
|
k key.Key
|
||||||
|
resp chan []peer.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Datastore) *ProviderManager {
|
||||||
|
pm := new(ProviderManager)
|
||||||
|
pm.getprovs = make(chan *getProv)
|
||||||
|
pm.newprovs = make(chan *addProv)
|
||||||
|
pm.dstore = dstore
|
||||||
|
cache, err := lru.New(lruCacheSize)
|
||||||
|
if err != nil {
|
||||||
|
panic(err) //only happens if negative value is passed to lru constructor
|
||||||
|
}
|
||||||
|
pm.providers = cache
|
||||||
|
|
||||||
|
pm.getlocal = make(chan chan []key.Key)
|
||||||
|
pm.local = make(map[key.Key]struct{})
|
||||||
|
pm.proc = goprocessctx.WithContext(ctx)
|
||||||
|
pm.cleanupInterval = defaultCleanupInterval
|
||||||
|
pm.proc.Go(func(p goprocess.Process) { pm.run() })
|
||||||
|
|
||||||
|
return pm
|
||||||
|
}
|
||||||
|
|
||||||
|
const providersKeyPrefix = "/providers/"
|
||||||
|
|
||||||
|
func mkProvKey(k key.Key) ds.Key {
|
||||||
|
return ds.NewKey(providersKeyPrefix + base32.RawStdEncoding.EncodeToString([]byte(k)))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pm *ProviderManager) Process() goprocess.Process {
|
||||||
|
return pm.proc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pm *ProviderManager) providersForKey(k key.Key) ([]peer.ID, error) {
|
||||||
|
pset, err := pm.getProvSet(k)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return pset.providers, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pm *ProviderManager) getProvSet(k key.Key) (*providerSet, error) {
|
||||||
|
cached, ok := pm.providers.Get(k)
|
||||||
|
if ok {
|
||||||
|
return cached.(*providerSet), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
pset, err := loadProvSet(pm.dstore, k)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(pset.providers) > 0 {
|
||||||
|
pm.providers.Add(k, pset)
|
||||||
|
}
|
||||||
|
|
||||||
|
return pset, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func loadProvSet(dstore ds.Datastore, k key.Key) (*providerSet, error) {
|
||||||
|
res, err := dstore.Query(dsq.Query{Prefix: mkProvKey(k).String()})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
out := newProviderSet()
|
||||||
|
for e := range res.Next() {
|
||||||
|
if e.Error != nil {
|
||||||
|
log.Error("got an error: ", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
parts := strings.Split(e.Key, "/")
|
||||||
|
if len(parts) != 4 {
|
||||||
|
log.Warning("incorrectly formatted key: ", e.Key)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
decstr, err := base32.RawStdEncoding.DecodeString(parts[len(parts)-1])
|
||||||
|
if err != nil {
|
||||||
|
log.Error("base32 decoding error: ", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
pid := peer.ID(decstr)
|
||||||
|
|
||||||
|
t, err := readTimeValue(e.Value)
|
||||||
|
if err != nil {
|
||||||
|
log.Warning("parsing providers record from disk: ", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
out.setVal(pid, t)
|
||||||
|
}
|
||||||
|
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func readTimeValue(i interface{}) (time.Time, error) {
|
||||||
|
data, ok := i.([]byte)
|
||||||
|
if !ok {
|
||||||
|
return time.Time{}, fmt.Errorf("data was not a []byte")
|
||||||
|
}
|
||||||
|
|
||||||
|
nsec, _ := binary.Varint(data)
|
||||||
|
|
||||||
|
return time.Unix(0, nsec), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pm *ProviderManager) addProv(k key.Key, p peer.ID) error {
|
||||||
|
iprovs, ok := pm.providers.Get(k)
|
||||||
|
if !ok {
|
||||||
|
iprovs = newProviderSet()
|
||||||
|
pm.providers.Add(k, iprovs)
|
||||||
|
}
|
||||||
|
provs := iprovs.(*providerSet)
|
||||||
|
now := time.Now()
|
||||||
|
provs.setVal(p, now)
|
||||||
|
|
||||||
|
return writeProviderEntry(pm.dstore, k, p, now)
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeProviderEntry(dstore ds.Datastore, k key.Key, p peer.ID, t time.Time) error {
|
||||||
|
dsk := mkProvKey(k).ChildString(base32.RawStdEncoding.EncodeToString([]byte(p)))
|
||||||
|
|
||||||
|
buf := make([]byte, 16)
|
||||||
|
n := binary.PutVarint(buf, t.UnixNano())
|
||||||
|
|
||||||
|
return dstore.Put(dsk, buf[:n])
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pm *ProviderManager) deleteProvSet(k key.Key) error {
|
||||||
|
pm.providers.Remove(k)
|
||||||
|
|
||||||
|
res, err := pm.dstore.Query(dsq.Query{
|
||||||
|
KeysOnly: true,
|
||||||
|
Prefix: mkProvKey(k).String(),
|
||||||
|
})
|
||||||
|
|
||||||
|
entries, err := res.Rest()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, e := range entries {
|
||||||
|
err := pm.dstore.Delete(ds.NewKey(e.Key))
|
||||||
|
if err != nil {
|
||||||
|
log.Error("deleting provider set: ", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pm *ProviderManager) getAllProvKeys() ([]key.Key, error) {
|
||||||
|
res, err := pm.dstore.Query(dsq.Query{
|
||||||
|
KeysOnly: true,
|
||||||
|
Prefix: providersKeyPrefix,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
entries, err := res.Rest()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
out := make([]key.Key, 0, len(entries))
|
||||||
|
seen := make(map[key.Key]struct{})
|
||||||
|
for _, e := range entries {
|
||||||
|
parts := strings.Split(e.Key, "/")
|
||||||
|
if len(parts) != 4 {
|
||||||
|
log.Warning("incorrectly formatted provider entry in datastore")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
decoded, err := base32.RawStdEncoding.DecodeString(parts[2])
|
||||||
|
if err != nil {
|
||||||
|
log.Warning("error decoding base32 provider key")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
k := key.Key(decoded)
|
||||||
|
if _, ok := seen[k]; !ok {
|
||||||
|
out = append(out, key.Key(decoded))
|
||||||
|
seen[k] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pm *ProviderManager) run() {
|
||||||
|
tick := time.NewTicker(pm.cleanupInterval)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case np := <-pm.newprovs:
|
||||||
|
if np.val == pm.lpeer {
|
||||||
|
pm.local[np.k] = struct{}{}
|
||||||
|
}
|
||||||
|
err := pm.addProv(np.k, np.val)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("error adding new providers: ", err)
|
||||||
|
}
|
||||||
|
case gp := <-pm.getprovs:
|
||||||
|
provs, err := pm.providersForKey(gp.k)
|
||||||
|
if err != nil && err != ds.ErrNotFound {
|
||||||
|
log.Error("error reading providers: ", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
gp.resp <- provs
|
||||||
|
case lc := <-pm.getlocal:
|
||||||
|
var keys []key.Key
|
||||||
|
for k := range pm.local {
|
||||||
|
keys = append(keys, k)
|
||||||
|
}
|
||||||
|
lc <- keys
|
||||||
|
|
||||||
|
case <-tick.C:
|
||||||
|
keys, err := pm.getAllProvKeys()
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Error loading provider keys: ", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, k := range keys {
|
||||||
|
provs, err := pm.getProvSet(k)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("error loading known provset: ", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var filtered []peer.ID
|
||||||
|
for p, t := range provs.set {
|
||||||
|
if time.Now().Sub(t) > ProvideValidity {
|
||||||
|
delete(provs.set, p)
|
||||||
|
} else {
|
||||||
|
filtered = append(filtered, p)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(filtered) > 0 {
|
||||||
|
provs.providers = filtered
|
||||||
|
} else {
|
||||||
|
err := pm.deleteProvSet(k)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("error deleting provider set: ", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case <-pm.proc.Closing():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pm *ProviderManager) AddProvider(ctx context.Context, k key.Key, val peer.ID) {
|
||||||
|
prov := &addProv{
|
||||||
|
k: k,
|
||||||
|
val: val,
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case pm.newprovs <- prov:
|
||||||
|
case <-ctx.Done():
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pm *ProviderManager) GetProviders(ctx context.Context, k key.Key) []peer.ID {
|
||||||
|
gp := &getProv{
|
||||||
|
k: k,
|
||||||
|
resp: make(chan []peer.ID, 1), // buffered to prevent sender from blocking
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil
|
||||||
|
case pm.getprovs <- gp:
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil
|
||||||
|
case peers := <-gp.resp:
|
||||||
|
return peers
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pm *ProviderManager) GetLocal() []key.Key {
|
||||||
|
resp := make(chan []key.Key)
|
||||||
|
pm.getlocal <- resp
|
||||||
|
return <-resp
|
||||||
|
}
|
||||||
|
|
||||||
|
func newProviderSet() *providerSet {
|
||||||
|
return &providerSet{
|
||||||
|
set: make(map[peer.ID]time.Time),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ps *providerSet) Add(p peer.ID) {
|
||||||
|
ps.setVal(p, time.Now())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ps *providerSet) setVal(p peer.ID, t time.Time) {
|
||||||
|
_, found := ps.set[p]
|
||||||
|
if !found {
|
||||||
|
ps.providers = append(ps.providers, p)
|
||||||
|
}
|
||||||
|
|
||||||
|
ps.set[p] = t
|
||||||
|
}
|
150
routing/dht/providers/providers_test.go
Normal file
150
routing/dht/providers/providers_test.go
Normal file
@ -0,0 +1,150 @@
|
|||||||
|
package providers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||||
|
peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer"
|
||||||
|
ds "gx/ipfs/QmZ6A6P6AMo8SR3jXAwzTuSU6B9R2Y4eqW2yW9VvfUayDN/go-datastore"
|
||||||
|
|
||||||
|
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestProviderManager(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
mid := peer.ID("testing")
|
||||||
|
p := NewProviderManager(ctx, mid, ds.NewMapDatastore())
|
||||||
|
a := key.Key("test")
|
||||||
|
p.AddProvider(ctx, a, peer.ID("testingprovider"))
|
||||||
|
resp := p.GetProviders(ctx, a)
|
||||||
|
if len(resp) != 1 {
|
||||||
|
t.Fatal("Could not retrieve provider.")
|
||||||
|
}
|
||||||
|
p.proc.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProvidersDatastore(t *testing.T) {
|
||||||
|
old := lruCacheSize
|
||||||
|
lruCacheSize = 10
|
||||||
|
defer func() { lruCacheSize = old }()
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
mid := peer.ID("testing")
|
||||||
|
p := NewProviderManager(ctx, mid, ds.NewMapDatastore())
|
||||||
|
defer p.proc.Close()
|
||||||
|
|
||||||
|
friend := peer.ID("friend")
|
||||||
|
var keys []key.Key
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
k := key.Key(fmt.Sprint(i))
|
||||||
|
keys = append(keys, k)
|
||||||
|
p.AddProvider(ctx, k, friend)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, k := range keys {
|
||||||
|
resp := p.GetProviders(ctx, k)
|
||||||
|
if len(resp) != 1 {
|
||||||
|
t.Fatal("Could not retrieve provider.")
|
||||||
|
}
|
||||||
|
if resp[0] != friend {
|
||||||
|
t.Fatal("expected provider to be 'friend'")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProvidersSerialization(t *testing.T) {
|
||||||
|
dstore := ds.NewMapDatastore()
|
||||||
|
|
||||||
|
k := key.Key("my key!")
|
||||||
|
p1 := peer.ID("peer one")
|
||||||
|
p2 := peer.ID("peer two")
|
||||||
|
pt1 := time.Now()
|
||||||
|
pt2 := pt1.Add(time.Hour)
|
||||||
|
|
||||||
|
err := writeProviderEntry(dstore, k, p1, pt1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = writeProviderEntry(dstore, k, p2, pt2)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
pset, err := loadProvSet(dstore, k)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
lt1, ok := pset.set[p1]
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("failed to load set correctly")
|
||||||
|
}
|
||||||
|
|
||||||
|
if pt1 != lt1 {
|
||||||
|
t.Fatal("time wasnt serialized correctly")
|
||||||
|
}
|
||||||
|
|
||||||
|
lt2, ok := pset.set[p2]
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("failed to load set correctly")
|
||||||
|
}
|
||||||
|
|
||||||
|
if pt2 != lt2 {
|
||||||
|
t.Fatal("time wasnt serialized correctly")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProvidesExpire(t *testing.T) {
|
||||||
|
pval := ProvideValidity
|
||||||
|
cleanup := defaultCleanupInterval
|
||||||
|
ProvideValidity = time.Second / 2
|
||||||
|
defaultCleanupInterval = time.Second / 2
|
||||||
|
defer func() {
|
||||||
|
ProvideValidity = pval
|
||||||
|
defaultCleanupInterval = cleanup
|
||||||
|
}()
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
mid := peer.ID("testing")
|
||||||
|
p := NewProviderManager(ctx, mid, ds.NewMapDatastore())
|
||||||
|
|
||||||
|
peers := []peer.ID{"a", "b"}
|
||||||
|
var keys []key.Key
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
k := key.Key(i)
|
||||||
|
keys = append(keys, k)
|
||||||
|
p.AddProvider(ctx, k, peers[0])
|
||||||
|
p.AddProvider(ctx, k, peers[1])
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
out := p.GetProviders(ctx, keys[i])
|
||||||
|
if len(out) != 2 {
|
||||||
|
t.Fatal("expected providers to still be there")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
out := p.GetProviders(ctx, keys[i])
|
||||||
|
if len(out) > 2 {
|
||||||
|
t.Fatal("expected providers to be cleaned up")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.providers.Len() != 0 {
|
||||||
|
t.Fatal("providers map not cleaned up")
|
||||||
|
}
|
||||||
|
|
||||||
|
allprovs, err := p.getAllProvKeys()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(allprovs) != 0 {
|
||||||
|
t.Fatal("expected everything to be cleaned out of the datastore")
|
||||||
|
}
|
||||||
|
}
|
@ -1,61 +0,0 @@
|
|||||||
package dht
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
|
||||||
peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer"
|
|
||||||
|
|
||||||
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestProviderManager(t *testing.T) {
|
|
||||||
ctx := context.Background()
|
|
||||||
mid := peer.ID("testing")
|
|
||||||
p := NewProviderManager(ctx, mid)
|
|
||||||
a := key.Key("test")
|
|
||||||
p.AddProvider(ctx, a, peer.ID("testingprovider"))
|
|
||||||
resp := p.GetProviders(ctx, a)
|
|
||||||
if len(resp) != 1 {
|
|
||||||
t.Fatal("Could not retrieve provider.")
|
|
||||||
}
|
|
||||||
p.proc.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestProvidesExpire(t *testing.T) {
|
|
||||||
ProvideValidity = time.Second
|
|
||||||
defaultCleanupInterval = time.Second
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
mid := peer.ID("testing")
|
|
||||||
p := NewProviderManager(ctx, mid)
|
|
||||||
|
|
||||||
peers := []peer.ID{"a", "b"}
|
|
||||||
var keys []key.Key
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
k := key.Key(i)
|
|
||||||
keys = append(keys, k)
|
|
||||||
p.AddProvider(ctx, k, peers[0])
|
|
||||||
p.AddProvider(ctx, k, peers[1])
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
out := p.GetProviders(ctx, keys[i])
|
|
||||||
if len(out) != 2 {
|
|
||||||
t.Fatal("expected providers to still be there")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
time.Sleep(time.Second * 3)
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
out := p.GetProviders(ctx, keys[i])
|
|
||||||
if len(out) > 2 {
|
|
||||||
t.Fatal("expected providers to be cleaned up")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(p.providers) != 0 {
|
|
||||||
t.Fatal("providers map not cleaned up")
|
|
||||||
}
|
|
||||||
}
|
|
Reference in New Issue
Block a user