client.go | 104 ----------------------------------------------------- torrent.go | 104 +++++++++++++++++++++++++++++++++++++++++++++++++++++ diff --git a/client.go b/client.go index dbebd1ba4e59616bb6ca4e951cc411f3f6a96e33..ab7583a1299fbf546f1e1248e1a765be4aca3e49 100644 --- a/client.go +++ b/client.go @@ -33,7 +33,6 @@ "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/mse" pp "github.com/anacrolix/torrent/peer_protocol" "github.com/anacrolix/torrent/storage" - "github.com/anacrolix/torrent/tracker" ) // Currently doesn't really queue, but should in the future. @@ -1645,109 +1644,6 @@ cl.mu.RLock() _, blocked = cl.ipBlockRange(addr.IP) cl.mu.RUnlock() return -} - -func (cl *Client) announceTorrentSingleTracker(tr string, req *tracker.AnnounceRequest, t *Torrent) (interval time.Duration, err error) { - blocked, err := cl.trackerBlockedUnlocked(tr) - if err != nil { - err = fmt.Errorf("error determining if tracker blocked: %s", err) - return - } - if blocked { - err = errors.New("tracker has blocked IP") - return - } - resp, err := tracker.Announce(tr, req) - if err != nil { - return - } - var peers []Peer - for _, peer := range resp.Peers { - peers = append(peers, Peer{ - IP: peer.IP, - Port: peer.Port, - }) - } - t.AddPeers(peers) - interval = time.Second * time.Duration(resp.Interval) - return -} - -func (cl *Client) announceTorrentTrackersFastStart(req *tracker.AnnounceRequest, trackers []trackerTier, t *Torrent) (atLeastOne bool) { - oks := make(chan bool) - outstanding := 0 - for _, tier := range trackers { - for _, tr := range tier { - outstanding++ - go func(tr string) { - _, err := cl.announceTorrentSingleTracker(tr, req, t) - oks <- err == nil - }(tr) - } - } - for outstanding > 0 { - ok := <-oks - outstanding-- - if ok { - atLeastOne = true - } - } - return -} - -// Announce torrent to its trackers. -func (cl *Client) announceTorrentTrackers(t *Torrent) { - req := tracker.AnnounceRequest{ - Event: tracker.Started, - NumWant: -1, - Port: uint16(cl.incomingPeerPort()), - PeerId: cl.peerID, - InfoHash: t.infoHash, - } - if !t.waitWantPeers() { - return - } - cl.mu.RLock() - req.Left = t.bytesLeftAnnounce() - trackers := t.trackers - cl.mu.RUnlock() - if cl.announceTorrentTrackersFastStart(&req, trackers, t) { - req.Event = tracker.None - } -newAnnounce: - for t.waitWantPeers() { - cl.mu.RLock() - req.Left = t.bytesLeftAnnounce() - trackers = t.trackers - cl.mu.RUnlock() - numTrackersTried := 0 - for _, tier := range trackers { - for trIndex, tr := range tier { - numTrackersTried++ - interval, err := cl.announceTorrentSingleTracker(tr, &req, t) - if err != nil { - // Try the next tracker. - continue - } - // Float the successful announce to the top of the tier. If - // the trackers list has been changed, we'll be modifying an - // old copy so it won't matter. - cl.mu.Lock() - tier[0], tier[trIndex] = tier[trIndex], tier[0] - cl.mu.Unlock() - - req.Event = tracker.None - // Wait the interval before attempting another announce. - time.Sleep(interval) - continue newAnnounce - } - } - if numTrackersTried != 0 { - log.Printf("%s: all trackers failed", t) - } - // TODO: Wait until trackers are added if there are none. - time.Sleep(10 * time.Second) - } } func (cl *Client) allTorrentsCompleted() bool { diff --git a/torrent.go b/torrent.go index b89ce948dbe0a049edbc813980ac8ce6df8ff146..124753bd751488ac5703c68beb1b1ed7bdd82da6 100644 --- a/torrent.go +++ b/torrent.go @@ -25,6 +25,7 @@ "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" "github.com/anacrolix/torrent/storage" + "github.com/anacrolix/torrent/tracker" ) func (t *Torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec { @@ -1135,3 +1136,106 @@ return false } return true } + +// Announce torrent to its trackers. +func (cl *Client) announceTorrentTrackers(t *Torrent) { + req := tracker.AnnounceRequest{ + Event: tracker.Started, + NumWant: -1, + Port: uint16(cl.incomingPeerPort()), + PeerId: cl.peerID, + InfoHash: t.infoHash, + } + if !t.waitWantPeers() { + return + } + cl.mu.RLock() + req.Left = t.bytesLeftAnnounce() + trackers := t.trackers + cl.mu.RUnlock() + if cl.announceTorrentTrackersFastStart(&req, trackers, t) { + req.Event = tracker.None + } +newAnnounce: + for t.waitWantPeers() { + cl.mu.RLock() + req.Left = t.bytesLeftAnnounce() + trackers = t.trackers + cl.mu.RUnlock() + numTrackersTried := 0 + for _, tier := range trackers { + for trIndex, tr := range tier { + numTrackersTried++ + interval, err := cl.announceTorrentSingleTracker(tr, &req, t) + if err != nil { + // Try the next tracker. + continue + } + // Float the successful announce to the top of the tier. If + // the trackers list has been changed, we'll be modifying an + // old copy so it won't matter. + cl.mu.Lock() + tier[0], tier[trIndex] = tier[trIndex], tier[0] + cl.mu.Unlock() + + req.Event = tracker.None + // Wait the interval before attempting another announce. + time.Sleep(interval) + continue newAnnounce + } + } + if numTrackersTried != 0 { + log.Printf("%s: all trackers failed", t) + } + // TODO: Wait until trackers are added if there are none. + time.Sleep(10 * time.Second) + } +} + +func (cl *Client) announceTorrentTrackersFastStart(req *tracker.AnnounceRequest, trackers []trackerTier, t *Torrent) (atLeastOne bool) { + oks := make(chan bool) + outstanding := 0 + for _, tier := range trackers { + for _, tr := range tier { + outstanding++ + go func(tr string) { + _, err := cl.announceTorrentSingleTracker(tr, req, t) + oks <- err == nil + }(tr) + } + } + for outstanding > 0 { + ok := <-oks + outstanding-- + if ok { + atLeastOne = true + } + } + return +} + +func (cl *Client) announceTorrentSingleTracker(tr string, req *tracker.AnnounceRequest, t *Torrent) (interval time.Duration, err error) { + blocked, err := cl.trackerBlockedUnlocked(tr) + if err != nil { + err = fmt.Errorf("error determining if tracker blocked: %s", err) + return + } + if blocked { + err = errors.New("tracker has blocked IP") + return + } + resp, err := tracker.Announce(tr, req) + if err != nil { + return + } + var peers []Peer + for _, peer := range resp.Peers { + peers = append(peers, Peer{ + IP: peer.IP, + Port: peer.Port, + }) + } + t.AddPeers(peers) + interval = time.Second * time.Duration(resp.Interval) + return +}