1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-07-02 03:28:25 +08:00

Merge pull request #2804 from ipfs/fix/provs-cleanup

fix cleanup of empty provider sets
This commit is contained in:
Jeromy Johnson
2016-06-04 11:31:03 -07:00
2 changed files with 53 additions and 4 deletions

View File

@ -11,6 +11,9 @@ import (
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
) )
var ProvideValidity = time.Hour * 24
var defaultCleanupInterval = time.Hour
type ProviderManager struct { type ProviderManager struct {
// all non channel fields are meant to be accessed only within // all non channel fields are meant to be accessed only within
// the run method // the run method
@ -23,6 +26,8 @@ type ProviderManager struct {
getprovs chan *getProv getprovs chan *getProv
period time.Duration period time.Duration
proc goprocess.Process proc goprocess.Process
cleanupInterval time.Duration
} }
type providerSet struct { type providerSet struct {
@ -48,13 +53,14 @@ func NewProviderManager(ctx context.Context, local peer.ID) *ProviderManager {
pm.getlocal = make(chan chan []key.Key) pm.getlocal = make(chan chan []key.Key)
pm.local = make(map[key.Key]struct{}) pm.local = make(map[key.Key]struct{})
pm.proc = goprocessctx.WithContext(ctx) pm.proc = goprocessctx.WithContext(ctx)
pm.cleanupInterval = defaultCleanupInterval
pm.proc.Go(func(p goprocess.Process) { pm.run() }) pm.proc.Go(func(p goprocess.Process) { pm.run() })
return pm return pm
} }
func (pm *ProviderManager) run() { func (pm *ProviderManager) run() {
tick := time.NewTicker(time.Hour) tick := time.NewTicker(pm.cleanupInterval)
for { for {
select { select {
case np := <-pm.newprovs: case np := <-pm.newprovs:
@ -85,16 +91,21 @@ func (pm *ProviderManager) run() {
lc <- keys lc <- keys
case <-tick.C: case <-tick.C:
for _, provs := range pm.providers { for k, provs := range pm.providers {
var filtered []peer.ID var filtered []peer.ID
for p, t := range provs.set { for p, t := range provs.set {
if time.Now().Sub(t) > time.Hour*24 { if time.Now().Sub(t) > ProvideValidity {
delete(provs.set, p) delete(provs.set, p)
} else { } else {
filtered = append(filtered, p) filtered = append(filtered, p)
} }
} }
provs.providers = filtered
if len(filtered) > 0 {
provs.providers = filtered
} else {
delete(pm.providers, k)
}
} }
case <-pm.proc.Closing(): case <-pm.proc.Closing():

View File

@ -2,6 +2,7 @@ package dht
import ( import (
"testing" "testing"
"time"
key "github.com/ipfs/go-ipfs/blocks/key" key "github.com/ipfs/go-ipfs/blocks/key"
peer "gx/ipfs/QmQGwpJy9P4yXZySmqkZEXCmbBpJUb8xntCv8Ca4taZwDC/go-libp2p-peer" peer "gx/ipfs/QmQGwpJy9P4yXZySmqkZEXCmbBpJUb8xntCv8Ca4taZwDC/go-libp2p-peer"
@ -21,3 +22,40 @@ func TestProviderManager(t *testing.T) {
} }
p.proc.Close() p.proc.Close()
} }
func TestProvidesExpire(t *testing.T) {
ProvideValidity = time.Second
defaultCleanupInterval = time.Second
ctx := context.Background()
mid := peer.ID("testing")
p := NewProviderManager(ctx, mid)
peers := []peer.ID{"a", "b"}
var keys []key.Key
for i := 0; i < 10; i++ {
k := key.Key(i)
keys = append(keys, k)
p.AddProvider(ctx, k, peers[0])
p.AddProvider(ctx, k, peers[1])
}
for i := 0; i < 10; i++ {
out := p.GetProviders(ctx, keys[i])
if len(out) != 2 {
t.Fatal("expected providers to still be there")
}
}
time.Sleep(time.Second * 3)
for i := 0; i < 10; i++ {
out := p.GetProviders(ctx, keys[i])
if len(out) > 2 {
t.Fatal("expected providers to be cleaned up")
}
}
if len(p.providers) != 0 {
t.Fatal("providers map not cleaned up")
}
}