]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Rework torrent tracker scraping
authorMatt Joiner <anacrolix@gmail.com>
Sun, 22 May 2016 12:45:08 +0000 (22:45 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Sun, 22 May 2016 12:45:08 +0000 (22:45 +1000)
New scrapers are added anytime trackers are added to a torrent. In the future they will also be stopped as soon as they're removed. All trackers are concurrently scraped, the old-style of sticking to one tracker that works is abandoned for now.

client.go
client_test.go
t.go
torrent.go
tracker_scraper.go [new file with mode: 0644]

index 33d6865b7407448e726f0cccbea553979d840cfe..5af08dcd3d38dc1200e793abc4577164c643213d 100644 (file)
--- a/client.go
+++ b/client.go
@@ -1372,6 +1372,7 @@ func (cl *Client) wantConns(t *Torrent) bool {
 }
 
 func (cl *Client) openNewConns(t *Torrent) {
+       defer t.updateWantPeersEvent()
        for len(t.peers) != 0 {
                if !cl.wantConns(t) {
                        return
@@ -1389,7 +1390,6 @@ func (cl *Client) openNewConns(t *Torrent) {
                delete(t.peers, k)
                cl.initiateConn(p, t)
        }
-       t.wantPeers.Broadcast()
 }
 
 func (cl *Client) addPeers(t *Torrent, peers []Peer) {
@@ -1426,7 +1426,6 @@ func (cl *Client) newTorrent(ih metainfo.Hash) (t *Torrent) {
 
                storageOpener: cl.defaultStorage,
        }
-       t.wantPeers.L = &cl.mu
        return
 }
 
@@ -1447,26 +1446,6 @@ func shuffleTier(tier trackerTier) {
        }
 }
 
-func copyTrackers(base []trackerTier) (copy []trackerTier) {
-       for _, tier := range base {
-               copy = append(copy, append(trackerTier(nil), tier...))
-       }
-       return
-}
-
-func mergeTier(tier trackerTier, newURLs []string) trackerTier {
-nextURL:
-       for _, url := range newURLs {
-               for _, trURL := range tier {
-                       if trURL == url {
-                               continue nextURL
-                       }
-               }
-               tier = append(tier, url)
-       }
-       return tier
-}
-
 // A file-like handle to some torrent data resource.
 type Handle interface {
        io.Reader
@@ -1527,9 +1506,6 @@ func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bo
        }
        new = true
        t = cl.newTorrent(infoHash)
-       if !cl.config.DisableTrackers {
-               go t.announceTrackers()
-       }
        if cl.dHT != nil {
                go cl.announceTorrentDHT(t, true)
        }
@@ -1577,7 +1553,12 @@ func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
 }
 
 func (cl *Client) announceTorrentDHT(t *Torrent, impliedPort bool) {
-       for t.waitWantPeers() {
+       for {
+               select {
+               case <-t.wantPeersEvent.LockedChan(&cl.mu):
+               case <-t.closed.LockedChan(&cl.mu):
+                       return
+               }
                // log.Printf("getting peers for %q from DHT", t)
                ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
                if err != nil {
@@ -1627,22 +1608,27 @@ func (cl *Client) announceTorrentDHT(t *Torrent, impliedPort bool) {
        }
 }
 
-func (cl *Client) trackerBlockedUnlocked(trRawURL string) (blocked bool, err error) {
-       url_, err := url.Parse(trRawURL)
+func (cl *Client) prepareTrackerAnnounceUnlocked(announceURL string) (blocked bool, urlToUse string, host string, err error) {
+       _url, err := url.Parse(announceURL)
        if err != nil {
                return
        }
-       host, _, err := net.SplitHostPort(url_.Host)
-       if err != nil {
-               host = url_.Host
+       hmp := missinggo.SplitHostMaybePort(_url.Host)
+       if hmp.Err != nil {
+               err = hmp.Err
+               return
        }
-       addr, err := net.ResolveIPAddr("ip", host)
+       addr, err := net.ResolveIPAddr("ip", hmp.Host)
        if err != nil {
                return
        }
        cl.mu.RLock()
        _, blocked = cl.ipBlockRange(addr.IP)
        cl.mu.RUnlock()
+       host = _url.Host
+       hmp.Host = addr.String()
+       _url.Host = hmp.String()
+       urlToUse = _url.String()
        return
 }
 
index 5da6b2cc3ed66279e76e672e5e4d570ef7b85e1f..d10a7567b759346ce0df76ad23832e9e9e638e0a 100644 (file)
@@ -453,11 +453,10 @@ func TestMergingTrackersByAddingSpecs(t *testing.T) {
        }
        spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
        _, new, _ = cl.AddTorrentSpec(&spec)
-       if new {
-               t.FailNow()
-       }
-       assert.EqualValues(t, T.trackers[0][0], "http://a")
-       assert.EqualValues(t, T.trackers[1][0], "udp://b")
+       assert.False(t, new)
+       assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
+       // Because trackers are disabled in TestingConfig.
+       assert.EqualValues(t, 0, len(T.trackerAnnouncers))
 }
 
 type badStorage struct{}
@@ -762,7 +761,7 @@ func TestAddMetainfoWithNodes(t *testing.T) {
        assert.EqualValues(t, cl.DHT().NumNodes(), 0)
        tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
        require.NoError(t, err)
-       assert.Len(t, tt.trackers, 5)
+       assert.Len(t, tt.metainfo.AnnounceList, 5)
        assert.EqualValues(t, 6, cl.DHT().NumNodes())
 }
 
@@ -889,3 +888,12 @@ func TestPieceCompletedInStorageButNotClient(t *testing.T) {
                Info: &greetingMetainfo.Info,
        })
 }
+
+func TestPrepareTrackerAnnounce(t *testing.T) {
+       cl := &Client{}
+       blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
+       require.NoError(t, err)
+       assert.False(t, blocked)
+       assert.EqualValues(t, "localhost:1234", host)
+       assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
+}
diff --git a/t.go b/t.go
index 899f759f9dc1bb68477fd52befac1ecd7bfcde36..27d821a35c9d266beeed2b6f50fe21b1bc032f21 100644 (file)
--- a/t.go
+++ b/t.go
@@ -120,7 +120,7 @@ func (t *Torrent) Length() int64 {
 func (t *Torrent) Metainfo() *metainfo.MetaInfo {
        t.cl.mu.Lock()
        defer t.cl.mu.Unlock()
-       return t.metainfo()
+       return t.newMetaInfo()
 }
 
 func (t *Torrent) addReader(r *Reader) {
index ec3624546d78a0e75f70892c5121aa98d9a34639..ea2c69ec16ec5debb1ad0e6742c2a662790de399 100644 (file)
@@ -56,6 +56,8 @@ type Torrent struct {
        // Storage for torrent data.
        storage storage.Torrent
 
+       metainfo metainfo.MetaInfo
+
        // The info dict. nil if we don't have it (yet).
        info *metainfo.InfoEx
        // Active peer connections, running message stream loops.
@@ -67,12 +69,11 @@ type Torrent struct {
        // Reserve of peers to connect to. A peer can be both here and in the
        // active connections if were told about the peer after connecting with
        // them. That encourages us to reconnect to peers that are well known.
-       peers     map[peersKey]Peer
-       wantPeers sync.Cond
+       peers          map[peersKey]Peer
+       wantPeersEvent missinggo.Event
 
-       // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
-       // mirror their respective URLs from the announce-list metainfo key.
-       trackers []trackerTier
+       // An announcer for each tracker URL.
+       trackerAnnouncers map[string]*trackerScraper
        // Name used if the info name isn't available.
        displayName string
        // The bencoded bytes of the info dict.
@@ -425,10 +426,8 @@ func (t *Torrent) writeStatus(w io.Writer, cl *Client) {
        })
        fmt.Fprintln(w)
        fmt.Fprintf(w, "Trackers: ")
-       for _, tier := range t.trackers {
-               for _, tr := range tier {
-                       fmt.Fprintf(w, "%q ", tr)
-               }
+       for _url := range t.trackerAnnouncers {
+               fmt.Fprintf(w, "%q ", _url)
        }
        fmt.Fprintf(w, "\n")
        fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers))
@@ -451,13 +450,12 @@ func (t *Torrent) haveInfo() bool {
 
 // TODO: Include URIs that weren't converted to tracker clients.
 func (t *Torrent) announceList() (al [][]string) {
-       missinggo.CastSlice(&al, t.trackers)
-       return
+       return t.metainfo.AnnounceList
 }
 
 // Returns a run-time generated MetaInfo that includes the info bytes and
 // announce-list as currently known to the client.
-func (t *Torrent) metainfo() *metainfo.MetaInfo {
+func (t *Torrent) newMetaInfo() *metainfo.MetaInfo {
        if t.metadataBytes == nil {
                panic("info bytes not set")
        }
@@ -1049,17 +1047,35 @@ func (t *Torrent) needData() bool {
        })
 }
 
-func (t *Torrent) addTrackers(announceList [][]string) {
-       newTrackers := copyTrackers(t.trackers)
-       for tierIndex, tier := range announceList {
-               if tierIndex < len(newTrackers) {
-                       newTrackers[tierIndex] = mergeTier(newTrackers[tierIndex], tier)
-               } else {
-                       newTrackers = append(newTrackers, mergeTier(nil, tier))
+func appendMissingStrings(old, new []string) (ret []string) {
+       ret = old
+new:
+       for _, n := range new {
+               for _, o := range old {
+                       if o == n {
+                               continue new
+                       }
                }
-               shuffleTier(newTrackers[tierIndex])
+               ret = append(ret, n)
+       }
+       return
+}
+
+func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
+       ret = existing
+       for minNumTiers > len(ret) {
+               ret = append(ret, nil)
+       }
+       return
+}
+
+func (t *Torrent) addTrackers(announceList [][]string) {
+       fullAnnounceList := &t.metainfo.AnnounceList
+       t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
+       for tierIndex, trackerURLs := range announceList {
+               (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
        }
-       t.trackers = newTrackers
+       t.startMissingTrackerScrapers()
 }
 
 // Don't call this before the info is available.
@@ -1103,22 +1119,21 @@ func (t *Torrent) dropConnection(c *connection) {
        }
 }
 
-// Returns true when peers are required, or false if the torrent is closing.
-func (t *Torrent) waitWantPeers() bool {
-       t.cl.mu.Lock()
-       defer t.cl.mu.Unlock()
-       for {
-               if t.closed.IsSet() {
-                       return false
-               }
-               if len(t.peers) > torrentPeersLowWater {
-                       goto wait
-               }
-               if t.needData() || t.seeding() {
-                       return true
-               }
-       wait:
-               t.wantPeers.Wait()
+func (t *Torrent) wantPeers() bool {
+       if t.closed.IsSet() {
+               return false
+       }
+       if len(t.peers) > torrentPeersLowWater {
+               return false
+       }
+       return t.needData() || t.seeding()
+}
+
+func (t *Torrent) updateWantPeersEvent() {
+       if t.wantPeers() {
+               t.wantPeersEvent.Set()
+       } else {
+               t.wantPeersEvent.Clear()
        }
 }
 
@@ -1137,106 +1152,36 @@ func (t *Torrent) seeding() bool {
        return true
 }
 
-// Announce torrent to its trackers.
-func (t *Torrent) announceTrackers() {
-       cl := t.cl
-       req := tracker.AnnounceRequest{
-               Event:    tracker.Started,
-               NumWant:  -1,
-               Port:     uint16(cl.incomingPeerPort()),
-               PeerId:   cl.peerID,
-               InfoHash: t.infoHash,
-       }
-       if !t.waitWantPeers() {
-               return
-       }
-       cl.mu.RLock()
-       req.Left = t.bytesLeftAnnounce()
-       trackers := t.trackers
-       cl.mu.RUnlock()
-       if t.announceTrackersFastStart(&req, trackers) {
-               req.Event = tracker.None
-       }
-newAnnounce:
-       for t.waitWantPeers() {
-               cl.mu.RLock()
-               req.Left = t.bytesLeftAnnounce()
-               trackers = t.trackers
-               cl.mu.RUnlock()
-               numTrackersTried := 0
-               for _, tier := range trackers {
-                       for trIndex, tr := range tier {
-                               numTrackersTried++
-                               interval, err := t.announceSingleTracker(tr, &req)
-                               if err != nil {
-                                       // Try the next tracker.
-                                       continue
-                               }
-                               // Float the successful announce to the top of the tier. If
-                               // the trackers list has been changed, we'll be modifying an
-                               // old copy so it won't matter.
-                               cl.mu.Lock()
-                               tier[0], tier[trIndex] = tier[trIndex], tier[0]
-                               cl.mu.Unlock()
-
-                               req.Event = tracker.None
-                               // Wait the interval before attempting another announce.
-                               time.Sleep(interval)
-                               continue newAnnounce
+// Adds and starts tracker scrapers for tracker URLs that aren't already
+// running.
+func (t *Torrent) startMissingTrackerScrapers() {
+       for _, tier := range t.announceList() {
+               for _, trackerURL := range tier {
+                       if _, ok := t.trackerAnnouncers[trackerURL]; ok {
+                               continue
                        }
-               }
-               if numTrackersTried != 0 {
-                       log.Printf("%s: all trackers failed", t)
-               }
-               // TODO: Wait until trackers are added if there are none.
-               time.Sleep(10 * time.Second)
-       }
-}
-
-func (t *Torrent) announceTrackersFastStart(req *tracker.AnnounceRequest, trackers []trackerTier) (atLeastOne bool) {
-       oks := make(chan bool)
-       outstanding := 0
-       for _, tier := range trackers {
-               for _, tr := range tier {
-                       outstanding++
-                       go func(tr string) {
-                               _, err := t.announceSingleTracker(tr, req)
-                               oks <- err == nil
-                       }(tr)
-               }
-       }
-       for outstanding > 0 {
-               ok := <-oks
-               outstanding--
-               if ok {
-                       atLeastOne = true
+                       newAnnouncer := &trackerScraper{
+                               url: trackerURL,
+                               t:   t,
+                       }
+                       if t.trackerAnnouncers == nil {
+                               t.trackerAnnouncers = make(map[string]*trackerScraper)
+                       }
+                       t.trackerAnnouncers[trackerURL] = newAnnouncer
+                       go newAnnouncer.Run()
                }
        }
-       return
 }
 
-func (t *Torrent) announceSingleTracker(tr string, req *tracker.AnnounceRequest) (interval time.Duration, err error) {
-       blocked, err := t.cl.trackerBlockedUnlocked(tr)
-       if err != nil {
-               err = fmt.Errorf("error determining if tracker blocked: %s", err)
-               return
-       }
-       if blocked {
-               err = errors.New("tracker has blocked IP")
-               return
-       }
-       resp, err := tracker.Announce(tr, req)
-       if err != nil {
-               return
-       }
-       var peers []Peer
-       for _, peer := range resp.Peers {
-               peers = append(peers, Peer{
-                       IP:   peer.IP,
-                       Port: peer.Port,
-               })
+// Returns an AnnounceRequest with fields filled out to defaults and current
+// values.
+func (t *Torrent) announceRequest() tracker.AnnounceRequest {
+       return tracker.AnnounceRequest{
+               Event:    tracker.None,
+               NumWant:  -1,
+               Port:     uint16(t.cl.incomingPeerPort()),
+               PeerId:   t.cl.peerID,
+               InfoHash: t.infoHash,
+               Left:     t.bytesLeftAnnounce(),
        }
-       t.AddPeers(peers)
-       interval = time.Second * time.Duration(resp.Interval)
-       return
 }
diff --git a/tracker_scraper.go b/tracker_scraper.go
new file mode 100644 (file)
index 0000000..71493b2
--- /dev/null
@@ -0,0 +1,73 @@
+package torrent
+
+import (
+       "log"
+       "time"
+
+       "github.com/anacrolix/missinggo"
+
+       "github.com/anacrolix/torrent/tracker"
+)
+
+// Announces a torrent to a tracker at regular intervals, when peers are
+// required.
+type trackerScraper struct {
+       url string
+       // Causes the trackerScraper to stop running.
+       stop missinggo.Event
+       t    *Torrent
+}
+
+func trackerToTorrentPeers(ps []tracker.Peer) (ret []Peer) {
+       ret = make([]Peer, 0, len(ps))
+       for _, p := range ps {
+               ret = append(ret, Peer{
+                       IP:     p.IP,
+                       Port:   p.Port,
+                       Source: peerSourceTracker,
+               })
+       }
+       return
+}
+
+// Return how long to wait before trying again.
+func (me *trackerScraper) announce() time.Duration {
+       blocked, urlToUse, host, err := me.t.cl.prepareTrackerAnnounceUnlocked(me.url)
+       if blocked {
+               // Wait for DNS to potentially change. Very few people do it faster
+               // than 5 minutes.
+               return 5 * time.Minute
+       }
+       me.t.cl.mu.Lock()
+       req := me.t.announceRequest()
+       me.t.cl.mu.Unlock()
+       res, err := tracker.AnnounceHost(urlToUse, &req, host)
+       if err != nil {
+               log.Printf("error announcing %s %q to %q: %s", me.t.InfoHash().HexString(), me.t.Name(), me.url, err)
+               return 5 * time.Minute
+       }
+       me.t.AddPeers(trackerToTorrentPeers(res.Peers))
+       return time.Duration(res.Interval) * time.Second
+}
+
+func (me *trackerScraper) Run() {
+       for {
+               select {
+               case <-me.t.closed.LockedChan(&me.t.cl.mu):
+                       return
+               case <-me.stop.LockedChan(&me.t.cl.mu):
+                       return
+               case <-me.t.wantPeersEvent.LockedChan(&me.t.cl.mu):
+               }
+
+               intervalChan := time.After(me.announce())
+
+               select {
+               case <-me.t.closed.LockedChan(&me.t.cl.mu):
+                       return
+               case <-me.stop.LockedChan(&me.t.cl.mu):
+                       return
+               case <-intervalChan:
+               }
+       }
+}