diff --git a/Godeps/_workspace/src/github.com/jbenet/go-logging/examples/example.go b/Godeps/_workspace/src/github.com/jbenet/go-logging/examples/example.go index 28168d00e..0ad8a7bee 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-logging/examples/example.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-logging/examples/example.go @@ -3,7 +3,7 @@ package main import ( "os" - "github.com/op/go-logging" + "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-logging" ) var log = logging.MustGetLogger("example") diff --git a/diagnostics/diag.go b/diagnostics/diag.go index f745e2c28..c54b7dd51 100644 --- a/diagnostics/diag.go +++ b/diagnostics/diag.go @@ -7,6 +7,7 @@ import ( "bytes" "encoding/json" "errors" + "fmt" "io" "sync" "time" @@ -33,6 +34,7 @@ var log = util.Logger("diagnostics") var ProtocolDiag protocol.ID = "/ipfs/diagnostics" const ResponseTimeout = time.Second * 10 +const HopTimeoutDecrement = time.Second * 2 // Diagnostics is a net service that manages requesting and responding to diagnostic // requests @@ -149,39 +151,24 @@ func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*DiagInfo, error) peers := d.getPeers() log.Debugf("Sending diagnostic request to %d peers.", len(peers)) + pmes := newMessage(diagID) + + pmes.SetTimeoutDuration(timeout - HopTimeoutDecrement) // decrease timeout per hop + dpeers, err := d.getDiagnosticFromPeers(ctx, d.getPeers(), pmes) + if err != nil { + return nil, fmt.Errorf("diagnostic from peers err: %s", err) + } + var out []*DiagInfo di := d.getDiagInfo() out = append(out, di) - - pmes := newMessage(diagID) - - respdata := make(chan []byte) - sends := 0 - for p, _ := range peers { - log.Debugf("Sending getDiagnostic to: %s", p) - sends++ - go func(p peer.ID) { - data, err := d.getDiagnosticFromPeer(ctx, p, pmes) - if err != nil { - log.Errorf("GetDiagnostic error: %v", err) - respdata <- nil - return - } - respdata <- data - }(p) - } - - for i := 0; i < sends; i++ { - data := <-respdata - if data == nil { - continue - } - out = appendDiagnostics(data, out) + for _, dpi := range dpeers { + out = appendDiagnostics(out, dpi) } return out, nil } -func appendDiagnostics(data []byte, cur []*DiagInfo) []*DiagInfo { +func appendDiagnostics(cur []*DiagInfo, data []byte) []*DiagInfo { buf := bytes.NewBuffer(data) dec := json.NewDecoder(buf) for { @@ -198,6 +185,38 @@ func appendDiagnostics(data []byte, cur []*DiagInfo) []*DiagInfo { return cur } +func (d *Diagnostics) getDiagnosticFromPeers(ctx context.Context, peers map[peer.ID]int, pmes *pb.Message) ([][]byte, error) { + timeout := pmes.GetTimeoutDuration() + if timeout < 1 { + return nil, fmt.Errorf("timeout too short: %s", timeout) + } + ctx, _ = context.WithTimeout(ctx, timeout) + + respdata := make(chan []byte) + sendcount := 0 + for p, _ := range peers { + log.Debugf("Sending diagnostic request to peer: %s", p) + sendcount++ + go func(p peer.ID) { + out, err := d.getDiagnosticFromPeer(ctx, p, pmes) + if err != nil { + log.Errorf("getDiagnostic error: %v", err) + respdata <- nil + return + } + respdata <- out + }(p) + } + + outall := make([][]byte, 0, len(peers)) + for i := 0; i < sendcount; i++ { + out := <-respdata + outall = append(outall, out) + } + + return outall, nil +} + // TODO: this method no longer needed. func (d *Diagnostics) getDiagnosticFromPeer(ctx context.Context, p peer.ID, mes *pb.Message) ([]byte, error) { rpmes, err := d.sendRequest(ctx, p, mes) @@ -259,38 +278,17 @@ func (d *Diagnostics) handleDiagnostic(p peer.ID, pmes *pb.Message) (*pb.Message d.diagMap[pmes.GetDiagID()] = time.Now() d.diagLock.Unlock() - buf := new(bytes.Buffer) di := d.getDiagInfo() - buf.Write(di.Marshal()) - - ctx, _ := context.WithTimeout(context.TODO(), ResponseTimeout) - - respdata := make(chan []byte) - sendcount := 0 - for p, _ := range d.getPeers() { - log.Debugf("Sending diagnostic request to peer: %s", p) - sendcount++ - go func(p peer.ID) { - out, err := d.getDiagnosticFromPeer(ctx, p, pmes) - if err != nil { - log.Errorf("getDiagnostic error: %v", err) - respdata <- nil - return - } - respdata <- out - }(p) - } - - for i := 0; i < sendcount; i++ { - out := <-respdata - _, err := buf.Write(out) - if err != nil { - log.Errorf("getDiagnostic write output error: %v", err) - continue + resp.Data = di.Marshal() + dpeers, err := d.getDiagnosticFromPeers(context.TODO(), d.getPeers(), pmes) + if err != nil { + log.Errorf("diagnostic from peers err: %s", err) + } else { + for _, b := range dpeers { + resp.Data = append(resp.Data, b...) // concatenate them all. } } - resp.Data = buf.Bytes() return resp, nil } diff --git a/diagnostics/internal/pb/diagnostics.pb.go b/diagnostics/internal/pb/diagnostics.pb.go index 4aa721711..0da512c3e 100644 --- a/diagnostics/internal/pb/diagnostics.pb.go +++ b/diagnostics/internal/pb/diagnostics.pb.go @@ -14,15 +14,18 @@ It has these top-level messages: package diagnostics_pb import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto" +import json "encoding/json" import math "math" -// Reference imports to suppress errors if they are not otherwise used. +// Reference proto, json, and math imports to suppress error if they are not otherwise used. var _ = proto.Marshal +var _ = &json.SyntaxError{} var _ = math.Inf type Message struct { DiagID *string `protobuf:"bytes,1,req" json:"DiagID,omitempty"` Data []byte `protobuf:"bytes,2,opt" json:"Data,omitempty"` + Timeout *int64 `protobuf:"varint,3,opt" json:"Timeout,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -44,5 +47,12 @@ func (m *Message) GetData() []byte { return nil } +func (m *Message) GetTimeout() int64 { + if m != nil && m.Timeout != nil { + return *m.Timeout + } + return 0 +} + func init() { } diff --git a/diagnostics/internal/pb/diagnostics.proto b/diagnostics/internal/pb/diagnostics.proto index 3ffe2f523..2202f7c24 100644 --- a/diagnostics/internal/pb/diagnostics.proto +++ b/diagnostics/internal/pb/diagnostics.proto @@ -3,4 +3,5 @@ package diagnostics.pb; message Message { required string DiagID = 1; optional bytes Data = 2; + optional int64 Timeout = 3; // in nanoseconds } diff --git a/diagnostics/internal/pb/timeout.go b/diagnostics/internal/pb/timeout.go new file mode 100644 index 000000000..f2043c0e7 --- /dev/null +++ b/diagnostics/internal/pb/timeout.go @@ -0,0 +1,14 @@ +package diagnostics_pb + +import ( + "time" +) + +func (m *Message) GetTimeoutDuration() time.Duration { + return time.Duration(m.GetTimeout()) +} + +func (m *Message) SetTimeoutDuration(t time.Duration) { + it := int64(t) + m.Timeout = &it +}