1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-10-26 19:05:05 +08:00

added cancel func calls previously ignored

This commit is contained in:
Juan Batiz-Benet
2015-03-07 09:31:46 -08:00
parent dfe2b16763
commit 1d5b903611
10 changed files with 27 additions and 13 deletions

View File

@ -126,7 +126,8 @@ func pingPeer(ctx context.Context, n *core.IpfsNode, pid peer.ID, numPings int)
Text: fmt.Sprintf("Looking up peer %s", pid.Pretty()), Text: fmt.Sprintf("Looking up peer %s", pid.Pretty()),
} }
ctx, _ := context.WithTimeout(ctx, kPingTimeout) ctx, cancel := context.WithTimeout(ctx, kPingTimeout)
defer cancel()
p, err := n.Routing.FindPeer(ctx, pid) p, err := n.Routing.FindPeer(ctx, pid)
if err != nil { if err != nil {
outChan <- &PingResult{Text: fmt.Sprintf("Peer lookup error: %s", err)} outChan <- &PingResult{Text: fmt.Sprintf("Peer lookup error: %s", err)}
@ -147,7 +148,8 @@ func pingPeer(ctx context.Context, n *core.IpfsNode, pid peer.ID, numPings int)
default: default:
} }
ctx, _ := context.WithTimeout(ctx, kPingTimeout) ctx, cancel := context.WithTimeout(ctx, kPingTimeout)
defer cancel()
took, err := n.Routing.Ping(ctx, pid) took, err := n.Routing.Ping(ctx, pid)
if err != nil { if err != nil {
log.Debugf("Ping error: %s", err) log.Debugf("Ping error: %s", err)

View File

@ -54,7 +54,8 @@ func Listen(nd *core.IpfsNode, protocol string) (*ipfsListener, error) {
} }
func Dial(nd *core.IpfsNode, p peer.ID, protocol string) (net.Stream, error) { func Dial(nd *core.IpfsNode, p peer.ID, protocol string) (net.Stream, error) {
ctx, _ := context.WithTimeout(nd.Context(), time.Second*30) ctx, cancel := context.WithTimeout(nd.Context(), time.Second*30)
defer cancel()
err := nd.PeerHost.Connect(ctx, peer.PeerInfo{ID: p}) err := nd.PeerHost.Connect(ctx, peer.PeerInfo{ID: p})
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -14,12 +14,12 @@ import (
ggio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io" ggio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
ctxutil "github.com/jbenet/go-ipfs/util/ctx"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
host "github.com/jbenet/go-ipfs/p2p/host" host "github.com/jbenet/go-ipfs/p2p/host"
inet "github.com/jbenet/go-ipfs/p2p/net" inet "github.com/jbenet/go-ipfs/p2p/net"
peer "github.com/jbenet/go-ipfs/p2p/peer" peer "github.com/jbenet/go-ipfs/p2p/peer"
protocol "github.com/jbenet/go-ipfs/p2p/protocol" protocol "github.com/jbenet/go-ipfs/p2p/protocol"
ctxutil "github.com/jbenet/go-ipfs/util/ctx"
pb "github.com/jbenet/go-ipfs/diagnostics/internal/pb" pb "github.com/jbenet/go-ipfs/diagnostics/internal/pb"
util "github.com/jbenet/go-ipfs/util" util "github.com/jbenet/go-ipfs/util"
@ -138,7 +138,8 @@ func newID() string {
// GetDiagnostic runs a diagnostics request across the entire network // GetDiagnostic runs a diagnostics request across the entire network
func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*DiagInfo, error) { func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*DiagInfo, error) {
log.Debug("Getting diagnostic.") log.Debug("Getting diagnostic.")
ctx, _ := context.WithTimeout(context.TODO(), timeout) ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()
diagID := newID() diagID := newID()
d.diagLock.Lock() d.diagLock.Lock()

View File

@ -269,7 +269,8 @@ func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli
go func(k u.Key) { go func(k u.Key) {
defer wg.Done() defer wg.Done()
child, _ := context.WithTimeout(ctx, providerRequestTimeout) child, cancel := context.WithTimeout(ctx, providerRequestTimeout)
defer cancel()
providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest) providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest)
for prov := range providers { for prov := range providers {
sendToPeers <- prov sendToPeers <- prov
@ -311,10 +312,11 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
// Should only track *useful* messages in ledger // Should only track *useful* messages in ledger
for _, block := range incoming.Blocks() { for _, block := range incoming.Blocks() {
hasBlockCtx, _ := context.WithTimeout(ctx, hasBlockTimeout) hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout)
if err := bs.HasBlock(hasBlockCtx, block); err != nil { if err := bs.HasBlock(hasBlockCtx, block); err != nil {
log.Debug(err) log.Debug(err)
} }
cancel()
} }
var keys []u.Key var keys []u.Key

View File

@ -88,7 +88,8 @@ func (n *dagService) Get(k u.Key) (*Node, error) {
return nil, fmt.Errorf("dagService is nil") return nil, fmt.Errorf("dagService is nil")
} }
ctx, _ := context.WithTimeout(context.TODO(), time.Minute) ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
defer cancel()
// we shouldn't use an arbitrary timeout here. // we shouldn't use an arbitrary timeout here.
// since Get doesnt take in a context yet, we give a large upper bound. // since Get doesnt take in a context yet, we give a large upper bound.
// think of an http request. we want it to go on as long as the client requests it. // think of an http request. we want it to go on as long as the client requests it.

View File

@ -60,9 +60,11 @@ func (p *ipnsPublisher) Publish(ctx context.Context, k ci.PrivKey, value u.Key)
nameb := u.Hash(pkbytes) nameb := u.Hash(pkbytes)
namekey := u.Key("/pk/" + string(nameb)) namekey := u.Key("/pk/" + string(nameb))
timectx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Second*10))
defer cancel()
log.Debugf("Storing pubkey at: %s", namekey) log.Debugf("Storing pubkey at: %s", namekey)
// Store associated public key // Store associated public key
timectx, _ := context.WithDeadline(ctx, time.Now().Add(time.Second*10))
err = p.routing.PutValue(timectx, namekey, pkbytes) err = p.routing.PutValue(timectx, namekey, pkbytes)
if err != nil { if err != nil {
return err return err

View File

@ -227,8 +227,9 @@ func (s *Swarm) gatedDialAttempt(ctx context.Context, p peer.ID) (*Conn, error)
// if it succeeds, dial will add the conn to the swarm itself. // if it succeeds, dial will add the conn to the swarm itself.
defer log.EventBegin(ctx, "swarmDialAttemptStart", logdial).Done() defer log.EventBegin(ctx, "swarmDialAttemptStart", logdial).Done()
ctxT, _ := context.WithTimeout(ctx, s.dialT) ctxT, cancel := context.WithTimeout(ctx, s.dialT)
conn, err := s.dial(ctxT, p) conn, err := s.dial(ctxT, p)
cancel()
s.dsync.Unlock(p) s.dsync.Unlock(p)
log.Debugf("dial end %s", conn) log.Debugf("dial end %s", conn)
if err != nil { if err != nil {

View File

@ -172,7 +172,9 @@ func (p *pinner) pinIndirectRecurse(node *mdag.Node) error {
} }
func (p *pinner) pinLinks(node *mdag.Node) error { func (p *pinner) pinLinks(node *mdag.Node) error {
ctx, _ := context.WithTimeout(context.Background(), time.Second*60) ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()
for _, ng := range p.dserv.GetDAG(ctx, node) { for _, ng := range p.dserv.GetDAG(ctx, node) {
subnode, err := ng.Get() subnode, err := ng.Get()
if err != nil { if err != nil {

View File

@ -357,11 +357,12 @@ func (dht *IpfsDHT) PingRoutine(t time.Duration) {
rand.Read(id) rand.Read(id)
peers := dht.routingTable.NearestPeers(kb.ConvertKey(u.Key(id)), 5) peers := dht.routingTable.NearestPeers(kb.ConvertKey(u.Key(id)), 5)
for _, p := range peers { for _, p := range peers {
ctx, _ := context.WithTimeout(dht.Context(), time.Second*5) ctx, cancel := context.WithTimeout(dht.Context(), time.Second*5)
_, err := dht.Ping(ctx, p) _, err := dht.Ping(ctx, p)
if err != nil { if err != nil {
log.Debugf("Ping error: %s", err) log.Debugf("Ping error: %s", err)
} }
cancel()
} }
case <-dht.Closing(): case <-dht.Closing():
return return

View File

@ -86,7 +86,8 @@ func (r *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) {
} }
r.flush() r.flush()
ctx, _ := context.WithTimeout(context.TODO(), time.Second*60) ctx, cancel := context.WithTimeout(context.TODO(), time.Second*60)
defer cancel()
for i, ng := range r.dag.GetDAG(ctx, dagnode) { for i, ng := range r.dag.GetDAG(ctx, dagnode) {
childNode, err := ng.Get() childNode, err := ng.Get()