1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-27 16:07:42 +08:00

net/diag: recursively decrement timeouts.

Not sure this works. we dont have tests for net diag.
We should make some.
cc @whyrusleeping.
This commit is contained in:
Juan Batiz-Benet
2015-01-17 16:44:56 -08:00
parent 898b9696ca
commit f6278735ef
5 changed files with 79 additions and 56 deletions

View File

@ -3,7 +3,7 @@ package main
import ( import (
"os" "os"
"github.com/op/go-logging" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-logging"
) )
var log = logging.MustGetLogger("example") var log = logging.MustGetLogger("example")

View File

@ -7,6 +7,7 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"io" "io"
"sync" "sync"
"time" "time"
@ -33,6 +34,7 @@ var log = util.Logger("diagnostics")
var ProtocolDiag protocol.ID = "/ipfs/diagnostics" var ProtocolDiag protocol.ID = "/ipfs/diagnostics"
const ResponseTimeout = time.Second * 10 const ResponseTimeout = time.Second * 10
const HopTimeoutDecrement = time.Second * 2
// Diagnostics is a net service that manages requesting and responding to diagnostic // Diagnostics is a net service that manages requesting and responding to diagnostic
// requests // requests
@ -149,39 +151,24 @@ func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*DiagInfo, error)
peers := d.getPeers() peers := d.getPeers()
log.Debugf("Sending diagnostic request to %d peers.", len(peers)) log.Debugf("Sending diagnostic request to %d peers.", len(peers))
pmes := newMessage(diagID)
pmes.SetTimeoutDuration(timeout - HopTimeoutDecrement) // decrease timeout per hop
dpeers, err := d.getDiagnosticFromPeers(ctx, d.getPeers(), pmes)
if err != nil {
return nil, fmt.Errorf("diagnostic from peers err: %s", err)
}
var out []*DiagInfo var out []*DiagInfo
di := d.getDiagInfo() di := d.getDiagInfo()
out = append(out, di) out = append(out, di)
for _, dpi := range dpeers {
pmes := newMessage(diagID) out = appendDiagnostics(out, dpi)
respdata := make(chan []byte)
sends := 0
for p, _ := range peers {
log.Debugf("Sending getDiagnostic to: %s", p)
sends++
go func(p peer.ID) {
data, err := d.getDiagnosticFromPeer(ctx, p, pmes)
if err != nil {
log.Errorf("GetDiagnostic error: %v", err)
respdata <- nil
return
}
respdata <- data
}(p)
}
for i := 0; i < sends; i++ {
data := <-respdata
if data == nil {
continue
}
out = appendDiagnostics(data, out)
} }
return out, nil return out, nil
} }
func appendDiagnostics(data []byte, cur []*DiagInfo) []*DiagInfo { func appendDiagnostics(cur []*DiagInfo, data []byte) []*DiagInfo {
buf := bytes.NewBuffer(data) buf := bytes.NewBuffer(data)
dec := json.NewDecoder(buf) dec := json.NewDecoder(buf)
for { for {
@ -198,6 +185,38 @@ func appendDiagnostics(data []byte, cur []*DiagInfo) []*DiagInfo {
return cur return cur
} }
func (d *Diagnostics) getDiagnosticFromPeers(ctx context.Context, peers map[peer.ID]int, pmes *pb.Message) ([][]byte, error) {
timeout := pmes.GetTimeoutDuration()
if timeout < 1 {
return nil, fmt.Errorf("timeout too short: %s", timeout)
}
ctx, _ = context.WithTimeout(ctx, timeout)
respdata := make(chan []byte)
sendcount := 0
for p, _ := range peers {
log.Debugf("Sending diagnostic request to peer: %s", p)
sendcount++
go func(p peer.ID) {
out, err := d.getDiagnosticFromPeer(ctx, p, pmes)
if err != nil {
log.Errorf("getDiagnostic error: %v", err)
respdata <- nil
return
}
respdata <- out
}(p)
}
outall := make([][]byte, 0, len(peers))
for i := 0; i < sendcount; i++ {
out := <-respdata
outall = append(outall, out)
}
return outall, nil
}
// TODO: this method no longer needed. // TODO: this method no longer needed.
func (d *Diagnostics) getDiagnosticFromPeer(ctx context.Context, p peer.ID, mes *pb.Message) ([]byte, error) { func (d *Diagnostics) getDiagnosticFromPeer(ctx context.Context, p peer.ID, mes *pb.Message) ([]byte, error) {
rpmes, err := d.sendRequest(ctx, p, mes) rpmes, err := d.sendRequest(ctx, p, mes)
@ -259,38 +278,17 @@ func (d *Diagnostics) handleDiagnostic(p peer.ID, pmes *pb.Message) (*pb.Message
d.diagMap[pmes.GetDiagID()] = time.Now() d.diagMap[pmes.GetDiagID()] = time.Now()
d.diagLock.Unlock() d.diagLock.Unlock()
buf := new(bytes.Buffer)
di := d.getDiagInfo() di := d.getDiagInfo()
buf.Write(di.Marshal()) resp.Data = di.Marshal()
dpeers, err := d.getDiagnosticFromPeers(context.TODO(), d.getPeers(), pmes)
ctx, _ := context.WithTimeout(context.TODO(), ResponseTimeout)
respdata := make(chan []byte)
sendcount := 0
for p, _ := range d.getPeers() {
log.Debugf("Sending diagnostic request to peer: %s", p)
sendcount++
go func(p peer.ID) {
out, err := d.getDiagnosticFromPeer(ctx, p, pmes)
if err != nil { if err != nil {
log.Errorf("getDiagnostic error: %v", err) log.Errorf("diagnostic from peers err: %s", err)
respdata <- nil } else {
return for _, b := range dpeers {
} resp.Data = append(resp.Data, b...) // concatenate them all.
respdata <- out
}(p)
}
for i := 0; i < sendcount; i++ {
out := <-respdata
_, err := buf.Write(out)
if err != nil {
log.Errorf("getDiagnostic write output error: %v", err)
continue
} }
} }
resp.Data = buf.Bytes()
return resp, nil return resp, nil
} }

View File

@ -14,15 +14,18 @@ It has these top-level messages:
package diagnostics_pb package diagnostics_pb
import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto" import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto"
import json "encoding/json"
import math "math" import math "math"
// Reference imports to suppress errors if they are not otherwise used. // Reference proto, json, and math imports to suppress error if they are not otherwise used.
var _ = proto.Marshal var _ = proto.Marshal
var _ = &json.SyntaxError{}
var _ = math.Inf var _ = math.Inf
type Message struct { type Message struct {
DiagID *string `protobuf:"bytes,1,req" json:"DiagID,omitempty"` DiagID *string `protobuf:"bytes,1,req" json:"DiagID,omitempty"`
Data []byte `protobuf:"bytes,2,opt" json:"Data,omitempty"` Data []byte `protobuf:"bytes,2,opt" json:"Data,omitempty"`
Timeout *int64 `protobuf:"varint,3,opt" json:"Timeout,omitempty"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
} }
@ -44,5 +47,12 @@ func (m *Message) GetData() []byte {
return nil return nil
} }
func (m *Message) GetTimeout() int64 {
if m != nil && m.Timeout != nil {
return *m.Timeout
}
return 0
}
func init() { func init() {
} }

View File

@ -3,4 +3,5 @@ package diagnostics.pb;
message Message { message Message {
required string DiagID = 1; required string DiagID = 1;
optional bytes Data = 2; optional bytes Data = 2;
optional int64 Timeout = 3; // in nanoseconds
} }

View File

@ -0,0 +1,14 @@
package diagnostics_pb
import (
"time"
)
func (m *Message) GetTimeoutDuration() time.Duration {
return time.Duration(m.GetTimeout())
}
func (m *Message) SetTimeoutDuration(t time.Duration) {
it := int64(t)
m.Timeout = &it
}