]> Sergey Matveev's repositories - btrtrc.git/blobdiff - torrent.go
Rework torrent tracker scraping
[btrtrc.git] / torrent.go
index ec3624546d78a0e75f70892c5121aa98d9a34639..ea2c69ec16ec5debb1ad0e6742c2a662790de399 100644 (file)
@@ -56,6 +56,8 @@ type Torrent struct {
        // Storage for torrent data.
        storage storage.Torrent
 
+       metainfo metainfo.MetaInfo
+
        // The info dict. nil if we don't have it (yet).
        info *metainfo.InfoEx
        // Active peer connections, running message stream loops.
@@ -67,12 +69,11 @@ type Torrent struct {
        // Reserve of peers to connect to. A peer can be both here and in the
        // active connections if were told about the peer after connecting with
        // them. That encourages us to reconnect to peers that are well known.
-       peers     map[peersKey]Peer
-       wantPeers sync.Cond
+       peers          map[peersKey]Peer
+       wantPeersEvent missinggo.Event
 
-       // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
-       // mirror their respective URLs from the announce-list metainfo key.
-       trackers []trackerTier
+       // An announcer for each tracker URL.
+       trackerAnnouncers map[string]*trackerScraper
        // Name used if the info name isn't available.
        displayName string
        // The bencoded bytes of the info dict.
@@ -425,10 +426,8 @@ func (t *Torrent) writeStatus(w io.Writer, cl *Client) {
        })
        fmt.Fprintln(w)
        fmt.Fprintf(w, "Trackers: ")
-       for _, tier := range t.trackers {
-               for _, tr := range tier {
-                       fmt.Fprintf(w, "%q ", tr)
-               }
+       for _url := range t.trackerAnnouncers {
+               fmt.Fprintf(w, "%q ", _url)
        }
        fmt.Fprintf(w, "\n")
        fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers))
@@ -451,13 +450,12 @@ func (t *Torrent) haveInfo() bool {
 
 // TODO: Include URIs that weren't converted to tracker clients.
 func (t *Torrent) announceList() (al [][]string) {
-       missinggo.CastSlice(&al, t.trackers)
-       return
+       return t.metainfo.AnnounceList
 }
 
 // Returns a run-time generated MetaInfo that includes the info bytes and
 // announce-list as currently known to the client.
-func (t *Torrent) metainfo() *metainfo.MetaInfo {
+func (t *Torrent) newMetaInfo() *metainfo.MetaInfo {
        if t.metadataBytes == nil {
                panic("info bytes not set")
        }
@@ -1049,17 +1047,35 @@ func (t *Torrent) needData() bool {
        })
 }
 
-func (t *Torrent) addTrackers(announceList [][]string) {
-       newTrackers := copyTrackers(t.trackers)
-       for tierIndex, tier := range announceList {
-               if tierIndex < len(newTrackers) {
-                       newTrackers[tierIndex] = mergeTier(newTrackers[tierIndex], tier)
-               } else {
-                       newTrackers = append(newTrackers, mergeTier(nil, tier))
+func appendMissingStrings(old, new []string) (ret []string) {
+       ret = old
+new:
+       for _, n := range new {
+               for _, o := range old {
+                       if o == n {
+                               continue new
+                       }
                }
-               shuffleTier(newTrackers[tierIndex])
+               ret = append(ret, n)
+       }
+       return
+}
+
+func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
+       ret = existing
+       for minNumTiers > len(ret) {
+               ret = append(ret, nil)
+       }
+       return
+}
+
+func (t *Torrent) addTrackers(announceList [][]string) {
+       fullAnnounceList := &t.metainfo.AnnounceList
+       t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
+       for tierIndex, trackerURLs := range announceList {
+               (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
        }
-       t.trackers = newTrackers
+       t.startMissingTrackerScrapers()
 }
 
 // Don't call this before the info is available.
@@ -1103,22 +1119,21 @@ func (t *Torrent) dropConnection(c *connection) {
        }
 }
 
-// Returns true when peers are required, or false if the torrent is closing.
-func (t *Torrent) waitWantPeers() bool {
-       t.cl.mu.Lock()
-       defer t.cl.mu.Unlock()
-       for {
-               if t.closed.IsSet() {
-                       return false
-               }
-               if len(t.peers) > torrentPeersLowWater {
-                       goto wait
-               }
-               if t.needData() || t.seeding() {
-                       return true
-               }
-       wait:
-               t.wantPeers.Wait()
+func (t *Torrent) wantPeers() bool {
+       if t.closed.IsSet() {
+               return false
+       }
+       if len(t.peers) > torrentPeersLowWater {
+               return false
+       }
+       return t.needData() || t.seeding()
+}
+
+func (t *Torrent) updateWantPeersEvent() {
+       if t.wantPeers() {
+               t.wantPeersEvent.Set()
+       } else {
+               t.wantPeersEvent.Clear()
        }
 }
 
@@ -1137,106 +1152,36 @@ func (t *Torrent) seeding() bool {
        return true
 }
 
-// Announce torrent to its trackers.
-func (t *Torrent) announceTrackers() {
-       cl := t.cl
-       req := tracker.AnnounceRequest{
-               Event:    tracker.Started,
-               NumWant:  -1,
-               Port:     uint16(cl.incomingPeerPort()),
-               PeerId:   cl.peerID,
-               InfoHash: t.infoHash,
-       }
-       if !t.waitWantPeers() {
-               return
-       }
-       cl.mu.RLock()
-       req.Left = t.bytesLeftAnnounce()
-       trackers := t.trackers
-       cl.mu.RUnlock()
-       if t.announceTrackersFastStart(&req, trackers) {
-               req.Event = tracker.None
-       }
-newAnnounce:
-       for t.waitWantPeers() {
-               cl.mu.RLock()
-               req.Left = t.bytesLeftAnnounce()
-               trackers = t.trackers
-               cl.mu.RUnlock()
-               numTrackersTried := 0
-               for _, tier := range trackers {
-                       for trIndex, tr := range tier {
-                               numTrackersTried++
-                               interval, err := t.announceSingleTracker(tr, &req)
-                               if err != nil {
-                                       // Try the next tracker.
-                                       continue
-                               }
-                               // Float the successful announce to the top of the tier. If
-                               // the trackers list has been changed, we'll be modifying an
-                               // old copy so it won't matter.
-                               cl.mu.Lock()
-                               tier[0], tier[trIndex] = tier[trIndex], tier[0]
-                               cl.mu.Unlock()
-
-                               req.Event = tracker.None
-                               // Wait the interval before attempting another announce.
-                               time.Sleep(interval)
-                               continue newAnnounce
+// Adds and starts tracker scrapers for tracker URLs that aren't already
+// running.
+func (t *Torrent) startMissingTrackerScrapers() {
+       for _, tier := range t.announceList() {
+               for _, trackerURL := range tier {
+                       if _, ok := t.trackerAnnouncers[trackerURL]; ok {
+                               continue
                        }
-               }
-               if numTrackersTried != 0 {
-                       log.Printf("%s: all trackers failed", t)
-               }
-               // TODO: Wait until trackers are added if there are none.
-               time.Sleep(10 * time.Second)
-       }
-}
-
-func (t *Torrent) announceTrackersFastStart(req *tracker.AnnounceRequest, trackers []trackerTier) (atLeastOne bool) {
-       oks := make(chan bool)
-       outstanding := 0
-       for _, tier := range trackers {
-               for _, tr := range tier {
-                       outstanding++
-                       go func(tr string) {
-                               _, err := t.announceSingleTracker(tr, req)
-                               oks <- err == nil
-                       }(tr)
-               }
-       }
-       for outstanding > 0 {
-               ok := <-oks
-               outstanding--
-               if ok {
-                       atLeastOne = true
+                       newAnnouncer := &trackerScraper{
+                               url: trackerURL,
+                               t:   t,
+                       }
+                       if t.trackerAnnouncers == nil {
+                               t.trackerAnnouncers = make(map[string]*trackerScraper)
+                       }
+                       t.trackerAnnouncers[trackerURL] = newAnnouncer
+                       go newAnnouncer.Run()
                }
        }
-       return
 }
 
-func (t *Torrent) announceSingleTracker(tr string, req *tracker.AnnounceRequest) (interval time.Duration, err error) {
-       blocked, err := t.cl.trackerBlockedUnlocked(tr)
-       if err != nil {
-               err = fmt.Errorf("error determining if tracker blocked: %s", err)
-               return
-       }
-       if blocked {
-               err = errors.New("tracker has blocked IP")
-               return
-       }
-       resp, err := tracker.Announce(tr, req)
-       if err != nil {
-               return
-       }
-       var peers []Peer
-       for _, peer := range resp.Peers {
-               peers = append(peers, Peer{
-                       IP:   peer.IP,
-                       Port: peer.Port,
-               })
+// Returns an AnnounceRequest with fields filled out to defaults and current
+// values.
+func (t *Torrent) announceRequest() tracker.AnnounceRequest {
+       return tracker.AnnounceRequest{
+               Event:    tracker.None,
+               NumWant:  -1,
+               Port:     uint16(t.cl.incomingPeerPort()),
+               PeerId:   t.cl.peerID,
+               InfoHash: t.infoHash,
+               Left:     t.bytesLeftAnnounce(),
        }
-       t.AddPeers(peers)
-       interval = time.Second * time.Duration(resp.Interval)
-       return
 }