]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Restart DHT announces at regular intervals
authorMatt Joiner <anacrolix@gmail.com>
Mon, 21 Jan 2019 02:46:26 +0000 (03:46 +0100)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 21 Jan 2019 21:54:03 +0000 (22:54 +0100)
The existing implementation would crawl the DHT until the address bloom filter was full. This could take ages unless enough peers were found to fill the pending nodes for the torrent to the high water mark. Fixes #301.

torrent.go

index a343f10a90380cdaf6a5a091b3c9f43a09562930..9e25349272eeb0b904a555a0a0385fb03954b6a8 100644 (file)
@@ -8,7 +8,6 @@ import (
        "io"
        "math"
        "math/rand"
-       "net"
        "net/url"
        "os"
        "sync"
@@ -1319,55 +1318,38 @@ func (t *Torrent) announceRequest() tracker.AnnounceRequest {
 
 // Adds peers revealed in an announce until the announce ends, or we have
 // enough peers.
-func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) {
+func (t *Torrent) consumeDhtAnnouncePeers(pvs <-chan dht.PeersValues) {
        cl := t.cl
-       // Count all the unique addresses we got during this announce.
-       allAddrs := make(map[string]struct{})
-       for {
-               select {
-               case v, ok := <-pvs:
-                       if !ok {
-                               return
-                       }
-                       addPeers := make([]Peer, 0, len(v.Peers))
-                       for _, cp := range v.Peers {
-                               if cp.Port == 0 {
-                                       // Can't do anything with this.
-                                       continue
-                               }
-                               addPeers = append(addPeers, Peer{
-                                       IP:     cp.IP[:],
-                                       Port:   cp.Port,
-                                       Source: peerSourceDHTGetPeers,
-                               })
-                               key := (&net.UDPAddr{
-                                       IP:   cp.IP[:],
-                                       Port: cp.Port,
-                               }).String()
-                               allAddrs[key] = struct{}{}
-                       }
-                       cl.lock()
-                       t.addPeers(addPeers)
-                       numPeers := t.peers.Len()
-                       cl.unlock()
-                       if numPeers >= cl.config.TorrentPeersHighWater {
-                               return
+       for v := range pvs {
+               cl.lock()
+               for _, cp := range v.Peers {
+                       if cp.Port == 0 {
+                               // Can't do anything with this.
+                               continue
                        }
-               case <-t.closed.LockedChan(cl.locker()):
-                       return
+                       t.addPeer(Peer{
+                               IP:     cp.IP[:],
+                               Port:   cp.Port,
+                               Source: peerSourceDHTGetPeers,
+                       })
                }
+               cl.unlock()
        }
 }
 
-func (t *Torrent) announceDHT(impliedPort bool, s *dht.Server) (err error) {
-       cl := t.cl
-       ps, err := s.Announce(t.infoHash, cl.incomingPeerPort(), impliedPort)
+func (t *Torrent) announceToDht(impliedPort bool, s *dht.Server) error {
+       ps, err := s.Announce(t.infoHash, t.cl.incomingPeerPort(), impliedPort)
        if err != nil {
-               return
+               return err
+       }
+       go t.consumeDhtAnnouncePeers(ps.Peers)
+       select {
+       case <-t.closed.LockedChan(t.cl.locker()):
+       case <-time.After(5 * time.Minute):
+       case <-t.wantPeersEvent.LockedChan(t.cl.locker()):
        }
-       t.consumeDHTAnnounce(ps.Peers)
        ps.Close()
-       return
+       return nil
 }
 
 func (t *Torrent) dhtAnnouncer(s *dht.Server) {
@@ -1378,7 +1360,7 @@ func (t *Torrent) dhtAnnouncer(s *dht.Server) {
                case <-t.closed.LockedChan(cl.locker()):
                        return
                }
-               err := t.announceDHT(true, s)
+               err := t.announceToDht(true, s)
                func() {
                        cl.lock()
                        defer cl.unlock()
@@ -1388,11 +1370,6 @@ func (t *Torrent) dhtAnnouncer(s *dht.Server) {
                                t.logger.Printf("error announcing %q to DHT: %s", t, err)
                        }
                }()
-               select {
-               case <-t.closed.LockedChan(cl.locker()):
-                       return
-               case <-time.After(5 * time.Minute):
-               }
        }
 }