]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Don't open new connections or announce when new connections aren't wanted
authorMatt Joiner <anacrolix@gmail.com>
Fri, 21 Nov 2014 06:09:55 +0000 (00:09 -0600)
committerMatt Joiner <anacrolix@gmail.com>
Fri, 21 Nov 2014 06:09:55 +0000 (00:09 -0600)
client.go
torrent.go

index afec2b887b0bc951d24f6efad4c69c8a771253a1..75a04fb1b06e5e2abc90eabc9602c8416a901b56 100644 (file)
--- a/client.go
+++ b/client.go
@@ -109,6 +109,7 @@ func (me *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) error {
        for _, cn := range t.Conns {
                me.replenishConnRequests(t, cn)
        }
+       me.openNewConns(t)
        return nil
 }
 
@@ -277,7 +278,7 @@ func NewClient(cfg *Config) (cl *Client, err error) {
                noUpload:         cfg.NoUpload,
                disableTrackers:  cfg.DisableTrackers,
                downloadStrategy: cfg.DownloadStrategy,
-               halfOpenLimit:    100,
+               halfOpenLimit:    socketsPerTorrent,
                dataDir:          cfg.DataDir,
                disableUTP:       cfg.DisableUTP,
 
@@ -493,7 +494,7 @@ func (me *Client) initiateConn(peer Peer, t *torrent) {
                                panic("invariant broken")
                        }
                        delete(t.HalfOpen, addr)
-                       me.openNewConns()
+                       me.openNewConns(t)
                }()
                if res.Conn == nil {
                        return
@@ -1131,7 +1132,7 @@ func (me *Client) dropConnection(torrent *torrent, conn *connection) {
                        torrent.Conns[i0] = torrent.Conns[i1]
                }
                torrent.Conns = torrent.Conns[:i1]
-               me.openNewConns()
+               me.openNewConns(torrent)
                return
        }
        panic("connection not found")
@@ -1153,6 +1154,8 @@ func (me *Client) addConnection(t *torrent, c *connection) bool {
                }
        }
        t.Conns = append(t.Conns, c)
