From: Matt Joiner Date: Mon, 17 Nov 2014 23:47:36 +0000 (-0600) Subject: dht: Include the sender of peers in the peer stream X-Git-Tag: v1.0.0~1525 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=6d05994c2c394db9b88840c69d2cd882d739d06a;p=btrtrc.git dht: Include the sender of peers in the peer stream --- diff --git a/client.go b/client.go index eff77f3e..c3e99d0c 100644 --- a/client.go +++ b/client.go @@ -1310,13 +1310,13 @@ func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) { select { case <-nextScrape: break getPeers - case cps, ok := <-ps.Values: + case v, ok := <-ps.Values: if !ok { break getPeers } - peersFoundByDHT.Add(int64(len(cps))) + peersFoundByDHT.Add(int64(len(v.Peers))) err = cl.AddPeers(t.InfoHash, func() (ret []Peer) { - for _, cp := range cps { + for _, cp := range v.Peers { ret = append(ret, Peer{ IP: cp.IP[:], Port: int(cp.Port), diff --git a/cmd/dht-get-peers/main.go b/cmd/dht-get-peers/main.go index 550f958d..6fd582a1 100644 --- a/cmd/dht-get-peers/main.go +++ b/cmd/dht-get-peers/main.go @@ -87,7 +87,7 @@ func init() { if err != nil { log.Fatalf("error loading table: %s", err) } - log.Printf("dht server on %s, ID is %q", s.LocalAddr(), s.IDString()) + log.Printf("dht server on %s, ID is %x", s.LocalAddr(), s.IDString()) setupSignals() } @@ -138,8 +138,9 @@ getPeers: log.Fatal(err) } go func() { - for sl := range ps.Values { - for _, p := range sl { + for v := range ps.Values { + log.Printf("received %d peers from %x", len(v.Peers), v.NodeInfo.ID) + for _, p := range v.Peers { if _, ok := seen[p]; ok { continue } diff --git a/dht/dht.go b/dht/dht.go index 094a6bc6..b8acae4b 100644 --- a/dht/dht.go +++ b/dht/dht.go @@ -691,9 +691,14 @@ func (s *Server) findNode(addr dHTAddr, targetID string) (t *transaction, err er return } +type peerStreamValue struct { + Peers []util.CompactPeer // Peers given in get_peers response. + NodeInfo // The node that gave the response. +} + type peerStream struct { mu sync.Mutex - Values chan []util.CompactPeer + Values chan peerStreamValue stop chan struct{} } @@ -743,7 +748,7 @@ func extractValues(m Msg) (vs []util.CompactPeer) { func (s *Server) GetPeers(infoHash string) (ps *peerStream, err error) { ps = &peerStream{ - Values: make(chan []util.CompactPeer), + Values: make(chan peerStreamValue), stop: make(chan struct{}), } done := make(chan struct{}) @@ -761,8 +766,21 @@ func (s *Server) GetPeers(infoHash string) (ps *peerStream, err error) { case m := <-t.Response: vs := extractValues(m) if vs != nil { + nodeInfo := NodeInfo{ + Addr: t.remoteAddr, + } + id := func() string { + defer func() { + recover() + }() + return m["r"].(map[string]interface{})["id"].(string) + }() + copy(nodeInfo.ID[:], id) select { - case ps.Values <- vs: + case ps.Values <- peerStreamValue{ + Peers: vs, + NodeInfo: nodeInfo, + }: case <-ps.stop: } }