]> Sergey Matveev's repositories - btrtrc.git/blobdiff - torrent.go
No Web*
[btrtrc.git] / torrent.go
index 81e80ddb038476de2291b622424070412921c447..b15b980326584a6bf0289835960cf635c2cae7ac 100644 (file)
@@ -3,11 +3,11 @@ package torrent
 import (
        "bytes"
        "container/heap"
-       "context"
        "crypto/sha1"
        "errors"
        "fmt"
        "io"
+       "math/rand"
        "net/netip"
        "net/url"
        "sort"
@@ -29,20 +29,18 @@ import (
        "github.com/anacrolix/missinggo/v2/pubsub"
        "github.com/anacrolix/multiless"
        "github.com/anacrolix/sync"
-       request_strategy "github.com/anacrolix/torrent/request-strategy"
-       typedRoaring "github.com/anacrolix/torrent/typed-roaring"
        "github.com/davecgh/go-spew/spew"
-       "github.com/pion/datachannel"
 
        "github.com/anacrolix/torrent/bencode"
        "github.com/anacrolix/torrent/common"
        "github.com/anacrolix/torrent/metainfo"
        pp "github.com/anacrolix/torrent/peer_protocol"
+       request_strategy "github.com/anacrolix/torrent/request-strategy"
        "github.com/anacrolix/torrent/segments"
        "github.com/anacrolix/torrent/storage"
        "github.com/anacrolix/torrent/tracker"
+       typedRoaring "github.com/anacrolix/torrent/typed-roaring"
        "github.com/anacrolix/torrent/webseed"
-       "github.com/anacrolix/torrent/webtorrent"
 )
 
 // Maintains state of torrent within a Client. Many methods should not be called before the info is
@@ -60,8 +58,12 @@ type Torrent struct {
        userOnWriteChunkErr    func(error)
 
        closed   chansync.SetOnce
+       onClose  []func()
        infoHash metainfo.Hash
        pieces   []Piece
+
+       // The order pieces are requested if there's no stronger reason like availability or priority.
+       pieceRequestOrder []int
        // Values are the piece indices that changed.
        pieceStateChanges pubsub.PubSub[PieceStateChange]
        // The size of chunks to request from peers over the wire. This is
@@ -70,7 +72,7 @@ type Torrent struct {
        chunkPool sync.Pool
        // Total length of the torrent in bytes. Stored because it's not O(1) to
        // get this from the info dict.
-       length *int64
+       _length Option[int64]
 
        // The storage to open when the info dict becomes available.
        storageOpener *storage.Client
@@ -87,6 +89,8 @@ type Torrent struct {
        fileIndex segments.Index
        files     *[]*File
 
+       _chunksPerRegularPiece chunkIndexType
+
        webSeeds map[string]*Peer
        // Active peer connections, running message stream loops. TODO: Make this
        // open (not-closed) connections only.
@@ -140,9 +144,8 @@ type Torrent struct {
        initialPieceCheckDisabled bool
 
        connsWithAllPieces map[*Peer]struct{}
-       // Count of each request across active connections.
-       pendingRequests map[RequestIndex]*Peer
-       lastRequested   map[RequestIndex]time.Time
+
+       requestState map[RequestIndex]requestState
        // Chunks we've written to since the corresponding piece was last checked.
        dirtyChunks typedRoaring.Bitmap[RequestIndex]
 
@@ -156,6 +159,14 @@ type Torrent struct {
        sourcesLogger log.Logger
 
        smartBanCache smartBanCache
+
+       // Large allocations reused between request state updates.
+       requestPieceStates []request_strategy.PieceRequestOrderState
+       requestIndexes     []RequestIndex
+}
+
+func (t *Torrent) length() int64 {
+       return t._length.Value
 }
 
 func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) {
@@ -381,11 +392,6 @@ func (t *Torrent) makePieces() {
                beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files)
                endFile := pieceEndFileIndex(piece.torrentEndOffset(), files)
                piece.files = files[beginFile:endFile]
-               piece.undirtiedChunksIter = undirtiedChunksIter{
-                       TorrentDirtyChunks: &t.dirtyChunks,
-                       StartRequestIndex:  piece.requestIndexOffset(),
-                       EndRequestIndex:    piece.requestIndexOffset() + piece.numChunks(),
-               }
        }
 }
 
@@ -416,7 +422,7 @@ func (t *Torrent) cacheLength() {
        for _, f := range t.info.UpvertedFiles() {
                l += f.Length
        }
-       t.length = &l
+       t._length = Some(l)
 }
 
 // TODO: This shouldn't fail for storage reasons. Instead we should handle storage failure
@@ -435,6 +441,7 @@ func (t *Torrent) setInfo(info *metainfo.Info) error {
        t.nameMu.Lock()
        t.info = info
        t.nameMu.Unlock()
+       t._chunksPerRegularPiece = chunkIndexType((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
        t.updateComplete()
        t.fileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles()))
        t.displayName = "" // Save a few bytes lol.
@@ -453,7 +460,9 @@ func (t *Torrent) pieceRequestOrderKey(i int) request_strategy.PieceRequestOrder
 
 // This seems to be all the follow-up tasks after info is set, that can't fail.
 func (t *Torrent) onSetInfo() {
+       t.pieceRequestOrder = rand.Perm(t.numPieces())
        t.initPieceRequestOrder()
+       MakeSliceWithLength(&t.requestPieceStates, t.numPieces())
        for i := range t.pieces {
                p := &t.pieces[i]
                // Need to add relativeAvailability before updating piece completion, as that may result in conns
@@ -472,8 +481,7 @@ func (t *Torrent) onSetInfo() {
        t.cl.event.Broadcast()
        close(t.gotMetainfoC)
        t.updateWantPeersEvent()
-       t.pendingRequests = make(map[RequestIndex]*Peer)
-       t.lastRequested = make(map[RequestIndex]time.Time)
+       t.requestState = make(map[RequestIndex]requestState)
        t.tryCreateMorePieceHashers()
        t.iterPeers(func(p *Peer) {
                p.onGotInfo(t.info)
@@ -524,7 +532,7 @@ func (t *Torrent) setMetadataSize(size int) (err error) {
                return
        }
        if uint32(size) > maxMetadataSize {
-               return errors.New("bad size")
+               return log.WithLevel(log.Warning, errors.New("bad size"))
        }
        if len(t.metadataBytes) == size {
                return
@@ -759,9 +767,14 @@ func (t *Torrent) writeStatus(w io.Writer) {
                }
                return worseConn(i, j)
        })
+       var buf bytes.Buffer
        for i, c := range peers {
                fmt.Fprintf(w, "%2d. ", i+1)
-               c.writeStatus(w, t)
+               buf.Reset()
+               c.writeStatus(&buf, t)
+               w.Write(bytes.TrimRight(
+                       bytes.ReplaceAll(buf.Bytes(), []byte("\n"), []byte("\n    ")),
+                       " "))
        }
 }
 
@@ -843,7 +856,7 @@ func (t *Torrent) usualPieceSize() int {
 }
 
 func (t *Torrent) numPieces() pieceIndex {
-       return pieceIndex(t.info.NumPieces())
+       return t.info.NumPieces()
 }
 
 func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
@@ -855,6 +868,9 @@ func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
                err = errors.New("already closed")
                return
        }
+       for _, f := range t.onClose {
+               f()
+       }
        if t.storage != nil {
                wg.Add(1)
                go func() {
@@ -875,27 +891,31 @@ func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
        if t.storage != nil {
                t.deletePieceRequestOrder()
        }
+       t.assertAllPiecesRelativeAvailabilityZero()
+       t.pex.Reset()
+       t.cl.event.Broadcast()
+       t.pieceStateChanges.Close()
+       t.updateWantPeersEvent()
+       return
+}
+
+func (t *Torrent) assertAllPiecesRelativeAvailabilityZero() {
        for i := range t.pieces {
                p := t.piece(i)
                if p.relativeAvailability != 0 {
                        panic(fmt.Sprintf("piece %v has relative availability %v", i, p.relativeAvailability))
                }
        }
-       t.pex.Reset()
-       t.cl.event.Broadcast()
-       t.pieceStateChanges.Close()
-       t.updateWantPeersEvent()
-       return
 }
 
 func (t *Torrent) requestOffset(r Request) int64 {
-       return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r)
+       return torrentRequestOffset(t.length(), int64(t.usualPieceSize()), r)
 }
 
 // Return the request that would include the given offset into the torrent data. Returns !ok if
 // there is no such request.
 func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
-       return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off)
+       return torrentOffsetRequest(t.length(), t.info.PieceLength, int64(t.chunkSize), off)
 }
 
 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
@@ -921,10 +941,10 @@ func (t *Torrent) pieceNumChunks(piece pieceIndex) chunkIndexType {
 }
 
 func (t *Torrent) chunksPerRegularPiece() chunkIndexType {
-       return chunkIndexType((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
+       return t._chunksPerRegularPiece
 }
 
-func (t *Torrent) numRequests() RequestIndex {
+func (t *Torrent) numChunks() RequestIndex {
        if t.numPieces() == 0 {
                return 0
        }
@@ -943,7 +963,7 @@ func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer {
                return 0
        }
        if piece == t.numPieces()-1 {
-               ret := pp.Integer(*t.length % t.info.PieceLength)
+               ret := pp.Integer(t.length() % t.info.PieceLength)
                if ret != 0 {
                        return ret
                }
@@ -1219,7 +1239,7 @@ func (t *Torrent) updatePiecePriorities(begin, end pieceIndex, reason string) {
 
 // Returns the range of pieces [begin, end) that contains the extent of bytes.
 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) {
-       if off >= *t.length {
+       if off >= t.length() {
                return
        }
        if off < 0 {
@@ -1436,7 +1456,7 @@ func (t *Torrent) bytesCompleted() int64 {
        if !t.haveInfo() {
                return 0
        }
-       return *t.length - t.bytesLeft()
+       return t.length() - t.bytesLeft()
 }
 
 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
@@ -1489,7 +1509,7 @@ func (t *Torrent) assertPendingRequests() {
        }
        // var actual pendingRequests
        // if t.haveInfo() {
-       //      actual.m = make([]int, t.numRequests())
+       //      actual.m = make([]int, t.numChunks())
        // }
        // t.iterPeers(func(p *Peer) {
        //      p.requestState.Requests.Iterate(func(x uint32) bool {
@@ -1551,51 +1571,10 @@ func (t *Torrent) seeding() bool {
        return true
 }
 
-func (t *Torrent) onWebRtcConn(
-       c datachannel.ReadWriteCloser,
-       dcc webtorrent.DataChannelContext,
-) {
-       defer c.Close()
-       netConn := webrtcNetConn{
-               ReadWriteCloser:    c,
-               DataChannelContext: dcc,
-       }
-       peerRemoteAddr := netConn.RemoteAddr()
-       if t.cl.badPeerAddr(peerRemoteAddr) {
-               return
-       }
-       pc, err := t.cl.initiateProtocolHandshakes(
-               context.Background(),
-               netConn,
-               t,
-               dcc.LocalOffered,
-               false,
-               netConn.RemoteAddr(),
-               webrtcNetwork,
-               fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)),
-       )
-       if err != nil {
-               t.logger.WithDefaultLevel(log.Error).Printf("error in handshaking webrtc connection: %v", err)
-               return
-       }
-       if dcc.LocalOffered {
-               pc.Discovery = PeerSourceTracker
-       } else {
-               pc.Discovery = PeerSourceIncoming
-       }
-       pc.conn.SetWriteDeadline(time.Time{})
-       t.cl.lock()
-       defer t.cl.unlock()
-       err = t.cl.runHandshookConn(pc, t)
-       if err != nil {
-               t.logger.WithDefaultLevel(log.Critical).Printf("error running handshook webrtc conn: %v", err)
-       }
-}
-
 func (t *Torrent) logRunHandshookConn(pc *PeerConn, logAll bool, level log.Level) {
        err := t.cl.runHandshookConn(pc, t)
        if err != nil || logAll {
-               t.logger.WithDefaultLevel(level).Printf("error running handshook conn: %v", err)
+               t.logger.WithDefaultLevel(level).Levelf(log.ErrorLevel(err), "error running handshook conn: %v", err)
        }
 }
 
@@ -1603,25 +1582,6 @@ func (t *Torrent) runHandshookConnLoggingErr(pc *PeerConn) {
        t.logRunHandshookConn(pc, false, log.Debug)
 }
 
-func (t *Torrent) startWebsocketAnnouncer(u url.URL) torrentTrackerAnnouncer {
-       wtc, release := t.cl.websocketTrackers.Get(u.String())
-       go func() {
-               <-t.closed.Done()
-               release()
-       }()
-       wst := websocketTrackerStatus{u, wtc}
-       go func() {
-               err := wtc.Announce(tracker.Started, t.infoHash)
-               if err != nil {
-                       t.logger.WithDefaultLevel(log.Warning).Printf(
-                               "error in initial announce to %q: %v",
-                               u.String(), err,
-                       )
-               }
-       }()
-       return wst
-}
-
 func (t *Torrent) startScrapingTracker(_url string) {
        if _url == "" {
                return
@@ -1647,11 +1607,6 @@ func (t *Torrent) startScrapingTracker(_url string) {
        }
        sl := func() torrentTrackerAnnouncer {
                switch u.Scheme {
-               case "ws", "wss":
-                       if t.cl.config.DisableWebtorrent {
-                               return nil
-                       }
-                       return t.startWebsocketAnnouncer(*u)
                case "udp4":
                        if t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4 {
                                return nil
@@ -1701,7 +1656,7 @@ func (t *Torrent) announceRequest(event tracker.AnnounceEvent) tracker.AnnounceR
                Event: event,
                NumWant: func() int32 {
                        if t.wantPeers() && len(t.cl.dialers) > 0 {
-                               return -1
+                               return 200 // Win has UDP packet limit. See: https://github.com/anacrolix/torrent/issues/764
                        } else {
                                return 0
                        }
@@ -1997,7 +1952,11 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
                        c._stats.incrementPiecesDirtiedGood()
                }
                t.clearPieceTouchers(piece)
+               hasDirty := p.hasDirtyChunks()
                t.cl.unlock()
+               if hasDirty {
+                       p.Flush() // You can be synchronous here!
+               }
                err := p.Storage().MarkComplete()
                if err != nil {
                        t.logger.Printf("%T: error marking piece complete %d: %s", t.storage, piece, err)
@@ -2063,7 +2022,9 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
 }
 
 func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) {
-       for ri := t.pieceRequestIndexOffset(piece); ri < t.pieceRequestIndexOffset(piece+1); ri++ {
+       start := t.pieceRequestIndexOffset(piece)
+       end := start + t.pieceNumChunks(piece)
+       for ri := start; ri < end; ri++ {
                t.cancelRequest(ri)
        }
 }
@@ -2145,7 +2106,7 @@ func (t *Torrent) dropBannedPeers() {
        t.iterPeers(func(p *Peer) {
                remoteIp := p.remoteIp()
                if remoteIp == nil {
-                       if p.bannableAddr.Ok() {
+                       if p.bannableAddr.Ok {
                                t.logger.WithDefaultLevel(log.Debug).Printf("can't get remote ip for peer %v", p)
                        }
                        return
@@ -2401,7 +2362,6 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) {
                        },
                },
                activeRequests: make(map[Request]webseed.Request, maxRequests),
-               maxRequests:    maxRequests,
        }
        ws.peer.initRequestState()
        for _, opt := range opts {
@@ -2453,16 +2413,20 @@ func (t *Torrent) updateComplete() {
 }
 
 func (t *Torrent) cancelRequest(r RequestIndex) *Peer {
-       p := t.pendingRequests[r]
+       p := t.requestingPeer(r)
        if p != nil {
                p.cancel(r)
        }
-       delete(t.pendingRequests, r)
+       // TODO: This is a check that an old invariant holds. It can be removed after some testing.
+       //delete(t.pendingRequests, r)
+       if _, ok := t.requestState[r]; ok {
+               panic("expected request state to be gone")
+       }
        return p
 }
 
 func (t *Torrent) requestingPeer(r RequestIndex) *Peer {
-       return t.pendingRequests[r]
+       return t.requestState[r].peer
 }
 
 func (t *Torrent) addConnWithAllPieces(p *Peer) {
@@ -2494,3 +2458,41 @@ func (t *Torrent) hasStorageCap() bool {
 func (t *Torrent) pieceIndexOfRequestIndex(ri RequestIndex) pieceIndex {
        return pieceIndex(ri / t.chunksPerRegularPiece())
 }
+
+func (t *Torrent) iterUndirtiedRequestIndexesInPiece(
+       reuseIter *typedRoaring.Iterator[RequestIndex],
+       piece pieceIndex,
+       f func(RequestIndex),
+) {
+       reuseIter.Initialize(&t.dirtyChunks)
+       pieceRequestIndexOffset := t.pieceRequestIndexOffset(piece)
+       iterBitmapUnsetInRange(
+               reuseIter,
+               pieceRequestIndexOffset, pieceRequestIndexOffset+t.pieceNumChunks(piece),
+               f,
+       )
+}
+
+type requestState struct {
+       peer *Peer
+       when time.Time
+}
+
+// Returns an error if a received chunk is out of bounds in someway.
+func (t *Torrent) checkValidReceiveChunk(r Request) error {
+       if !t.haveInfo() {
+               return errors.New("torrent missing info")
+       }
+       if int(r.Index) >= t.numPieces() {
+               return fmt.Errorf("chunk index %v, torrent num pieces %v", r.Index, t.numPieces())
+       }
+       pieceLength := t.pieceLength(pieceIndex(r.Index))
+       if r.Begin >= pieceLength {
+               return fmt.Errorf("chunk begins beyond end of piece (%v >= %v)", r.Begin, pieceLength)
+       }
+       // We could check chunk lengths here, but chunk request size is not changed often, and tricky
+       // for peers to manipulate as they need to send potentially large buffers to begin with. There
+       // should be considerable checks elsewhere for this case due to the network overhead. We should
+       // catch most of the overflow manipulation stuff by checking index and begin above.
+       return nil
+}