diff --git a/routing/dht/dht.go b/routing/dht/dht.go index e0553c9a7..91ad57307 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -1,7 +1,6 @@ package dht import ( - "bytes" "crypto/rand" "errors" "fmt" @@ -341,62 +340,67 @@ type providerInfo struct { func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *Message) { key := u.Key(pmes.GetKey()) - u.DOut("[%s] Adding [%s] as a provider for '%s'\n", dht.self.ID.Pretty(), p.ID.Pretty(), peer.ID(key).Pretty()) + u.DOut("[%s] Adding [%s] as a provider for '%s'\n", + dht.self.ID.Pretty(), p.ID.Pretty(), peer.ID(key).Pretty()) dht.providers.AddProvider(key, p) } // Halt stops all communications from this peer and shut down +// TODO -- remove this in favor of context func (dht *IpfsDHT) Halt() { dht.shutdown <- struct{}{} dht.network.Close() dht.providers.Halt() - dht.listener.Halt() } // NOTE: not yet finished, low priority -func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *Message) { +func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *Message) (*Message, error) { seq := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10) - listenChan := dht.listener.Listen(pmes.GetId(), len(seq), time.Second*30) for _, ps := range seq { - mes := swarm.NewMessage(ps, pmes) - dht.netChan.Outgoing <- mes - } - - buf := new(bytes.Buffer) - di := dht.getDiagInfo() - buf.Write(di.Marshal()) - - // NOTE: this shouldnt be a hardcoded value - after := time.After(time.Second * 20) - count := len(seq) - for count > 0 { - select { - case <-after: - //Timeout, return what we have - goto out - case reqResp := <-listenChan: - pmesOut := new(Message) - err := proto.Unmarshal(reqResp.Data, pmesOut) - if err != nil { - // It broke? eh, whatever, keep going - continue - } - buf.Write(reqResp.Data) - count-- + mes, err := msg.FromObject(ps, pmes) + if err != nil { + u.PErr("handleDiagnostics error creating message: %v\n", err) + continue } + // dht.sender.SendRequest(context.TODO(), mes) } + return nil, errors.New("not yet ported back") -out: - resp := Message{ - Type: Message_DIAGNOSTIC, - ID: pmes.GetId(), - Value: buf.Bytes(), - Response: true, - } - - mes := swarm.NewMessage(p, resp.ToProtobuf()) - dht.netChan.Outgoing <- mes + // buf := new(bytes.Buffer) + // di := dht.getDiagInfo() + // buf.Write(di.Marshal()) + // + // // NOTE: this shouldnt be a hardcoded value + // after := time.After(time.Second * 20) + // count := len(seq) + // for count > 0 { + // select { + // case <-after: + // //Timeout, return what we have + // goto out + // case reqResp := <-listenChan: + // pmesOut := new(Message) + // err := proto.Unmarshal(reqResp.Data, pmesOut) + // if err != nil { + // // It broke? eh, whatever, keep going + // continue + // } + // buf.Write(reqResp.Data) + // count-- + // } + // } + // + // out: + // resp := Message{ + // Type: Message_DIAGNOSTIC, + // ID: pmes.GetId(), + // Value: buf.Bytes(), + // Response: true, + // } + // + // mes := swarm.NewMessage(p, resp.ToProtobuf()) + // dht.netChan.Outgoing <- mes } func (dht *IpfsDHT) getValueOrPeers(p *peer.Peer, key u.Key, timeout time.Duration, level int) ([]byte, []*peer.Peer, error) {