]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Add pending peers high water mark to stop excessively long running DHT queries
authorMatt Joiner <anacrolix@gmail.com>
Tue, 9 Dec 2014 03:58:49 +0000 (21:58 -0600)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 9 Dec 2014 03:58:49 +0000 (21:58 -0600)
client.go

index f03157626477c26a03c4e509571706dfc17a30d5..aeff48c0b8a0eb257edeeb1490a186feb5facdb8 100644 (file)
--- a/client.go
+++ b/client.go
@@ -76,7 +76,9 @@ const (
        // DHT: http://www.bittorrent.org/beps/bep_0005.html
        extensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x01"
 
-       socketsPerTorrent = 40
+       socketsPerTorrent     = 40
+       torrentPeersHighWater = 1000
+       torrentPeersLowWater  = socketsPerTorrent * 5
 )
 
 // Currently doesn't really queue, but should in the future.
@@ -1451,14 +1453,7 @@ func (me *Client) openNewConns(t *torrent) {
        t.wantPeers.Broadcast()
 }
 
-// Adds peers to the swarm for the torrent corresponding to infoHash.
-func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error {
-       me.mu.Lock()
-       defer me.mu.Unlock()
-       t := me.torrent(infoHash)
-       if t == nil {
-               return errors.New("no such torrent")
-       }
+func (me *Client) addPeers(t *torrent, peers []Peer) {
        blocked := 0
        for i, p := range peers {
                if me.ipBlockRange(p.IP) == nil {
@@ -1474,6 +1469,17 @@ func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error {
        }
        t.AddPeers(peers)
        me.openNewConns(t)
+}
+
+// Adds peers to the swarm for the torrent corresponding to infoHash.
+func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error {
+       me.mu.Lock()
+       defer me.mu.Unlock()
+       t := me.torrent(infoHash)
+       if t == nil {
+               return errors.New("no such torrent")
+       }
+       me.addPeers(t, peers)
        return nil
 }
 
@@ -1783,7 +1789,7 @@ func (cl *Client) waitWantPeers(t *torrent) bool {
                        return false
                default:
                }
-               if len(t.Peers) < socketsPerTorrent*5 {
+               if len(t.Peers) < torrentPeersLowWater {
                        return true
                }
                cl.mu.Unlock()
@@ -1818,7 +1824,8 @@ func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) {
                                        }).String()] = struct{}{}
                                }
                                // log.Printf("%s: %d new peers from DHT", t, len(v.Peers))
-                               err = cl.AddPeers(t.InfoHash, func() (ret []Peer) {
+                               cl.mu.Lock()
+                               cl.addPeers(t, func() (ret []Peer) {
                                        for _, cp := range v.Peers {
                                                ret = append(ret, Peer{
                                                        IP:     cp.IP[:],
@@ -1828,8 +1835,9 @@ func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) {
                                        }
                                        return
                                }())
-                               if err != nil {
-                                       log.Printf("error adding peers from dht for torrent %q: %s", t, err)
+                               numPeers := len(t.Peers)
+                               cl.mu.Unlock()
+                               if numPeers >= torrentPeersHighWater {
                                        break getPeers
                                }
                        case <-t.ceasingNetworking: