1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-09-10 11:52:21 +08:00

updated datastore for proper query handling

Queries now can be cancelled and the resources collected
This commit is contained in:
Juan Batiz-Benet
2015-01-10 14:31:40 -08:00
parent c0cc951118
commit 92e8a7bcd5
27 changed files with 791 additions and 207 deletions

2
Godeps/Godeps.json generated
View File

@ -110,7 +110,7 @@
},
{
"ImportPath": "github.com/jbenet/go-datastore",
"Rev": "8a8988d1a4e174274bd4a9dd55c4837f46fdf323"
"Rev": "35738aceb35505bd3c77c2a618fb1947ca3f72da"
},
{
"ImportPath": "github.com/jbenet/go-fuse-version",

View File

@ -18,6 +18,10 @@
"ImportPath": "github.com/hashicorp/golang-lru",
"Rev": "4dfff096c4973178c8f35cf6dd1a732a0a139370"
},
{
"ImportPath": "github.com/jbenet/goprocess",
"Rev": "b4b4178efcf2404ce9db72438c9c49db2fb399d8"
},
{
"ImportPath": "github.com/mattbaird/elastigo/api",
"Rev": "041b88c1fcf6489a5721ede24378ce1253b9159d"

View File

@ -3,7 +3,7 @@ package datastore
import (
"log"
query "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
// Here are some basic datastore implementations.
@ -50,13 +50,13 @@ func (d *MapDatastore) Delete(key Key) (err error) {
}
// Query implements Datastore.Query
func (d *MapDatastore) Query(q query.Query) (*query.Results, error) {
re := make([]query.Entry, 0, len(d.values))
func (d *MapDatastore) Query(q dsq.Query) (dsq.Results, error) {
re := make([]dsq.Entry, 0, len(d.values))
for k, v := range d.values {
re = append(re, query.Entry{Key: k.String(), Value: v})
re = append(re, dsq.Entry{Key: k.String(), Value: v})
}
r := query.ResultsWithEntries(q, re)
r = q.ApplyTo(r)
r := dsq.ResultsWithEntries(q, re)
r = dsq.NaiveQueryApply(q, r)
return r, nil
}
@ -91,8 +91,8 @@ func (d *NullDatastore) Delete(key Key) (err error) {
}
// Query implements Datastore.Query
func (d *NullDatastore) Query(q query.Query) (*query.Results, error) {
return query.ResultsWithEntries(q, nil), nil
func (d *NullDatastore) Query(q dsq.Query) (dsq.Results, error) {
return dsq.ResultsWithEntries(q, nil), nil
}
// LogDatastore logs all accesses through the datastore.
@ -147,7 +147,7 @@ func (d *LogDatastore) Delete(key Key) (err error) {
}
// Query implements Datastore.Query
func (d *LogDatastore) Query(q query.Query) (*query.Results, error) {
func (d *LogDatastore) Query(q dsq.Query) (dsq.Results, error) {
log.Printf("%s: Query\n", d.Name)
return d.child.Query(q)
}

View File

@ -66,7 +66,7 @@ type Datastore interface {
// result.Wait()
// result.AllEntries()
//
Query(q query.Query) (*query.Results, error)
Query(q query.Query) (query.Results, error)
}
// ThreadSafeDatastore is an interface that all threadsafe datastore should

View File

@ -113,7 +113,7 @@ func (d *Datastore) Delete(key ds.Key) (err error) {
return nil
}
func (d *Datastore) Query(query.Query) (*query.Results, error) {
func (d *Datastore) Query(query.Query) (query.Results, error) {
return nil, errors.New("Not yet implemented!")
}

View File

@ -83,9 +83,9 @@ func (d *Datastore) Delete(key ds.Key) (err error) {
}
// Query implements Datastore.Query
func (d *Datastore) Query(q query.Query) (*query.Results, error) {
func (d *Datastore) Query(q query.Query) (query.Results, error) {
entries := make(chan query.Entry)
results := make(chan query.Result)
walkFn := func(path string, info os.FileInfo, err error) error {
// remove ds path prefix
@ -98,17 +98,18 @@ func (d *Datastore) Query(q query.Query) (*query.Results, error) {
path = path[:len(path)-len(ObjectKeySuffix)]
}
key := ds.NewKey(path)
entries <- query.Entry{Key: key.String(), Value: query.NotFetched}
entry := query.Entry{Key: key.String(), Value: query.NotFetched}
results <- query.Result{Entry: entry}
}
return nil
}
go func() {
filepath.Walk(d.path, walkFn)
close(entries)
close(results)
}()
r := query.ResultsWithEntriesChan(q, entries)
r = q.ApplyTo(r)
r := query.ResultsWithChan(q, results)
r = query.NaiveQueryApply(q, r)
return r, nil
}

View File

@ -67,7 +67,10 @@ func (ks *DSSuite) TestBasic(c *C) {
"/foo/bar/bazb",
"/foo/bar/baz/barb",
}
all := r.AllEntries()
all, err := r.Rest()
if err != nil {
c.Fatal(err)
}
c.Check(len(all), Equals, len(expect))
for _, k := range expect {

View File

@ -52,24 +52,24 @@ func (d *ktds) Delete(key ds.Key) (err error) {
}
// Query implements Query, inverting keys on the way back out.
func (d *ktds) Query(q dsq.Query) (*dsq.Results, error) {
q2 := q
q2.Prefix = d.ConvertKey(ds.NewKey(q2.Prefix)).String()
r, err := d.child.Query(q2)
func (d *ktds) Query(q dsq.Query) (dsq.Results, error) {
qr, err := d.child.Query(q)
if err != nil {
return nil, err
}
ch := make(chan dsq.Entry)
ch := make(chan dsq.Result)
go func() {
for e := range r.Entries() {
e.Key = d.InvertKey(ds.NewKey(e.Key)).String()
ch <- e
defer close(ch)
defer qr.Close()
for r := range qr.Next() {
if r.Error == nil {
r.Entry.Key = d.InvertKey(ds.NewKey(r.Entry.Key)).String()
}
ch <- r
}
close(ch)
}()
r2 := dsq.ResultsWithEntriesChan(q, ch)
return r2, nil
return dsq.DerivedResults(qr, ch), nil
}

View File

@ -62,13 +62,18 @@ func (ks *DSSuite) TestBasic(c *C) {
c.Check(bytes.Equal(v2.([]byte), []byte(k.String())), Equals, true)
}
listAr, errA := mpds.Query(dsq.Query{})
listBr, errB := ktds.Query(dsq.Query{})
c.Check(errA, Equals, nil)
c.Check(errB, Equals, nil)
run := func(d ds.Datastore, q dsq.Query) []ds.Key {
r, err := d.Query(q)
c.Check(err, Equals, nil)
listA := ds.EntryKeys(listAr.AllEntries())
listB := ds.EntryKeys(listBr.AllEntries())
e, err := r.Rest()
c.Check(err, Equals, nil)
return ds.EntryKeys(e)
}
listA := run(mpds, dsq.Query{})
listB := run(ktds, dsq.Query{})
c.Check(len(listA), Equals, len(listB))
// sort them cause yeah.

View File

@ -3,12 +3,12 @@ package leveldb
import (
"io"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
type Datastore interface {
@ -72,54 +72,80 @@ func (d *datastore) Delete(key ds.Key) (err error) {
return err
}
func (d *datastore) Query(q dsq.Query) (*dsq.Results, error) {
func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
// we can use multiple iterators concurrently. see:
// https://godoc.org/github.com/syndtr/goleveldb/leveldb#DB.NewIterator
// advance the iterator only if the reader reads
//
// run query in own sub-process tied to Results.Process(), so that
// it waits for us to finish AND so that clients can signal to us
// that resources should be reclaimed.
qrb := dsq.NewResultBuilder(q)
qrb.Process.Go(func(worker goprocess.Process) {
d.runQuery(worker, qrb)
})
// go wait on the worker (without signaling close)
go qrb.Process.CloseAfterChildren()
// Now, apply remaining things (filters, order)
qr := qrb.Results()
for _, f := range q.Filters {
qr = dsq.NaiveFilter(qr, f)
}
for _, o := range q.Orders {
qr = dsq.NaiveOrder(qr, o)
}
return qr, nil
}
func (d *datastore) runQuery(worker goprocess.Process, qrb *dsq.ResultBuilder) {
var rnge *util.Range
if q.Prefix != "" {
rnge = util.BytesPrefix([]byte(q.Prefix))
if qrb.Query.Prefix != "" {
rnge = util.BytesPrefix([]byte(qrb.Query.Prefix))
}
i := d.DB.NewIterator(rnge, nil)
defer i.Release()
// offset
if q.Offset > 0 {
for j := 0; j < q.Offset; j++ {
// advance iterator for offset
if qrb.Query.Offset > 0 {
for j := 0; j < qrb.Query.Offset; j++ {
i.Next()
}
}
var es []dsq.Entry
for i.Next() {
// limit
if q.Limit > 0 && len(es) >= q.Limit {
// iterate, and handle limit, too
for sent := 0; i.Next(); sent++ {
// end early if we hit the limit
if qrb.Query.Limit > 0 && sent >= qrb.Query.Limit {
break
}
k := ds.NewKey(string(i.Key())).String()
e := dsq.Entry{Key: k}
if !q.KeysOnly {
if !qrb.Query.KeysOnly {
buf := make([]byte, len(i.Value()))
copy(buf, i.Value())
e.Value = buf
}
es = append(es, e)
select {
case qrb.Output <- dsq.Result{Entry: e}: // we sent it out
case <-worker.Closing(): // client told us to end early.
break
}
i.Release()
if err := i.Error(); err != nil {
return nil, err
}
// Now, apply remaining pieces.
q2 := q
q2.Offset = 0 // already applied
q2.Limit = 0 // already applied
// TODO: make this async with:
// qr := dsq.ResultsWithEntriesChan(q, ch)
qr := dsq.ResultsWithEntries(q, es)
qr = q2.ApplyTo(qr)
qr.Query = q // set it back
return qr, nil
if err := i.Error(); err != nil {
select {
case qrb.Output <- dsq.Result{Error: err}: // client read our error
case <-worker.Closing(): // client told us to end.
return
}
}
}
// LevelDB needs to be closed.

View File

@ -20,21 +20,28 @@ var testcases = map[string]string{
"/f": "f",
}
func TestQuery(t *testing.T) {
// returns datastore, and a function to call on exit.
// (this garbage collects). So:
//
// d, close := newDS(t)
// defer close()
func newDS(t *testing.T) (Datastore, func()) {
path, err := ioutil.TempDir("/tmp", "testing_leveldb_")
if err != nil {
t.Fatal(err)
}
defer func() {
os.RemoveAll(path)
}()
d, err := NewDatastore(path, nil)
if err != nil {
t.Fatal(err)
}
defer d.Close()
return d, func() {
os.RemoveAll(path)
d.Close()
}
}
func addTestCases(t *testing.T, d Datastore, testcases map[string]string) {
for k, v := range testcases {
dsk := ds.NewKey(k)
if err := d.Put(dsk, []byte(v)); err != nil {
@ -54,6 +61,13 @@ func TestQuery(t *testing.T) {
}
}
}
func TestQuery(t *testing.T) {
d, close := newDS(t)
defer close()
addTestCases(t, d, testcases)
rs, err := d.Query(dsq.Query{Prefix: "/a/"})
if err != nil {
t.Fatal(err)
@ -65,7 +79,7 @@ func TestQuery(t *testing.T) {
"/a/b/d",
"/a/c",
"/a/d",
}, rs.AllEntries())
}, rs)
// test offset and limit
@ -77,11 +91,22 @@ func TestQuery(t *testing.T) {
expectMatches(t, []string{
"/a/b/d",
"/a/c",
}, rs.AllEntries())
}, rs)
}
func expectMatches(t *testing.T, expect []string, actual []dsq.Entry) {
func TestQueryRespectsProcess(t *testing.T) {
d, close := newDS(t)
defer close()
addTestCases(t, d, testcases)
}
func expectMatches(t *testing.T, expect []string, actualR dsq.Results) {
actual, err := actualR.Rest()
if err != nil {
t.Error(err)
}
if len(actual) != len(expect) {
t.Error("not enough", expect, actual)
}

View File

@ -51,6 +51,6 @@ func (d *Datastore) Delete(key ds.Key) (err error) {
}
// KeyList returns a list of keys in the datastore
func (d *Datastore) Query(q dsq.Query) (*dsq.Results, error) {
func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
return nil, errors.New("KeyList not implemented.")
}

View File

@ -6,6 +6,7 @@ import (
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
ktds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
// PrefixTransform constructs a KeyTransform with a pair of functions that
@ -40,5 +41,43 @@ func Wrap(child ds.Datastore, prefix ds.Key) ktds.Datastore {
panic("child (ds.Datastore) is nil")
}
return ktds.Wrap(child, PrefixTransform(prefix))
d := ktds.Wrap(child, PrefixTransform(prefix))
return &datastore{Datastore: d, raw: child, prefix: prefix}
}
type datastore struct {
prefix ds.Key
raw ds.Datastore
ktds.Datastore
}
// Query implements Query, inverting keys on the way back out.
func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
qr, err := d.raw.Query(q)
if err != nil {
return nil, err
}
ch := make(chan dsq.Result)
go func() {
defer close(ch)
defer qr.Close()
for r := range qr.Next() {
if r.Error != nil {
ch <- r
continue
}
k := ds.NewKey(r.Entry.Key)
if !d.prefix.IsAncestorOf(k) {
continue
}
r.Entry.Key = d.Datastore.InvertKey(k).String()
ch <- r
}
}()
return dsq.DerivedResults(qr, ch), nil
}

View File

@ -48,13 +48,18 @@ func (ks *DSSuite) TestBasic(c *C) {
c.Check(bytes.Equal(v2.([]byte), []byte(k.String())), Equals, true)
}
listAr, errA := mpds.Query(dsq.Query{})
listBr, errB := nsds.Query(dsq.Query{})
c.Check(errA, Equals, nil)
c.Check(errB, Equals, nil)
run := func(d ds.Datastore, q dsq.Query) []ds.Key {
r, err := d.Query(q)
c.Check(err, Equals, nil)
listA := ds.EntryKeys(listAr.AllEntries())
listB := ds.EntryKeys(listBr.AllEntries())
e, err := r.Rest()
c.Check(err, Equals, nil)
return ds.EntryKeys(e)
}
listA := run(mpds, dsq.Query{})
listB := run(nsds, dsq.Query{})
c.Check(len(listA), Equals, len(listB))
// sort them cause yeah.

View File

@ -58,7 +58,7 @@ func (d *datastore) Delete(key ds.Key) error {
return nil
}
func (d *datastore) Query(q dsq.Query) (*dsq.Results, error) {
func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
r, err := d.child.Query(q)
if err != nil {
fmt.Fprintf(os.Stdout, "panic datastore: %s", err)

View File

@ -5,15 +5,6 @@ import (
"testing"
)
var sampleKeys = []string{
"/ab/c",
"/ab/cd",
"/a",
"/abce",
"/abcf",
"/ab",
}
type filterTestCase struct {
filter Filter
keys []string
@ -28,7 +19,10 @@ func testKeyFilter(t *testing.T, f Filter, keys []string, expect []string) {
res := ResultsWithEntries(Query{}, e)
res = NaiveFilter(res, f)
actualE := res.AllEntries()
actualE, err := res.Rest()
if err != nil {
t.Fatal(err)
}
actual := make([]string, len(actualE))
for i, e := range actualE {
actual[i] = e.Key

View File

@ -19,7 +19,11 @@ func testKeyOrder(t *testing.T, f Order, keys []string, expect []string) {
res := ResultsWithEntries(Query{}, e)
res = NaiveOrder(res, f)
actualE := res.AllEntries()
actualE, err := res.Rest()
if err != nil {
t.Fatal(err)
}
actual := make([]string, len(actualE))
for i, e := range actualE {
actual[i] = e.Key

View File

@ -1,5 +1,9 @@
package query
import (
goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
)
/*
Query represents storage for any key-value pair.
@ -64,7 +68,7 @@ type Query struct {
// of an Entry has been fetched or not. This is needed because
// datastore implementations get to decide whether Query returns values
// or only keys. nil is not a good signal, as real values may be nil.
var NotFetched = struct{}{}
const NotFetched int = iota
// Entry is a query result entry.
type Entry struct {
@ -72,74 +76,175 @@ type Entry struct {
Value interface{}
}
// Results is a set of Query results
type Results struct {
Query Query // the query these Results correspond to
// Result is a special entry that includes an error, so that the client
// may be warned about internal errors.
type Result struct {
Entry
done chan struct{}
res chan Entry
all []Entry
Error error
}
// ResultsWithEntriesChan returns a Results object from a
// channel of ResultEntries. It's merely an encapsulation
// that provides for AllEntries() functionality.
func ResultsWithEntriesChan(q Query, res <-chan Entry) *Results {
r := &Results{
Query: q,
done: make(chan struct{}),
res: make(chan Entry),
all: []Entry{},
// Results is a set of Query results. This is the interface for clients.
// Example:
//
// qr, _ := myds.Query(q)
// for r := range qr.Next() {
// if r.Error != nil {
// // handle.
// break
// }
//
// fmt.Println(r.Entry.Key, r.Entry.Value)
// }
//
// or, wait on all results at once:
//
// qr, _ := myds.Query(q)
// es, _ := qr.Rest()
// for _, e := range es {
// fmt.Println(e.Key, e.Value)
// }
//
type Results interface {
Query() Query // the query these Results correspond to
Next() <-chan Result // returns a channel to wait for the next result
Rest() ([]Entry, error) // waits till processing finishes, returns all entries at once.
Close() error // client may call Close to signal early exit
// Process returns a goprocess.Process associated with these results.
// most users will not need this function (Close is all they want),
// but it's here in case you want to connect the results to other
// goprocess-friendly things.
Process() goprocess.Process
}
// go consume all the results and add them to the results.
go func() {
for e := range res {
r.all = append(r.all, e)
r.res <- e
}
close(r.res)
close(r.done)
}()
return r
// results implements Results
type results struct {
query Query
proc goprocess.Process
res <-chan Result
}
// ResultsWithEntries returns a Results object from a
// channel of ResultEntries. It's merely an encapsulation
// that provides for AllEntries() functionality.
func ResultsWithEntries(q Query, res []Entry) *Results {
r := &Results{
Query: q,
done: make(chan struct{}),
res: make(chan Entry),
all: res,
}
// go add all the results
go func() {
for _, e := range res {
r.res <- e
}
close(r.res)
close(r.done)
}()
return r
}
// Entries() returns results through a channel.
// Results may arrive at any time.
// The channel may or may not be buffered.
// The channel may or may not rate limit the query processing.
func (r *Results) Entries() <-chan Entry {
func (r *results) Next() <-chan Result {
return r.res
}
// AllEntries returns all the entries in Results.
// It blocks until all the results have come in.
func (r *Results) AllEntries() []Entry {
func (r *results) Rest() ([]Entry, error) {
var es []Entry
for e := range r.res {
_ = e
if e.Error != nil {
return es, e.Error
}
es = append(es, e.Entry)
}
<-r.proc.Closed() // wait till the processing finishes.
return es, nil
}
func (r *results) Process() goprocess.Process {
return r.proc
}
func (r *results) Close() error {
return r.proc.Close()
}
func (r *results) Query() Query {
return r.query
}
// ResultBuilder is what implementors use to construct results
// Implementors of datastores and their clients must respect the
// Process of the Request:
//
// * clients must call r.Process().Close() on an early exit, so
// implementations can reclaim resources.
// * if the Entries are read to completion (channel closed), Process
// should be closed automatically.
// * datastores must respect <-Process.Closing(), which intermediates
// an early close signal from the client.
//
type ResultBuilder struct {
Query Query
Process goprocess.Process
Output chan Result
}
// Results returns a Results to to this builder.
func (rb *ResultBuilder) Results() Results {
return &results{
query: rb.Query,
proc: rb.Process,
res: rb.Output,
}
}
func NewResultBuilder(q Query) *ResultBuilder {
b := &ResultBuilder{
Query: q,
Output: make(chan Result),
}
b.Process = goprocess.WithTeardown(func() error {
close(b.Output)
return nil
})
return b
}
// ResultsWithChan returns a Results object from a channel
// of Result entries. Respects its own Close()
func ResultsWithChan(q Query, res <-chan Result) Results {
b := NewResultBuilder(q)
// go consume all the entries and add them to the results.
b.Process.Go(func(worker goprocess.Process) {
for {
select {
case <-worker.Closing(): // client told us to close early
return
case e, more := <-res:
if !more {
return
}
select {
case b.Output <- e:
case <-worker.Closing(): // client told us to close early
return
}
}
}
return
})
go b.Process.CloseAfterChildren()
return b.Results()
}
// ResultsWithEntries returns a Results object from a list of entries
func ResultsWithEntries(q Query, res []Entry) Results {
b := NewResultBuilder(q)
// go consume all the entries and add them to the results.
b.Process.Go(func(worker goprocess.Process) {
for _, e := range res {
select {
case b.Output <- Result{Entry: e}:
case <-worker.Closing(): // client told us to close early
return
}
}
return
})
go b.Process.CloseAfterChildren()
return b.Results()
}
func ResultsReplaceQuery(r Results, q Query) Results {
return &results{
query: q,
proc: r.Process(),
res: r.Next(),
}
<-r.done
return r.all
}

View File

@ -1,63 +1,105 @@
package query
// NaiveFilter applies a filter to the results
func NaiveFilter(qr *Results, filter Filter) *Results {
ch := make(chan Entry)
func DerivedResults(qr Results, ch <-chan Result) Results {
return &results{
query: qr.Query(),
proc: qr.Process(),
res: ch,
}
}
// NaiveFilter applies a filter to the results.
func NaiveFilter(qr Results, filter Filter) Results {
ch := make(chan Result)
go func() {
defer close(ch)
defer qr.Close()
for e := range qr.Entries() {
if filter.Filter(e) {
for e := range qr.Next() {
if e.Error != nil || filter.Filter(e.Entry) {
ch <- e
}
}
}()
return ResultsWithEntriesChan(qr.Query, ch)
return DerivedResults(qr, ch)
}
// NaiveLimit truncates the results to a given int limit
func NaiveLimit(qr *Results, limit int) *Results {
ch := make(chan Entry)
func NaiveLimit(qr Results, limit int) Results {
ch := make(chan Result)
go func() {
defer close(ch)
defer qr.Close()
for l := 0; l < limit; l++ {
e, more := <-qr.Entries()
if !more {
return
l := 0
for e := range qr.Next() {
if e.Error != nil {
ch <- e
continue
}
ch <- e
l++
if limit > 0 && l >= limit {
break
}
}
}()
return ResultsWithEntriesChan(qr.Query, ch)
return DerivedResults(qr, ch)
}
// NaiveOffset skips a given number of results
func NaiveOffset(qr *Results, offset int) *Results {
ch := make(chan Entry)
func NaiveOffset(qr Results, offset int) Results {
ch := make(chan Result)
go func() {
defer close(ch)
defer qr.Close()
for l := 0; l < offset; l++ {
<-qr.Entries() // discard
sent := 0
for e := range qr.Next() {
if e.Error != nil {
ch <- e
}
for e := range qr.Entries() {
if sent < offset {
sent++
continue
}
ch <- e
}
}()
return ResultsWithEntriesChan(qr.Query, ch)
return DerivedResults(qr, ch)
}
// NaiveOrder reorders results according to given Order.
// WARNING: this is the only non-stream friendly operation!
func NaiveOrder(qr *Results, o Order) *Results {
e := qr.AllEntries()
o.Sort(e)
return ResultsWithEntries(qr.Query, e)
func NaiveOrder(qr Results, o Order) Results {
ch := make(chan Result)
var entries []Entry
go func() {
defer close(ch)
defer qr.Close()
for e := range qr.Next() {
if e.Error != nil {
ch <- e
}
func (q Query) ApplyTo(qr *Results) *Results {
entries = append(entries, e.Entry)
}
o.Sort(entries)
for _, e := range entries {
ch <- Result{Entry: e}
}
}()
return DerivedResults(qr, ch)
}
func NaiveQueryApply(q Query, qr Results) Results {
if q.Prefix != "" {
qr = NaiveFilter(qr, FilterKeyPrefix{q.Prefix})
}

View File

@ -0,0 +1,109 @@
package query
import (
"strings"
"testing"
)
var sampleKeys = []string{
"/ab/c",
"/ab/cd",
"/a",
"/abce",
"/abcf",
"/ab",
}
type testCase struct {
keys []string
expect []string
}
func testResults(t *testing.T, res Results, expect []string) {
actualE, err := res.Rest()
if err != nil {
t.Fatal(err)
}
actual := make([]string, len(actualE))
for i, e := range actualE {
actual[i] = e.Key
}
if len(actual) != len(expect) {
t.Error("expect != actual.", expect, actual)
}
if strings.Join(actual, "") != strings.Join(expect, "") {
t.Error("expect != actual.", expect, actual)
}
}
func TestLimit(t *testing.T) {
testKeyLimit := func(t *testing.T, limit int, keys []string, expect []string) {
e := make([]Entry, len(keys))
for i, k := range keys {
e[i] = Entry{Key: k}
}
res := ResultsWithEntries(Query{}, e)
res = NaiveLimit(res, limit)
testResults(t, res, expect)
}
testKeyLimit(t, 0, sampleKeys, []string{ // none
"/ab/c",
"/ab/cd",
"/a",
"/abce",
"/abcf",
"/ab",
})
testKeyLimit(t, 10, sampleKeys, []string{ // large
"/ab/c",
"/ab/cd",
"/a",
"/abce",
"/abcf",
"/ab",
})
testKeyLimit(t, 2, sampleKeys, []string{
"/ab/c",
"/ab/cd",
})
}
func TestOffset(t *testing.T) {
testOffset := func(t *testing.T, offset int, keys []string, expect []string) {
e := make([]Entry, len(keys))
for i, k := range keys {
e[i] = Entry{Key: k}
}
res := ResultsWithEntries(Query{}, e)
res = NaiveOffset(res, offset)
testResults(t, res, expect)
}
testOffset(t, 0, sampleKeys, []string{ // none
"/ab/c",
"/ab/cd",
"/a",
"/abce",
"/abcf",
"/ab",
})
testOffset(t, 10, sampleKeys, []string{ // large
})
testOffset(t, 2, sampleKeys, []string{
"/a",
"/abce",
"/abcf",
"/ab",
})
}

View File

@ -58,7 +58,7 @@ func (d *MutexDatastore) Delete(key ds.Key) (err error) {
}
// KeyList implements Datastore.KeyList
func (d *MutexDatastore) Query(q dsq.Query) (*dsq.Results, error) {
func (d *MutexDatastore) Query(q dsq.Query) (dsq.Results, error) {
d.RLock()
defer d.RUnlock()
return d.child.Query(q)

View File

@ -5,6 +5,7 @@ package blockstore
import (
"errors"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dsns "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
@ -12,8 +13,11 @@ import (
blocks "github.com/jbenet/go-ipfs/blocks"
u "github.com/jbenet/go-ipfs/util"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
)
var log = eventlog.Logger("blockstore")
// BlockPrefix namespaces blockstore datastores
var BlockPrefix = ds.NewKey("blocks")
@ -27,7 +31,9 @@ type Blockstore interface {
Has(u.Key) (bool, error)
Get(u.Key) (*blocks.Block, error)
Put(*blocks.Block) error
AllKeys(offset int, limit int) ([]u.Key, error)
AllKeys(ctx context.Context, offset int, limit int) ([]u.Key, error)
AllKeysChan(ctx context.Context, offset int, limit int) (<-chan u.Key, error)
}
func NewBlockstore(d ds.ThreadSafeDatastore) Blockstore {
@ -80,10 +86,29 @@ func (s *blockstore) DeleteBlock(k u.Key) error {
// AllKeys runs a query for keys from the blockstore.
// this is very simplistic, in the future, take dsq.Query as a param?
// if offset and limit are 0, they are ignored.
func (bs *blockstore) AllKeys(offset int, limit int) ([]u.Key, error) {
var keys []u.Key
//
// AllKeys respects context
func (bs *blockstore) AllKeys(ctx context.Context, offset int, limit int) ([]u.Key, error) {
ch, err := bs.AllKeysChan(ctx, offset, limit)
if err != nil {
return nil, err
}
var keys []u.Key
for k := range ch {
keys = append(keys, k)
}
return keys, nil
}
// AllKeys runs a query for keys from the blockstore.
// this is very simplistic, in the future, take dsq.Query as a param?
// if offset and limit are 0, they are ignored.
//
// AllKeys respects context
func (bs *blockstore) AllKeysChan(ctx context.Context, offset int, limit int) (<-chan u.Key, error) {
// TODO make async inside ds/leveldb.Query
// KeysOnly, because that would be _a lot_ of data.
q := dsq.Query{KeysOnly: true, Offset: offset, Limit: limit}
res, err := bs.datastore.Query(q)
@ -91,10 +116,46 @@ func (bs *blockstore) AllKeys(offset int, limit int) ([]u.Key, error) {
return nil, err
}
for e := range res.Entries() {
// this function is here to compartmentalize
get := func() (k u.Key, ok bool) {
select {
case <-ctx.Done():
return k, false
case e, more := <-res.Next():
if !more {
return k, false
}
if e.Error != nil {
log.Debug("blockstore.AllKeysChan got err:", e.Error)
return k, false
}
// need to convert to u.Key using u.KeyFromDsKey.
k := u.KeyFromDsKey(ds.NewKey(e.Key))
keys = append(keys, k)
k = u.KeyFromDsKey(ds.NewKey(e.Key))
return k, true
}
return keys, nil
}
output := make(chan u.Key)
go func() {
defer func() {
res.Process().Close() // ensure exit (signals early exit, too)
close(output)
}()
for {
k, ok := get()
if !ok {
return
}
select {
case <-ctx.Done():
return
case output <- k:
}
}
}()
return output, nil
}

View File

@ -5,8 +5,11 @@ import (
"fmt"
"testing"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
blocks "github.com/jbenet/go-ipfs/blocks"
u "github.com/jbenet/go-ipfs/util"
)
@ -42,9 +45,11 @@ func TestPutThenGetBlock(t *testing.T) {
}
}
func TestAllKeys(t *testing.T) {
bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
N := 100
func newBlockStoreWithKeys(t *testing.T, d ds.Datastore, N int) (Blockstore, []u.Key) {
if d == nil {
d = ds.NewMapDatastore()
}
bs := NewBlockstore(ds_sync.MutexWrap(d))
keys := make([]u.Key, N)
for i := 0; i < N; i++ {
@ -55,8 +60,14 @@ func TestAllKeys(t *testing.T) {
}
keys[i] = block.Key()
}
return bs, keys
}
keys2, err := bs.AllKeys(0, 0)
func TestAllKeysSimple(t *testing.T) {
bs, keys := newBlockStoreWithKeys(t, nil, 100)
ctx := context.Background()
keys2, err := bs.AllKeys(ctx, 0, 0)
if err != nil {
t.Fatal(err)
}
@ -65,8 +76,14 @@ func TestAllKeys(t *testing.T) {
// }
expectMatches(t, keys, keys2)
}
keys3, err := bs.AllKeys(N/3, N/3)
func TestAllKeysOffsetAndLimit(t *testing.T) {
N := 30
bs, _ := newBlockStoreWithKeys(t, nil, N)
ctx := context.Background()
keys3, err := bs.AllKeys(ctx, N/3, N/3)
if err != nil {
t.Fatal(err)
}
@ -76,6 +93,114 @@ func TestAllKeys(t *testing.T) {
if len(keys3) != N/3 {
t.Errorf("keys3 should be: %d != %d", N/3, len(keys3))
}
}
func TestAllKeysRespectsContext(t *testing.T) {
N := 100
d := &queryTestDS{ds: ds.NewMapDatastore()}
bs, _ := newBlockStoreWithKeys(t, d, N)
started := make(chan struct{}, 1)
done := make(chan struct{}, 1)
errors := make(chan error, 100)
getKeys := func(ctx context.Context) {
started <- struct{}{}
_, err := bs.AllKeys(ctx, 0, 0) // once without cancelling
if err != nil {
errors <- err
}
done <- struct{}{}
errors <- nil // a nil one to signal break
}
// Once without context, to make sure it all works
{
var results dsq.Results
resultChan := make(chan dsq.Result)
d.SetFunc(func(q dsq.Query) (dsq.Results, error) {
results = dsq.ResultsWithChan(q, resultChan)
return results, nil
})
go getKeys(context.Background())
// make sure it's waiting.
<-started
select {
case <-done:
t.Fatal("sync is wrong")
case <-results.Process().Closing():
t.Fatal("should not be closing")
case <-results.Process().Closed():
t.Fatal("should not be closed")
default:
}
e := dsq.Entry{Key: BlockPrefix.ChildString("foo").String()}
resultChan <- dsq.Result{Entry: e} // let it go.
close(resultChan)
<-done // should be done now.
<-results.Process().Closed() // should be closed now
// print any errors
for err := range errors {
if err == nil {
break
}
t.Error(err)
}
}
// Once with
{
var results dsq.Results
resultChan := make(chan dsq.Result)
d.SetFunc(func(q dsq.Query) (dsq.Results, error) {
results = dsq.ResultsWithChan(q, resultChan)
return results, nil
})
ctx, cancel := context.WithCancel(context.Background())
go getKeys(ctx)
// make sure it's waiting.
<-started
select {
case <-done:
t.Fatal("sync is wrong")
case <-results.Process().Closing():
t.Fatal("should not be closing")
case <-results.Process().Closed():
t.Fatal("should not be closed")
default:
}
cancel() // let it go.
select {
case <-done:
t.Fatal("sync is wrong")
case <-results.Process().Closed():
t.Fatal("should not be closed") // should not be closed yet.
case <-results.Process().Closing():
// should be closing now!
t.Log("closing correctly at this point.")
}
close(resultChan)
<-done // should be done now.
<-results.Process().Closed() // should be closed now
// print any errors
for err := range errors {
if err == nil {
break
}
t.Error(err)
}
}
}
@ -111,3 +236,33 @@ func expectMatches(t *testing.T, expect, actual []u.Key) {
}
}
}
type queryTestDS struct {
cb func(q dsq.Query) (dsq.Results, error)
ds ds.Datastore
}
func (c *queryTestDS) SetFunc(f func(dsq.Query) (dsq.Results, error)) { c.cb = f }
func (c *queryTestDS) Put(key ds.Key, value interface{}) (err error) {
return c.ds.Put(key, value)
}
func (c *queryTestDS) Get(key ds.Key) (value interface{}, err error) {
return c.ds.Get(key)
}
func (c *queryTestDS) Has(key ds.Key) (exists bool, err error) {
return c.ds.Has(key)
}
func (c *queryTestDS) Delete(key ds.Key) (err error) {
return c.ds.Delete(key)
}
func (c *queryTestDS) Query(q dsq.Query) (dsq.Results, error) {
if c.cb != nil {
return c.cb(q)
}
return c.ds.Query(q)
}

View File

@ -1,7 +1,9 @@
package blockstore
import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/hashicorp/golang-lru"
"github.com/jbenet/go-ipfs/blocks"
u "github.com/jbenet/go-ipfs/util"
)
@ -44,6 +46,10 @@ func (w *writecache) Put(b *blocks.Block) error {
return w.blockstore.Put(b)
}
func (w *writecache) AllKeys(offset int, limit int) ([]u.Key, error) {
return w.blockstore.AllKeys(offset, limit)
func (w *writecache) AllKeys(ctx context.Context, offset int, limit int) ([]u.Key, error) {
return w.blockstore.AllKeys(ctx, offset, limit)
}
func (w *writecache) AllKeysChan(ctx context.Context, offset int, limit int) (<-chan u.Key, error) {
return w.blockstore.AllKeysChan(ctx, offset, limit)
}

View File

@ -84,7 +84,7 @@ func (c *callbackDatastore) Delete(key ds.Key) (err error) {
return c.ds.Delete(key)
}
func (c *callbackDatastore) Query(q dsq.Query) (*dsq.Results, error) {
func (c *callbackDatastore) Query(q dsq.Query) (dsq.Results, error) {
c.f()
return c.ds.Query(q)
}

View File

@ -128,7 +128,7 @@ Displays the hashes of all local objects.
}
// todo: make async
allKeys, err := n.Blockstore.AllKeys(0, 0)
allKeys, err := n.Blockstore.AllKeys(context.TODO(), 0, 0)
if err != nil {
return nil, err
}

View File

@ -36,7 +36,7 @@ func (dds *delayed) Delete(key ds.Key) (err error) {
return dds.ds.Delete(key)
}
func (dds *delayed) Query(q dsq.Query) (*dsq.Results, error) {
func (dds *delayed) Query(q dsq.Query) (dsq.Results, error) {
dds.delay.Wait()
return dds.ds.Query(q)
}