1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-28 08:47:42 +08:00

Merge pull request #1466 from ipfs/export-modules

moved util/ctx to github.com/jbenet/go-context
This commit is contained in:
Juan Batiz-Benet
2015-07-10 18:11:56 -07:00
9 changed files with 103 additions and 53 deletions

13
Godeps/Godeps.json generated
View File

@ -14,11 +14,6 @@
"Comment": "null-5", "Comment": "null-5",
"Rev": "75cd24fc2f2c2a2088577d12123ddee5f54e0675" "Rev": "75cd24fc2f2c2a2088577d12123ddee5f54e0675"
}, },
{
"ImportPath": "code.google.com/p/goprotobuf/proto",
"Comment": "go.r60-152",
"Rev": "36be16571e14f67e114bb0af619e5de2c1591679"
},
{ {
"ImportPath": "github.com/ActiveState/tail", "ImportPath": "github.com/ActiveState/tail",
"Rev": "068b72961a6bc5b4a82cf4fc14ccc724c0cfa73a" "Rev": "068b72961a6bc5b4a82cf4fc14ccc724c0cfa73a"
@ -137,6 +132,14 @@
"ImportPath": "github.com/jbenet/go-base58", "ImportPath": "github.com/jbenet/go-base58",
"Rev": "6237cf65f3a6f7111cd8a42be3590df99a66bc7d" "Rev": "6237cf65f3a6f7111cd8a42be3590df99a66bc7d"
}, },
{
"ImportPath": "github.com/jbenet/go-context/frac",
"Rev": "d14ea06fba99483203c19d92cfcd13ebe73135f4"
},
{
"ImportPath": "github.com/jbenet/go-context/io",
"Rev": "d14ea06fba99483203c19d92cfcd13ebe73135f4"
},
{ {
"ImportPath": "github.com/jbenet/go-datastore", "ImportPath": "github.com/jbenet/go-datastore",
"Rev": "7d6acaf7c0164c335f2ca4100f8fe30a7e2943dd" "Rev": "7d6acaf7c0164c335f2ca4100f8fe30a7e2943dd"

View File

@ -0,0 +1,62 @@
// Package ctxext provides multiple useful context constructors.
package ctxext
import (
"time"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
// WithDeadlineFraction returns a Context with a fraction of the
// original context's timeout. This is useful in sequential pipelines
// of work, where one might try options and fall back to others
// depending on the time available, or failure to respond. For example:
//
// // getPicture returns a picture from our encrypted database
// // we have a pipeline of multiple steps. we need to:
// // - get the data from a database
// // - decrypt it
// // - apply many transforms
// //
// // we **know** that each step takes increasingly more time.
// // The transforms are much more expensive than decryption, and
// // decryption is more expensive than the database lookup.
// // If our database takes too long (i.e. >0.2 of available time),
// // there's no use in continuing.
// func getPicture(ctx context.Context, key string) ([]byte, error) {
// // fractional timeout contexts to the rescue!
//
// // try the database with 0.2 of remaining time.
// ctx1, _ := ctxext.WithDeadlineFraction(ctx, 0.2)
// val, err := db.Get(ctx1, key)
// if err != nil {
// return nil, err
// }
//
// // try decryption with 0.3 of remaining time.
// ctx2, _ := ctxext.WithDeadlineFraction(ctx, 0.3)
// if val, err = decryptor.Decrypt(ctx2, val); err != nil {
// return nil, err
// }
//
// // try transforms with all remaining time. hopefully it's enough!
// return transformer.Transform(ctx, val)
// }
//
//
func WithDeadlineFraction(ctx context.Context, fraction float64) (
context.Context, context.CancelFunc) {
d, found := ctx.Deadline()
if !found { // no deadline
return context.WithCancel(ctx)
}
left := d.Sub(time.Now())
if left < 0 { // already passed...
return context.WithCancel(ctx)
}
left = time.Duration(float64(left) * fraction)
return context.WithTimeout(ctx, left)
}

View File

@ -1,17 +1,16 @@
package ctxutil package ctxext
import ( import (
"os"
"testing" "testing"
"time" "time"
travis "github.com/ipfs/go-ipfs/util/testutil/ci/travis"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
) )
// this test is on the context tool itself, not our stuff. it's for sanity on ours. // this test is on the context tool itself, not our stuff. it's for sanity on ours.
func TestDeadline(t *testing.T) { func TestDeadline(t *testing.T) {
if travis.IsRunning() { if os.Getenv("TRAVIS") == "true" {
t.Skip("timeouts don't work reliably on travis") t.Skip("timeouts don't work reliably on travis")
} }
@ -43,7 +42,7 @@ func TestDeadlineFractionForever(t *testing.T) {
} }
func TestDeadlineFractionHalf(t *testing.T) { func TestDeadlineFractionHalf(t *testing.T) {
if travis.IsRunning() { if os.Getenv("TRAVIS") == "true" {
t.Skip("timeouts don't work reliably on travis") t.Skip("timeouts don't work reliably on travis")
} }

View File

@ -1,4 +1,14 @@
package ctxutil // Package ctxio provides io.Reader and io.Writer wrappers that
// respect context.Contexts. Use these at the interface between
// your context code and your io.
//
// WARNING: read the code. see how writes and reads will continue
// until you cancel the io. Maybe this package should provide
// versions of io.ReadCloser and io.WriteCloser that automatically
// call .Close when the context expires. But for now -- since in my
// use cases I have long-lived connections with ephemeral io wrappers
// -- this has yet to be a need.
package ctxio
import ( import (
"io" "io"

View File

@ -1,4 +1,4 @@
package ctxutil package ctxio
import ( import (
"bytes" "bytes"
@ -49,8 +49,8 @@ func TestReader(t *testing.T) {
} }
func TestWriter(t *testing.T) { func TestWriter(t *testing.T) {
buf := new(bytes.Buffer) var buf bytes.Buffer
w := NewWriter(context.Background(), buf) w := NewWriter(context.Background(), &buf)
// write three // write three
n, err := w.Write([]byte("abc")) n, err := w.Write([]byte("abc"))

View File

@ -13,15 +13,14 @@ import (
ggio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/io" ggio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/io"
proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto" proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
ctxio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-context/io"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
pb "github.com/ipfs/go-ipfs/diagnostics/pb" pb "github.com/ipfs/go-ipfs/diagnostics/pb"
host "github.com/ipfs/go-ipfs/p2p/host" host "github.com/ipfs/go-ipfs/p2p/host"
inet "github.com/ipfs/go-ipfs/p2p/net" inet "github.com/ipfs/go-ipfs/p2p/net"
peer "github.com/ipfs/go-ipfs/p2p/peer" peer "github.com/ipfs/go-ipfs/p2p/peer"
protocol "github.com/ipfs/go-ipfs/p2p/protocol" protocol "github.com/ipfs/go-ipfs/p2p/protocol"
util "github.com/ipfs/go-ipfs/util" util "github.com/ipfs/go-ipfs/util"
ctxutil "github.com/ipfs/go-ipfs/util/ctx"
) )
var log = util.Logger("diagnostics") var log = util.Logger("diagnostics")
@ -209,8 +208,8 @@ func (d *Diagnostics) getDiagnosticFromPeer(ctx context.Context, p peer.ID, pmes
return nil, err return nil, err
} }
cr := ctxutil.NewReader(ctx, s) // ok to use. we defer close stream in this func cr := ctxio.NewReader(ctx, s) // ok to use. we defer close stream in this func
cw := ctxutil.NewWriter(ctx, s) // ok to use. we defer close stream in this func cw := ctxio.NewWriter(ctx, s) // ok to use. we defer close stream in this func
r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax) r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(cw) w := ggio.NewDelimitedWriter(cw)
@ -267,8 +266,8 @@ func newMessage(diagID string) *pb.Message {
func (d *Diagnostics) HandleMessage(ctx context.Context, s inet.Stream) error { func (d *Diagnostics) HandleMessage(ctx context.Context, s inet.Stream) error {
cr := ctxutil.NewReader(ctx, s) cr := ctxio.NewReader(ctx, s)
cw := ctxutil.NewWriter(ctx, s) cw := ctxio.NewWriter(ctx, s)
r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax) // maxsize r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax) // maxsize
w := ggio.NewDelimitedWriter(cw) w := ggio.NewDelimitedWriter(cw)

View File

@ -4,13 +4,12 @@ import (
"errors" "errors"
"time" "time"
ggio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/io"
ctxio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-context/io"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
inet "github.com/ipfs/go-ipfs/p2p/net" inet "github.com/ipfs/go-ipfs/p2p/net"
peer "github.com/ipfs/go-ipfs/p2p/peer" peer "github.com/ipfs/go-ipfs/p2p/peer"
pb "github.com/ipfs/go-ipfs/routing/dht/pb" pb "github.com/ipfs/go-ipfs/routing/dht/pb"
ctxutil "github.com/ipfs/go-ipfs/util/ctx"
ggio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/io"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
) )
// handleNewStream implements the inet.StreamHandler // handleNewStream implements the inet.StreamHandler
@ -22,8 +21,8 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {
defer s.Close() defer s.Close()
ctx := dht.Context() ctx := dht.Context()
cr := ctxutil.NewReader(ctx, s) // ok to use. we defer close stream in this func cr := ctxio.NewReader(ctx, s) // ok to use. we defer close stream in this func
cw := ctxutil.NewWriter(ctx, s) // ok to use. we defer close stream in this func cw := ctxio.NewWriter(ctx, s) // ok to use. we defer close stream in this func
r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax) r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(cw) w := ggio.NewDelimitedWriter(cw)
mPeer := s.Conn().RemotePeer() mPeer := s.Conn().RemotePeer()
@ -78,8 +77,8 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message
} }
defer s.Close() defer s.Close()
cr := ctxutil.NewReader(ctx, s) // ok to use. we defer close stream in this func cr := ctxio.NewReader(ctx, s) // ok to use. we defer close stream in this func
cw := ctxutil.NewWriter(ctx, s) // ok to use. we defer close stream in this func cw := ctxio.NewWriter(ctx, s) // ok to use. we defer close stream in this func
r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax) r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(cw) w := ggio.NewDelimitedWriter(cw)
@ -116,7 +115,7 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message
} }
defer s.Close() defer s.Close()
cw := ctxutil.NewWriter(ctx, s) // ok to use. we defer close stream in this func cw := ctxio.NewWriter(ctx, s) // ok to use. we defer close stream in this func
w := ggio.NewDelimitedWriter(cw) w := ggio.NewDelimitedWriter(cw)
if err := w.WriteMsg(pmes); err != nil { if err := w.WriteMsg(pmes); err != nil {

View File

@ -3,13 +3,13 @@ package dht
import ( import (
"fmt" "fmt"
ctxfrac "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-context/frac"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
ci "github.com/ipfs/go-ipfs/p2p/crypto" ci "github.com/ipfs/go-ipfs/p2p/crypto"
peer "github.com/ipfs/go-ipfs/p2p/peer" peer "github.com/ipfs/go-ipfs/p2p/peer"
routing "github.com/ipfs/go-ipfs/routing" routing "github.com/ipfs/go-ipfs/routing"
pb "github.com/ipfs/go-ipfs/routing/dht/pb" pb "github.com/ipfs/go-ipfs/routing/dht/pb"
record "github.com/ipfs/go-ipfs/routing/record" record "github.com/ipfs/go-ipfs/routing/record"
ctxutil "github.com/ipfs/go-ipfs/util/ctx"
) )
func (dht *IpfsDHT) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, error) { func (dht *IpfsDHT) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, error) {
@ -22,7 +22,7 @@ func (dht *IpfsDHT) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, err
} }
// ok, try the node itself. if they're overwhelmed or slow we can move on. // ok, try the node itself. if they're overwhelmed or slow we can move on.
ctxT, cancelFunc := ctxutil.WithDeadlineFraction(ctx, 0.3) ctxT, cancelFunc := ctxfrac.WithDeadlineFraction(ctx, 0.3)
defer cancelFunc() defer cancelFunc()
if pk, err := dht.getPublicKeyFromNode(ctx, p); err == nil { if pk, err := dht.getPublicKeyFromNode(ctx, p); err == nil {
err := dht.peerstore.AddPubKey(p, pk) err := dht.peerstore.AddPubKey(p, pk)

View File

@ -1,22 +0,0 @@
package ctxutil
import (
"time"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
func WithDeadlineFraction(ctx context.Context, fraction float64) (context.Context, context.CancelFunc) {
d, found := ctx.Deadline()
if !found { // no deadline
return context.WithCancel(ctx)
}
left := d.Sub(time.Now())
if left < 0 { // already passed...
return context.WithCancel(ctx)
}
left = time.Duration(float64(left) * fraction)
return context.WithTimeout(ctx, left)
}