]> Sergey Matveev's repositories - btrtrc.git/blobdiff - torrent.go
Merge pull request #410 from anacrolix/webseeds
[btrtrc.git] / torrent.go
index d231a8e54c6c77ceb1212ac38f3c82073a0b2f59..c2f936e2e2a41a08f4e9d357844d675ea9b3a2a2 100644 (file)
@@ -8,12 +8,17 @@ import (
        "fmt"
        "io"
        "math/rand"
+       "net/http"
        "net/url"
+       "sort"
        "sync"
        "text/tabwriter"
        "time"
        "unsafe"
 
+       "github.com/anacrolix/torrent/common"
+       "github.com/anacrolix/torrent/segments"
+       "github.com/anacrolix/torrent/webseed"
        "github.com/davecgh/go-spew/spew"
        "github.com/pion/datachannel"
 
@@ -75,8 +80,11 @@ type Torrent struct {
        metainfo metainfo.MetaInfo
 
        // The info dict. nil if we don't have it (yet).
-       info  *metainfo.Info
-       files *[]*File
+       info      *metainfo.Info
+       fileIndex segments.Index
+       files     *[]*File
+
+       webSeeds map[string]*peer
 
        // Active peer connections, running message stream loops. TODO: Make this
        // open (not-closed) connections only.
@@ -84,8 +92,8 @@ type Torrent struct {
        maxEstablishedConns int
        // Set of addrs to which we're attempting to connect. Connections are
        // half-open until all handshakes are completed.
-       halfOpen    map[string]Peer
-       fastestConn *PeerConn
+       halfOpen    map[string]PeerInfo
+       fastestPeer *peer
 
        // Reserve of peers to connect to. A peer can be both here and in the
        // active connections if were told about the peer after connecting with
@@ -182,9 +190,9 @@ func (t *Torrent) Closed() <-chan struct{} {
 
 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
 // pending, and half-open peers.
-func (t *Torrent) KnownSwarm() (ks []Peer) {
+func (t *Torrent) KnownSwarm() (ks []PeerInfo) {
        // Add pending peers to the list
-       t.peers.Each(func(peer Peer) {
+       t.peers.Each(func(peer PeerInfo) {
                ks = append(ks, peer)
        })
 
@@ -196,7 +204,7 @@ func (t *Torrent) KnownSwarm() (ks []Peer) {
        // Add active peers to the list
        for conn := range t.conns {
 
-               ks = append(ks, Peer{
+               ks = append(ks, PeerInfo{
                        Id:     conn.PeerID,
                        Addr:   conn.remoteAddr,
                        Source: conn.Discovery,
@@ -255,7 +263,7 @@ func (t *Torrent) unclosedConnsAsSlice() (ret []*PeerConn) {
        return
 }
 
-func (t *Torrent) addPeer(p Peer) (added bool) {
+func (t *Torrent) addPeer(p PeerInfo) (added bool) {
        cl := t.cl
        torrent.Add(fmt.Sprintf("peers added by source %q", p.Source), 1)
        if t.closed.IsSet() {
@@ -270,7 +278,7 @@ func (t *Torrent) addPeer(p Peer) (added bool) {
        }
        if replaced, ok := t.peers.AddReturningReplacedPeer(p); ok {
                torrent.Add("peers replaced", 1)
-               if !replaced.Equal(p) {
+               if !replaced.equal(p) {
                        t.logger.WithDefaultLevel(log.Debug).Printf("added %v replacing %v", p, replaced)
                        added = true
                }
@@ -391,6 +399,7 @@ func (t *Torrent) setInfo(info *metainfo.Info) error {
        t.nameMu.Lock()
        t.info = info
        t.nameMu.Unlock()
+       t.fileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles()))
        t.displayName = "" // Save a few bytes lol.
        t.initFiles()
        t.cacheLength()
@@ -398,13 +407,11 @@ func (t *Torrent) setInfo(info *metainfo.Info) error {
        return nil
 }
 
+// This seems to be all the follow-up tasks after info is set, that can't fail.
 func (t *Torrent) onSetInfo() {
-       for conn := range t.conns {
-               if err := conn.setNumPieces(t.numPieces()); err != nil {
-                       t.logger.Printf("closing connection: %s", err)
-                       conn.close()
-               }
-       }
+       t.iterPeers(func(p *peer) {
+               p.onGotInfo(t.info)
+       })
        for i := range t.pieces {
                t.updatePieceCompletion(pieceIndex(i))
                p := &t.pieces[i]
@@ -629,9 +636,11 @@ func (t *Torrent) writeStatus(w io.Writer) {
        spew.NewDefaultConfig()
        spew.Fdump(w, t.statsLocked())
 
-       conns := t.connsAsSlice()
-       slices.Sort(conns, worseConn)
-       for i, c := range conns {
+       peers := t.peersAsSlice()
+       sort.Slice(peers, func(i, j int) bool {
+               return worseConn(peers[i], peers[j])
+       })
+       for i, c := range peers {
                fmt.Fprintf(w, "%2d. ", i+1)
                c.writeStatus(w, t)
        }
@@ -731,8 +740,8 @@ func (t *Torrent) requestOffset(r request) int64 {
        return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r)
 }
 
-// Return the request that would include the given offset into the torrent
-// data. Returns !ok if there is no such request.
+// Return the request that would include the given offset into the torrent data. Returns !ok if
+// there is no such request.
 func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
        return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off)
 }
@@ -848,10 +857,9 @@ func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
        })
 }
 
-// The worst connection is one that hasn't been sent, or sent anything useful
-// for the longest. A bad connection is one that usually sends us unwanted
-// pieces, or has been in worser half of the established connections for more
-// than a minute.
+// The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad
+// connection is one that usually sends us unwanted pieces, or has been in worser half of the
+// established connections for more than a minute.
 func (t *Torrent) worstBadConn() *PeerConn {
        wcs := worseConnSlice{t.unclosedConnsAsSlice()}
        heap.Init(&wcs)
@@ -943,12 +951,12 @@ func (t *Torrent) maybeNewConns() {
 
 func (t *Torrent) piecePriorityChanged(piece pieceIndex) {
        // t.logger.Printf("piece %d priority changed", piece)
-       for c := range t.conns {
+       t.iterPeers(func(c *peer) {
                if c.updatePiecePriority(piece) {
                        // log.Print("conn piece priority changed")
                        c.updateRequests()
                }
-       }
+       })
        t.maybeNewConns()
        t.publishPieceChange(piece)
 }
@@ -1227,12 +1235,19 @@ func (t *Torrent) deleteConnection(c *PeerConn) (ret bool) {
        }
        torrent.Add("deleted connections", 1)
        c.deleteAllRequests()
-       if len(t.conns) == 0 {
+       if t.numActivePeers() == 0 {
                t.assertNoPendingRequests()
        }
        return
 }
 
+func (t *Torrent) numActivePeers() (num int) {
+       t.iterPeers(func(*peer) {
+               num++
+       })
+       return
+}
+
 func (t *Torrent) assertNoPendingRequests() {
        if len(t.pendingRequests) != 0 {
                panic(t.pendingRequests)
@@ -1378,6 +1393,9 @@ func (t *Torrent) startScrapingTracker(_url string) {
        sl := func() torrentTrackerAnnouncer {
                switch u.Scheme {
                case "ws", "wss":
+                       if t.cl.config.DisableWebtorrent {
+                               return nil
+                       }
                        return t.startWebsocketAnnouncer(*u)
                }
                if u.Scheme == "udp4" && (t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4) {
@@ -1455,7 +1473,7 @@ func (t *Torrent) consumeDhtAnnouncePeers(pvs <-chan dht.PeersValues) {
                                // Can't do anything with this.
                                continue
                        }
-                       t.addPeer(Peer{
+                       t.addPeer(PeerInfo{
                                Addr:   ipPortAddr{cp.IP, cp.Port},
                                Source: PeerSourceDhtGetPeers,
                        })
@@ -1510,7 +1528,7 @@ func (t *Torrent) dhtAnnouncer(s DhtServer) {
        }
 }
 
-func (t *Torrent) addPeers(peers []Peer) (added int) {
+func (t *Torrent) addPeers(peers []PeerInfo) (added int) {
        for _, p := range peers {
                if t.addPeer(p) {
                        added++
@@ -1556,7 +1574,7 @@ func (t *Torrent) numTotalPeers() int {
        for addr := range t.halfOpen {
                peers[addr] = struct{}{}
        }
-       t.peers.Each(func(peer Peer) {
+       t.peers.Each(func(peer PeerInfo) {
                peers[peer.Addr.String()] = struct{}{}
        })
        return len(peers)
@@ -1642,7 +1660,9 @@ func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
        defer t.cl.unlock()
        oldMax = t.maxEstablishedConns
        t.maxEstablishedConns = max
-       wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn)
+       wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), func(l, r *PeerConn) bool {
+               return worseConn(&l.peer, &r.peer)
+       })
        for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
                t.dropConnection(wcs.Pop().(*PeerConn))
        }
@@ -1695,7 +1715,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
                                c.stats().incrementPiecesDirtiedBad()
                        }
 
-                       bannableTouchers := make([]*PeerConn, 0, len(p.dirtiers))
+                       bannableTouchers := make([]*peer, 0, len(p.dirtiers))
                        for c := range p.dirtiers {
                                if !c.trusted {
                                        bannableTouchers = append(bannableTouchers, c)
@@ -1764,11 +1784,11 @@ func (t *Torrent) onIncompletePiece(piece pieceIndex) {
        //              c.drop()
        //      }
        // }
-       for conn := range t.conns {
+       t.iterPeers(func(conn *peer) {
                if conn.peerHasPiece(piece) {
                        conn.updateRequests()
                }
-       }
+       })
 }
 
 func (t *Torrent) tryCreateMorePieceHashers() {
@@ -1836,10 +1856,10 @@ func (t *Torrent) clearPieceTouchers(pi pieceIndex) {
        }
 }
 
-func (t *Torrent) connsAsSlice() (ret []*PeerConn) {
-       for c := range t.conns {
-               ret = append(ret, c)
-       }
+func (t *Torrent) peersAsSlice() (ret []*peer) {
+       t.iterPeers(func(p *peer) {
+               ret = append(ret, p)
+       })
        return
 }
 
@@ -1863,7 +1883,7 @@ func (t *Torrent) VerifyData() {
 }
 
 // Start the process of connecting to the given peer for the given torrent if appropriate.
-func (t *Torrent) initiateConn(peer Peer) {
+func (t *Torrent) initiateConn(peer PeerInfo) {
        if peer.Id == t.cl.peerID {
                return
        }
@@ -1882,9 +1902,9 @@ func (t *Torrent) initiateConn(peer Peer) {
 // Adds a trusted, pending peer for each of the given Client's addresses. Typically used in tests to
 // quickly make one Client visible to the Torrent of another Client.
 func (t *Torrent) AddClientPeer(cl *Client) int {
-       return t.AddPeers(func() (ps []Peer) {
+       return t.AddPeers(func() (ps []PeerInfo) {
                for _, la := range cl.ListenAddrs() {
-                       ps = append(ps, Peer{
+                       ps = append(ps, PeerInfo{
                                Addr:    la,
                                Trusted: true,
                        })
@@ -1928,11 +1948,11 @@ func (cb torrentRequestStrategyCallbacks) requestTimedOut(r request) {
        torrent.Add("request timeouts", 1)
        cb.t.cl.lock()
        defer cb.t.cl.unlock()
-       for cn := range cb.t.conns {
+       cb.t.iterPeers(func(cn *peer) {
                if cn.peerHasPiece(pieceIndex(r.Index)) {
                        cn.updateRequests()
                }
-       }
+       })
 
 }
 
@@ -1957,9 +1977,9 @@ func (t *Torrent) DisallowDataDownload() {
 func (t *Torrent) disallowDataDownloadLocked() {
        log.Printf("disallowing data download")
        t.dataDownloadDisallowed = true
-       for c := range t.conns {
+       t.iterPeers(func(c *peer) {
                c.updateRequests()
-       }
+       })
 }
 
 func (t *Torrent) AllowDataDownload() {
@@ -1967,10 +1987,9 @@ func (t *Torrent) AllowDataDownload() {
        defer t.cl.unlock()
        log.Printf("AllowDataDownload")
        t.dataDownloadDisallowed = false
-       for c := range t.conns {
+       t.iterPeers(func(c *peer) {
                c.updateRequests()
-       }
-
+       })
 }
 
 func (t *Torrent) AllowDataUpload() {
@@ -1998,3 +2017,52 @@ func (t *Torrent) SetOnWriteChunkError(f func(error)) {
        defer t.cl.unlock()
        t.userOnWriteChunkErr = f
 }
+
+func (t *Torrent) iterPeers(f func(*peer)) {
+       for pc := range t.conns {
+               f(&pc.peer)
+       }
+       for _, ws := range t.webSeeds {
+               f(ws)
+       }
+}
+
+func (t *Torrent) addWebSeed(url string) {
+       if t.cl.config.DisableWebseeds {
+               return
+       }
+       if _, ok := t.webSeeds[url]; ok {
+               return
+       }
+       const maxRequests = 10
+       ws := webSeed{
+               peer: peer{
+                       t:                        t,
+                       connString:               url,
+                       outgoing:                 true,
+                       network:                  "http",
+                       reconciledHandshakeStats: true,
+                       peerSentHaveAll:          true,
+                       PeerMaxRequests:          maxRequests,
+               },
+               client: webseed.Client{
+                       HttpClient: http.DefaultClient,
+                       Url:        url,
+               },
+               requests: make(map[request]webseed.Request, maxRequests),
+       }
+       ws.peer.peerImpl = &ws
+       if t.haveInfo() {
+               ws.onGotInfo(t.info)
+       }
+       t.webSeeds[url] = &ws.peer
+}
+
+func (t *Torrent) peerIsActive(p *peer) (active bool) {
+       t.iterPeers(func(p1 *peer) {
+               if p1 == p {
+                       active = true
+               }
+       })
+       return
+}