+       // TODO: This should probably be done by a routine that kills off bad
+       // connections, and extra connections killed here instead.
        if len(t.Conns) > socketsPerTorrent {
                wcs := t.worstConnsHeap()
                heap.Pop(wcs).(*connection).Close()
@@ -1160,31 +1163,33 @@ func (me *Client) addConnection(t *torrent, c *connection) bool {
        return true
 }
 
-func (me *Client) openNewConns() {
-       for _, t := range me.torrents {
-               select {
-               case <-t.ceasingNetworking:
-                       continue
-               default:
+func (me *Client) openNewConns(t *torrent) {
+       select {
+       case <-t.ceasingNetworking:
+               return
+       default:
+       }
+       if t.haveInfo() && !me.downloadStrategy.PendingData(t) {
+               return
+       }
+       for len(t.Peers) != 0 {
+               if len(t.Conns) >= socketsPerTorrent {
+                       break
                }
-               for len(t.Peers) != 0 {
-                       if len(t.HalfOpen) >= me.halfOpenLimit {
-                               return
-                       }
-                       if len(t.HalfOpen)+me.handshaking+len(t.Conns) >= socketsPerTorrent {
-                               break
-                       }
-                       var (
-                               k peersKey
-                               p Peer
-                       )
-                       for k, p = range t.Peers {
-                               break
-                       }
-                       delete(t.Peers, k)
-                       me.initiateConn(p, t)
+               if len(t.HalfOpen)+me.handshaking >= me.halfOpenLimit {
+                       break
+               }
+               var (
+                       k peersKey
+                       p Peer
+               )
+               for k, p = range t.Peers {
+                       break
                }
+               delete(t.Peers, k)
+               me.initiateConn(p, t)
        }
+       t.wantPeers.Broadcast()
 }
 
 // Adds peers to the swarm for the torrent corresponding to infoHash.
@@ -1195,11 +1200,8 @@ func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error {
        if t == nil {
                return errors.New("no such torrent")
        }
-       // for _, p := range peers {
-       //      log.Printf("adding peer for %q: %s", infoHash, p)
-       // }
        t.AddPeers(peers)
-       me.openNewConns()
+       me.openNewConns(t)
        return nil
 }
 
@@ -1257,28 +1259,57 @@ func newTorrent(ih InfoHash, announceList [][]string, halfOpenLimit int) (t *tor
 
                HalfOpen: make(map[string]struct{}, halfOpenLimit),
        }
+       t.wantPeers.L = &t.stateMu
        t.GotMetainfo = t.gotMetainfo
-       t.Trackers = make([][]tracker.Client, len(announceList))
-       for tierIndex := range announceList {
-               tier := t.Trackers[tierIndex]
-               for _, url := range announceList[tierIndex] {
-                       tr, err := tracker.New(url)
-                       if err != nil {
-                               log.Print(err)
-                               continue
+       t.addTrackers(announceList)
+       return
+}
+
+// The trackers within each tier must be shuffled before use.
+// http://stackoverflow.com/a/12267471/149482
+// http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
+func shuffleTier(tier []tracker.Client) {
+       for i := range tier {
+               j := mathRand.Intn(i + 1)
+               tier[i], tier[j] = tier[j], tier[i]
+       }
+}
+
+func copyTrackers(base [][]tracker.Client) (copy [][]tracker.Client) {
+       for _, tier := range base {
+               copy = append(copy, tier)
+       }
+       return
+}
+
+func mergeTier(tier []tracker.Client, newURLs []string) []tracker.Client {
+nextURL:
+       for _, url := range newURLs {
+               for _, tr := range tier {
+                       if tr.URL() == url {
+                               continue nextURL
                        }
-                       tier = append(tier, tr)
                }
-               // The trackers within each tier must be shuffled before use.
-               // http://stackoverflow.com/a/12267471/149482
-               // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
-               for i := range tier {
-                       j := mathRand.Intn(i + 1)
-                       tier[i], tier[j] = tier[j], tier[i]
+               tr, err := tracker.New(url)
+               if err != nil {
+                       log.Printf("error creating tracker client for %q: %s", url, err)
+                       continue
                }
-               t.Trackers[tierIndex] = tier
+               tier = append(tier, tr)
        }
-       return
+       return tier
+}
+
+func (t *torrent) addTrackers(announceList [][]string) {
+       newTrackers := copyTrackers(t.Trackers)
+       for tierIndex, tier := range announceList {
+               if tierIndex < len(newTrackers) {
+                       newTrackers[tierIndex] = mergeTier(newTrackers[tierIndex], tier)
+               } else {
+                       newTrackers = append(newTrackers, mergeTier(nil, tier))
+               }
+       }
+       t.Trackers = newTrackers
 }
 
 type Torrent struct {
@@ -1409,19 +1440,40 @@ func (me *Client) AddTorrentFromFile(name string) (err error) {
        return me.AddTorrent(mi)
 }
 
-func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) {
+// Returns true when peers are required, or false if the torrent is closing.
+func (cl *Client) waitWantPeers(t *torrent) bool {
+       cl.mu.Lock()
+       defer cl.mu.Unlock()
+       t.stateMu.Lock()
+       defer t.stateMu.Unlock()
        for {
+               select {
+               case <-t.ceasingNetworking:
+                       return false
+               default:
+               }
+               if len(t.Peers) < socketsPerTorrent*5 {
+                       return true
+               }
+               cl.mu.Unlock()
+               t.wantPeers.Wait()
+               t.stateMu.Unlock()
+               cl.mu.Lock()
+               t.stateMu.Lock()
+       }
+}
+
+func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) {
+       for cl.waitWantPeers(t) {
+               log.Printf("announcing torrent %q to DHT", t)
                ps, err := cl.dHT.GetPeers(string(t.InfoHash[:]))
                if err != nil {
                        log.Printf("error getting peers from dht: %s", err)
                        return
                }
-               nextScrape := time.After(1 * time.Minute)
        getPeers:
                for {
                        select {
-                       case <-nextScrape:
-                               break getPeers
                        case v, ok := <-ps.Values:
                                if !ok {
                                        break getPeers
@@ -1447,6 +1499,7 @@ func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) {
                        }
                }
                ps.Close()
+               log.Printf("finished DHT peer scrape for %s", t)
 
                // After a GetPeers, we can announce on the best nodes that gave us an
                // announce token.
@@ -1457,7 +1510,12 @@ func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) {
                if port != 0 {
                        // We can't allow the port to be implied as long as the UTP and
                        // DHT ports are different.
-                       cl.dHT.AnnouncePeer(port, impliedPort, t.InfoHash.AsString())
+                       err := cl.dHT.AnnouncePeer(port, impliedPort, t.InfoHash.AsString())
+                       if err != nil {
+                               log.Printf("error announcing torrent to DHT: %s", err)
+                       } else {
+                               log.Printf("announced %q to DHT", t)
+                       }
                }
        }
 }
@@ -1471,16 +1529,12 @@ func (cl *Client) announceTorrent(t *torrent) {
                InfoHash: t.InfoHash,
        }
 newAnnounce:
-       for {
-               select {
-               case <-t.ceasingNetworking:
-                       return
-               default:
-               }
+       for cl.waitWantPeers(t) {
                cl.mu.Lock()
                req.Left = t.BytesLeft()
+               trackers := t.Trackers
                cl.mu.Unlock()
-               for _, tier := range t.Trackers {
+               for _, tier := range trackers {
                        for trIndex, tr := range tier {
                                if err := tr.Connect(); err != nil {
                                        log.Print(err)
index 1fa05be14963eb45f1a5869b468d080c2b1c2fe4..be9ca7116aec3e131cf5dfd822c0f0fd38d72739 100644 (file)
@@ -46,8 +46,11 @@ type peersKey struct {
 }
 
 type torrent struct {
-       stateMu           sync.Mutex
-       closing           chan struct{}
+       stateMu sync.Mutex
+       closing chan struct{}
+
+       // Closed when no more network activity is desired. This includes
+       // announcing, and communicating with peers.
        ceasingNetworking chan struct{}
 
        InfoHash                    InfoHash
@@ -63,10 +66,12 @@ type torrent struct {
        Conns []*connection
        // Set of addrs to which we're attempting to connect.
        HalfOpen map[string]struct{}
+
        // Reserve of peers to connect to. A peer can be both here and in the
        // active connections if were told about the peer after connecting with
        // them. That encourages us to reconnect to peers that are well known.
-       Peers map[peersKey]Peer
+       Peers     map[peersKey]Peer
+       wantPeers sync.Cond
 
        // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
        // mirror their respective URLs from the announce-list key.