From: Matt Joiner Date: Mon, 21 Jan 2019 02:46:26 +0000 (+0100) Subject: Restart DHT announces at regular intervals X-Git-Tag: v1.1.0~20 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=69c39f003623606c32050346098d46f53b101487;p=btrtrc.git Restart DHT announces at regular intervals The existing implementation would crawl the DHT until the address bloom filter was full. This could take ages unless enough peers were found to fill the pending nodes for the torrent to the high water mark. Fixes #301. --- diff --git a/torrent.go b/torrent.go index a343f10a..9e253492 100644 --- a/torrent.go +++ b/torrent.go @@ -8,7 +8,6 @@ import ( "io" "math" "math/rand" - "net" "net/url" "os" "sync" @@ -1319,55 +1318,38 @@ func (t *Torrent) announceRequest() tracker.AnnounceRequest { // Adds peers revealed in an announce until the announce ends, or we have // enough peers. -func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) { +func (t *Torrent) consumeDhtAnnouncePeers(pvs <-chan dht.PeersValues) { cl := t.cl - // Count all the unique addresses we got during this announce. - allAddrs := make(map[string]struct{}) - for { - select { - case v, ok := <-pvs: - if !ok { - return - } - addPeers := make([]Peer, 0, len(v.Peers)) - for _, cp := range v.Peers { - if cp.Port == 0 { - // Can't do anything with this. - continue - } - addPeers = append(addPeers, Peer{ - IP: cp.IP[:], - Port: cp.Port, - Source: peerSourceDHTGetPeers, - }) - key := (&net.UDPAddr{ - IP: cp.IP[:], - Port: cp.Port, - }).String() - allAddrs[key] = struct{}{} - } - cl.lock() - t.addPeers(addPeers) - numPeers := t.peers.Len() - cl.unlock() - if numPeers >= cl.config.TorrentPeersHighWater { - return + for v := range pvs { + cl.lock() + for _, cp := range v.Peers { + if cp.Port == 0 { + // Can't do anything with this. + continue } - case <-t.closed.LockedChan(cl.locker()): - return + t.addPeer(Peer{ + IP: cp.IP[:], + Port: cp.Port, + Source: peerSourceDHTGetPeers, + }) } + cl.unlock() } } -func (t *Torrent) announceDHT(impliedPort bool, s *dht.Server) (err error) { - cl := t.cl - ps, err := s.Announce(t.infoHash, cl.incomingPeerPort(), impliedPort) +func (t *Torrent) announceToDht(impliedPort bool, s *dht.Server) error { + ps, err := s.Announce(t.infoHash, t.cl.incomingPeerPort(), impliedPort) if err != nil { - return + return err + } + go t.consumeDhtAnnouncePeers(ps.Peers) + select { + case <-t.closed.LockedChan(t.cl.locker()): + case <-time.After(5 * time.Minute): + case <-t.wantPeersEvent.LockedChan(t.cl.locker()): } - t.consumeDHTAnnounce(ps.Peers) ps.Close() - return + return nil } func (t *Torrent) dhtAnnouncer(s *dht.Server) { @@ -1378,7 +1360,7 @@ func (t *Torrent) dhtAnnouncer(s *dht.Server) { case <-t.closed.LockedChan(cl.locker()): return } - err := t.announceDHT(true, s) + err := t.announceToDht(true, s) func() { cl.lock() defer cl.unlock() @@ -1388,11 +1370,6 @@ func (t *Torrent) dhtAnnouncer(s *dht.Server) { t.logger.Printf("error announcing %q to DHT: %s", t, err) } }() - select { - case <-t.closed.LockedChan(cl.locker()): - return - case <-time.After(5 * time.Minute): - } } }