From 69c39f003623606c32050346098d46f53b101487 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 21 Jan 2019 03:46:26 +0100 Subject: [PATCH] Restart DHT announces at regular intervals The existing implementation would crawl the DHT until the address bloom filter was full. This could take ages unless enough peers were found to fill the pending nodes for the torrent to the high water mark. Fixes #301. --- torrent.go | 71 ++++++++++++++++++------------------------------------ 1 file changed, 24 insertions(+), 47 deletions(-) diff --git a/torrent.go b/torrent.go index a343f10a..9e253492 100644 --- a/torrent.go +++ b/torrent.go @@ -8,7 +8,6 @@ import ( "io" "math" "math/rand" - "net" "net/url" "os" "sync" @@ -1319,55 +1318,38 @@ func (t *Torrent) announceRequest() tracker.AnnounceRequest { // Adds peers revealed in an announce until the announce ends, or we have // enough peers. -func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) { +func (t *Torrent) consumeDhtAnnouncePeers(pvs <-chan dht.PeersValues) { cl := t.cl - // Count all the unique addresses we got during this announce. - allAddrs := make(map[string]struct{}) - for { - select { - case v, ok := <-pvs: - if !ok { - return - } - addPeers := make([]Peer, 0, len(v.Peers)) - for _, cp := range v.Peers { - if cp.Port == 0 { - // Can't do anything with this. - continue - } - addPeers = append(addPeers, Peer{ - IP: cp.IP[:], - Port: cp.Port, - Source: peerSourceDHTGetPeers, - }) - key := (&net.UDPAddr{ - IP: cp.IP[:], - Port: cp.Port, - }).String() - allAddrs[key] = struct{}{} - } - cl.lock() - t.addPeers(addPeers) - numPeers := t.peers.Len() - cl.unlock() - if numPeers >= cl.config.TorrentPeersHighWater { - return + for v := range pvs { + cl.lock() + for _, cp := range v.Peers { + if cp.Port == 0 { + // Can't do anything with this. + continue } - case <-t.closed.LockedChan(cl.locker()): - return + t.addPeer(Peer{ + IP: cp.IP[:], + Port: cp.Port, + Source: peerSourceDHTGetPeers, + }) } + cl.unlock() } } -func (t *Torrent) announceDHT(impliedPort bool, s *dht.Server) (err error) { - cl := t.cl - ps, err := s.Announce(t.infoHash, cl.incomingPeerPort(), impliedPort) +func (t *Torrent) announceToDht(impliedPort bool, s *dht.Server) error { + ps, err := s.Announce(t.infoHash, t.cl.incomingPeerPort(), impliedPort) if err != nil { - return + return err + } + go t.consumeDhtAnnouncePeers(ps.Peers) + select { + case <-t.closed.LockedChan(t.cl.locker()): + case <-time.After(5 * time.Minute): + case <-t.wantPeersEvent.LockedChan(t.cl.locker()): } - t.consumeDHTAnnounce(ps.Peers) ps.Close() - return + return nil } func (t *Torrent) dhtAnnouncer(s *dht.Server) { @@ -1378,7 +1360,7 @@ func (t *Torrent) dhtAnnouncer(s *dht.Server) { case <-t.closed.LockedChan(cl.locker()): return } - err := t.announceDHT(true, s) + err := t.announceToDht(true, s) func() { cl.lock() defer cl.unlock() @@ -1388,11 +1370,6 @@ func (t *Torrent) dhtAnnouncer(s *dht.Server) { t.logger.Printf("error announcing %q to DHT: %s", t, err) } }() - select { - case <-t.closed.LockedChan(cl.locker()): - return - case <-time.After(5 * time.Minute): - } } } -- 2.48.1