"github.com/anacrolix/torrent/mse"
pp "github.com/anacrolix/torrent/peer_protocol"
"github.com/anacrolix/torrent/storage"
- "github.com/anacrolix/torrent/tracker"
)
// Currently doesn't really queue, but should in the future.
// include ourselves if we end up trying to connect to our own address
// through legitimate channels.
dopplegangerAddrs map[string]struct{}
+ badPeerIPs map[string]struct{}
defaultStorage storage.Client
fmt.Fprintln(w, "Not listening!")
}
fmt.Fprintf(w, "Peer ID: %+q\n", cl.peerID)
+ fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPs))
if cl.dHT != nil {
dhtStats := cl.dHT.Stats()
fmt.Fprintf(w, "DHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
acceptTCP.Add(1)
}
cl.mu.RLock()
- doppleganger := cl.dopplegangerAddr(conn.RemoteAddr().String())
- _, blocked := cl.ipBlockRange(missinggo.AddrIP(conn.RemoteAddr()))
+ reject := cl.badPeerIPPort(
+ missinggo.AddrIP(conn.RemoteAddr()),
+ missinggo.AddrPort(conn.RemoteAddr()))
cl.mu.RUnlock()
- if blocked || doppleganger {
+ if reject {
acceptReject.Add(1)
- // log.Printf("inbound connection from %s blocked by %s", conn.RemoteAddr(), blockRange)
conn.Close()
continue
}
if peer.Id == cl.peerID {
return
}
- addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
- if cl.dopplegangerAddr(addr) || t.addrActive(addr) {
- duplicateConnsAvoided.Add(1)
+ if cl.badPeerIPPort(peer.IP, peer.Port) {
return
}
- if r, ok := cl.ipBlockRange(peer.IP); ok {
- log.Printf("outbound connect to %s blocked by IP blocklist rule %s", peer.IP, r)
+ addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
+ if t.addrActive(addr) {
return
}
t.halfOpen[addr] = struct{}{}
func (cl *Client) runInitiatedHandshookConn(c *connection, t *Torrent) {
if c.PeerID == cl.peerID {
- // Only if we initiated the connection is the remote address a
- // listen addr for a doppleganger.
connsToSelf.Add(1)
addr := c.conn.RemoteAddr().String()
cl.dopplegangerAddrs[addr] = struct{}{}
cl.mu.Lock()
defer cl.mu.Unlock()
if c.PeerID == cl.peerID {
+ // Because the remote address is not necessarily the same as its
+ // client's torrent listen address, we won't record the remote address
+ // as a doppleganger. Instead, the initiator can record *us* as the
+ // doppleganger.
return
}
cl.runHandshookConn(c, t)
}
func (cl *Client) openNewConns(t *Torrent) {
+ defer t.updateWantPeersEvent()
for len(t.peers) != 0 {
if !cl.wantConns(t) {
return
delete(t.peers, k)
cl.initiateConn(p, t)
}
- t.wantPeers.Broadcast()
+}
+
+func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
+ if port == 0 {
+ return true
+ }
+ if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
+ return true
+ }
+ if _, ok := cl.ipBlockRange(ip); ok {
+ return true
+ }
+ if _, ok := cl.badPeerIPs[ip.String()]; ok {
+ return true
+ }
+ return false
}
func (cl *Client) addPeers(t *Torrent, peers []Peer) {
for _, p := range peers {
- if cl.dopplegangerAddr(net.JoinHostPort(
- p.IP.String(),
- strconv.FormatInt(int64(p.Port), 10),
- )) {
- continue
- }
- if _, ok := cl.ipBlockRange(p.IP); ok {
- continue
- }
- if p.Port == 0 {
- // The spec says to scrub these yourselves. Fine.
+ if cl.badPeerIPPort(p.IP, p.Port) {
continue
}
t.addPeer(p, cl)
storageOpener: cl.defaultStorage,
}
- t.wantPeers.L = &cl.mu
return
}
}
}
-func copyTrackers(base []trackerTier) (copy []trackerTier) {
- for _, tier := range base {
- copy = append(copy, append(trackerTier(nil), tier...))
- }
- return
-}
-
-func mergeTier(tier trackerTier, newURLs []string) trackerTier {
-nextURL:
- for _, url := range newURLs {
- for _, trURL := range tier {
- if trURL == url {
- continue nextURL
- }
- }
- tier = append(tier, url)
- }
- return tier
-}
-
// A file-like handle to some torrent data resource.
type Handle interface {
io.Reader
DisplayName: mi.Info.Name,
InfoHash: mi.Info.Hash(),
}
- if len(spec.Trackers) == 0 {
- spec.Trackers = [][]string{[]string{mi.Announce}}
- } else {
- spec.Trackers[0] = append(spec.Trackers[0], mi.Announce)
+ if spec.Trackers == nil && mi.Announce != "" {
+ spec.Trackers = [][]string{{mi.Announce}}
}
return
}
}
new = true
t = cl.newTorrent(infoHash)
- if !cl.config.DisableTrackers {
- go cl.announceTorrentTrackers(t)
- }
if cl.dHT != nil {
go cl.announceTorrentDHT(t, true)
}
cl.torrents[infoHash] = t
+ t.updateWantPeersEvent()
return
}
}
func (cl *Client) announceTorrentDHT(t *Torrent, impliedPort bool) {
- for t.waitWantPeers() {
+ for {
+ select {
+ case <-t.wantPeersEvent.LockedChan(&cl.mu):
+ case <-t.closed.LockedChan(&cl.mu):
+ return
+ }
// log.Printf("getting peers for %q from DHT", t)
ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
if err != nil {
}
}
-func (cl *Client) trackerBlockedUnlocked(trRawURL string) (blocked bool, err error) {
- url_, err := url.Parse(trRawURL)
+func (cl *Client) prepareTrackerAnnounceUnlocked(announceURL string) (blocked bool, urlToUse string, host string, err error) {
+ _url, err := url.Parse(announceURL)
if err != nil {
return
}
- host, _, err := net.SplitHostPort(url_.Host)
- if err != nil {
- host = url_.Host
+ hmp := missinggo.SplitHostMaybePort(_url.Host)
+ if hmp.Err != nil {
+ err = hmp.Err
+ return
}
- addr, err := net.ResolveIPAddr("ip", host)
+ addr, err := net.ResolveIPAddr("ip", hmp.Host)
if err != nil {
return
}
cl.mu.RLock()
_, blocked = cl.ipBlockRange(addr.IP)
cl.mu.RUnlock()
+ host = _url.Host
+ hmp.Host = addr.String()
+ _url.Host = hmp.String()
+ urlToUse = _url.String()
return
}
-func (cl *Client) announceTorrentSingleTracker(tr string, req *tracker.AnnounceRequest, t *Torrent) (interval time.Duration, err error) {
- blocked, err := cl.trackerBlockedUnlocked(tr)
- if err != nil {
- err = fmt.Errorf("error determining if tracker blocked: %s", err)
- return
- }
- if blocked {
- err = errors.New("tracker has blocked IP")
- return
- }
- resp, err := tracker.Announce(tr, req)
- if err != nil {
- return
- }
- var peers []Peer
- for _, peer := range resp.Peers {
- peers = append(peers, Peer{
- IP: peer.IP,
- Port: peer.Port,
- })
- }
- t.AddPeers(peers)
- interval = time.Second * time.Duration(resp.Interval)
- return
-}
-
-func (cl *Client) announceTorrentTrackersFastStart(req *tracker.AnnounceRequest, trackers []trackerTier, t *Torrent) (atLeastOne bool) {
- oks := make(chan bool)
- outstanding := 0
- for _, tier := range trackers {
- for _, tr := range tier {
- outstanding++
- go func(tr string) {
- _, err := cl.announceTorrentSingleTracker(tr, req, t)
- oks <- err == nil
- }(tr)
- }
- }
- for outstanding > 0 {
- ok := <-oks
- outstanding--
- if ok {
- atLeastOne = true
- }
- }
- return
-}
-
-// Announce torrent to its trackers.
-func (cl *Client) announceTorrentTrackers(t *Torrent) {
- req := tracker.AnnounceRequest{
- Event: tracker.Started,
- NumWant: -1,
- Port: uint16(cl.incomingPeerPort()),
- PeerId: cl.peerID,
- InfoHash: t.infoHash,
- }
- if !t.waitWantPeers() {
- return
- }
- cl.mu.RLock()
- req.Left = t.bytesLeftAnnounce()
- trackers := t.trackers
- cl.mu.RUnlock()
- if cl.announceTorrentTrackersFastStart(&req, trackers, t) {
- req.Event = tracker.None
- }
-newAnnounce:
- for t.waitWantPeers() {
- cl.mu.RLock()
- req.Left = t.bytesLeftAnnounce()
- trackers = t.trackers
- cl.mu.RUnlock()
- numTrackersTried := 0
- for _, tier := range trackers {
- for trIndex, tr := range tier {
- numTrackersTried++
- interval, err := cl.announceTorrentSingleTracker(tr, &req, t)
- if err != nil {
- // Try the next tracker.
- continue
- }
- // Float the successful announce to the top of the tier. If
- // the trackers list has been changed, we'll be modifying an
- // old copy so it won't matter.
- cl.mu.Lock()
- tier[0], tier[trIndex] = tier[trIndex], tier[0]
- cl.mu.Unlock()
-
- req.Event = tracker.None
- // Wait the interval before attempting another announce.
- time.Sleep(interval)
- continue newAnnounce
- }
- }
- if numTrackersTried != 0 {
- log.Printf("%s: all trackers failed", t)
- }
- // TODO: Wait until trackers are added if there are none.
- time.Sleep(10 * time.Second)
- }
-}
-
func (cl *Client) allTorrentsCompleted() bool {
for _, t := range cl.torrents {
if !t.haveInfo() {
p.EverHashed = true
touchers := cl.reapPieceTouches(t, piece)
if correct {
+ for _, c := range touchers {
+ c.goodPiecesDirtied++
+ }
err := p.Storage().MarkComplete()
if err != nil {
log.Printf("%T: error completing piece %d: %s", t.storage, piece, err)
}
t.updatePieceCompletion(piece)
} else if len(touchers) != 0 {
- log.Printf("dropping %d conns that touched piece", len(touchers))
+ log.Printf("dropping and banning %d conns that touched piece", len(touchers))
for _, c := range touchers {
+ c.badPiecesDirtied++
+ t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr()))
t.dropConnection(c)
}
}
cl.DHT().AddNode(ni)
}
}
+
+func (cl *Client) banPeerIP(ip net.IP) {
+ if cl.badPeerIPs == nil {
+ cl.badPeerIPs = make(map[string]struct{})
+ }
+ cl.badPeerIPs[ip.String()] = struct{}{}
+}
}
spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
_, new, _ = cl.AddTorrentSpec(&spec)
- if new {
- t.FailNow()
- }
- assert.EqualValues(t, T.trackers[0][0], "http://a")
- assert.EqualValues(t, T.trackers[1][0], "udp://b")
+ assert.False(t, new)
+ assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
+ // Because trackers are disabled in TestingConfig.
+ assert.EqualValues(t, 0, len(T.trackerAnnouncers))
}
type badStorage struct{}
assert.EqualValues(t, cl.DHT().NumNodes(), 0)
tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
require.NoError(t, err)
- assert.Len(t, tt.trackers, 5)
+ assert.Len(t, tt.metainfo.AnnounceList, 5)
assert.EqualValues(t, 6, cl.DHT().NumNodes())
}
Info: &greetingMetainfo.Info,
})
}
+
+func TestPrepareTrackerAnnounce(t *testing.T) {
+ cl := &Client{}
+ blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
+ require.NoError(t, err)
+ assert.False(t, blocked)
+ assert.EqualValues(t, "localhost:1234", host)
+ assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
+}
continue
}
d := map[string]interface{}{
- "Name": info.Name,
- "NumPieces": info.NumPieces(),
- "PieceLength": info.PieceLength,
- "InfoHash": metainfo.Info.Hash().HexString(),
- "NumFiles": len(info.UpvertedFiles()),
- "TotalLength": info.TotalLength(),
+ "Name": info.Name,
+ "NumPieces": info.NumPieces(),
+ "PieceLength": info.PieceLength,
+ "InfoHash": metainfo.Info.Hash().HexString(),
+ "NumFiles": len(info.UpvertedFiles()),
+ "TotalLength": info.TotalLength(),
+ "Announce": metainfo.Announce,
+ "AnnounceList": metainfo.AnnounceList,
}
if flags.Files {
d["Files"] = info.Files
UnwantedChunksReceived int
UsefulChunksReceived int
chunksSent int
+ goodPiecesDirtied int
+ badPiecesDirtied int
lastMessageReceived time.Time
completedHandshake time.Time
peersAddedBySource = expvar.NewMap("peersAddedBySource")
- uploadChunksPosted = expvar.NewInt("uploadChunksPosted")
- unexpectedCancels = expvar.NewInt("unexpectedCancels")
- postedCancels = expvar.NewInt("postedCancels")
- duplicateConnsAvoided = expvar.NewInt("duplicateConnsAvoided")
+ uploadChunksPosted = expvar.NewInt("uploadChunksPosted")
+ unexpectedCancels = expvar.NewInt("unexpectedCancels")
+ postedCancels = expvar.NewInt("postedCancels")
pieceHashedCorrect = expvar.NewInt("pieceHashedCorrect")
pieceHashedNotCorrect = expvar.NewInt("pieceHashedNotCorrect")
mi = new(metainfo.MetaInfo)
mi.Info.Name = GreetingFileName
mi.Info.Length = int64(len(GreetingFileContents))
- mi.Announce = "lol://cheezburger"
mi.Info.PieceLength = 5
err := mi.Info.GeneratePieces(func(metainfo.FileInfo) (io.ReadCloser, error) {
return ioutil.NopCloser(strings.NewReader(GreetingFileContents)), nil
func (t *Torrent) Metainfo() *metainfo.MetaInfo {
t.cl.mu.Lock()
defer t.cl.mu.Unlock()
- return t.metainfo()
+ return t.newMetaInfo()
}
func (t *Torrent) addReader(r *Reader) {
}
return s
}
+
+func (t *Torrent) AddTrackers(announceList [][]string) {
+ t.cl.mu.Lock()
+ defer t.cl.mu.Unlock()
+ t.addTrackers(announceList)
+}
"github.com/anacrolix/torrent/metainfo"
pp "github.com/anacrolix/torrent/peer_protocol"
"github.com/anacrolix/torrent/storage"
+ "github.com/anacrolix/torrent/tracker"
)
func (t *Torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
// Storage for torrent data.
storage storage.Torrent
+ metainfo metainfo.MetaInfo
+
// The info dict. nil if we don't have it (yet).
info *metainfo.InfoEx
// Active peer connections, running message stream loops.
// Reserve of peers to connect to. A peer can be both here and in the
// active connections if were told about the peer after connecting with
// them. That encourages us to reconnect to peers that are well known.
- peers map[peersKey]Peer
- wantPeers sync.Cond
+ peers map[peersKey]Peer
+ wantPeersEvent missinggo.Event
- // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
- // mirror their respective URLs from the announce-list metainfo key.
- trackers []trackerTier
+ // An announcer for each tracker URL.
+ trackerAnnouncers map[string]*trackerScraper
// Name used if the info name isn't available.
displayName string
// The bencoded bytes of the info dict.
if err != nil {
return fmt.Errorf("bad info: %s", err)
}
+ defer t.updateWantPeersEvent()
t.info = ie
t.cl.event.Broadcast()
t.gotMetainfo.Set()
})
fmt.Fprintln(w)
fmt.Fprintf(w, "Trackers: ")
- for _, tier := range t.trackers {
- for _, tr := range tier {
- fmt.Fprintf(w, "%q ", tr)
- }
+ for _url := range t.trackerAnnouncers {
+ fmt.Fprintf(w, "%q ", _url)
}
fmt.Fprintf(w, "\n")
fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers))
// TODO: Include URIs that weren't converted to tracker clients.
func (t *Torrent) announceList() (al [][]string) {
- missinggo.CastSlice(&al, t.trackers)
- return
+ return t.metainfo.AnnounceList
}
// Returns a run-time generated MetaInfo that includes the info bytes and
// announce-list as currently known to the client.
-func (t *Torrent) metainfo() *metainfo.MetaInfo {
- if t.metadataBytes == nil {
- panic("info bytes not set")
- }
- return &metainfo.MetaInfo{
- Info: *t.info,
+func (t *Torrent) newMetaInfo() (mi *metainfo.MetaInfo) {
+ mi = &metainfo.MetaInfo{
CreationDate: time.Now().Unix(),
Comment: "dynamic metainfo from client",
CreatedBy: "go.torrent",
AnnounceList: t.announceList(),
}
+ if t.info != nil {
+ mi.Info = *t.info
+ }
+ return
}
func (t *Torrent) bytesLeft() (left int64) {
conn.Close()
}
t.pieceStateChanges.Close()
+ t.updateWantPeersEvent()
return
}
})
}
-func (t *Torrent) addTrackers(announceList [][]string) {
- newTrackers := copyTrackers(t.trackers)
- for tierIndex, tier := range announceList {
- if tierIndex < len(newTrackers) {
- newTrackers[tierIndex] = mergeTier(newTrackers[tierIndex], tier)
- } else {
- newTrackers = append(newTrackers, mergeTier(nil, tier))
+func appendMissingStrings(old, new []string) (ret []string) {
+ ret = old
+new:
+ for _, n := range new {
+ for _, o := range old {
+ if o == n {
+ continue new
+ }
}
- shuffleTier(newTrackers[tierIndex])
+ ret = append(ret, n)
}
- t.trackers = newTrackers
+ return
+}
+
+func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
+ ret = existing
+ for minNumTiers > len(ret) {
+ ret = append(ret, nil)
+ }
+ return
+}
+
+func (t *Torrent) addTrackers(announceList [][]string) {
+ fullAnnounceList := &t.metainfo.AnnounceList
+ t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
+ for tierIndex, trackerURLs := range announceList {
+ (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
+ }
+ t.startMissingTrackerScrapers()
+ t.updateWantPeersEvent()
}
// Don't call this before the info is available.
}
}
-// Returns true when peers are required, or false if the torrent is closing.
-func (t *Torrent) waitWantPeers() bool {
- t.cl.mu.Lock()
- defer t.cl.mu.Unlock()
- for {
- if t.closed.IsSet() {
- return false
- }
- if len(t.peers) > torrentPeersLowWater {
- goto wait
- }
- if t.needData() || t.seeding() {
- return true
- }
- wait:
- t.wantPeers.Wait()
+func (t *Torrent) wantPeers() bool {
+ if t.closed.IsSet() {
+ return false
+ }
+ if len(t.peers) > torrentPeersLowWater {
+ return false
+ }
+ return t.needData() || t.seeding()
+}
+
+func (t *Torrent) updateWantPeersEvent() {
+ if t.wantPeers() {
+ t.wantPeersEvent.Set()
+ } else {
+ t.wantPeersEvent.Clear()
}
}
}
return true
}
+
+// Adds and starts tracker scrapers for tracker URLs that aren't already
+// running.
+func (t *Torrent) startMissingTrackerScrapers() {
+ if t.cl.config.DisableTrackers {
+ return
+ }
+ for _, tier := range t.announceList() {
+ for _, trackerURL := range tier {
+ if _, ok := t.trackerAnnouncers[trackerURL]; ok {
+ continue
+ }
+ newAnnouncer := &trackerScraper{
+ url: trackerURL,
+ t: t,
+ }
+ if t.trackerAnnouncers == nil {
+ t.trackerAnnouncers = make(map[string]*trackerScraper)
+ }
+ t.trackerAnnouncers[trackerURL] = newAnnouncer
+ go newAnnouncer.Run()
+ }
+ }
+}
+
+// Returns an AnnounceRequest with fields filled out to defaults and current
+// values.
+func (t *Torrent) announceRequest() tracker.AnnounceRequest {
+ return tracker.AnnounceRequest{
+ Event: tracker.None,
+ NumWant: -1,
+ Port: uint16(t.cl.incomingPeerPort()),
+ PeerId: t.cl.peerID,
+ InfoHash: t.infoHash,
+ Left: t.bytesLeftAnnounce(),
+ }
+}
"net/url"
"strconv"
+ "github.com/anacrolix/missinggo/httptoo"
+
"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/util"
)
-func init() {
- registerClientScheme("http", newHTTPClient)
-}
-
-type httpClient struct {
- url url.URL
-}
-
-func (httpClient) Close() error { return nil }
-
-func newHTTPClient(url *url.URL) client {
- return &httpClient{
- url: *url,
- }
-}
-
type httpResponse struct {
FailureReason string `bencode:"failure reason"`
Interval int32 `bencode:"interval"`
return
}
-func (c *httpClient) Announce(ar *AnnounceRequest) (ret AnnounceResponse, err error) {
- // retain query parameters from announce URL
- q := c.url.Query()
+func setAnnounceParams(_url *url.URL, ar *AnnounceRequest) {
+ q := _url.Query()
q.Set("info_hash", string(ar.InfoHash[:]))
q.Set("peer_id", string(ar.PeerId[:]))
q.Set("compact", "1")
// According to https://wiki.vuze.com/w/Message_Stream_Encryption.
q.Set("supportcrypto", "1")
- var reqURL url.URL = c.url
- reqURL.RawQuery = q.Encode()
- resp, err := http.Get(reqURL.String())
+
+ _url.RawQuery = q.Encode()
+}
+
+func announceHTTP(ar *AnnounceRequest, _url *url.URL, host string) (ret AnnounceResponse, err error) {
+ _url = httptoo.CopyURL(_url)
+ setAnnounceParams(_url, ar)
+ req, err := http.NewRequest("GET", _url.String(), nil)
+ req.Host = host
+ resp, err := http.DefaultClient.Do(req)
if err != nil {
return
}
defer resp.Body.Close()
- buf := bytes.Buffer{}
+ var buf bytes.Buffer
io.Copy(&buf, resp.Body)
if resp.StatusCode != 200 {
err = fmt.Errorf("response from tracker: %s: %s", resp.Status, buf.String())
Downloaded int64
Left uint64
Uploaded int64
- Event AnnounceEvent
- IPAddress int32
- Key int32
- NumWant int32 // How many peer addresses are desired. -1 for default.
- Port uint16
+ // Apparently this is optional. None can be used for announces done at
+ // regular intervals.
+ Event AnnounceEvent
+ IPAddress int32
+ Key int32
+ NumWant int32 // How many peer addresses are desired. -1 for default.
+ Port uint16
} // 82 bytes
type AnnounceResponse struct {
Stopped // The local peer is leaving the swarm.
)
-type client interface {
- Announce(*AnnounceRequest) (AnnounceResponse, error)
- Close() error
-}
-
var (
ErrBadScheme = errors.New("unknown scheme")
-
- schemes = make(map[string]func(*url.URL) client)
)
-func registerClientScheme(scheme string, newFunc func(*url.URL) client) {
- schemes[scheme] = newFunc
+func Announce(urlStr string, req *AnnounceRequest) (res AnnounceResponse, err error) {
+ return AnnounceHost(urlStr, req, "")
}
-// Returns ErrBadScheme if the tracker scheme isn't recognised.
-func newClient(rawurl string) (cl client, err error) {
- url_s, err := url.Parse(rawurl)
+func AnnounceHost(urlStr string, req *AnnounceRequest, host string) (res AnnounceResponse, err error) {
+ _url, err := url.Parse(urlStr)
if err != nil {
return
}
- newFunc, ok := schemes[url_s.Scheme]
- if !ok {
+ switch _url.Scheme {
+ case "http", "https":
+ return announceHTTP(req, _url, host)
+ case "udp":
+ return announceUDP(req, _url)
+ default:
err = ErrBadScheme
return
}
- cl = newFunc(url_s)
- return
-}
-
-func Announce(urlStr string, req *AnnounceRequest) (res AnnounceResponse, err error) {
- cl, err := newClient(urlStr)
- if err != nil {
- return
- }
- defer cl.Close()
- return cl.Announce(req)
-
}
Seeders int32
}
-func init() {
- registerClientScheme("udp", newUDPClient)
-}
-
-func newUDPClient(url *url.URL) client {
- return &udpClient{
- url: *url,
- }
-}
-
func newTransactionId() int32 {
return int32(rand.Uint32())
}
return
}
-type udpClient struct {
+type udpAnnounce struct {
contiguousTimeouts int
connectionIdReceived time.Time
connectionId int64
url url.URL
}
-func (c *udpClient) Close() error {
+func (c *udpAnnounce) Close() error {
if c.socket != nil {
return c.socket.Close()
}
return nil
}
-func (c *udpClient) Announce(req *AnnounceRequest) (res AnnounceResponse, err error) {
+func (c *udpAnnounce) Do(req *AnnounceRequest) (res AnnounceResponse, err error) {
err = c.connect()
if err != nil {
return
// body is the binary serializable request body. trailer is optional data
// following it, such as for BEP 41.
-func (c *udpClient) write(h *RequestHeader, body interface{}, trailer []byte) (err error) {
+func (c *udpAnnounce) write(h *RequestHeader, body interface{}, trailer []byte) (err error) {
var buf bytes.Buffer
err = binary.Write(&buf, binary.BigEndian, h)
if err != nil {
// args is the binary serializable request body. trailer is optional data
// following it, such as for BEP 41.
-func (c *udpClient) request(action Action, args interface{}, options []byte) (responseBody *bytes.Buffer, err error) {
+func (c *udpAnnounce) request(action Action, args interface{}, options []byte) (responseBody *bytes.Buffer, err error) {
tid := newTransactionId()
err = c.write(&RequestHeader{
ConnectionId: c.connectionId,
return
}
-func (c *udpClient) connected() bool {
+func (c *udpAnnounce) connected() bool {
return !c.connectionIdReceived.IsZero() && time.Now().Before(c.connectionIdReceived.Add(time.Minute))
}
-func (c *udpClient) connect() (err error) {
+func (c *udpAnnounce) connect() (err error) {
if c.connected() {
return nil
}
c.connectionIdReceived = time.Now()
return
}
+
+func announceUDP(ar *AnnounceRequest, _url *url.URL) (AnnounceResponse, error) {
+ ua := udpAnnounce{
+ url: *_url,
+ }
+ defer ua.Close()
+ return ua.Do(ar)
+}
rand.Read(req.PeerId[:])
copy(req.InfoHash[:], []uint8{0xa3, 0x56, 0x41, 0x43, 0x74, 0x23, 0xe6, 0x26, 0xd9, 0x38, 0x25, 0x4a, 0x6b, 0x80, 0x49, 0x10, 0xa6, 0x67, 0xa, 0xc1})
ar, err := Announce("udp://tracker.openbittorrent.com:80/announce", &req)
- // Skip temporary errors as we don't control the server.
- if ne, ok := err.(net.Error); ok {
- if ne.Timeout() {
- t.Skip(err)
- }
- }
- // Skip DNS errors because the network might not be available, and we
- // don't control the domains we're testing.
- if oe, ok := err.(*net.OpError); ok {
- if _, ok := oe.Err.(*net.DNSError); ok {
- t.Skip(err)
- }
+ // Skip any net errors as we don't control the server.
+ if _, ok := err.(net.Error); ok {
+ t.Skip(err)
}
require.NoError(t, err)
t.Log(ar)
--- /dev/null
+package torrent
+
+import (
+ "log"
+ "time"
+
+ "github.com/anacrolix/missinggo"
+
+ "github.com/anacrolix/torrent/tracker"
+)
+
+// Announces a torrent to a tracker at regular intervals, when peers are
+// required.
+type trackerScraper struct {
+ url string
+ // Causes the trackerScraper to stop running.
+ stop missinggo.Event
+ t *Torrent
+}
+
+func trackerToTorrentPeers(ps []tracker.Peer) (ret []Peer) {
+ ret = make([]Peer, 0, len(ps))
+ for _, p := range ps {
+ ret = append(ret, Peer{
+ IP: p.IP,
+ Port: p.Port,
+ Source: peerSourceTracker,
+ })
+ }
+ return
+}
+
+// Return how long to wait before trying again. For most errors, we return 5
+// minutes, a relatively quick turn around for DNS changes.
+func (me *trackerScraper) announce() time.Duration {
+ blocked, urlToUse, host, err := me.t.cl.prepareTrackerAnnounceUnlocked(me.url)
+ if err != nil {
+ log.Printf("error preparing announce to %q: %s", me.url, err)
+ return 5 * time.Minute
+ }
+ if blocked {
+ log.Printf("announce to tracker %q blocked by IP", me.url)
+ return 5 * time.Minute
+ }
+ me.t.cl.mu.Lock()
+ req := me.t.announceRequest()
+ me.t.cl.mu.Unlock()
+ res, err := tracker.AnnounceHost(urlToUse, &req, host)
+ if err != nil {
+ log.Printf("error announcing %s %q to %q: %s", me.t.InfoHash().HexString(), me.t.Name(), me.url, err)
+ return 5 * time.Minute
+ }
+ me.t.AddPeers(trackerToTorrentPeers(res.Peers))
+ return time.Duration(res.Interval) * time.Second
+}
+
+func (me *trackerScraper) Run() {
+ for {
+ select {
+ case <-me.t.closed.LockedChan(&me.t.cl.mu):
+ return
+ case <-me.stop.LockedChan(&me.t.cl.mu):
+ return
+ case <-me.t.wantPeersEvent.LockedChan(&me.t.cl.mu):
+ }
+
+ intervalChan := time.After(me.announce())
+
+ select {
+ case <-me.t.closed.LockedChan(&me.t.cl.mu):
+ return
+ case <-me.stop.LockedChan(&me.t.cl.mu):
+ return
+ case <-intervalChan:
+ }
+ }
+}