1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-29 01:12:24 +08:00

updated datastore (Query)

This commit is contained in:
Juan Batiz-Benet
2015-01-09 16:37:20 -08:00
parent dac2e449a2
commit f9ca67ef04
25 changed files with 837 additions and 66 deletions

2
Godeps/Godeps.json generated
View File

@ -110,7 +110,7 @@
},
{
"ImportPath": "github.com/jbenet/go-datastore",
"Rev": "6a1c83bda2a71a9bdc936749fdb507df958ed949"
"Rev": "8a8988d1a4e174274bd4a9dd55c4837f46fdf323"
},
{
"ImportPath": "github.com/jbenet/go-fuse-version",

View File

@ -0,0 +1,11 @@
language: go
go:
- 1.3
- release
- tip
script:
- make test
env: TEST_NO_FUSE=1 TEST_VERBOSE=1

View File

@ -1,6 +1,9 @@
build:
go build
test:
go test ./...
# saves/vendors third-party dependencies to Godeps/_workspace
# -r flag rewrites import paths to use the vendored path
# ./... performs operation on all packages in tree

View File

@ -1,6 +1,10 @@
package datastore
import "log"
import (
"log"
query "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
// Here are some basic datastore implementations.
@ -45,13 +49,15 @@ func (d *MapDatastore) Delete(key Key) (err error) {
return nil
}
// KeyList implements Datastore.KeyList
func (d *MapDatastore) KeyList() ([]Key, error) {
var keys []Key
for k := range d.values {
keys = append(keys, k)
// Query implements Datastore.Query
func (d *MapDatastore) Query(q query.Query) (*query.Results, error) {
re := make([]query.Entry, 0, len(d.values))
for k, v := range d.values {
re = append(re, query.Entry{Key: k.String(), Value: v})
}
return keys, nil
r := query.ResultsWithEntries(q, re)
r = q.ApplyTo(r)
return r, nil
}
// NullDatastore stores nothing, but conforms to the API.
@ -84,9 +90,9 @@ func (d *NullDatastore) Delete(key Key) (err error) {
return nil
}
// KeyList implements Datastore.KeyList
func (d *NullDatastore) KeyList() ([]Key, error) {
return nil, nil
// Query implements Datastore.Query
func (d *NullDatastore) Query(q query.Query) (*query.Results, error) {
return query.ResultsWithEntries(q, nil), nil
}
// LogDatastore logs all accesses through the datastore.
@ -140,8 +146,8 @@ func (d *LogDatastore) Delete(key Key) (err error) {
return d.child.Delete(key)
}
// KeyList implements Datastore.KeyList
func (d *LogDatastore) KeyList() ([]Key, error) {
log.Printf("%s: Get KeyList\n", d.Name)
return d.child.KeyList()
// Query implements Datastore.Query
func (d *LogDatastore) Query(q query.Query) (*query.Results, error) {
log.Printf("%s: Query\n", d.Name)
return d.child.Query(q)
}

View File

@ -2,6 +2,8 @@ package datastore
import (
"errors"
query "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
/*
@ -52,8 +54,19 @@ type Datastore interface {
// Delete removes the value for given `key`.
Delete(key Key) (err error)
// KeyList returns a list of keys in the datastore
KeyList() ([]Key, error)
// Query searches the datastore and returns a query result. This function
// may return before the query actually runs. To wait for the query:
//
// result, _ := ds.Query(q)
//
// // use the channel interface; result may come in at different times
// for entry := range result.Entries() { ... }
//
// // or wait for the query to be completely done
// result.Wait()
// result.AllEntries()
//
Query(q query.Query) (*query.Results, error)
}
// ThreadSafeDatastore is an interface that all threadsafe datastore should

View File

@ -8,6 +8,7 @@ import (
"github.com/codahale/blake2"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
query "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
"github.com/mattbaird/elastigo/api"
"github.com/mattbaird/elastigo/core"
)
@ -112,7 +113,7 @@ func (d *Datastore) Delete(key ds.Key) (err error) {
return nil
}
func (d *Datastore) KeyList() ([]ds.Key, error) {
func (d *Datastore) Query(query.Query) (*query.Results, error) {
return nil, errors.New("Not yet implemented!")
}

View File

@ -8,8 +8,11 @@ import (
"strings"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
query "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
var ObjectKeySuffix = ".dsobject"
// Datastore uses a standard Go map for internal storage.
type Datastore struct {
path string
@ -26,7 +29,7 @@ func NewDatastore(path string) (ds.Datastore, error) {
// KeyFilename returns the filename associated with `key`
func (d *Datastore) KeyFilename(key ds.Key) string {
return filepath.Join(d.path, key.String(), ".dsobject")
return filepath.Join(d.path, key.String(), ObjectKeySuffix)
}
// Put stores the given value.
@ -79,10 +82,10 @@ func (d *Datastore) Delete(key ds.Key) (err error) {
return os.Remove(fn)
}
// KeyList returns a list of all keys in the datastore
func (d *Datastore) KeyList() ([]ds.Key, error) {
// Query implements Datastore.Query
func (d *Datastore) Query(q query.Query) (*query.Results, error) {
keys := []ds.Key{}
entries := make(chan query.Entry)
walkFn := func(path string, info os.FileInfo, err error) error {
// remove ds path prefix
@ -91,14 +94,22 @@ func (d *Datastore) KeyList() ([]ds.Key, error) {
}
if !info.IsDir() {
if strings.HasSuffix(path, ObjectKeySuffix) {
path = path[:len(path)-len(ObjectKeySuffix)]
}
key := ds.NewKey(path)
keys = append(keys, key)
entries <- query.Entry{Key: key.String(), Value: query.NotFetched}
}
return nil
}
filepath.Walk(d.path, walkFn)
return keys, nil
go func() {
filepath.Walk(d.path, walkFn)
close(entries)
}()
r := query.ResultsWithEntriesChan(q, entries)
r = q.ApplyTo(r)
return r, nil
}
// isDir returns whether given path is a directory

View File

@ -4,9 +4,11 @@ import (
"bytes"
"testing"
. "launchpad.net/gocheck"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
fs "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs"
. "launchpad.net/gocheck"
query "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
// Hook up gocheck into the "go test" runner.
@ -54,6 +56,32 @@ func (ks *DSSuite) TestBasic(c *C) {
c.Check(err, Equals, nil)
c.Check(bytes.Equal(v.([]byte), []byte(k.String())), Equals, true)
}
r, err := ks.ds.Query(query.Query{Prefix: "/foo/bar/"})
if err != nil {
c.Check(err, Equals, nil)
}
expect := []string{
"/foo/bar/baz",
"/foo/bar/bazb",
"/foo/bar/baz/barb",
}
all := r.AllEntries()
c.Check(len(all), Equals, len(expect))
for _, k := range expect {
found := false
for _, e := range all {
if e.Key == k {
found = true
}
}
if !found {
c.Error("did not find expected key: ", k)
}
}
}
func strsToKeys(strs []string) []ds.Key {

View File

@ -5,6 +5,8 @@ import (
"strings"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go-uuid/uuid"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
/*
@ -239,3 +241,12 @@ type KeySlice []Key
func (p KeySlice) Len() int { return len(p) }
func (p KeySlice) Less(i, j int) bool { return p[i].Less(p[j]) }
func (p KeySlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
// EntryKeys
func EntryKeys(e []dsq.Entry) []Key {
ks := make([]Key, len(e))
for i, e := range e {
ks[i] = NewKey(e.Key)
}
return ks
}

View File

@ -1,6 +1,9 @@
package keytransform
import ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
import (
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
type Pair struct {
Convert KeyMapping
@ -48,16 +51,25 @@ func (d *ktds) Delete(key ds.Key) (err error) {
return d.child.Delete(d.ConvertKey(key))
}
// KeyList returns a list of all keys in the datastore, transforming keys out.
func (d *ktds) KeyList() ([]ds.Key, error) {
// Query implements Query, inverting keys on the way back out.
func (d *ktds) Query(q dsq.Query) (*dsq.Results, error) {
keys, err := d.child.KeyList()
q2 := q
q2.Prefix = d.ConvertKey(ds.NewKey(q2.Prefix)).String()
r, err := d.child.Query(q2)
if err != nil {
return nil, err
}
for i, k := range keys {
keys[i] = d.InvertKey(k)
}
return keys, nil
ch := make(chan dsq.Entry)
go func() {
for e := range r.Entries() {
e.Key = d.InvertKey(ds.NewKey(e.Key)).String()
ch <- e
}
close(ch)
}()
r2 := dsq.ResultsWithEntriesChan(q, ch)
return r2, nil
}

View File

@ -5,9 +5,11 @@ import (
"sort"
"testing"
. "launchpad.net/gocheck"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
kt "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform"
. "launchpad.net/gocheck"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
// Hook up gocheck into the "go test" runner.
@ -60,10 +62,13 @@ func (ks *DSSuite) TestBasic(c *C) {
c.Check(bytes.Equal(v2.([]byte), []byte(k.String())), Equals, true)
}
listA, errA := mpds.KeyList()
listB, errB := ktds.KeyList()
listAr, errA := mpds.Query(dsq.Query{})
listBr, errB := ktds.Query(dsq.Query{})
c.Check(errA, Equals, nil)
c.Check(errB, Equals, nil)
listA := ds.EntryKeys(listAr.AllEntries())
listB := ds.EntryKeys(listBr.AllEntries())
c.Check(len(listA), Equals, len(listB))
// sort them cause yeah.
@ -75,6 +80,9 @@ func (ks *DSSuite) TestBasic(c *C) {
c.Check(pair.Invert(kA), Equals, kB)
c.Check(kA, Equals, pair.Convert(kB))
}
c.Log("listA: ", listA)
c.Log("listB: ", listB)
}
func strsToKeys(strs []string) []ds.Key {

View File

@ -3,9 +3,12 @@ package leveldb
import (
"io"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
type Datastore interface {
@ -69,13 +72,54 @@ func (d *datastore) Delete(key ds.Key) (err error) {
return err
}
func (d *datastore) KeyList() ([]ds.Key, error) {
i := d.DB.NewIterator(nil, nil)
var keys []ds.Key
for i.Next() {
keys = append(keys, ds.NewKey(string(i.Key())))
func (d *datastore) Query(q dsq.Query) (*dsq.Results, error) {
var rnge *util.Range
if q.Prefix != "" {
rnge = util.BytesPrefix([]byte(q.Prefix))
}
return keys, nil
i := d.DB.NewIterator(rnge, nil)
// offset
if q.Offset > 0 {
for j := 0; j < q.Offset; j++ {
i.Next()
}
}
var es []dsq.Entry
for i.Next() {
// limit
if q.Limit > 0 && len(es) >= q.Limit {
break
}
k := ds.NewKey(string(i.Key())).String()
e := dsq.Entry{Key: k}
if !q.KeysOnly {
buf := make([]byte, len(i.Value()))
copy(buf, i.Value())
e.Value = buf
}
es = append(es, e)
}
i.Release()
if err := i.Error(); err != nil {
return nil, err
}
// Now, apply remaining pieces.
q2 := q
q2.Offset = 0 // already applied
q2.Limit = 0 // already applied
// TODO: make this async with:
// qr := dsq.ResultsWithEntriesChan(q, ch)
qr := dsq.ResultsWithEntries(q, es)
qr = q2.ApplyTo(qr)
qr.Query = q // set it back
return qr, nil
}
// LevelDB needs to be closed.

View File

@ -0,0 +1,99 @@
package leveldb
import (
"io/ioutil"
"os"
"testing"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
var testcases = map[string]string{
"/a": "a",
"/a/b": "ab",
"/a/b/c": "abc",
"/a/b/d": "a/b/d",
"/a/c": "ac",
"/a/d": "ad",
"/e": "e",
"/f": "f",
}
func TestQuery(t *testing.T) {
path, err := ioutil.TempDir("/tmp", "testing_leveldb_")
if err != nil {
t.Fatal(err)
}
defer func() {
os.RemoveAll(path)
}()
d, err := NewDatastore(path, nil)
if err != nil {
t.Fatal(err)
}
defer d.Close()
for k, v := range testcases {
dsk := ds.NewKey(k)
if err := d.Put(dsk, []byte(v)); err != nil {
t.Fatal(err)
}
}
for k, v := range testcases {
dsk := ds.NewKey(k)
v2, err := d.Get(dsk)
if err != nil {
t.Fatal(err)
}
v2b := v2.([]byte)
if string(v2b) != v {
t.Errorf("%s values differ: %s != %s", k, v, v2)
}
}
rs, err := d.Query(dsq.Query{Prefix: "/a/"})
if err != nil {
t.Fatal(err)
}
expectMatches(t, []string{
"/a/b",
"/a/b/c",
"/a/b/d",
"/a/c",
"/a/d",
}, rs.AllEntries())
// test offset and limit
rs, err = d.Query(dsq.Query{Prefix: "/a/", Offset: 2, Limit: 2})
if err != nil {
t.Fatal(err)
}
expectMatches(t, []string{
"/a/b/d",
"/a/c",
}, rs.AllEntries())
}
func expectMatches(t *testing.T, expect []string, actual []dsq.Entry) {
if len(actual) != len(expect) {
t.Error("not enough", expect, actual)
}
for _, k := range expect {
found := false
for _, e := range actual {
if e.Key == k {
found = true
}
}
if !found {
t.Error(k, "not found")
}
}
}

View File

@ -4,7 +4,9 @@ import (
"errors"
lru "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/hashicorp/golang-lru"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
// Datastore uses golang-lru for internal storage.
@ -49,6 +51,6 @@ func (d *Datastore) Delete(key ds.Key) (err error) {
}
// KeyList returns a list of keys in the datastore
func (d *Datastore) KeyList() ([]ds.Key, error) {
func (d *Datastore) Query(q dsq.Query) (*dsq.Results, error) {
return nil, errors.New("KeyList not implemented.")
}

View File

@ -5,9 +5,11 @@ import (
"sort"
"testing"
. "launchpad.net/gocheck"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
ns "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace"
. "launchpad.net/gocheck"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
// Hook up gocheck into the "go test" runner.
@ -46,10 +48,13 @@ func (ks *DSSuite) TestBasic(c *C) {
c.Check(bytes.Equal(v2.([]byte), []byte(k.String())), Equals, true)
}
listA, errA := mpds.KeyList()
listB, errB := nsds.KeyList()
listAr, errA := mpds.Query(dsq.Query{})
listBr, errB := nsds.Query(dsq.Query{})
c.Check(errA, Equals, nil)
c.Check(errB, Equals, nil)
listA := ds.EntryKeys(listAr.AllEntries())
listB := ds.EntryKeys(listBr.AllEntries())
c.Check(len(listA), Equals, len(listB))
// sort them cause yeah.

View File

@ -5,6 +5,7 @@ import (
"os"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
type datastore struct {
@ -57,11 +58,11 @@ func (d *datastore) Delete(key ds.Key) error {
return nil
}
func (d *datastore) KeyList() ([]ds.Key, error) {
kl, err := d.child.KeyList()
func (d *datastore) Query(q dsq.Query) (*dsq.Results, error) {
r, err := d.child.Query(q)
if err != nil {
fmt.Fprintf(os.Stdout, "panic datastore: %s", err)
panic("panic datastore: KeyList failed")
panic("panic datastore: Query failed")
}
return kl, nil
return r, nil
}

View File

@ -0,0 +1,86 @@
package query
import (
"fmt"
"reflect"
"strings"
)
// Filter is an object that tests ResultEntries
type Filter interface {
// Filter returns whether an entry passes the filter
Filter(e Entry) bool
}
// Op is a comparison operator
type Op string
var (
Equal = Op("==")
NotEqual = Op("!=")
GreaterThan = Op(">")
GreaterThanOrEqual = Op(">=")
LessThan = Op("<")
LessThanOrEqual = Op("<=")
)
// FilterValueCompare is used to signal to datastores they
// should apply internal comparisons. unfortunately, there
// is no way to apply comparisons* to interface{} types in
// Go, so if the datastore doesnt have a special way to
// handle these comparisons, you must provided the
// TypedFilter to actually do filtering.
//
// [*] other than == and !=, which use reflect.DeepEqual.
type FilterValueCompare struct {
Op Op
Value interface{}
TypedFilter Filter
}
func (f FilterValueCompare) Filter(e Entry) bool {
if f.TypedFilter != nil {
return f.TypedFilter.Filter(e)
}
switch f.Op {
case Equal:
return reflect.DeepEqual(f.Value, e.Value)
case NotEqual:
return !reflect.DeepEqual(f.Value, e.Value)
default:
panic(fmt.Errorf("cannot apply op '%s' to interface{}.", f.Op))
}
}
type FilterKeyCompare struct {
Op Op
Key string
}
func (f FilterKeyCompare) Filter(e Entry) bool {
switch f.Op {
case Equal:
return e.Key == f.Key
case NotEqual:
return e.Key != f.Key
case GreaterThan:
return e.Key > f.Key
case GreaterThanOrEqual:
return e.Key >= f.Key
case LessThan:
return e.Key < f.Key
case LessThanOrEqual:
return e.Key <= f.Key
default:
panic(fmt.Errorf("unknown op '%s'", f.Op))
}
}
type FilterKeyPrefix struct {
Prefix string
}
func (f FilterKeyPrefix) Filter(e Entry) bool {
return strings.HasPrefix(e.Key, f.Prefix)
}

View File

@ -0,0 +1,75 @@
package query
import (
"strings"
"testing"
)
var sampleKeys = []string{
"/ab/c",
"/ab/cd",
"/a",
"/abce",
"/abcf",
"/ab",
}
type filterTestCase struct {
filter Filter
keys []string
expect []string
}
func testKeyFilter(t *testing.T, f Filter, keys []string, expect []string) {
e := make([]Entry, len(keys))
for i, k := range keys {
e[i] = Entry{Key: k}
}
res := ResultsWithEntries(Query{}, e)
res = NaiveFilter(res, f)
actualE := res.AllEntries()
actual := make([]string, len(actualE))
for i, e := range actualE {
actual[i] = e.Key
}
if len(actual) != len(expect) {
t.Error("expect != actual.", expect, actual)
}
if strings.Join(actual, "") != strings.Join(expect, "") {
t.Error("expect != actual.", expect, actual)
}
}
func TestFilterKeyCompare(t *testing.T) {
testKeyFilter(t, FilterKeyCompare{Equal, "/ab"}, sampleKeys, []string{"/ab"})
testKeyFilter(t, FilterKeyCompare{GreaterThan, "/ab"}, sampleKeys, []string{
"/ab/c",
"/ab/cd",
"/abce",
"/abcf",
})
testKeyFilter(t, FilterKeyCompare{LessThanOrEqual, "/ab"}, sampleKeys, []string{
"/a",
"/ab",
})
}
func TestFilterKeyPrefix(t *testing.T) {
testKeyFilter(t, FilterKeyPrefix{"/a"}, sampleKeys, []string{
"/ab/c",
"/ab/cd",
"/a",
"/abce",
"/abcf",
"/ab",
})
testKeyFilter(t, FilterKeyPrefix{"/ab/"}, sampleKeys, []string{
"/ab/c",
"/ab/cd",
})
}

View File

@ -0,0 +1,66 @@
package query
import (
"sort"
)
// Order is an object used to order objects
type Order interface {
// Sort sorts the Entry slice according to
// the Order criteria.
Sort([]Entry)
}
// OrderByValue is used to signal to datastores they
// should apply internal orderings. unfortunately, there
// is no way to apply order comparisons to interface{} types
// in Go, so if the datastore doesnt have a special way to
// handle these comparisons, you must provide an Order
// implementation that casts to the correct type.
type OrderByValue struct {
TypedOrder Order
}
func (o OrderByValue) Sort(res []Entry) {
if o.TypedOrder == nil {
panic("cannot order interface{} by value. see query docs.")
}
o.TypedOrder.Sort(res)
}
// OrderByValueDescending is used to signal to datastores they
// should apply internal orderings. unfortunately, there
// is no way to apply order comparisons to interface{} types
// in Go, so if the datastore doesnt have a special way to
// handle these comparisons, you are SOL.
type OrderByValueDescending struct {
TypedOrder Order
}
func (o OrderByValueDescending) Sort(res []Entry) {
if o.TypedOrder == nil {
panic("cannot order interface{} by value. see query docs.")
}
o.TypedOrder.Sort(res)
}
// OrderByKey
type OrderByKey struct{}
func (o OrderByKey) Sort(res []Entry) {
sort.Stable(reByKey(res))
}
// OrderByKeyDescending
type OrderByKeyDescending struct{}
func (o OrderByKeyDescending) Sort(res []Entry) {
sort.Stable(sort.Reverse(reByKey(res)))
}
type reByKey []Entry
func (s reByKey) Len() int { return len(s) }
func (s reByKey) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s reByKey) Less(i, j int) bool { return s[i].Key < s[j].Key }

View File

@ -0,0 +1,55 @@
package query
import (
"strings"
"testing"
)
type orderTestCase struct {
order Order
keys []string
expect []string
}
func testKeyOrder(t *testing.T, f Order, keys []string, expect []string) {
e := make([]Entry, len(keys))
for i, k := range keys {
e[i] = Entry{Key: k}
}
res := ResultsWithEntries(Query{}, e)
res = NaiveOrder(res, f)
actualE := res.AllEntries()
actual := make([]string, len(actualE))
for i, e := range actualE {
actual[i] = e.Key
}
if len(actual) != len(expect) {
t.Error("expect != actual.", expect, actual)
}
if strings.Join(actual, "") != strings.Join(expect, "") {
t.Error("expect != actual.", expect, actual)
}
}
func TestOrderByKey(t *testing.T) {
testKeyOrder(t, OrderByKey{}, sampleKeys, []string{
"/a",
"/ab",
"/ab/c",
"/ab/cd",
"/abce",
"/abcf",
})
testKeyOrder(t, OrderByKeyDescending{}, sampleKeys, []string{
"/abcf",
"/abce",
"/ab/cd",
"/ab/c",
"/ab",
"/a",
})
}

View File

@ -0,0 +1,145 @@
package query
/*
Query represents storage for any key-value pair.
tl;dr:
queries are supported across datastores.
Cheap on top of relational dbs, and expensive otherwise.
Pick the right tool for the job!
In addition to the key-value store get and set semantics, datastore
provides an interface to retrieve multiple records at a time through
the use of queries. The datastore Query model gleans a common set of
operations performed when querying. To avoid pasting here years of
database research, lets summarize the operations datastore supports.
Query Operations:
* namespace - scope the query, usually by object type
* filters - select a subset of values by applying constraints
* orders - sort the results by applying sort conditions
* limit - impose a numeric limit on the number of results
* offset - skip a number of results (for efficient pagination)
datastore combines these operations into a simple Query class that allows
applications to define their constraints in a simple, generic, way without
introducing datastore specific calls, languages, etc.
Of course, different datastores provide relational query support across a
wide spectrum, from full support in traditional databases to none at all in
most key-value stores. Datastore aims to provide a common, simple interface
for the sake of application evolution over time and keeping large code bases
free of tool-specific code. It would be ridiculous to claim to support high-
performance queries on architectures that obviously do not. Instead, datastore
provides the interface, ideally translating queries to their native form
(e.g. into SQL for MySQL).
However, on the wrong datastore, queries can potentially incur the high cost
of performing the aforemantioned query operations on the data set directly in
Go. It is the clients responsibility to select the right tool for the job:
pick a data storage solution that fits the applications needs now, and wrap
it with a datastore implementation. As the needs change, swap out datastore
implementations to support your new use cases. Some applications, particularly
in early development stages, can afford to incurr the cost of queries on non-
relational databases (e.g. using a FSDatastore and not worry about a database
at all). When it comes time to switch the tool for performance, updating the
application code can be as simple as swapping the datastore in one place, not
all over the application code base. This gain in engineering time, both at
initial development and during later iterations, can significantly offset the
cost of the layer of abstraction.
*/
type Query struct {
Prefix string // namespaces the query to results whose keys have Prefix
Filters []Filter // filter results. apply sequentially
Orders []Order // order results. apply sequentially
Limit int // maximum number of results
Offset int // skip given number of results
KeysOnly bool // return only keys.
}
// NotFetched is a special type that signals whether or not the value
// of an Entry has been fetched or not. This is needed because
// datastore implementations get to decide whether Query returns values
// or only keys. nil is not a good signal, as real values may be nil.
var NotFetched = struct{}{}
// Entry is a query result entry.
type Entry struct {
Key string // cant be ds.Key because circular imports ...!!!
Value interface{}
}
// Results is a set of Query results
type Results struct {
Query Query // the query these Results correspond to
done chan struct{}
res chan Entry
all []Entry
}
// ResultsWithEntriesChan returns a Results object from a
// channel of ResultEntries. It's merely an encapsulation
// that provides for AllEntries() functionality.
func ResultsWithEntriesChan(q Query, res <-chan Entry) *Results {
r := &Results{
Query: q,
done: make(chan struct{}),
res: make(chan Entry),
all: []Entry{},
}
// go consume all the results and add them to the results.
go func() {
for e := range res {
r.all = append(r.all, e)
r.res <- e
}
close(r.res)
close(r.done)
}()
return r
}
// ResultsWithEntries returns a Results object from a
// channel of ResultEntries. It's merely an encapsulation
// that provides for AllEntries() functionality.
func ResultsWithEntries(q Query, res []Entry) *Results {
r := &Results{
Query: q,
done: make(chan struct{}),
res: make(chan Entry),
all: res,
}
// go add all the results
go func() {
for _, e := range res {
r.res <- e
}
close(r.res)
close(r.done)
}()
return r
}
// Entries() returns results through a channel.
// Results may arrive at any time.
// The channel may or may not be buffered.
// The channel may or may not rate limit the query processing.
func (r *Results) Entries() <-chan Entry {
return r.res
}
// AllEntries returns all the entries in Results.
// It blocks until all the results have come in.
func (r *Results) AllEntries() []Entry {
for e := range r.res {
_ = e
}
<-r.done
return r.all
}

View File

@ -0,0 +1,85 @@
package query
// NaiveFilter applies a filter to the results
func NaiveFilter(qr *Results, filter Filter) *Results {
ch := make(chan Entry)
go func() {
defer close(ch)
for e := range qr.Entries() {
if filter.Filter(e) {
ch <- e
}
}
}()
return ResultsWithEntriesChan(qr.Query, ch)
}
// NaiveLimit truncates the results to a given int limit
func NaiveLimit(qr *Results, limit int) *Results {
ch := make(chan Entry)
go func() {
defer close(ch)
for l := 0; l < limit; l++ {
e, more := <-qr.Entries()
if !more {
return
}
ch <- e
}
}()
return ResultsWithEntriesChan(qr.Query, ch)
}
// NaiveOffset skips a given number of results
func NaiveOffset(qr *Results, offset int) *Results {
ch := make(chan Entry)
go func() {
defer close(ch)
for l := 0; l < offset; l++ {
<-qr.Entries() // discard
}
for e := range qr.Entries() {
ch <- e
}
}()
return ResultsWithEntriesChan(qr.Query, ch)
}
// NaiveOrder reorders results according to given Order.
// WARNING: this is the only non-stream friendly operation!
func NaiveOrder(qr *Results, o Order) *Results {
e := qr.AllEntries()
o.Sort(e)
return ResultsWithEntries(qr.Query, e)
}
func (q Query) ApplyTo(qr *Results) *Results {
if q.Prefix != "" {
qr = NaiveFilter(qr, FilterKeyPrefix{q.Prefix})
}
for _, f := range q.Filters {
qr = NaiveFilter(qr, f)
}
for _, o := range q.Orders {
qr = NaiveOrder(qr, o)
}
if q.Offset != 0 {
qr = NaiveOffset(qr, q.Offset)
}
if q.Limit != 0 {
qr = NaiveLimit(qr, q.Offset)
}
return qr
}
func ResultEntriesFrom(keys []string, vals []interface{}) []Entry {
re := make([]Entry, len(keys))
for i, k := range keys {
re[i] = Entry{Key: k, Value: vals[i]}
}
return re
}

View File

@ -4,6 +4,7 @@ import (
"sync"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
// MutexDatastore contains a child datastire and a mutex.
@ -57,8 +58,8 @@ func (d *MutexDatastore) Delete(key ds.Key) (err error) {
}
// KeyList implements Datastore.KeyList
func (d *MutexDatastore) KeyList() ([]ds.Key, error) {
func (d *MutexDatastore) Query(q dsq.Query) (*dsq.Results, error) {
d.RLock()
defer d.RUnlock()
return d.child.KeyList()
return d.child.Query(q)
}

View File

@ -4,6 +4,7 @@ import (
"testing"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
syncds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
"github.com/jbenet/go-ipfs/blocks"
)
@ -83,7 +84,7 @@ func (c *callbackDatastore) Delete(key ds.Key) (err error) {
return c.ds.Delete(key)
}
func (c *callbackDatastore) KeyList() ([]ds.Key, error) {
func (c *callbackDatastore) Query(q dsq.Query) (*dsq.Results, error) {
c.f()
return c.ds.KeyList()
return c.ds.Query(q)
}

View File

@ -1,42 +1,44 @@
package datastore2
import (
datastore "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
delay "github.com/jbenet/go-ipfs/util/delay"
)
func WithDelay(ds datastore.Datastore, delay delay.D) datastore.Datastore {
func WithDelay(ds ds.Datastore, delay delay.D) ds.Datastore {
return &delayed{ds: ds, delay: delay}
}
type delayed struct {
ds datastore.Datastore
ds ds.Datastore
delay delay.D
}
func (dds *delayed) Put(key datastore.Key, value interface{}) (err error) {
func (dds *delayed) Put(key ds.Key, value interface{}) (err error) {
dds.delay.Wait()
return dds.ds.Put(key, value)
}
func (dds *delayed) Get(key datastore.Key) (value interface{}, err error) {
func (dds *delayed) Get(key ds.Key) (value interface{}, err error) {
dds.delay.Wait()
return dds.ds.Get(key)
}
func (dds *delayed) Has(key datastore.Key) (exists bool, err error) {
func (dds *delayed) Has(key ds.Key) (exists bool, err error) {
dds.delay.Wait()
return dds.ds.Has(key)
}
func (dds *delayed) Delete(key datastore.Key) (err error) {
func (dds *delayed) Delete(key ds.Key) (err error) {
dds.delay.Wait()
return dds.ds.Delete(key)
}
func (dds *delayed) KeyList() ([]datastore.Key, error) {
func (dds *delayed) Query(q dsq.Query) (*dsq.Results, error) {
dds.delay.Wait()
return dds.ds.KeyList()
return dds.ds.Query(q)
}
var _ datastore.Datastore = &delayed{}
var _ ds.Datastore = &delayed{}