From: Matt Joiner Date: Tue, 9 Dec 2014 03:58:49 +0000 (-0600) Subject: Add pending peers high water mark to stop excessively long running DHT queries X-Git-Tag: v1.0.0~1398 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=48582796ed80c4a7ebb714d3160de0288bdff5fa;p=btrtrc.git Add pending peers high water mark to stop excessively long running DHT queries --- diff --git a/client.go b/client.go index f0315762..aeff48c0 100644 --- a/client.go +++ b/client.go @@ -76,7 +76,9 @@ const ( // DHT: http://www.bittorrent.org/beps/bep_0005.html extensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x01" - socketsPerTorrent = 40 + socketsPerTorrent = 40 + torrentPeersHighWater = 1000 + torrentPeersLowWater = socketsPerTorrent * 5 ) // Currently doesn't really queue, but should in the future. @@ -1451,14 +1453,7 @@ func (me *Client) openNewConns(t *torrent) { t.wantPeers.Broadcast() } -// Adds peers to the swarm for the torrent corresponding to infoHash. -func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error { - me.mu.Lock() - defer me.mu.Unlock() - t := me.torrent(infoHash) - if t == nil { - return errors.New("no such torrent") - } +func (me *Client) addPeers(t *torrent, peers []Peer) { blocked := 0 for i, p := range peers { if me.ipBlockRange(p.IP) == nil { @@ -1474,6 +1469,17 @@ func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error { } t.AddPeers(peers) me.openNewConns(t) +} + +// Adds peers to the swarm for the torrent corresponding to infoHash. +func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error { + me.mu.Lock() + defer me.mu.Unlock() + t := me.torrent(infoHash) + if t == nil { + return errors.New("no such torrent") + } + me.addPeers(t, peers) return nil } @@ -1783,7 +1789,7 @@ func (cl *Client) waitWantPeers(t *torrent) bool { return false default: } - if len(t.Peers) < socketsPerTorrent*5 { + if len(t.Peers) < torrentPeersLowWater { return true } cl.mu.Unlock() @@ -1818,7 +1824,8 @@ func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) { }).String()] = struct{}{} } // log.Printf("%s: %d new peers from DHT", t, len(v.Peers)) - err = cl.AddPeers(t.InfoHash, func() (ret []Peer) { + cl.mu.Lock() + cl.addPeers(t, func() (ret []Peer) { for _, cp := range v.Peers { ret = append(ret, Peer{ IP: cp.IP[:], @@ -1828,8 +1835,9 @@ func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) { } return }()) - if err != nil { - log.Printf("error adding peers from dht for torrent %q: %s", t, err) + numPeers := len(t.Peers) + cl.mu.Unlock() + if numPeers >= torrentPeersHighWater { break getPeers } case <-t.ceasingNetworking: