From 20870ec4ff47a16f695f801d30315d7ba9244ebe Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sun, 22 May 2016 22:45:08 +1000 Subject: [PATCH] Rework torrent tracker scraping New scrapers are added anytime trackers are added to a torrent. In the future they will also be stopped as soon as they're removed. All trackers are concurrently scraped, the old-style of sticking to one tracker that works is abandoned for now. --- client.go | 50 ++++------- client_test.go | 20 +++-- t.go | 2 +- torrent.go | 211 +++++++++++++++++---------------------------- tracker_scraper.go | 73 ++++++++++++++++ 5 files changed, 184 insertions(+), 172 deletions(-) create mode 100644 tracker_scraper.go diff --git a/client.go b/client.go index 33d6865b..5af08dcd 100644 --- a/client.go +++ b/client.go @@ -1372,6 +1372,7 @@ func (cl *Client) wantConns(t *Torrent) bool { } func (cl *Client) openNewConns(t *Torrent) { + defer t.updateWantPeersEvent() for len(t.peers) != 0 { if !cl.wantConns(t) { return @@ -1389,7 +1390,6 @@ func (cl *Client) openNewConns(t *Torrent) { delete(t.peers, k) cl.initiateConn(p, t) } - t.wantPeers.Broadcast() } func (cl *Client) addPeers(t *Torrent, peers []Peer) { @@ -1426,7 +1426,6 @@ func (cl *Client) newTorrent(ih metainfo.Hash) (t *Torrent) { storageOpener: cl.defaultStorage, } - t.wantPeers.L = &cl.mu return } @@ -1447,26 +1446,6 @@ func shuffleTier(tier trackerTier) { } } -func copyTrackers(base []trackerTier) (copy []trackerTier) { - for _, tier := range base { - copy = append(copy, append(trackerTier(nil), tier...)) - } - return -} - -func mergeTier(tier trackerTier, newURLs []string) trackerTier { -nextURL: - for _, url := range newURLs { - for _, trURL := range tier { - if trURL == url { - continue nextURL - } - } - tier = append(tier, url) - } - return tier -} - // A file-like handle to some torrent data resource. type Handle interface { io.Reader @@ -1527,9 +1506,6 @@ func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bo } new = true t = cl.newTorrent(infoHash) - if !cl.config.DisableTrackers { - go t.announceTrackers() - } if cl.dHT != nil { go cl.announceTorrentDHT(t, true) } @@ -1577,7 +1553,12 @@ func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) { } func (cl *Client) announceTorrentDHT(t *Torrent, impliedPort bool) { - for t.waitWantPeers() { + for { + select { + case <-t.wantPeersEvent.LockedChan(&cl.mu): + case <-t.closed.LockedChan(&cl.mu): + return + } // log.Printf("getting peers for %q from DHT", t) ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort) if err != nil { @@ -1627,22 +1608,27 @@ func (cl *Client) announceTorrentDHT(t *Torrent, impliedPort bool) { } } -func (cl *Client) trackerBlockedUnlocked(trRawURL string) (blocked bool, err error) { - url_, err := url.Parse(trRawURL) +func (cl *Client) prepareTrackerAnnounceUnlocked(announceURL string) (blocked bool, urlToUse string, host string, err error) { + _url, err := url.Parse(announceURL) if err != nil { return } - host, _, err := net.SplitHostPort(url_.Host) - if err != nil { - host = url_.Host + hmp := missinggo.SplitHostMaybePort(_url.Host) + if hmp.Err != nil { + err = hmp.Err + return } - addr, err := net.ResolveIPAddr("ip", host) + addr, err := net.ResolveIPAddr("ip", hmp.Host) if err != nil { return } cl.mu.RLock() _, blocked = cl.ipBlockRange(addr.IP) cl.mu.RUnlock() + host = _url.Host + hmp.Host = addr.String() + _url.Host = hmp.String() + urlToUse = _url.String() return } diff --git a/client_test.go b/client_test.go index 5da6b2cc..d10a7567 100644 --- a/client_test.go +++ b/client_test.go @@ -453,11 +453,10 @@ func TestMergingTrackersByAddingSpecs(t *testing.T) { } spec.Trackers = [][]string{{"http://a"}, {"udp://b"}} _, new, _ = cl.AddTorrentSpec(&spec) - if new { - t.FailNow() - } - assert.EqualValues(t, T.trackers[0][0], "http://a") - assert.EqualValues(t, T.trackers[1][0], "udp://b") + assert.False(t, new) + assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList) + // Because trackers are disabled in TestingConfig. + assert.EqualValues(t, 0, len(T.trackerAnnouncers)) } type badStorage struct{} @@ -762,7 +761,7 @@ func TestAddMetainfoWithNodes(t *testing.T) { assert.EqualValues(t, cl.DHT().NumNodes(), 0) tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent") require.NoError(t, err) - assert.Len(t, tt.trackers, 5) + assert.Len(t, tt.metainfo.AnnounceList, 5) assert.EqualValues(t, 6, cl.DHT().NumNodes()) } @@ -889,3 +888,12 @@ func TestPieceCompletedInStorageButNotClient(t *testing.T) { Info: &greetingMetainfo.Info, }) } + +func TestPrepareTrackerAnnounce(t *testing.T) { + cl := &Client{} + blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp") + require.NoError(t, err) + assert.False(t, blocked) + assert.EqualValues(t, "localhost:1234", host) + assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse) +} diff --git a/t.go b/t.go index 899f759f..27d821a3 100644 --- a/t.go +++ b/t.go @@ -120,7 +120,7 @@ func (t *Torrent) Length() int64 { func (t *Torrent) Metainfo() *metainfo.MetaInfo { t.cl.mu.Lock() defer t.cl.mu.Unlock() - return t.metainfo() + return t.newMetaInfo() } func (t *Torrent) addReader(r *Reader) { diff --git a/torrent.go b/torrent.go index ec362454..ea2c69ec 100644 --- a/torrent.go +++ b/torrent.go @@ -56,6 +56,8 @@ type Torrent struct { // Storage for torrent data. storage storage.Torrent + metainfo metainfo.MetaInfo + // The info dict. nil if we don't have it (yet). info *metainfo.InfoEx // Active peer connections, running message stream loops. @@ -67,12 +69,11 @@ type Torrent struct { // Reserve of peers to connect to. A peer can be both here and in the // active connections if were told about the peer after connecting with // them. That encourages us to reconnect to peers that are well known. - peers map[peersKey]Peer - wantPeers sync.Cond + peers map[peersKey]Peer + wantPeersEvent missinggo.Event - // BEP 12 Multitracker Metadata Extension. The tracker.Client instances - // mirror their respective URLs from the announce-list metainfo key. - trackers []trackerTier + // An announcer for each tracker URL. + trackerAnnouncers map[string]*trackerScraper // Name used if the info name isn't available. displayName string // The bencoded bytes of the info dict. @@ -425,10 +426,8 @@ func (t *Torrent) writeStatus(w io.Writer, cl *Client) { }) fmt.Fprintln(w) fmt.Fprintf(w, "Trackers: ") - for _, tier := range t.trackers { - for _, tr := range tier { - fmt.Fprintf(w, "%q ", tr) - } + for _url := range t.trackerAnnouncers { + fmt.Fprintf(w, "%q ", _url) } fmt.Fprintf(w, "\n") fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers)) @@ -451,13 +450,12 @@ func (t *Torrent) haveInfo() bool { // TODO: Include URIs that weren't converted to tracker clients. func (t *Torrent) announceList() (al [][]string) { - missinggo.CastSlice(&al, t.trackers) - return + return t.metainfo.AnnounceList } // Returns a run-time generated MetaInfo that includes the info bytes and // announce-list as currently known to the client. -func (t *Torrent) metainfo() *metainfo.MetaInfo { +func (t *Torrent) newMetaInfo() *metainfo.MetaInfo { if t.metadataBytes == nil { panic("info bytes not set") } @@ -1049,17 +1047,35 @@ func (t *Torrent) needData() bool { }) } -func (t *Torrent) addTrackers(announceList [][]string) { - newTrackers := copyTrackers(t.trackers) - for tierIndex, tier := range announceList { - if tierIndex < len(newTrackers) { - newTrackers[tierIndex] = mergeTier(newTrackers[tierIndex], tier) - } else { - newTrackers = append(newTrackers, mergeTier(nil, tier)) +func appendMissingStrings(old, new []string) (ret []string) { + ret = old +new: + for _, n := range new { + for _, o := range old { + if o == n { + continue new + } } - shuffleTier(newTrackers[tierIndex]) + ret = append(ret, n) + } + return +} + +func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) { + ret = existing + for minNumTiers > len(ret) { + ret = append(ret, nil) + } + return +} + +func (t *Torrent) addTrackers(announceList [][]string) { + fullAnnounceList := &t.metainfo.AnnounceList + t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList)) + for tierIndex, trackerURLs := range announceList { + (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs) } - t.trackers = newTrackers + t.startMissingTrackerScrapers() } // Don't call this before the info is available. @@ -1103,22 +1119,21 @@ func (t *Torrent) dropConnection(c *connection) { } } -// Returns true when peers are required, or false if the torrent is closing. -func (t *Torrent) waitWantPeers() bool { - t.cl.mu.Lock() - defer t.cl.mu.Unlock() - for { - if t.closed.IsSet() { - return false - } - if len(t.peers) > torrentPeersLowWater { - goto wait - } - if t.needData() || t.seeding() { - return true - } - wait: - t.wantPeers.Wait() +func (t *Torrent) wantPeers() bool { + if t.closed.IsSet() { + return false + } + if len(t.peers) > torrentPeersLowWater { + return false + } + return t.needData() || t.seeding() +} + +func (t *Torrent) updateWantPeersEvent() { + if t.wantPeers() { + t.wantPeersEvent.Set() + } else { + t.wantPeersEvent.Clear() } } @@ -1137,106 +1152,36 @@ func (t *Torrent) seeding() bool { return true } -// Announce torrent to its trackers. -func (t *Torrent) announceTrackers() { - cl := t.cl - req := tracker.AnnounceRequest{ - Event: tracker.Started, - NumWant: -1, - Port: uint16(cl.incomingPeerPort()), - PeerId: cl.peerID, - InfoHash: t.infoHash, - } - if !t.waitWantPeers() { - return - } - cl.mu.RLock() - req.Left = t.bytesLeftAnnounce() - trackers := t.trackers - cl.mu.RUnlock() - if t.announceTrackersFastStart(&req, trackers) { - req.Event = tracker.None - } -newAnnounce: - for t.waitWantPeers() { - cl.mu.RLock() - req.Left = t.bytesLeftAnnounce() - trackers = t.trackers - cl.mu.RUnlock() - numTrackersTried := 0 - for _, tier := range trackers { - for trIndex, tr := range tier { - numTrackersTried++ - interval, err := t.announceSingleTracker(tr, &req) - if err != nil { - // Try the next tracker. - continue - } - // Float the successful announce to the top of the tier. If - // the trackers list has been changed, we'll be modifying an - // old copy so it won't matter. - cl.mu.Lock() - tier[0], tier[trIndex] = tier[trIndex], tier[0] - cl.mu.Unlock() - - req.Event = tracker.None - // Wait the interval before attempting another announce. - time.Sleep(interval) - continue newAnnounce +// Adds and starts tracker scrapers for tracker URLs that aren't already +// running. +func (t *Torrent) startMissingTrackerScrapers() { + for _, tier := range t.announceList() { + for _, trackerURL := range tier { + if _, ok := t.trackerAnnouncers[trackerURL]; ok { + continue } - } - if numTrackersTried != 0 { - log.Printf("%s: all trackers failed", t) - } - // TODO: Wait until trackers are added if there are none. - time.Sleep(10 * time.Second) - } -} - -func (t *Torrent) announceTrackersFastStart(req *tracker.AnnounceRequest, trackers []trackerTier) (atLeastOne bool) { - oks := make(chan bool) - outstanding := 0 - for _, tier := range trackers { - for _, tr := range tier { - outstanding++ - go func(tr string) { - _, err := t.announceSingleTracker(tr, req) - oks <- err == nil - }(tr) - } - } - for outstanding > 0 { - ok := <-oks - outstanding-- - if ok { - atLeastOne = true + newAnnouncer := &trackerScraper{ + url: trackerURL, + t: t, + } + if t.trackerAnnouncers == nil { + t.trackerAnnouncers = make(map[string]*trackerScraper) + } + t.trackerAnnouncers[trackerURL] = newAnnouncer + go newAnnouncer.Run() } } - return } -func (t *Torrent) announceSingleTracker(tr string, req *tracker.AnnounceRequest) (interval time.Duration, err error) { - blocked, err := t.cl.trackerBlockedUnlocked(tr) - if err != nil { - err = fmt.Errorf("error determining if tracker blocked: %s", err) - return - } - if blocked { - err = errors.New("tracker has blocked IP") - return - } - resp, err := tracker.Announce(tr, req) - if err != nil { - return - } - var peers []Peer - for _, peer := range resp.Peers { - peers = append(peers, Peer{ - IP: peer.IP, - Port: peer.Port, - }) +// Returns an AnnounceRequest with fields filled out to defaults and current +// values. +func (t *Torrent) announceRequest() tracker.AnnounceRequest { + return tracker.AnnounceRequest{ + Event: tracker.None, + NumWant: -1, + Port: uint16(t.cl.incomingPeerPort()), + PeerId: t.cl.peerID, + InfoHash: t.infoHash, + Left: t.bytesLeftAnnounce(), } - t.AddPeers(peers) - interval = time.Second * time.Duration(resp.Interval) - return } diff --git a/tracker_scraper.go b/tracker_scraper.go new file mode 100644 index 00000000..71493b2f --- /dev/null +++ b/tracker_scraper.go @@ -0,0 +1,73 @@ +package torrent + +import ( + "log" + "time" + + "github.com/anacrolix/missinggo" + + "github.com/anacrolix/torrent/tracker" +) + +// Announces a torrent to a tracker at regular intervals, when peers are +// required. +type trackerScraper struct { + url string + // Causes the trackerScraper to stop running. + stop missinggo.Event + t *Torrent +} + +func trackerToTorrentPeers(ps []tracker.Peer) (ret []Peer) { + ret = make([]Peer, 0, len(ps)) + for _, p := range ps { + ret = append(ret, Peer{ + IP: p.IP, + Port: p.Port, + Source: peerSourceTracker, + }) + } + return +} + +// Return how long to wait before trying again. +func (me *trackerScraper) announce() time.Duration { + blocked, urlToUse, host, err := me.t.cl.prepareTrackerAnnounceUnlocked(me.url) + if blocked { + // Wait for DNS to potentially change. Very few people do it faster + // than 5 minutes. + return 5 * time.Minute + } + me.t.cl.mu.Lock() + req := me.t.announceRequest() + me.t.cl.mu.Unlock() + res, err := tracker.AnnounceHost(urlToUse, &req, host) + if err != nil { + log.Printf("error announcing %s %q to %q: %s", me.t.InfoHash().HexString(), me.t.Name(), me.url, err) + return 5 * time.Minute + } + me.t.AddPeers(trackerToTorrentPeers(res.Peers)) + return time.Duration(res.Interval) * time.Second +} + +func (me *trackerScraper) Run() { + for { + select { + case <-me.t.closed.LockedChan(&me.t.cl.mu): + return + case <-me.stop.LockedChan(&me.t.cl.mu): + return + case <-me.t.wantPeersEvent.LockedChan(&me.t.cl.mu): + } + + intervalChan := time.After(me.announce()) + + select { + case <-me.t.closed.LockedChan(&me.t.cl.mu): + return + case <-me.stop.LockedChan(&me.t.cl.mu): + return + case <-intervalChan: + } + } +} -- 2.48.1