From: Matt Joiner Date: Sat, 23 Jul 2016 12:38:31 +0000 (+1000) Subject: Break up the DHT announcer code into smaller functions, and add a 5 minute delay... X-Git-Tag: v1.0.0~630 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=980cd69ab22d9e3e0178d21eed0f7481717d0a7b;p=btrtrc.git Break up the DHT announcer code into smaller functions, and add a 5 minute delay between announces --- diff --git a/client.go b/client.go index 2ee5239b..27b340a5 100644 --- a/client.go +++ b/client.go @@ -1473,7 +1473,7 @@ func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bo new = true t = cl.newTorrent(infoHash) if cl.dHT != nil { - go t.announceDHT(true) + go t.dhtAnnouncer() } cl.torrents[infoHash] = t t.updateWantPeersEvent() diff --git a/torrent.go b/torrent.go index 29eec4c1..039e572d 100644 --- a/torrent.go +++ b/torrent.go @@ -23,6 +23,7 @@ import ( "github.com/bradfitz/iter" "github.com/anacrolix/torrent/bencode" + "github.com/anacrolix/torrent/dht" "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" "github.com/anacrolix/torrent/storage" @@ -1208,63 +1209,80 @@ func (t *Torrent) announceRequest() tracker.AnnounceRequest { } } -func (t *Torrent) announceDHT(impliedPort bool) { +// Adds peers revealed in an announce until the announce ends, or we have +// enough peers. +func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) { cl := t.cl + // Count all the unique addresses we got during this announce. + allAddrs := make(map[string]struct{}) for { select { - case <-t.wantPeersEvent.LockedChan(&cl.mu): + case v, ok := <-pvs: + if !ok { + return + } + addPeers := make([]Peer, 0, len(v.Peers)) + for _, cp := range v.Peers { + if cp.Port == 0 { + // Can't do anything with this. + continue + } + addPeers = append(addPeers, Peer{ + IP: cp.IP[:], + Port: cp.Port, + Source: peerSourceDHT, + }) + key := (&net.UDPAddr{ + IP: cp.IP[:], + Port: cp.Port, + }).String() + allAddrs[key] = struct{}{} + } + cl.mu.Lock() + t.addPeers(addPeers) + numPeers := len(t.peers) + cl.mu.Unlock() + if numPeers >= torrentPeersHighWater { + return + } case <-t.closed.LockedChan(&cl.mu): return } - // log.Printf("getting peers for %q from DHT", t) - ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort) - if err != nil { - log.Printf("error getting peers from dht: %s", err) + } +} + +func (t *Torrent) announceDHT(impliedPort bool) (err error) { + cl := t.cl + ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort) + if err != nil { + return + } + t.consumeDHTAnnounce(ps.Peers) + ps.Close() + return +} + +func (t *Torrent) dhtAnnouncer() { + cl := t.cl + for { + select { + case <-t.wantPeersEvent.LockedChan(&cl.mu): + case <-t.closed.LockedChan(&cl.mu): return } - cl.mu.Lock() - t.numDHTAnnounces++ - cl.mu.Unlock() - // Count all the unique addresses we got during this announce. - allAddrs := make(map[string]struct{}) - getPeers: - for { - select { - case v, ok := <-ps.Peers: - if !ok { - break getPeers - } - addPeers := make([]Peer, 0, len(v.Peers)) - for _, cp := range v.Peers { - if cp.Port == 0 { - // Can't do anything with this. - continue - } - addPeers = append(addPeers, Peer{ - IP: cp.IP[:], - Port: cp.Port, - Source: peerSourceDHT, - }) - key := (&net.UDPAddr{ - IP: cp.IP[:], - Port: cp.Port, - }).String() - allAddrs[key] = struct{}{} - } - cl.mu.Lock() - t.addPeers(addPeers) - numPeers := len(t.peers) - cl.mu.Unlock() - if numPeers >= torrentPeersHighWater { - break getPeers - } - case <-t.closed.LockedChan(&cl.mu): - ps.Close() - return - } + err := t.announceDHT(true) + if err == nil { + cl.mu.Lock() + t.numDHTAnnounces++ + cl.mu.Unlock() + } else { + log.Printf("error announcing %q to DHT: %s", t, err) + } + select { + case <-t.closed.LockedChan(&cl.mu): + return + case <-time.After(5 * time.Minute): } - ps.Close() - // log.Printf("finished DHT peer scrape for %s: %d peers", t, len(allAddrs)) } }