// Storage for torrent data.
storage storage.Torrent
+ metainfo metainfo.MetaInfo
+
// The info dict. nil if we don't have it (yet).
info *metainfo.InfoEx
// Active peer connections, running message stream loops.
// Reserve of peers to connect to. A peer can be both here and in the
// active connections if were told about the peer after connecting with
// them. That encourages us to reconnect to peers that are well known.
- peers map[peersKey]Peer
- wantPeers sync.Cond
+ peers map[peersKey]Peer
+ wantPeersEvent missinggo.Event
- // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
- // mirror their respective URLs from the announce-list metainfo key.
- trackers []trackerTier
+ // An announcer for each tracker URL.
+ trackerAnnouncers map[string]*trackerScraper
// Name used if the info name isn't available.
displayName string
// The bencoded bytes of the info dict.
})
fmt.Fprintln(w)
fmt.Fprintf(w, "Trackers: ")
- for _, tier := range t.trackers {
- for _, tr := range tier {
- fmt.Fprintf(w, "%q ", tr)
- }
+ for _url := range t.trackerAnnouncers {
+ fmt.Fprintf(w, "%q ", _url)
}
fmt.Fprintf(w, "\n")
fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers))
// TODO: Include URIs that weren't converted to tracker clients.
func (t *Torrent) announceList() (al [][]string) {
- missinggo.CastSlice(&al, t.trackers)
- return
+ return t.metainfo.AnnounceList
}
// Returns a run-time generated MetaInfo that includes the info bytes and
// announce-list as currently known to the client.
-func (t *Torrent) metainfo() *metainfo.MetaInfo {
+func (t *Torrent) newMetaInfo() *metainfo.MetaInfo {
if t.metadataBytes == nil {
panic("info bytes not set")
}
})
}
-func (t *Torrent) addTrackers(announceList [][]string) {
- newTrackers := copyTrackers(t.trackers)
- for tierIndex, tier := range announceList {
- if tierIndex < len(newTrackers) {
- newTrackers[tierIndex] = mergeTier(newTrackers[tierIndex], tier)
- } else {
- newTrackers = append(newTrackers, mergeTier(nil, tier))
+func appendMissingStrings(old, new []string) (ret []string) {
+ ret = old
+new:
+ for _, n := range new {
+ for _, o := range old {
+ if o == n {
+ continue new
+ }
}
- shuffleTier(newTrackers[tierIndex])
+ ret = append(ret, n)
+ }
+ return
+}
+
+func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
+ ret = existing
+ for minNumTiers > len(ret) {
+ ret = append(ret, nil)
+ }
+ return
+}
+
+func (t *Torrent) addTrackers(announceList [][]string) {
+ fullAnnounceList := &t.metainfo.AnnounceList
+ t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
+ for tierIndex, trackerURLs := range announceList {
+ (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
}
- t.trackers = newTrackers
+ t.startMissingTrackerScrapers()
}
// Don't call this before the info is available.
}
}
-// Returns true when peers are required, or false if the torrent is closing.
-func (t *Torrent) waitWantPeers() bool {
- t.cl.mu.Lock()
- defer t.cl.mu.Unlock()
- for {
- if t.closed.IsSet() {
- return false
- }
- if len(t.peers) > torrentPeersLowWater {
- goto wait
- }
- if t.needData() || t.seeding() {
- return true
- }
- wait:
- t.wantPeers.Wait()
+func (t *Torrent) wantPeers() bool {
+ if t.closed.IsSet() {
+ return false
+ }
+ if len(t.peers) > torrentPeersLowWater {
+ return false
+ }
+ return t.needData() || t.seeding()
+}
+
+func (t *Torrent) updateWantPeersEvent() {
+ if t.wantPeers() {
+ t.wantPeersEvent.Set()
+ } else {
+ t.wantPeersEvent.Clear()
}
}
return true
}
-// Announce torrent to its trackers.
-func (t *Torrent) announceTrackers() {
- cl := t.cl
- req := tracker.AnnounceRequest{
- Event: tracker.Started,
- NumWant: -1,
- Port: uint16(cl.incomingPeerPort()),
- PeerId: cl.peerID,
- InfoHash: t.infoHash,
- }
- if !t.waitWantPeers() {
- return
- }
- cl.mu.RLock()
- req.Left = t.bytesLeftAnnounce()
- trackers := t.trackers
- cl.mu.RUnlock()
- if t.announceTrackersFastStart(&req, trackers) {
- req.Event = tracker.None
- }
-newAnnounce:
- for t.waitWantPeers() {
- cl.mu.RLock()
- req.Left = t.bytesLeftAnnounce()
- trackers = t.trackers
- cl.mu.RUnlock()
- numTrackersTried := 0
- for _, tier := range trackers {
- for trIndex, tr := range tier {
- numTrackersTried++
- interval, err := t.announceSingleTracker(tr, &req)
- if err != nil {
- // Try the next tracker.
- continue
- }
- // Float the successful announce to the top of the tier. If
- // the trackers list has been changed, we'll be modifying an
- // old copy so it won't matter.
- cl.mu.Lock()
- tier[0], tier[trIndex] = tier[trIndex], tier[0]
- cl.mu.Unlock()
-
- req.Event = tracker.None
- // Wait the interval before attempting another announce.
- time.Sleep(interval)
- continue newAnnounce
+// Adds and starts tracker scrapers for tracker URLs that aren't already
+// running.
+func (t *Torrent) startMissingTrackerScrapers() {
+ for _, tier := range t.announceList() {
+ for _, trackerURL := range tier {
+ if _, ok := t.trackerAnnouncers[trackerURL]; ok {
+ continue
}
- }
- if numTrackersTried != 0 {
- log.Printf("%s: all trackers failed", t)
- }
- // TODO: Wait until trackers are added if there are none.
- time.Sleep(10 * time.Second)
- }
-}
-
-func (t *Torrent) announceTrackersFastStart(req *tracker.AnnounceRequest, trackers []trackerTier) (atLeastOne bool) {
- oks := make(chan bool)
- outstanding := 0
- for _, tier := range trackers {
- for _, tr := range tier {
- outstanding++
- go func(tr string) {
- _, err := t.announceSingleTracker(tr, req)
- oks <- err == nil
- }(tr)
- }
- }
- for outstanding > 0 {
- ok := <-oks
- outstanding--
- if ok {
- atLeastOne = true
+ newAnnouncer := &trackerScraper{
+ url: trackerURL,
+ t: t,
+ }
+ if t.trackerAnnouncers == nil {
+ t.trackerAnnouncers = make(map[string]*trackerScraper)
+ }
+ t.trackerAnnouncers[trackerURL] = newAnnouncer
+ go newAnnouncer.Run()
}
}
- return
}
-func (t *Torrent) announceSingleTracker(tr string, req *tracker.AnnounceRequest) (interval time.Duration, err error) {
- blocked, err := t.cl.trackerBlockedUnlocked(tr)
- if err != nil {
- err = fmt.Errorf("error determining if tracker blocked: %s", err)
- return
- }
- if blocked {
- err = errors.New("tracker has blocked IP")
- return
- }
- resp, err := tracker.Announce(tr, req)
- if err != nil {
- return
- }
- var peers []Peer
- for _, peer := range resp.Peers {
- peers = append(peers, Peer{
- IP: peer.IP,
- Port: peer.Port,
- })
+// Returns an AnnounceRequest with fields filled out to defaults and current
+// values.
+func (t *Torrent) announceRequest() tracker.AnnounceRequest {
+ return tracker.AnnounceRequest{
+ Event: tracker.None,
+ NumWant: -1,
+ Port: uint16(t.cl.incomingPeerPort()),
+ PeerId: t.cl.peerID,
+ InfoHash: t.infoHash,
+ Left: t.bytesLeftAnnounce(),
}
- t.AddPeers(peers)
- interval = time.Second * time.Duration(resp.Interval)
- return
}