diff --git a/routing/dht/dht.go b/routing/dht/dht.go index c8b3bdaba..da58089f3 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -159,8 +159,37 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) (msg. return rmes, nil } -func (dht *IpfsDHT) getValueOrPeers(p *peer.Peer, key u.Key, timeout time.Duration, level int) ([]byte, []*peer.Peer, error) { - pmes, err := dht.getValueSingle(p, key, timeout, level) +// sendRequest sends out a request using dht.sender, but also makes sure to +// measure the RTT for latency measurements. +func (dht *IpfsDHT) sendRequest(ctx context.Context, p *peer.Peer, pmes *Message) (*Message, error) { + + mes, err := msg.FromObject(p, pmes) + if err != nil { + return nil, err + } + + start := time.Now() + + rmes, err := dht.sender.SendRequest(ctx, mes) + if err != nil { + return nil, err + } + + rtt := time.Since(start) + rmes.Peer().SetLatency(rtt) + + rpmes := new(Message) + if err := proto.Unmarshal(rmes.Data(), rpmes); err != nil { + return nil, err + } + + return rpmes, nil +} + +func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer, + key u.Key, level int) ([]byte, []*peer.Peer, error) { + + pmes, err := dht.getValueSingle(ctx, p, key, level) if err != nil { return nil, nil, err } @@ -202,39 +231,15 @@ func (dht *IpfsDHT) getValueOrPeers(p *peer.Peer, key u.Key, timeout time.Durati } // getValueSingle simply performs the get value RPC with the given parameters -func (dht *IpfsDHT) getValueSingle(p *peer.Peer, key u.Key, timeout time.Duration, level int) (*Message, error) { - pmes := Message{ - Type: Message_GET_VALUE, - Key: string(key), - Value: []byte{byte(level)}, - ID: swarm.GenerateMessageID(), - } - responseChan := dht.listener.Listen(pmes.ID, 1, time.Minute) +func (dht *IpfsDHT) getValueSingle(ctx context.Context, p *peer.Peer, + key u.Key, level int) (*Message, error) { - mes := swarm.NewMessage(p, pmes.ToProtobuf()) - t := time.Now() - dht.netChan.Outgoing <- mes + typ := Message_GET_VALUE + skey := string(key) + pmes := &Message{Type: &typ, Key: &skey} + pmes.SetClusterLevel(int32(level)) - // Wait for either the response or a timeout - timeup := time.After(timeout) - select { - case <-timeup: - dht.listener.Unlisten(pmes.ID) - return nil, u.ErrTimeout - case resp, ok := <-responseChan: - if !ok { - u.PErr("response channel closed before timeout, please investigate.\n") - return nil, u.ErrTimeout - } - roundtrip := time.Since(t) - resp.Peer.SetLatency(roundtrip) - pmesOut := new(Message) - err := proto.Unmarshal(resp.Data, pmesOut) - if err != nil { - return nil, err - } - return pmesOut, nil - } + return dht.sendRequest(ctx, p, pmes) } // TODO: Im not certain on this implementation, we get a list of peers/providers