From 980cd69ab22d9e3e0178d21eed0f7481717d0a7b Mon Sep 17 00:00:00 2001
From: Matt Joiner <anacrolix@gmail.com>
Date: Sat, 23 Jul 2016 22:38:31 +1000
Subject: [PATCH] Break up the DHT announcer code into smaller functions, and
 add a 5 minute delay between announces

---
 client.go  |   2 +-
 torrent.go | 114 +++++++++++++++++++++++++++++++----------------------
 2 files changed, 67 insertions(+), 49 deletions(-)

diff --git a/client.go b/client.go
index 2ee5239b..27b340a5 100644
--- a/client.go
+++ b/client.go
@@ -1473,7 +1473,7 @@ func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bo
 	new = true
 	t = cl.newTorrent(infoHash)
 	if cl.dHT != nil {
-		go t.announceDHT(true)
+		go t.dhtAnnouncer()
 	}
 	cl.torrents[infoHash] = t
 	t.updateWantPeersEvent()
diff --git a/torrent.go b/torrent.go
index 29eec4c1..039e572d 100644
--- a/torrent.go
+++ b/torrent.go
@@ -23,6 +23,7 @@ import (
 	"github.com/bradfitz/iter"
 
 	"github.com/anacrolix/torrent/bencode"
+	"github.com/anacrolix/torrent/dht"
 	"github.com/anacrolix/torrent/metainfo"
 	pp "github.com/anacrolix/torrent/peer_protocol"
 	"github.com/anacrolix/torrent/storage"
@@ -1208,63 +1209,80 @@ func (t *Torrent) announceRequest() tracker.AnnounceRequest {
 	}
 }
 
-func (t *Torrent) announceDHT(impliedPort bool) {
+// Adds peers revealed in an announce until the announce ends, or we have
+// enough peers.
+func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) {
 	cl := t.cl
+	// Count all the unique addresses we got during this announce.
+	allAddrs := make(map[string]struct{})
 	for {
 		select {
-		case <-t.wantPeersEvent.LockedChan(&cl.mu):
+		case v, ok := <-pvs:
+			if !ok {
+				return
+			}
+			addPeers := make([]Peer, 0, len(v.Peers))
+			for _, cp := range v.Peers {
+				if cp.Port == 0 {
+					// Can't do anything with this.
+					continue
+				}
+				addPeers = append(addPeers, Peer{
+					IP:     cp.IP[:],
+					Port:   cp.Port,
+					Source: peerSourceDHT,
+				})
+				key := (&net.UDPAddr{
+					IP:   cp.IP[:],
+					Port: cp.Port,
+				}).String()
+				allAddrs[key] = struct{}{}
+			}
+			cl.mu.Lock()
+			t.addPeers(addPeers)
+			numPeers := len(t.peers)
+			cl.mu.Unlock()
+			if numPeers >= torrentPeersHighWater {
+				return
+			}
 		case <-t.closed.LockedChan(&cl.mu):
 			return
 		}
-		// log.Printf("getting peers for %q from DHT", t)
-		ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
-		if err != nil {
-			log.Printf("error getting peers from dht: %s", err)
+	}
+}
+
+func (t *Torrent) announceDHT(impliedPort bool) (err error) {
+	cl := t.cl
+	ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
+	if err != nil {
+		return
+	}
+	t.consumeDHTAnnounce(ps.Peers)
+	ps.Close()
+	return
+}
+
+func (t *Torrent) dhtAnnouncer() {
+	cl := t.cl
+	for {
+		select {
+		case <-t.wantPeersEvent.LockedChan(&cl.mu):
+		case <-t.closed.LockedChan(&cl.mu):
 			return
 		}
-		cl.mu.Lock()
-		t.numDHTAnnounces++
-		cl.mu.Unlock()
-		// Count all the unique addresses we got during this announce.
-		allAddrs := make(map[string]struct{})
-	getPeers:
-		for {
-			select {
-			case v, ok := <-ps.Peers:
-				if !ok {
-					break getPeers
-				}
-				addPeers := make([]Peer, 0, len(v.Peers))
-				for _, cp := range v.Peers {
-					if cp.Port == 0 {
-						// Can't do anything with this.
-						continue
-					}
-					addPeers = append(addPeers, Peer{
-						IP:     cp.IP[:],
-						Port:   cp.Port,
-						Source: peerSourceDHT,
-					})
-					key := (&net.UDPAddr{
-						IP:   cp.IP[:],
-						Port: cp.Port,
-					}).String()
-					allAddrs[key] = struct{}{}
-				}
-				cl.mu.Lock()
-				t.addPeers(addPeers)
-				numPeers := len(t.peers)
-				cl.mu.Unlock()
-				if numPeers >= torrentPeersHighWater {
-					break getPeers
-				}
-			case <-t.closed.LockedChan(&cl.mu):
-				ps.Close()
-				return
-			}
+		err := t.announceDHT(true)
+		if err == nil {
+			cl.mu.Lock()
+			t.numDHTAnnounces++
+			cl.mu.Unlock()
+		} else {
+			log.Printf("error announcing %q to DHT: %s", t, err)
+		}
+		select {
+		case <-t.closed.LockedChan(&cl.mu):
+			return
+		case <-time.After(5 * time.Minute):
 		}
-		ps.Close()
-		// log.Printf("finished DHT peer scrape for %s: %d peers", t, len(allAddrs))
 	}
 }
 
-- 
2.51.0