]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Merge branch 'master' of https://github.com/lovedboy/torrent
authorlovedboy <lovedboy.tk@qq.com>
Tue, 24 May 2016 05:19:55 +0000 (13:19 +0800)
committerlovedboy <lovedboy.tk@qq.com>
Tue, 24 May 2016 05:19:55 +0000 (13:19 +0800)
13 files changed:
client.go
client_test.go
cmd/torrent-metainfo-pprint/main.go
connection.go
global.go
internal/testutil/testutil.go
t.go
torrent.go
tracker/http.go
tracker/tracker.go
tracker/udp.go
tracker/udp_test.go
tracker_scraper.go [new file with mode: 0644]

index 3759bfd1dda63546de0e9d7d88084164fcbb301a..e03e3a3c698fa721fcf10102e403ee27a6330b60 100644 (file)
--- a/client.go
+++ b/client.go
@@ -33,7 +33,6 @@ import (
        "github.com/anacrolix/torrent/mse"
        pp "github.com/anacrolix/torrent/peer_protocol"
        "github.com/anacrolix/torrent/storage"
-       "github.com/anacrolix/torrent/tracker"
 )
 
 // Currently doesn't really queue, but should in the future.
@@ -74,6 +73,7 @@ type Client struct {
        // include ourselves if we end up trying to connect to our own address
        // through legitimate channels.
        dopplegangerAddrs map[string]struct{}
+       badPeerIPs        map[string]struct{}
 
        defaultStorage storage.Client
 
@@ -157,6 +157,7 @@ func (cl *Client) WriteStatus(_w io.Writer) {
                fmt.Fprintln(w, "Not listening!")
        }
        fmt.Fprintf(w, "Peer ID: %+q\n", cl.peerID)
+       fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPs))
        if cl.dHT != nil {
                dhtStats := cl.dHT.Stats()
                fmt.Fprintf(w, "DHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
@@ -397,12 +398,12 @@ func (cl *Client) acceptConnections(l net.Listener, utp bool) {
                        acceptTCP.Add(1)
                }
                cl.mu.RLock()
-               doppleganger := cl.dopplegangerAddr(conn.RemoteAddr().String())
-               _, blocked := cl.ipBlockRange(missinggo.AddrIP(conn.RemoteAddr()))
+               reject := cl.badPeerIPPort(
+                       missinggo.AddrIP(conn.RemoteAddr()),
+                       missinggo.AddrPort(conn.RemoteAddr()))
                cl.mu.RUnlock()
-               if blocked || doppleganger {
+               if reject {
                        acceptReject.Add(1)
-                       // log.Printf("inbound connection from %s blocked by %s", conn.RemoteAddr(), blockRange)
                        conn.Close()
                        continue
                }
@@ -474,13 +475,11 @@ func (cl *Client) initiateConn(peer Peer, t *Torrent) {
        if peer.Id == cl.peerID {
                return
        }
-       addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
-       if cl.dopplegangerAddr(addr) || t.addrActive(addr) {
-               duplicateConnsAvoided.Add(1)
+       if cl.badPeerIPPort(peer.IP, peer.Port) {
                return
        }
-       if r, ok := cl.ipBlockRange(peer.IP); ok {
-               log.Printf("outbound connect to %s blocked by IP blocklist rule %s", peer.IP, r)
+       addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
+       if t.addrActive(addr) {
                return
        }
        t.halfOpen[addr] = struct{}{}
@@ -876,8 +875,6 @@ func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainf
 
 func (cl *Client) runInitiatedHandshookConn(c *connection, t *Torrent) {
        if c.PeerID == cl.peerID {
-               // Only if we initiated the connection is the remote address a
-               // listen addr for a doppleganger.
                connsToSelf.Add(1)
                addr := c.conn.RemoteAddr().String()
                cl.dopplegangerAddrs[addr] = struct{}{}
@@ -904,6 +901,10 @@ func (cl *Client) runReceivedConn(c *connection) {
        cl.mu.Lock()
        defer cl.mu.Unlock()
        if c.PeerID == cl.peerID {
+               // Because the remote address is not necessarily the same as its
+               // client's torrent listen address, we won't record the remote address
+               // as a doppleganger. Instead, the initiator can record *us* as the
+               // doppleganger.
                return
        }
        cl.runHandshookConn(c, t)
@@ -1375,6 +1376,7 @@ func (cl *Client) wantConns(t *Torrent) bool {
 }
 
 func (cl *Client) openNewConns(t *Torrent) {
+       defer t.updateWantPeersEvent()
        for len(t.peers) != 0 {
                if !cl.wantConns(t) {
                        return
@@ -1392,22 +1394,27 @@ func (cl *Client) openNewConns(t *Torrent) {
                delete(t.peers, k)
                cl.initiateConn(p, t)
        }
-       t.wantPeers.Broadcast()
+}
+
+func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
+       if port == 0 {
+               return true
+       }
+       if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
+               return true
+       }
+       if _, ok := cl.ipBlockRange(ip); ok {
+               return true
+       }
+       if _, ok := cl.badPeerIPs[ip.String()]; ok {
+               return true
+       }
+       return false
 }
 
 func (cl *Client) addPeers(t *Torrent, peers []Peer) {
        for _, p := range peers {
-               if cl.dopplegangerAddr(net.JoinHostPort(
-                       p.IP.String(),
-                       strconv.FormatInt(int64(p.Port), 10),
-               )) {
-                       continue
-               }
-               if _, ok := cl.ipBlockRange(p.IP); ok {
-                       continue
-               }
-               if p.Port == 0 {
-                       // The spec says to scrub these yourselves. Fine.
+               if cl.badPeerIPPort(p.IP, p.Port) {
                        continue
                }
                t.addPeer(p, cl)
@@ -1429,7 +1436,6 @@ func (cl *Client) newTorrent(ih metainfo.Hash) (t *Torrent) {
 
                storageOpener: cl.defaultStorage,
        }
-       t.wantPeers.L = &cl.mu
        return
 }
 
@@ -1450,26 +1456,6 @@ func shuffleTier(tier trackerTier) {
        }
 }
 
-func copyTrackers(base []trackerTier) (copy []trackerTier) {
-       for _, tier := range base {
-               copy = append(copy, append(trackerTier(nil), tier...))
-       }
-       return
-}
-
-func mergeTier(tier trackerTier, newURLs []string) trackerTier {
-nextURL:
-       for _, url := range newURLs {
-               for _, trURL := range tier {
-                       if trURL == url {
-                               continue nextURL
-                       }
-               }
-               tier = append(tier, url)
-       }
-       return tier
-}
-
 // A file-like handle to some torrent data resource.
 type Handle interface {
        io.Reader
@@ -1513,10 +1499,8 @@ func TorrentSpecFromMetaInfo(mi *metainfo.MetaInfo) (spec *TorrentSpec) {
                DisplayName: mi.Info.Name,
                InfoHash:    mi.Info.Hash(),
        }
-       if len(spec.Trackers) == 0 {
-               spec.Trackers = [][]string{[]string{mi.Announce}}
-       } else {
-               spec.Trackers[0] = append(spec.Trackers[0], mi.Announce)
+       if spec.Trackers == nil && mi.Announce != "" {
+               spec.Trackers = [][]string{{mi.Announce}}
        }
        return
 }
@@ -1530,13 +1514,11 @@ func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bo
        }
        new = true
        t = cl.newTorrent(infoHash)
-       if !cl.config.DisableTrackers {
-               go cl.announceTorrentTrackers(t)
-       }
        if cl.dHT != nil {
                go cl.announceTorrentDHT(t, true)
        }
        cl.torrents[infoHash] = t
+       t.updateWantPeersEvent()
        return
 }
 
@@ -1580,7 +1562,12 @@ func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
 }
 
 func (cl *Client) announceTorrentDHT(t *Torrent, impliedPort bool) {
-       for t.waitWantPeers() {
+       for {
+               select {
+               case <-t.wantPeersEvent.LockedChan(&cl.mu):
+               case <-t.closed.LockedChan(&cl.mu):
+                       return
+               }
                // log.Printf("getting peers for %q from DHT", t)
                ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
                if err != nil {
@@ -1630,128 +1617,30 @@ func (cl *Client) announceTorrentDHT(t *Torrent, impliedPort bool) {
        }
 }
 
-func (cl *Client) trackerBlockedUnlocked(trRawURL string) (blocked bool, err error) {
-       url_, err := url.Parse(trRawURL)
+func (cl *Client) prepareTrackerAnnounceUnlocked(announceURL string) (blocked bool, urlToUse string, host string, err error) {
+       _url, err := url.Parse(announceURL)
        if err != nil {
                return
        }
-       host, _, err := net.SplitHostPort(url_.Host)
-       if err != nil {
-               host = url_.Host
+       hmp := missinggo.SplitHostMaybePort(_url.Host)
+       if hmp.Err != nil {
+               err = hmp.Err
+               return
        }
-       addr, err := net.ResolveIPAddr("ip", host)
+       addr, err := net.ResolveIPAddr("ip", hmp.Host)
        if err != nil {
                return
        }
        cl.mu.RLock()
        _, blocked = cl.ipBlockRange(addr.IP)
        cl.mu.RUnlock()
+       host = _url.Host
+       hmp.Host = addr.String()
+       _url.Host = hmp.String()
+       urlToUse = _url.String()
        return
 }
 
-func (cl *Client) announceTorrentSingleTracker(tr string, req *tracker.AnnounceRequest, t *Torrent) (interval time.Duration, err error) {
-       blocked, err := cl.trackerBlockedUnlocked(tr)
-       if err != nil {
-               err = fmt.Errorf("error determining if tracker blocked: %s", err)
-               return
-       }
-       if blocked {
-               err = errors.New("tracker has blocked IP")
-               return
-       }
-       resp, err := tracker.Announce(tr, req)
-       if err != nil {
-               return
-       }
-       var peers []Peer
-       for _, peer := range resp.Peers {
-               peers = append(peers, Peer{
-                       IP:   peer.IP,
-                       Port: peer.Port,
-               })
-       }
-       t.AddPeers(peers)
-       interval = time.Second * time.Duration(resp.Interval)
-       return
-}
-
-func (cl *Client) announceTorrentTrackersFastStart(req *tracker.AnnounceRequest, trackers []trackerTier, t *Torrent) (atLeastOne bool) {
-       oks := make(chan bool)
-       outstanding := 0
-       for _, tier := range trackers {
-               for _, tr := range tier {
-                       outstanding++
-                       go func(tr string) {
-                               _, err := cl.announceTorrentSingleTracker(tr, req, t)
-                               oks <- err == nil
-                       }(tr)
-               }
-       }
-       for outstanding > 0 {
-               ok := <-oks
-               outstanding--
-               if ok {
-                       atLeastOne = true
-               }
-       }
-       return
-}
-
-// Announce torrent to its trackers.
-func (cl *Client) announceTorrentTrackers(t *Torrent) {
-       req := tracker.AnnounceRequest{
-               Event:    tracker.Started,
-               NumWant:  -1,
-               Port:     uint16(cl.incomingPeerPort()),
-               PeerId:   cl.peerID,
-               InfoHash: t.infoHash,
-       }
-       if !t.waitWantPeers() {
-               return
-       }
-       cl.mu.RLock()
-       req.Left = t.bytesLeftAnnounce()
-       trackers := t.trackers
-       cl.mu.RUnlock()
-       if cl.announceTorrentTrackersFastStart(&req, trackers, t) {
-               req.Event = tracker.None
-       }
-newAnnounce:
-       for t.waitWantPeers() {
-               cl.mu.RLock()
-               req.Left = t.bytesLeftAnnounce()
-               trackers = t.trackers
-               cl.mu.RUnlock()
-               numTrackersTried := 0
-               for _, tier := range trackers {
-                       for trIndex, tr := range tier {
-                               numTrackersTried++
-                               interval, err := cl.announceTorrentSingleTracker(tr, &req, t)
-                               if err != nil {
-                                       // Try the next tracker.
-                                       continue
-                               }
-                               // Float the successful announce to the top of the tier. If
-                               // the trackers list has been changed, we'll be modifying an
-                               // old copy so it won't matter.
-                               cl.mu.Lock()
-                               tier[0], tier[trIndex] = tier[trIndex], tier[0]
-                               cl.mu.Unlock()
-
-                               req.Event = tracker.None
-                               // Wait the interval before attempting another announce.
-                               time.Sleep(interval)
-                               continue newAnnounce
-                       }
-               }
-               if numTrackersTried != 0 {
-                       log.Printf("%s: all trackers failed", t)
-               }
-               // TODO: Wait until trackers are added if there are none.
-               time.Sleep(10 * time.Second)
-       }
-}
-
 func (cl *Client) allTorrentsCompleted() bool {
        for _, t := range cl.torrents {
                if !t.haveInfo() {
@@ -1877,14 +1766,19 @@ func (cl *Client) pieceHashed(t *Torrent, piece int, correct bool) {
        p.EverHashed = true
        touchers := cl.reapPieceTouches(t, piece)
        if correct {
+               for _, c := range touchers {
+                       c.goodPiecesDirtied++
+               }
                err := p.Storage().MarkComplete()
                if err != nil {
                        log.Printf("%T: error completing piece %d: %s", t.storage, piece, err)
                }
                t.updatePieceCompletion(piece)
        } else if len(touchers) != 0 {
-               log.Printf("dropping %d conns that touched piece", len(touchers))
+               log.Printf("dropping and banning %d conns that touched piece", len(touchers))
                for _, c := range touchers {
+                       c.badPiecesDirtied++
+                       t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr()))
                        t.dropConnection(c)
                }
        }
@@ -2014,3 +1908,10 @@ func (cl *Client) AddDHTNodes(nodes []string) {
                cl.DHT().AddNode(ni)
        }
 }
+
+func (cl *Client) banPeerIP(ip net.IP) {
+       if cl.badPeerIPs == nil {
+               cl.badPeerIPs = make(map[string]struct{})
+       }
+       cl.badPeerIPs[ip.String()] = struct{}{}
+}
index 5da6b2cc3ed66279e76e672e5e4d570ef7b85e1f..d10a7567b759346ce0df76ad23832e9e9e638e0a 100644 (file)
@@ -453,11 +453,10 @@ func TestMergingTrackersByAddingSpecs(t *testing.T) {
        }
        spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
        _, new, _ = cl.AddTorrentSpec(&spec)
-       if new {
-               t.FailNow()
-       }
-       assert.EqualValues(t, T.trackers[0][0], "http://a")
-       assert.EqualValues(t, T.trackers[1][0], "udp://b")
+       assert.False(t, new)
+       assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
+       // Because trackers are disabled in TestingConfig.
+       assert.EqualValues(t, 0, len(T.trackerAnnouncers))
 }
 
 type badStorage struct{}
@@ -762,7 +761,7 @@ func TestAddMetainfoWithNodes(t *testing.T) {
        assert.EqualValues(t, cl.DHT().NumNodes(), 0)
        tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
        require.NoError(t, err)
-       assert.Len(t, tt.trackers, 5)
+       assert.Len(t, tt.metainfo.AnnounceList, 5)
        assert.EqualValues(t, 6, cl.DHT().NumNodes())
 }
 
@@ -889,3 +888,12 @@ func TestPieceCompletedInStorageButNotClient(t *testing.T) {
                Info: &greetingMetainfo.Info,
        })
 }
+
+func TestPrepareTrackerAnnounce(t *testing.T) {
+       cl := &Client{}
+       blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
+       require.NoError(t, err)
+       assert.False(t, blocked)
+       assert.EqualValues(t, "localhost:1234", host)
+       assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
+}
index 96fba9a8f035934859dd4628ccb3935d6a233b2d..8495ae10d6546e3b9b695f83e8cb0174d61e79b9 100644 (file)
@@ -35,12 +35,14 @@ func main() {
                        continue
                }
                d := map[string]interface{}{
-                       "Name":        info.Name,
-                       "NumPieces":   info.NumPieces(),
-                       "PieceLength": info.PieceLength,
-                       "InfoHash":    metainfo.Info.Hash().HexString(),
-                       "NumFiles":    len(info.UpvertedFiles()),
-                       "TotalLength": info.TotalLength(),
+                       "Name":         info.Name,
+                       "NumPieces":    info.NumPieces(),
+                       "PieceLength":  info.PieceLength,
+                       "InfoHash":     metainfo.Info.Hash().HexString(),
+                       "NumFiles":     len(info.UpvertedFiles()),
+                       "TotalLength":  info.TotalLength(),
+                       "Announce":     metainfo.Announce,
+                       "AnnounceList": metainfo.AnnounceList,
                }
                if flags.Files {
                        d["Files"] = info.Files
index 9264c3f7c98b847d3db14aa619b4bc7804ed4379..cbcd1c1e83adb795bb1286814939850386370a4b 100644 (file)
@@ -47,6 +47,8 @@ type connection struct {
        UnwantedChunksReceived int
        UsefulChunksReceived   int
        chunksSent             int
+       goodPiecesDirtied      int
+       badPiecesDirtied       int
 
        lastMessageReceived     time.Time
        completedHandshake      time.Time
index 5cce77f8a3a1318f65157b225640ee05a7be8ec9..f938baa282d6fa87f39381446e805f28be8b31ad 100644 (file)
--- a/global.go
+++ b/global.go
@@ -60,10 +60,9 @@ var (
 
        peersAddedBySource = expvar.NewMap("peersAddedBySource")
 
-       uploadChunksPosted    = expvar.NewInt("uploadChunksPosted")
-       unexpectedCancels     = expvar.NewInt("unexpectedCancels")
-       postedCancels         = expvar.NewInt("postedCancels")
-       duplicateConnsAvoided = expvar.NewInt("duplicateConnsAvoided")
+       uploadChunksPosted = expvar.NewInt("uploadChunksPosted")
+       unexpectedCancels  = expvar.NewInt("unexpectedCancels")
+       postedCancels      = expvar.NewInt("postedCancels")
 
        pieceHashedCorrect    = expvar.NewInt("pieceHashedCorrect")
        pieceHashedNotCorrect = expvar.NewInt("pieceHashedNotCorrect")
index 1d2963969e38a0f31e5405526d273e0dec09f270..1f236a424e96b475c214739b97404287cb57075c 100644 (file)
@@ -34,7 +34,6 @@ func GreetingMetaInfo() (mi *metainfo.MetaInfo) {
        mi = new(metainfo.MetaInfo)
        mi.Info.Name = GreetingFileName
        mi.Info.Length = int64(len(GreetingFileContents))
-       mi.Announce = "lol://cheezburger"
        mi.Info.PieceLength = 5
        err := mi.Info.GeneratePieces(func(metainfo.FileInfo) (io.ReadCloser, error) {
                return ioutil.NopCloser(strings.NewReader(GreetingFileContents)), nil
diff --git a/t.go b/t.go
index 899f759f9dc1bb68477fd52befac1ecd7bfcde36..854e0d563c0fe852e13c695b4b29234c4cf44698 100644 (file)
--- a/t.go
+++ b/t.go
@@ -120,7 +120,7 @@ func (t *Torrent) Length() int64 {
 func (t *Torrent) Metainfo() *metainfo.MetaInfo {
        t.cl.mu.Lock()
        defer t.cl.mu.Unlock()
-       return t.metainfo()
+       return t.newMetaInfo()
 }
 
 func (t *Torrent) addReader(r *Reader) {
@@ -197,3 +197,9 @@ func (t *Torrent) String() string {
        }
        return s
 }
+
+func (t *Torrent) AddTrackers(announceList [][]string) {
+       t.cl.mu.Lock()
+       defer t.cl.mu.Unlock()
+       t.addTrackers(announceList)
+}
index b89ce948dbe0a049edbc813980ac8ce6df8ff146..6848a6fddf6ffc87468f6b5cf20b60301f74f434 100644 (file)
@@ -25,6 +25,7 @@ import (
        "github.com/anacrolix/torrent/metainfo"
        pp "github.com/anacrolix/torrent/peer_protocol"
        "github.com/anacrolix/torrent/storage"
+       "github.com/anacrolix/torrent/tracker"
 )
 
 func (t *Torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
@@ -55,6 +56,8 @@ type Torrent struct {
        // Storage for torrent data.
        storage storage.Torrent
 
+       metainfo metainfo.MetaInfo
+
        // The info dict. nil if we don't have it (yet).
        info *metainfo.InfoEx
        // Active peer connections, running message stream loops.
@@ -66,12 +69,11 @@ type Torrent struct {
        // Reserve of peers to connect to. A peer can be both here and in the
        // active connections if were told about the peer after connecting with
        // them. That encourages us to reconnect to peers that are well known.
-       peers     map[peersKey]Peer
-       wantPeers sync.Cond
+       peers          map[peersKey]Peer
+       wantPeersEvent missinggo.Event
 
-       // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
-       // mirror their respective URLs from the announce-list metainfo key.
-       trackers []trackerTier
+       // An announcer for each tracker URL.
+       trackerAnnouncers map[string]*trackerScraper
        // Name used if the info name isn't available.
        displayName string
        // The bencoded bytes of the info dict.
@@ -216,6 +218,7 @@ func (t *Torrent) setInfoBytes(b []byte) error {
        if err != nil {
                return fmt.Errorf("bad info: %s", err)
        }
+       defer t.updateWantPeersEvent()
        t.info = ie
        t.cl.event.Broadcast()
        t.gotMetainfo.Set()
@@ -424,10 +427,8 @@ func (t *Torrent) writeStatus(w io.Writer, cl *Client) {
        })
        fmt.Fprintln(w)
        fmt.Fprintf(w, "Trackers: ")
-       for _, tier := range t.trackers {
-               for _, tr := range tier {
-                       fmt.Fprintf(w, "%q ", tr)
-               }
+       for _url := range t.trackerAnnouncers {
+               fmt.Fprintf(w, "%q ", _url)
        }
        fmt.Fprintf(w, "\n")
        fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers))
@@ -450,23 +451,22 @@ func (t *Torrent) haveInfo() bool {
 
 // TODO: Include URIs that weren't converted to tracker clients.
 func (t *Torrent) announceList() (al [][]string) {
-       missinggo.CastSlice(&al, t.trackers)
-       return
+       return t.metainfo.AnnounceList
 }
 
 // Returns a run-time generated MetaInfo that includes the info bytes and
 // announce-list as currently known to the client.
-func (t *Torrent) metainfo() *metainfo.MetaInfo {
-       if t.metadataBytes == nil {
-               panic("info bytes not set")
-       }
-       return &metainfo.MetaInfo{
-               Info:         *t.info,
+func (t *Torrent) newMetaInfo() (mi *metainfo.MetaInfo) {
+       mi = &metainfo.MetaInfo{
                CreationDate: time.Now().Unix(),
                Comment:      "dynamic metainfo from client",
                CreatedBy:    "go.torrent",
                AnnounceList: t.announceList(),
        }
+       if t.info != nil {
+               mi.Info = *t.info
+       }
+       return
 }
 
 func (t *Torrent) bytesLeft() (left int64) {
@@ -520,6 +520,7 @@ func (t *Torrent) close() (err error) {
                conn.Close()
        }
        t.pieceStateChanges.Close()
+       t.updateWantPeersEvent()
        return
 }
 
@@ -1048,17 +1049,36 @@ func (t *Torrent) needData() bool {
        })
 }
 
-func (t *Torrent) addTrackers(announceList [][]string) {
-       newTrackers := copyTrackers(t.trackers)
-       for tierIndex, tier := range announceList {
-               if tierIndex < len(newTrackers) {
-                       newTrackers[tierIndex] = mergeTier(newTrackers[tierIndex], tier)
-               } else {
-                       newTrackers = append(newTrackers, mergeTier(nil, tier))
+func appendMissingStrings(old, new []string) (ret []string) {
+       ret = old
+new:
+       for _, n := range new {
+               for _, o := range old {
+                       if o == n {
+                               continue new
+                       }
                }
-               shuffleTier(newTrackers[tierIndex])
+               ret = append(ret, n)
        }
-       t.trackers = newTrackers
+       return
+}
+
+func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
+       ret = existing
+       for minNumTiers > len(ret) {
+               ret = append(ret, nil)
+       }
+       return
+}
+
+func (t *Torrent) addTrackers(announceList [][]string) {
+       fullAnnounceList := &t.metainfo.AnnounceList
+       t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
+       for tierIndex, trackerURLs := range announceList {
+               (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
+       }
+       t.startMissingTrackerScrapers()
+       t.updateWantPeersEvent()
 }
 
 // Don't call this before the info is available.
@@ -1102,22 +1122,21 @@ func (t *Torrent) dropConnection(c *connection) {
        }
 }
 
-// Returns true when peers are required, or false if the torrent is closing.
-func (t *Torrent) waitWantPeers() bool {
-       t.cl.mu.Lock()
-       defer t.cl.mu.Unlock()
-       for {
-               if t.closed.IsSet() {
-                       return false
-               }
-               if len(t.peers) > torrentPeersLowWater {
-                       goto wait
-               }
-               if t.needData() || t.seeding() {
-                       return true
-               }
-       wait:
-               t.wantPeers.Wait()
+func (t *Torrent) wantPeers() bool {
+       if t.closed.IsSet() {
+               return false
+       }
+       if len(t.peers) > torrentPeersLowWater {
+               return false
+       }
+       return t.needData() || t.seeding()
+}
+
+func (t *Torrent) updateWantPeersEvent() {
+       if t.wantPeers() {
+               t.wantPeersEvent.Set()
+       } else {
+               t.wantPeersEvent.Clear()
        }
 }
 
@@ -1135,3 +1154,40 @@ func (t *Torrent) seeding() bool {
        }
        return true
 }
+
+// Adds and starts tracker scrapers for tracker URLs that aren't already
+// running.
+func (t *Torrent) startMissingTrackerScrapers() {
+       if t.cl.config.DisableTrackers {
+               return
+       }
+       for _, tier := range t.announceList() {
+               for _, trackerURL := range tier {
+                       if _, ok := t.trackerAnnouncers[trackerURL]; ok {
+                               continue
+                       }
+                       newAnnouncer := &trackerScraper{
+                               url: trackerURL,
+                               t:   t,
+                       }
+                       if t.trackerAnnouncers == nil {
+                               t.trackerAnnouncers = make(map[string]*trackerScraper)
+                       }
+                       t.trackerAnnouncers[trackerURL] = newAnnouncer
+                       go newAnnouncer.Run()
+               }
+       }
+}
+
+// Returns an AnnounceRequest with fields filled out to defaults and current
+// values.
+func (t *Torrent) announceRequest() tracker.AnnounceRequest {
+       return tracker.AnnounceRequest{
+               Event:    tracker.None,
+               NumWant:  -1,
+               Port:     uint16(t.cl.incomingPeerPort()),
+               PeerId:   t.cl.peerID,
+               InfoHash: t.infoHash,
+               Left:     t.bytesLeftAnnounce(),
+       }
+}
index 0f5c6c30df28a5c9a0d27b747212bd86cf606c80..20678b8c1d90ac0f734b27c4dbe454ee13422a24 100644 (file)
@@ -10,26 +10,12 @@ import (
        "net/url"
        "strconv"
 
+       "github.com/anacrolix/missinggo/httptoo"
+
        "github.com/anacrolix/torrent/bencode"
        "github.com/anacrolix/torrent/util"
 )
 
-func init() {
-       registerClientScheme("http", newHTTPClient)
-}
-
-type httpClient struct {
-       url url.URL
-}
-
-func (httpClient) Close() error { return nil }
-
-func newHTTPClient(url *url.URL) client {
-       return &httpClient{
-               url: *url,
-       }
-}
-
 type httpResponse struct {
        FailureReason string      `bencode:"failure reason"`
        Interval      int32       `bencode:"interval"`
@@ -56,9 +42,8 @@ func (r *httpResponse) UnmarshalPeers() (ret []Peer, err error) {
        return
 }
 
-func (c *httpClient) Announce(ar *AnnounceRequest) (ret AnnounceResponse, err error) {
-       // retain query parameters from announce URL
-       q := c.url.Query()
+func setAnnounceParams(_url *url.URL, ar *AnnounceRequest) {
+       q := _url.Query()
 
        q.Set("info_hash", string(ar.InfoHash[:]))
        q.Set("peer_id", string(ar.PeerId[:]))
@@ -73,14 +58,21 @@ func (c *httpClient) Announce(ar *AnnounceRequest) (ret AnnounceResponse, err er
        q.Set("compact", "1")
        // According to https://wiki.vuze.com/w/Message_Stream_Encryption.
        q.Set("supportcrypto", "1")
-       var reqURL url.URL = c.url
-       reqURL.RawQuery = q.Encode()
-       resp, err := http.Get(reqURL.String())
+
+       _url.RawQuery = q.Encode()
+}
+
+func announceHTTP(ar *AnnounceRequest, _url *url.URL, host string) (ret AnnounceResponse, err error) {
+       _url = httptoo.CopyURL(_url)
+       setAnnounceParams(_url, ar)
+       req, err := http.NewRequest("GET", _url.String(), nil)
+       req.Host = host
+       resp, err := http.DefaultClient.Do(req)
        if err != nil {
                return
        }
        defer resp.Body.Close()
-       buf := bytes.Buffer{}
+       var buf bytes.Buffer
        io.Copy(&buf, resp.Body)
        if resp.StatusCode != 200 {
                err = fmt.Errorf("response from tracker: %s: %s", resp.Status, buf.String())
index 3c6633018f0e9c953381cc85ef46d63942748718..ed8c42d80009426658d1f2dcfcdde8cc53240fc7 100644 (file)
@@ -13,11 +13,13 @@ type AnnounceRequest struct {
        Downloaded int64
        Left       uint64
        Uploaded   int64
-       Event      AnnounceEvent
-       IPAddress  int32
-       Key        int32
-       NumWant    int32 // How many peer addresses are desired. -1 for default.
-       Port       uint16
+       // Apparently this is optional. None can be used for announces done at
+       // regular intervals.
+       Event     AnnounceEvent
+       IPAddress int32
+       Key       int32
+       NumWant   int32 // How many peer addresses are desired. -1 for default.
+       Port      uint16
 } // 82 bytes
 
 type AnnounceResponse struct {
@@ -46,42 +48,26 @@ const (
        Stopped                 // The local peer is leaving the swarm.
 )
 
-type client interface {
-       Announce(*AnnounceRequest) (AnnounceResponse, error)
-       Close() error
-}
-
 var (
        ErrBadScheme = errors.New("unknown scheme")
-
-       schemes = make(map[string]func(*url.URL) client)
 )
 
-func registerClientScheme(scheme string, newFunc func(*url.URL) client) {
-       schemes[scheme] = newFunc
+func Announce(urlStr string, req *AnnounceRequest) (res AnnounceResponse, err error) {
+       return AnnounceHost(urlStr, req, "")
 }
 
-// Returns ErrBadScheme if the tracker scheme isn't recognised.
-func newClient(rawurl string) (cl client, err error) {
-       url_s, err := url.Parse(rawurl)
+func AnnounceHost(urlStr string, req *AnnounceRequest, host string) (res AnnounceResponse, err error) {
+       _url, err := url.Parse(urlStr)
        if err != nil {
                return
        }
-       newFunc, ok := schemes[url_s.Scheme]
-       if !ok {
+       switch _url.Scheme {
+       case "http", "https":
+               return announceHTTP(req, _url, host)
+       case "udp":
+               return announceUDP(req, _url)
+       default:
                err = ErrBadScheme
                return
        }
-       cl = newFunc(url_s)
-       return
-}
-
-func Announce(urlStr string, req *AnnounceRequest) (res AnnounceResponse, err error) {
-       cl, err := newClient(urlStr)
-       if err != nil {
-               return
-       }
-       defer cl.Close()
-       return cl.Announce(req)
-
 }
index 6d1228a38b991ba9047aea4165ecb118a4e39b56..07c7f0ae2d0d19a9c38442f8745483b499dcf625 100644 (file)
@@ -60,16 +60,6 @@ type AnnounceResponseHeader struct {
        Seeders  int32
 }
 
-func init() {
-       registerClientScheme("udp", newUDPClient)
-}
-
-func newUDPClient(url *url.URL) client {
-       return &udpClient{
-               url: *url,
-       }
-}
-
 func newTransactionId() int32 {
        return int32(rand.Uint32())
 }
@@ -85,7 +75,7 @@ func timeout(contiguousTimeouts int) (d time.Duration) {
        return
 }
 
-type udpClient struct {
+type udpAnnounce struct {
        contiguousTimeouts   int
        connectionIdReceived time.Time
        connectionId         int64
@@ -93,14 +83,14 @@ type udpClient struct {
        url                  url.URL
 }
 
-func (c *udpClient) Close() error {
+func (c *udpAnnounce) Close() error {
        if c.socket != nil {
                return c.socket.Close()
        }
        return nil
 }
 
-func (c *udpClient) Announce(req *AnnounceRequest) (res AnnounceResponse, err error) {
+func (c *udpAnnounce) Do(req *AnnounceRequest) (res AnnounceResponse, err error) {
        err = c.connect()
        if err != nil {
                return
@@ -140,7 +130,7 @@ func (c *udpClient) Announce(req *AnnounceRequest) (res AnnounceResponse, err er
 
 // body is the binary serializable request body. trailer is optional data
 // following it, such as for BEP 41.
-func (c *udpClient) write(h *RequestHeader, body interface{}, trailer []byte) (err error) {
+func (c *udpAnnounce) write(h *RequestHeader, body interface{}, trailer []byte) (err error) {
        var buf bytes.Buffer
        err = binary.Write(&buf, binary.BigEndian, h)
        if err != nil {
@@ -176,7 +166,7 @@ func write(w io.Writer, data interface{}) error {
 
 // args is the binary serializable request body. trailer is optional data
 // following it, such as for BEP 41.
-func (c *udpClient) request(action Action, args interface{}, options []byte) (responseBody *bytes.Buffer, err error) {
+func (c *udpAnnounce) request(action Action, args interface{}, options []byte) (responseBody *bytes.Buffer, err error) {
        tid := newTransactionId()
        err = c.write(&RequestHeader{
                ConnectionId:  c.connectionId,
@@ -232,11 +222,11 @@ func readBody(r io.Reader, data ...interface{}) (err error) {
        return
 }
 
-func (c *udpClient) connected() bool {
+func (c *udpAnnounce) connected() bool {
        return !c.connectionIdReceived.IsZero() && time.Now().Before(c.connectionIdReceived.Add(time.Minute))
 }
 
-func (c *udpClient) connect() (err error) {
+func (c *udpAnnounce) connect() (err error) {
        if c.connected() {
                return nil
        }
@@ -266,3 +256,11 @@ func (c *udpClient) connect() (err error) {
        c.connectionIdReceived = time.Now()
        return
 }
+
+func announceUDP(ar *AnnounceRequest, _url *url.URL) (AnnounceResponse, error) {
+       ua := udpAnnounce{
+               url: *_url,
+       }
+       defer ua.Close()
+       return ua.Do(ar)
+}
index d01830f68c28db4d582d0aac23760185095f0dbc..df639dbd83818bc4befc223dffcf061aa72676f2 100644 (file)
@@ -131,18 +131,9 @@ func TestUDPTracker(t *testing.T) {
        rand.Read(req.PeerId[:])
        copy(req.InfoHash[:], []uint8{0xa3, 0x56, 0x41, 0x43, 0x74, 0x23, 0xe6, 0x26, 0xd9, 0x38, 0x25, 0x4a, 0x6b, 0x80, 0x49, 0x10, 0xa6, 0x67, 0xa, 0xc1})
        ar, err := Announce("udp://tracker.openbittorrent.com:80/announce", &req)
-       // Skip temporary errors as we don't control the server.
-       if ne, ok := err.(net.Error); ok {
-               if ne.Timeout() {
-                       t.Skip(err)
-               }
-       }
-       // Skip DNS errors because the network might not be available, and we
-       // don't control the domains we're testing.
-       if oe, ok := err.(*net.OpError); ok {
-               if _, ok := oe.Err.(*net.DNSError); ok {
-                       t.Skip(err)
-               }
+       // Skip any net errors as we don't control the server.
+       if _, ok := err.(net.Error); ok {
+               t.Skip(err)
        }
        require.NoError(t, err)
        t.Log(ar)
diff --git a/tracker_scraper.go b/tracker_scraper.go
new file mode 100644 (file)
index 0000000..c675e11
--- /dev/null
@@ -0,0 +1,77 @@
+package torrent
+
+import (
+       "log"
+       "time"
+
+       "github.com/anacrolix/missinggo"
+
+       "github.com/anacrolix/torrent/tracker"
+)
+
+// Announces a torrent to a tracker at regular intervals, when peers are
+// required.
+type trackerScraper struct {
+       url string
+       // Causes the trackerScraper to stop running.
+       stop missinggo.Event
+       t    *Torrent
+}
+
+func trackerToTorrentPeers(ps []tracker.Peer) (ret []Peer) {
+       ret = make([]Peer, 0, len(ps))
+       for _, p := range ps {
+               ret = append(ret, Peer{
+                       IP:     p.IP,
+                       Port:   p.Port,
+                       Source: peerSourceTracker,
+               })
+       }
+       return
+}
+
+// Return how long to wait before trying again. For most errors, we return 5
+// minutes, a relatively quick turn around for DNS changes.
+func (me *trackerScraper) announce() time.Duration {
+       blocked, urlToUse, host, err := me.t.cl.prepareTrackerAnnounceUnlocked(me.url)
+       if err != nil {
+               log.Printf("error preparing announce to %q: %s", me.url, err)
+               return 5 * time.Minute
+       }
+       if blocked {
+               log.Printf("announce to tracker %q blocked by IP", me.url)
+               return 5 * time.Minute
+       }
+       me.t.cl.mu.Lock()
+       req := me.t.announceRequest()
+       me.t.cl.mu.Unlock()
+       res, err := tracker.AnnounceHost(urlToUse, &req, host)
+       if err != nil {
+               log.Printf("error announcing %s %q to %q: %s", me.t.InfoHash().HexString(), me.t.Name(), me.url, err)
+               return 5 * time.Minute
+       }
+       me.t.AddPeers(trackerToTorrentPeers(res.Peers))
+       return time.Duration(res.Interval) * time.Second
+}
+
+func (me *trackerScraper) Run() {
+       for {
+               select {
+               case <-me.t.closed.LockedChan(&me.t.cl.mu):
+                       return
+               case <-me.stop.LockedChan(&me.t.cl.mu):
+                       return
+               case <-me.t.wantPeersEvent.LockedChan(&me.t.cl.mu):
+               }
+
+               intervalChan := time.After(me.announce())
+
+               select {
+               case <-me.t.closed.LockedChan(&me.t.cl.mu):
+                       return
+               case <-me.stop.LockedChan(&me.t.cl.mu):
+                       return
+               case <-intervalChan:
+               }
+       }
+}