]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Break up the DHT announcer code into smaller functions, and add a 5 minute delay...
authorMatt Joiner <anacrolix@gmail.com>
Sat, 23 Jul 2016 12:38:31 +0000 (22:38 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Sat, 23 Jul 2016 12:38:31 +0000 (22:38 +1000)
client.go
torrent.go

index 2ee5239b6e35bfa92be2e826155604c38df82c4c..27b340a59773f9ad1324406451e6f1000d81fe20 100644 (file)
--- a/client.go
+++ b/client.go
@@ -1473,7 +1473,7 @@ func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bo
        new = true
        t = cl.newTorrent(infoHash)
        if cl.dHT != nil {
-               go t.announceDHT(true)
+               go t.dhtAnnouncer()
        }
        cl.torrents[infoHash] = t
        t.updateWantPeersEvent()
index 29eec4c11b0e78be30517c40bbdae0415d79696f..039e572dc2b2bb92b1a884ecc8cdacd002c03180 100644 (file)
@@ -23,6 +23,7 @@ import (
        "github.com/bradfitz/iter"
 
        "github.com/anacrolix/torrent/bencode"
+       "github.com/anacrolix/torrent/dht"
        "github.com/anacrolix/torrent/metainfo"
        pp "github.com/anacrolix/torrent/peer_protocol"
        "github.com/anacrolix/torrent/storage"
@@ -1208,63 +1209,80 @@ func (t *Torrent) announceRequest() tracker.AnnounceRequest {
        }
 }
 
-func (t *Torrent) announceDHT(impliedPort bool) {
+// Adds peers revealed in an announce until the announce ends, or we have
+// enough peers.
+func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) {
        cl := t.cl
+       // Count all the unique addresses we got during this announce.
+       allAddrs := make(map[string]struct{})
        for {
                select {
-               case <-t.wantPeersEvent.LockedChan(&cl.mu):
+               case v, ok := <-pvs:
+                       if !ok {
+                               return
+                       }
+                       addPeers := make([]Peer, 0, len(v.Peers))
+                       for _, cp := range v.Peers {
+                               if cp.Port == 0 {
+                                       // Can't do anything with this.
+                                       continue
+                               }
+                               addPeers = append(addPeers, Peer{
+                                       IP:     cp.IP[:],
+                                       Port:   cp.Port,
+                                       Source: peerSourceDHT,
+                               })
+                               key := (&net.UDPAddr{
+                                       IP:   cp.IP[:],
+                                       Port: cp.Port,
+                               }).String()
+                               allAddrs[key] = struct{}{}
+                       }
+                       cl.mu.Lock()
+                       t.addPeers(addPeers)
+                       numPeers := len(t.peers)
+                       cl.mu.Unlock()
+                       if numPeers >= torrentPeersHighWater {
+                               return
+                       }
                case <-t.closed.LockedChan(&cl.mu):
                        return
                }
-               // log.Printf("getting peers for %q from DHT", t)
-               ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
-               if err != nil {
-                       log.Printf("error getting peers from dht: %s", err)
+       }
+}
+
+func (t *Torrent) announceDHT(impliedPort bool) (err error) {
+       cl := t.cl
+       ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
+       if err != nil {
+               return
+       }
+       t.consumeDHTAnnounce(ps.Peers)
+       ps.Close()
+       return
+}
+
+func (t *Torrent) dhtAnnouncer() {
+       cl := t.cl
+       for {
+               select {
+               case <-t.wantPeersEvent.LockedChan(&cl.mu):
+               case <-t.closed.LockedChan(&cl.mu):
                        return
                }
-               cl.mu.Lock()
-               t.numDHTAnnounces++
-               cl.mu.Unlock()
-               // Count all the unique addresses we got during this announce.
-               allAddrs := make(map[string]struct{})
-       getPeers:
-               for {
-                       select {
-                       case v, ok := <-ps.Peers:
-                               if !ok {
-                                       break getPeers
-                               }
-                               addPeers := make([]Peer, 0, len(v.Peers))
-                               for _, cp := range v.Peers {
-                                       if cp.Port == 0 {
-                                               // Can't do anything with this.
-                                               continue
-                                       }
-                                       addPeers = append(addPeers, Peer{
-                                               IP:     cp.IP[:],
-                                               Port:   cp.Port,
-                                               Source: peerSourceDHT,
-                                       })
-                                       key := (&net.UDPAddr{
-                                               IP:   cp.IP[:],
-                                               Port: cp.Port,
-                                       }).String()
-                                       allAddrs[key] = struct{}{}
-                               }
-                               cl.mu.Lock()
-                               t.addPeers(addPeers)
-                               numPeers := len(t.peers)
-                               cl.mu.Unlock()
-                               if numPeers >= torrentPeersHighWater {
-                                       break getPeers
-                               }
-                       case <-t.closed.LockedChan(&cl.mu):
-                               ps.Close()
-                               return
-                       }
+               err := t.announceDHT(true)
+               if err == nil {
+                       cl.mu.Lock()
+                       t.numDHTAnnounces++
+                       cl.mu.Unlock()
+               } else {
+                       log.Printf("error announcing %q to DHT: %s", t, err)
+               }
+               select {
+               case <-t.closed.LockedChan(&cl.mu):
+                       return
+               case <-time.After(5 * time.Minute):
                }
-               ps.Close()
-               // log.Printf("finished DHT peer scrape for %s: %d peers", t, len(allAddrs))
        }
 }