mirror of
https://github.com/ipfs/kubo.git
synced 2025-09-10 05:52:20 +08:00
dht_test large providers test
This commit is contained in:
@ -2,7 +2,9 @@ package dht
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"fmt"
|
||||||
"sort"
|
"sort"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -19,6 +21,17 @@ import (
|
|||||||
testutil "github.com/jbenet/go-ipfs/util/testutil"
|
testutil "github.com/jbenet/go-ipfs/util/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var testCaseValues = map[u.Key][]byte{}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
testCaseValues["hello"] = []byte("world")
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
k := fmt.Sprintf("%d -- key", i)
|
||||||
|
v := fmt.Sprintf("%d -- value", i)
|
||||||
|
testCaseValues[u.Key(k)] = []byte(v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func setupDHT(ctx context.Context, t *testing.T, addr ma.Multiaddr) *IpfsDHT {
|
func setupDHT(ctx context.Context, t *testing.T, addr ma.Multiaddr) *IpfsDHT {
|
||||||
|
|
||||||
sk, pk, err := testutil.RandKeyPair(512)
|
sk, pk, err := testutil.RandKeyPair(512)
|
||||||
@ -174,37 +187,144 @@ func TestProvides(t *testing.T) {
|
|||||||
connect(t, ctx, dhts[1], dhts[2])
|
connect(t, ctx, dhts[1], dhts[2])
|
||||||
connect(t, ctx, dhts[1], dhts[3])
|
connect(t, ctx, dhts[1], dhts[3])
|
||||||
|
|
||||||
err := dhts[3].putLocal(u.Key("hello"), []byte("world"))
|
for k, v := range testCaseValues {
|
||||||
if err != nil {
|
t.Logf("adding local values for %s = %s", k, v)
|
||||||
t.Fatal(err)
|
err := dhts[3].putLocal(k, v)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
bits, err := dhts[3].getLocal(k)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if !bytes.Equal(bits, v) {
|
||||||
|
t.Fatal("didn't store the right bits (%s, %s)", k, v)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bits, err := dhts[3].getLocal(u.Key("hello"))
|
for k, _ := range testCaseValues {
|
||||||
if err != nil && bytes.Equal(bits, []byte("world")) {
|
t.Logf("announcing provider for %s", k)
|
||||||
t.Fatal(err)
|
if err := dhts[3].Provide(ctx, k); err != nil {
|
||||||
}
|
t.Fatal(err)
|
||||||
|
}
|
||||||
err = dhts[3].Provide(ctx, u.Key("hello"))
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// what is this timeout for? was 60ms before.
|
// what is this timeout for? was 60ms before.
|
||||||
time.Sleep(time.Millisecond * 6)
|
time.Sleep(time.Millisecond * 6)
|
||||||
|
|
||||||
ctxT, _ := context.WithTimeout(ctx, time.Second)
|
n := 0
|
||||||
provchan := dhts[0].FindProvidersAsync(ctxT, u.Key("hello"), 1)
|
for k, _ := range testCaseValues {
|
||||||
|
n = (n + 1) % 3
|
||||||
|
|
||||||
select {
|
t.Logf("getting providers for %s from %d", k, n)
|
||||||
case prov := <-provchan:
|
ctxT, _ := context.WithTimeout(ctx, time.Second)
|
||||||
if prov.ID == "" {
|
provchan := dhts[n].FindProvidersAsync(ctxT, k, 1)
|
||||||
t.Fatal("Got back nil provider")
|
|
||||||
|
select {
|
||||||
|
case prov := <-provchan:
|
||||||
|
if prov.ID == "" {
|
||||||
|
t.Fatal("Got back nil provider")
|
||||||
|
}
|
||||||
|
if prov.ID != dhts[3].self {
|
||||||
|
t.Fatal("Got back wrong provider")
|
||||||
|
}
|
||||||
|
case <-ctxT.Done():
|
||||||
|
t.Fatal("Did not get a provider back.")
|
||||||
}
|
}
|
||||||
if prov.ID != dhts[3].self {
|
}
|
||||||
t.Fatal("Got back nil provider")
|
}
|
||||||
|
|
||||||
|
func TestProvidesMany(t *testing.T) {
|
||||||
|
t.Skip("this test doesn't work")
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
nDHTs := 40
|
||||||
|
_, _, dhts := setupDHTS(ctx, nDHTs, t)
|
||||||
|
defer func() {
|
||||||
|
for i := 0; i < nDHTs; i++ {
|
||||||
|
dhts[i].Close()
|
||||||
|
defer dhts[i].network.Close()
|
||||||
}
|
}
|
||||||
case <-ctxT.Done():
|
}()
|
||||||
t.Fatal("Did not get a provider back.")
|
|
||||||
|
t.Logf("connecting %d dhts in a ring", nDHTs)
|
||||||
|
for i := 0; i < nDHTs; i++ {
|
||||||
|
connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
|
||||||
|
}
|
||||||
|
|
||||||
|
// t.Logf("bootstrapping them so they find each other", nDHTs)
|
||||||
|
// for _, dht := range dhts {
|
||||||
|
// bootstrap(t, ctx, dht)
|
||||||
|
// }
|
||||||
|
|
||||||
|
d := 0
|
||||||
|
for k, v := range testCaseValues {
|
||||||
|
d = (d + 1) % len(dhts)
|
||||||
|
dht := dhts[d]
|
||||||
|
|
||||||
|
t.Logf("adding local values for %s = %s (on %s)", k, v, dht.self)
|
||||||
|
err := dht.putLocal(k, v)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
bits, err := dht.getLocal(k)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if !bytes.Equal(bits, v) {
|
||||||
|
t.Fatal("didn't store the right bits (%s, %s)", k, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Logf("announcing provider for %s", k)
|
||||||
|
if err := dht.Provide(ctx, k); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// what is this timeout for? was 60ms before.
|
||||||
|
time.Sleep(time.Millisecond * 6)
|
||||||
|
|
||||||
|
errchan := make(chan error)
|
||||||
|
|
||||||
|
ctxT, _ := context.WithTimeout(ctx, 5*time.Second)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
getProvider := func(dht *IpfsDHT, k u.Key) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
provchan := dht.FindProvidersAsync(ctxT, k, 1)
|
||||||
|
select {
|
||||||
|
case prov := <-provchan:
|
||||||
|
if prov.ID == "" {
|
||||||
|
errchan <- fmt.Errorf("Got back nil provider (%s at %s)", k, dht.self)
|
||||||
|
} else if prov.ID != dhts[3].self {
|
||||||
|
errchan <- fmt.Errorf("Got back wrong provider (%s at %s)", k, dht.self)
|
||||||
|
}
|
||||||
|
case <-ctxT.Done():
|
||||||
|
errchan <- fmt.Errorf("Did not get a provider back (%s at %s)", k, dht.self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for k, _ := range testCaseValues {
|
||||||
|
// everyone should be able to find it...
|
||||||
|
for _, dht := range dhts {
|
||||||
|
t.Logf("getting providers for %s at %s", k, dht.self)
|
||||||
|
wg.Add(1)
|
||||||
|
go getProvider(dht, k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// we need this because of printing errors
|
||||||
|
go func() {
|
||||||
|
wg.Wait()
|
||||||
|
close(errchan)
|
||||||
|
}()
|
||||||
|
|
||||||
|
t.Logf("looking through errors")
|
||||||
|
for err := range errchan {
|
||||||
|
t.Error(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user