From 8f91069e50c1c755d43a905bb086d332fea01000 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 16 May 2016 17:01:00 -0700 Subject: [PATCH 1/3] Write providers to disk to avoid memory leaks License: MIT Signed-off-by: Jeromy --- routing/dht/dht.go | 7 +- routing/dht/providers.go | 165 ----------- routing/dht/providers/providers.go | 363 ++++++++++++++++++++++++ routing/dht/providers/providers_test.go | 134 +++++++++ routing/dht/providers_test.go | 61 ---- 5 files changed, 501 insertions(+), 229 deletions(-) delete mode 100644 routing/dht/providers.go create mode 100644 routing/dht/providers/providers.go create mode 100644 routing/dht/providers/providers_test.go delete mode 100644 routing/dht/providers_test.go diff --git a/routing/dht/dht.go b/routing/dht/dht.go index b914e9b86..ecee8c820 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -12,6 +12,7 @@ import ( key "github.com/ipfs/go-ipfs/blocks/key" routing "github.com/ipfs/go-ipfs/routing" pb "github.com/ipfs/go-ipfs/routing/dht/pb" + providers "github.com/ipfs/go-ipfs/routing/dht/providers" kb "github.com/ipfs/go-ipfs/routing/kbucket" record "github.com/ipfs/go-ipfs/routing/record" @@ -48,7 +49,7 @@ type IpfsDHT struct { datastore ds.Datastore // Local data routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes - providers *ProviderManager + providers *providers.ProviderManager birth time.Time // When this peer started up diaglock sync.Mutex // lock to make diagnostics work better @@ -84,8 +85,8 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.Datastore) *IpfsDHT { dht.ctx = ctx h.SetStreamHandler(ProtocolDHT, dht.handleNewStream) - dht.providers = NewProviderManager(dht.ctx, dht.self) - dht.proc.AddChild(dht.providers.proc) + dht.providers = providers.NewProviderManager(dht.ctx, dht.self, dstore) + dht.proc.AddChild(dht.providers.Process()) goprocessctx.CloseAfterContext(dht.proc, ctx) dht.routingTable = kb.NewRoutingTable(20, kb.ConvertPeerID(dht.self), time.Minute, dht.peerstore) diff --git a/routing/dht/providers.go b/routing/dht/providers.go deleted file mode 100644 index f1bdd088e..000000000 --- a/routing/dht/providers.go +++ /dev/null @@ -1,165 +0,0 @@ -package dht - -import ( - "time" - - key "github.com/ipfs/go-ipfs/blocks/key" - goprocess "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess" - goprocessctx "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess/context" - peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer" - - context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" -) - -var ProvideValidity = time.Hour * 24 -var defaultCleanupInterval = time.Hour - -type ProviderManager struct { - // all non channel fields are meant to be accessed only within - // the run method - providers map[key.Key]*providerSet - local map[key.Key]struct{} - lpeer peer.ID - - getlocal chan chan []key.Key - newprovs chan *addProv - getprovs chan *getProv - period time.Duration - proc goprocess.Process - - cleanupInterval time.Duration -} - -type providerSet struct { - providers []peer.ID - set map[peer.ID]time.Time -} - -type addProv struct { - k key.Key - val peer.ID -} - -type getProv struct { - k key.Key - resp chan []peer.ID -} - -func NewProviderManager(ctx context.Context, local peer.ID) *ProviderManager { - pm := new(ProviderManager) - pm.getprovs = make(chan *getProv) - pm.newprovs = make(chan *addProv) - pm.providers = make(map[key.Key]*providerSet) - pm.getlocal = make(chan chan []key.Key) - pm.local = make(map[key.Key]struct{}) - pm.proc = goprocessctx.WithContext(ctx) - pm.cleanupInterval = defaultCleanupInterval - pm.proc.Go(func(p goprocess.Process) { pm.run() }) - - return pm -} - -func (pm *ProviderManager) run() { - tick := time.NewTicker(pm.cleanupInterval) - for { - select { - case np := <-pm.newprovs: - if np.val == pm.lpeer { - pm.local[np.k] = struct{}{} - } - provs, ok := pm.providers[np.k] - if !ok { - provs = newProviderSet() - pm.providers[np.k] = provs - } - provs.Add(np.val) - - case gp := <-pm.getprovs: - var parr []peer.ID - provs, ok := pm.providers[gp.k] - if ok { - parr = provs.providers - } - - gp.resp <- parr - - case lc := <-pm.getlocal: - var keys []key.Key - for k := range pm.local { - keys = append(keys, k) - } - lc <- keys - - case <-tick.C: - for k, provs := range pm.providers { - var filtered []peer.ID - for p, t := range provs.set { - if time.Now().Sub(t) > ProvideValidity { - delete(provs.set, p) - } else { - filtered = append(filtered, p) - } - } - - if len(filtered) > 0 { - provs.providers = filtered - } else { - delete(pm.providers, k) - } - } - - case <-pm.proc.Closing(): - return - } - } -} - -func (pm *ProviderManager) AddProvider(ctx context.Context, k key.Key, val peer.ID) { - prov := &addProv{ - k: k, - val: val, - } - select { - case pm.newprovs <- prov: - case <-ctx.Done(): - } -} - -func (pm *ProviderManager) GetProviders(ctx context.Context, k key.Key) []peer.ID { - gp := &getProv{ - k: k, - resp: make(chan []peer.ID, 1), // buffered to prevent sender from blocking - } - select { - case <-ctx.Done(): - return nil - case pm.getprovs <- gp: - } - select { - case <-ctx.Done(): - return nil - case peers := <-gp.resp: - return peers - } -} - -func (pm *ProviderManager) GetLocal() []key.Key { - resp := make(chan []key.Key) - pm.getlocal <- resp - return <-resp -} - -func newProviderSet() *providerSet { - return &providerSet{ - set: make(map[peer.ID]time.Time), - } -} - -func (ps *providerSet) Add(p peer.ID) { - _, found := ps.set[p] - if !found { - ps.providers = append(ps.providers, p) - } - - ps.set[p] = time.Now() -} diff --git a/routing/dht/providers/providers.go b/routing/dht/providers/providers.go new file mode 100644 index 000000000..beb53d4ae --- /dev/null +++ b/routing/dht/providers/providers.go @@ -0,0 +1,363 @@ +package providers + +import ( + "encoding/base32" + "encoding/binary" + "fmt" + "strings" + "time" + + logging "gx/ipfs/QmNQynaz7qfriSUJkiEZUrm2Wen1u3Kj9goZzWtrPyu7XR/go-log" + goprocess "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess" + goprocessctx "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess/context" + peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer" + lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru" + ds "gx/ipfs/QmZ6A6P6AMo8SR3jXAwzTuSU6B9R2Y4eqW2yW9VvfUayDN/go-datastore" + dsq "gx/ipfs/QmZ6A6P6AMo8SR3jXAwzTuSU6B9R2Y4eqW2yW9VvfUayDN/go-datastore/query" + + key "github.com/ipfs/go-ipfs/blocks/key" + + context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" +) + +var log = logging.Logger("providers") + +var lruCacheSize = 256 +var ProvideValidity = time.Hour * 24 +var defaultCleanupInterval = time.Hour + +type ProviderManager struct { + // all non channel fields are meant to be accessed only within + // the run method + providers *lru.Cache + local map[key.Key]struct{} + lpeer peer.ID + dstore ds.Datastore + + getlocal chan chan []key.Key + newprovs chan *addProv + getprovs chan *getProv + period time.Duration + proc goprocess.Process + + cleanupInterval time.Duration +} + +type providerSet struct { + providers []peer.ID + set map[peer.ID]time.Time +} + +type addProv struct { + k key.Key + val peer.ID +} + +type getProv struct { + k key.Key + resp chan []peer.ID +} + +func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Datastore) *ProviderManager { + pm := new(ProviderManager) + pm.getprovs = make(chan *getProv) + pm.newprovs = make(chan *addProv) + pm.dstore = dstore + cache, err := lru.New(lruCacheSize) + if err != nil { + panic(err) //only happens if negative value is passed to lru constructor + } + pm.providers = cache + + pm.getlocal = make(chan chan []key.Key) + pm.local = make(map[key.Key]struct{}) + pm.proc = goprocessctx.WithContext(ctx) + pm.cleanupInterval = defaultCleanupInterval + pm.proc.Go(func(p goprocess.Process) { pm.run() }) + + return pm +} + +const providersKeyPrefix = "/providers/" + +func mkProvKey(k key.Key) ds.Key { + return ds.NewKey(providersKeyPrefix + base32.StdEncoding.EncodeToString([]byte(k))) +} + +func (pm *ProviderManager) Process() goprocess.Process { + return pm.proc +} + +func (pm *ProviderManager) providersForKey(k key.Key) ([]peer.ID, error) { + pset, err := pm.getProvSet(k) + if err != nil { + return nil, err + } + return pset.providers, nil +} + +func (pm *ProviderManager) getProvSet(k key.Key) (*providerSet, error) { + cached, ok := pm.providers.Get(k) + if ok { + return cached.(*providerSet), nil + } + + pset, err := loadProvSet(pm.dstore, k) + if err != nil { + return nil, err + } + + if len(pset.providers) > 0 { + pm.providers.Add(k, pset) + } + + return pset, nil +} + +func loadProvSet(dstore ds.Datastore, k key.Key) (*providerSet, error) { + res, err := dstore.Query(dsq.Query{Prefix: mkProvKey(k).String()}) + if err != nil { + return nil, err + } + + out := newProviderSet() + for e := range res.Next() { + if e.Error != nil { + log.Error("got an error: ", err) + continue + } + parts := strings.Split(e.Key, "/") + if len(parts) != 4 { + log.Warning("incorrectly formatted key: ", e.Key) + continue + } + + decstr, err := base32.StdEncoding.DecodeString(parts[len(parts)-1]) + if err != nil { + log.Error("base32 decoding error: ", err) + continue + } + + pid := peer.ID(decstr) + + t, err := readTimeValue(e.Value) + if err != nil { + log.Warning("parsing providers record from disk: ", err) + continue + } + + out.setVal(pid, t) + } + + return out, nil +} + +func readTimeValue(i interface{}) (time.Time, error) { + data, ok := i.([]byte) + if !ok { + return time.Time{}, fmt.Errorf("data was not a []byte") + } + + nsec, _ := binary.Varint(data) + + return time.Unix(0, nsec), nil +} + +func (pm *ProviderManager) addProv(k key.Key, p peer.ID) error { + iprovs, ok := pm.providers.Get(k) + if !ok { + iprovs = newProviderSet() + pm.providers.Add(k, iprovs) + } + provs := iprovs.(*providerSet) + now := time.Now() + provs.setVal(p, now) + + return writeProviderEntry(pm.dstore, k, p, now) +} + +func writeProviderEntry(dstore ds.Datastore, k key.Key, p peer.ID, t time.Time) error { + dsk := mkProvKey(k).ChildString(base32.StdEncoding.EncodeToString([]byte(p))) + + buf := make([]byte, 16) + n := binary.PutVarint(buf, t.UnixNano()) + + return dstore.Put(dsk, buf[:n]) +} + +func (pm *ProviderManager) deleteProvSet(k key.Key) error { + pm.providers.Remove(k) + + res, err := pm.dstore.Query(dsq.Query{ + KeysOnly: true, + Prefix: mkProvKey(k).String(), + }) + + entries, err := res.Rest() + if err != nil { + return err + } + + for _, e := range entries { + err := pm.dstore.Delete(ds.NewKey(e.Key)) + if err != nil { + log.Error("deleting provider set: ", err) + } + } + return nil +} + +func (pm *ProviderManager) getAllProvKeys() ([]key.Key, error) { + res, err := pm.dstore.Query(dsq.Query{ + KeysOnly: true, + Prefix: providersKeyPrefix, + }) + + if err != nil { + return nil, err + } + + entries, err := res.Rest() + if err != nil { + return nil, err + } + + out := make([]key.Key, 0, len(entries)) + seen := make(map[key.Key]struct{}) + for _, e := range entries { + parts := strings.Split(e.Key, "/") + if len(parts) != 4 { + log.Warning("incorrectly formatted provider entry in datastore") + continue + } + decoded, err := base32.StdEncoding.DecodeString(parts[2]) + if err != nil { + log.Warning("error decoding base32 provider key") + continue + } + + k := key.Key(decoded) + if _, ok := seen[k]; !ok { + out = append(out, key.Key(decoded)) + seen[k] = struct{}{} + } + } + + return out, nil +} + +func (pm *ProviderManager) run() { + tick := time.NewTicker(pm.cleanupInterval) + for { + select { + case np := <-pm.newprovs: + if np.val == pm.lpeer { + pm.local[np.k] = struct{}{} + } + err := pm.addProv(np.k, np.val) + if err != nil { + log.Error("error adding new providers: ", err) + } + case gp := <-pm.getprovs: + provs, err := pm.providersForKey(gp.k) + if err != nil && err != ds.ErrNotFound { + log.Error("error reading providers: ", err) + } + + gp.resp <- provs + case lc := <-pm.getlocal: + var keys []key.Key + for k := range pm.local { + keys = append(keys, k) + } + lc <- keys + + case <-tick.C: + keys, err := pm.getAllProvKeys() + if err != nil { + log.Error("Error loading provider keys: ", err) + continue + } + for _, k := range keys { + provs, err := pm.getProvSet(k) + if err != nil { + log.Error("error loading known provset: ", err) + continue + } + var filtered []peer.ID + for p, t := range provs.set { + if time.Now().Sub(t) > ProvideValidity { + delete(provs.set, p) + } else { + filtered = append(filtered, p) + } + } + + if len(filtered) > 0 { + provs.providers = filtered + } else { + err := pm.deleteProvSet(k) + if err != nil { + log.Error("error deleting provider set: ", err) + } + } + } + case <-pm.proc.Closing(): + return + } + } +} + +func (pm *ProviderManager) AddProvider(ctx context.Context, k key.Key, val peer.ID) { + prov := &addProv{ + k: k, + val: val, + } + select { + case pm.newprovs <- prov: + case <-ctx.Done(): + } +} + +func (pm *ProviderManager) GetProviders(ctx context.Context, k key.Key) []peer.ID { + gp := &getProv{ + k: k, + resp: make(chan []peer.ID, 1), // buffered to prevent sender from blocking + } + select { + case <-ctx.Done(): + return nil + case pm.getprovs <- gp: + } + select { + case <-ctx.Done(): + return nil + case peers := <-gp.resp: + return peers + } +} + +func (pm *ProviderManager) GetLocal() []key.Key { + resp := make(chan []key.Key) + pm.getlocal <- resp + return <-resp +} + +func newProviderSet() *providerSet { + return &providerSet{ + set: make(map[peer.ID]time.Time), + } +} + +func (ps *providerSet) Add(p peer.ID) { + ps.setVal(p, time.Now()) +} + +func (ps *providerSet) setVal(p peer.ID, t time.Time) { + _, found := ps.set[p] + if !found { + ps.providers = append(ps.providers, p) + } + + ps.set[p] = t +} diff --git a/routing/dht/providers/providers_test.go b/routing/dht/providers/providers_test.go new file mode 100644 index 000000000..2943e5ccb --- /dev/null +++ b/routing/dht/providers/providers_test.go @@ -0,0 +1,134 @@ +package providers + +import ( + "fmt" + "testing" + "time" + + key "github.com/ipfs/go-ipfs/blocks/key" + peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer" + ds "gx/ipfs/QmZ6A6P6AMo8SR3jXAwzTuSU6B9R2Y4eqW2yW9VvfUayDN/go-datastore" + + context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" +) + +func TestProviderManager(t *testing.T) { + ctx := context.Background() + mid := peer.ID("testing") + p := NewProviderManager(ctx, mid, ds.NewMapDatastore()) + a := key.Key("test") + p.AddProvider(ctx, a, peer.ID("testingprovider")) + resp := p.GetProviders(ctx, a) + if len(resp) != 1 { + t.Fatal("Could not retrieve provider.") + } + p.proc.Close() +} + +func TestProvidersDatastore(t *testing.T) { + old := lruCacheSize + lruCacheSize = 10 + defer func() { lruCacheSize = old }() + + ctx := context.Background() + mid := peer.ID("testing") + p := NewProviderManager(ctx, mid, ds.NewMapDatastore()) + defer p.proc.Close() + + friend := peer.ID("friend") + var keys []key.Key + for i := 0; i < 100; i++ { + k := key.Key(fmt.Sprint(i)) + keys = append(keys, k) + p.AddProvider(ctx, k, friend) + } + + for _, k := range keys { + resp := p.GetProviders(ctx, k) + if len(resp) != 1 { + t.Fatal("Could not retrieve provider.") + } + if resp[0] != friend { + t.Fatal("expected provider to be 'friend'") + } + } +} + +func TestProvidersSerialization(t *testing.T) { + dstore := ds.NewMapDatastore() + + k := key.Key("my key!") + p := peer.ID("my peer") + pt := time.Now() + + err := writeProviderEntry(dstore, k, p, pt) + if err != nil { + t.Fatal(err) + } + + pset, err := loadProvSet(dstore, k) + if err != nil { + t.Fatal(err) + } + + lt, ok := pset.set[p] + if !ok { + t.Fatal("failed to load set correctly") + } + + if pt != lt { + t.Fatal("time wasnt serialized correctly") + } +} + +func TestProvidesExpire(t *testing.T) { + pval := ProvideValidity + cleanup := defaultCleanupInterval + ProvideValidity = time.Second / 2 + defaultCleanupInterval = time.Second / 2 + defer func() { + ProvideValidity = pval + defaultCleanupInterval = cleanup + }() + + ctx := context.Background() + mid := peer.ID("testing") + p := NewProviderManager(ctx, mid, ds.NewMapDatastore()) + + peers := []peer.ID{"a", "b"} + var keys []key.Key + for i := 0; i < 10; i++ { + k := key.Key(i) + keys = append(keys, k) + p.AddProvider(ctx, k, peers[0]) + p.AddProvider(ctx, k, peers[1]) + } + + for i := 0; i < 10; i++ { + out := p.GetProviders(ctx, keys[i]) + if len(out) != 2 { + t.Fatal("expected providers to still be there") + } + } + + time.Sleep(time.Second) + for i := 0; i < 10; i++ { + out := p.GetProviders(ctx, keys[i]) + if len(out) > 2 { + t.Fatal("expected providers to be cleaned up") + } + } + + if p.providers.Len() != 0 { + t.Fatal("providers map not cleaned up") + } + + allprovs, err := p.getAllProvKeys() + if err != nil { + t.Fatal(err) + } + + if len(allprovs) != 0 { + t.Fatal("expected everything to be cleaned out of the datastore") + } +} diff --git a/routing/dht/providers_test.go b/routing/dht/providers_test.go deleted file mode 100644 index 362a83cb6..000000000 --- a/routing/dht/providers_test.go +++ /dev/null @@ -1,61 +0,0 @@ -package dht - -import ( - "testing" - "time" - - key "github.com/ipfs/go-ipfs/blocks/key" - peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer" - - context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" -) - -func TestProviderManager(t *testing.T) { - ctx := context.Background() - mid := peer.ID("testing") - p := NewProviderManager(ctx, mid) - a := key.Key("test") - p.AddProvider(ctx, a, peer.ID("testingprovider")) - resp := p.GetProviders(ctx, a) - if len(resp) != 1 { - t.Fatal("Could not retrieve provider.") - } - p.proc.Close() -} - -func TestProvidesExpire(t *testing.T) { - ProvideValidity = time.Second - defaultCleanupInterval = time.Second - - ctx := context.Background() - mid := peer.ID("testing") - p := NewProviderManager(ctx, mid) - - peers := []peer.ID{"a", "b"} - var keys []key.Key - for i := 0; i < 10; i++ { - k := key.Key(i) - keys = append(keys, k) - p.AddProvider(ctx, k, peers[0]) - p.AddProvider(ctx, k, peers[1]) - } - - for i := 0; i < 10; i++ { - out := p.GetProviders(ctx, keys[i]) - if len(out) != 2 { - t.Fatal("expected providers to still be there") - } - } - - time.Sleep(time.Second * 3) - for i := 0; i < 10; i++ { - out := p.GetProviders(ctx, keys[i]) - if len(out) > 2 { - t.Fatal("expected providers to be cleaned up") - } - } - - if len(p.providers) != 0 { - t.Fatal("providers map not cleaned up") - } -} From 959fe64dd3ec72f9ec968052555963bc27e71cdb Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 25 Jun 2016 22:59:57 -0700 Subject: [PATCH 2/3] providers test with multiple peers License: MIT Signed-off-by: Jeromy --- routing/dht/providers/providers_test.go | 26 ++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/routing/dht/providers/providers_test.go b/routing/dht/providers/providers_test.go index 2943e5ccb..660640267 100644 --- a/routing/dht/providers/providers_test.go +++ b/routing/dht/providers/providers_test.go @@ -58,10 +58,17 @@ func TestProvidersSerialization(t *testing.T) { dstore := ds.NewMapDatastore() k := key.Key("my key!") - p := peer.ID("my peer") - pt := time.Now() + p1 := peer.ID("peer one") + p2 := peer.ID("peer two") + pt1 := time.Now() + pt2 := pt1.Add(time.Hour) - err := writeProviderEntry(dstore, k, p, pt) + err := writeProviderEntry(dstore, k, p1, pt1) + if err != nil { + t.Fatal(err) + } + + err = writeProviderEntry(dstore, k, p2, pt2) if err != nil { t.Fatal(err) } @@ -71,12 +78,21 @@ func TestProvidersSerialization(t *testing.T) { t.Fatal(err) } - lt, ok := pset.set[p] + lt1, ok := pset.set[p1] if !ok { t.Fatal("failed to load set correctly") } - if pt != lt { + if pt1 != lt1 { + t.Fatal("time wasnt serialized correctly") + } + + lt2, ok := pset.set[p2] + if !ok { + t.Fatal("failed to load set correctly") + } + + if pt2 != lt2 { t.Fatal("time wasnt serialized correctly") } } From d489f82b590e00257296bce45daf1671770d892e Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 27 Jun 2016 14:43:01 -0700 Subject: [PATCH 3/3] use no padding encoding License: MIT Signed-off-by: Jeromy --- package.json | 6 ++++++ routing/dht/providers/providers.go | 10 +++++----- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/package.json b/package.json index 7a37a3c4f..4100728eb 100644 --- a/package.json +++ b/package.json @@ -171,6 +171,12 @@ "hash": "QmQdnfvZQuhdT93LNc5bos52wAmdr3G2p6G8teLJMEN32P", "name": "go-libp2p-peerstore", "version": "1.1.2" + }, + { + "author": "whyrusleeping", + "hash": "Qmb1DA2A9LS2wR4FFweB4uEDomFsdmnw1VLawLE1yQzudj", + "name": "base32", + "version": "0.0.0" } ], "gxVersion": "0.4.0", diff --git a/routing/dht/providers/providers.go b/routing/dht/providers/providers.go index beb53d4ae..95d9b70d2 100644 --- a/routing/dht/providers/providers.go +++ b/routing/dht/providers/providers.go @@ -1,7 +1,6 @@ package providers import ( - "encoding/base32" "encoding/binary" "fmt" "strings" @@ -14,6 +13,7 @@ import ( lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru" ds "gx/ipfs/QmZ6A6P6AMo8SR3jXAwzTuSU6B9R2Y4eqW2yW9VvfUayDN/go-datastore" dsq "gx/ipfs/QmZ6A6P6AMo8SR3jXAwzTuSU6B9R2Y4eqW2yW9VvfUayDN/go-datastore/query" + base32 "gx/ipfs/Qmb1DA2A9LS2wR4FFweB4uEDomFsdmnw1VLawLE1yQzudj/base32" key "github.com/ipfs/go-ipfs/blocks/key" @@ -81,7 +81,7 @@ func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Datastore) const providersKeyPrefix = "/providers/" func mkProvKey(k key.Key) ds.Key { - return ds.NewKey(providersKeyPrefix + base32.StdEncoding.EncodeToString([]byte(k))) + return ds.NewKey(providersKeyPrefix + base32.RawStdEncoding.EncodeToString([]byte(k))) } func (pm *ProviderManager) Process() goprocess.Process { @@ -132,7 +132,7 @@ func loadProvSet(dstore ds.Datastore, k key.Key) (*providerSet, error) { continue } - decstr, err := base32.StdEncoding.DecodeString(parts[len(parts)-1]) + decstr, err := base32.RawStdEncoding.DecodeString(parts[len(parts)-1]) if err != nil { log.Error("base32 decoding error: ", err) continue @@ -177,7 +177,7 @@ func (pm *ProviderManager) addProv(k key.Key, p peer.ID) error { } func writeProviderEntry(dstore ds.Datastore, k key.Key, p peer.ID, t time.Time) error { - dsk := mkProvKey(k).ChildString(base32.StdEncoding.EncodeToString([]byte(p))) + dsk := mkProvKey(k).ChildString(base32.RawStdEncoding.EncodeToString([]byte(p))) buf := make([]byte, 16) n := binary.PutVarint(buf, t.UnixNano()) @@ -230,7 +230,7 @@ func (pm *ProviderManager) getAllProvKeys() ([]key.Key, error) { log.Warning("incorrectly formatted provider entry in datastore") continue } - decoded, err := base32.StdEncoding.DecodeString(parts[2]) + decoded, err := base32.RawStdEncoding.DecodeString(parts[2]) if err != nil { log.Warning("error decoding base32 provider key") continue