]> Sergey Matveev's repositories - btrtrc.git/blobdiff - torrent.go
Use reusable roaring iterators
[btrtrc.git] / torrent.go
index 5c8dca25d58db702d5eba5e4cb5c0b663c565d44..bd7b75738471913dc9ddb2da784f82b71f814800 100644 (file)
@@ -8,6 +8,8 @@ import (
        "errors"
        "fmt"
        "io"
+       "math/rand"
+       "net/netip"
        "net/url"
        "sort"
        "strings"
@@ -19,15 +21,17 @@ import (
        "github.com/anacrolix/chansync"
        "github.com/anacrolix/chansync/events"
        "github.com/anacrolix/dht/v2"
+       . "github.com/anacrolix/generics"
        "github.com/anacrolix/log"
        "github.com/anacrolix/missinggo/perf"
-       "github.com/anacrolix/missinggo/pubsub"
        "github.com/anacrolix/missinggo/slices"
        "github.com/anacrolix/missinggo/v2"
        "github.com/anacrolix/missinggo/v2/bitmap"
+       "github.com/anacrolix/missinggo/v2/pubsub"
        "github.com/anacrolix/multiless"
        "github.com/anacrolix/sync"
        request_strategy "github.com/anacrolix/torrent/request-strategy"
+       typedRoaring "github.com/anacrolix/torrent/typed-roaring"
        "github.com/davecgh/go-spew/spew"
        "github.com/pion/datachannel"
 
@@ -59,8 +63,11 @@ type Torrent struct {
        closed   chansync.SetOnce
        infoHash metainfo.Hash
        pieces   []Piece
+
+       // The order pieces are requested if there's no stronger reason like availability or priority.
+       pieceRequestOrder []int
        // Values are the piece indices that changed.
-       pieceStateChanges *pubsub.PubSub
+       pieceStateChanges pubsub.PubSub[PieceStateChange]
        // The size of chunks to request from peers over the wire. This is
        // normally 16KiB by convention these days.
        chunkSize pp.Integer
@@ -84,8 +91,9 @@ type Torrent struct {
        fileIndex segments.Index
        files     *[]*File
 
-       webSeeds map[string]*Peer
+       _chunksPerRegularPiece chunkIndexType
 
+       webSeeds map[string]*Peer
        // Active peer connections, running message stream loops. TODO: Make this
        // open (not-closed) connections only.
        conns               map[*PeerConn]struct{}
@@ -137,20 +145,34 @@ type Torrent struct {
        activePieceHashes         int
        initialPieceCheckDisabled bool
 
-       // Count of each request across active connections.
-       pendingRequests map[RequestIndex]*Peer
-       lastRequested   map[RequestIndex]time.Time
+       connsWithAllPieces map[*Peer]struct{}
+
+       requestState []requestState
        // Chunks we've written to since the corresponding piece was last checked.
-       dirtyChunks roaring.Bitmap
+       dirtyChunks typedRoaring.Bitmap[RequestIndex]
 
        pex pexState
 
        // Is On when all pieces are complete.
        Complete chansync.Flag
+
+       // Torrent sources in use keyed by the source string.
+       activeSources sync.Map
+       sourcesLogger log.Logger
+
+       smartBanCache smartBanCache
+
+       // Large allocations reused between request state updates.
+       requestPieceStates []request_strategy.PieceRequestOrderState
+       requestIndexes     []RequestIndex
 }
 
-func (t *Torrent) pieceAvailabilityFromPeers(i pieceIndex) (count int) {
+func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) {
+       // This could be done with roaring.BitSliceIndexing.
        t.iterPeers(func(peer *Peer) {
+               if _, ok := t.connsWithAllPieces[peer]; ok {
+                       return
+               }
                if peer.peerHasPiece(i) {
                        count++
                }
@@ -163,10 +185,10 @@ func (t *Torrent) decPieceAvailability(i pieceIndex) {
                return
        }
        p := t.piece(i)
-       if p.availability <= 0 {
-               panic(p.availability)
+       if p.relativeAvailability <= 0 {
+               panic(p.relativeAvailability)
        }
-       p.availability--
+       p.relativeAvailability--
        t.updatePieceRequestOrder(i)
 }
 
@@ -174,7 +196,7 @@ func (t *Torrent) incPieceAvailability(i pieceIndex) {
        // If we don't the info, this should be reconciled when we do.
        if t.haveInfo() {
                p := t.piece(i)
-               p.availability++
+               p.relativeAvailability++
                t.updatePieceRequestOrder(i)
        }
 }
@@ -243,6 +265,9 @@ func (t *Torrent) pieceComplete(piece pieceIndex) bool {
 }
 
 func (t *Torrent) pieceCompleteUncached(piece pieceIndex) storage.Completion {
+       if t.storage == nil {
+               return storage.Completion{Complete: false, Ok: true}
+       }
        return t.pieces[piece].Storage().Completion()
 }
 
@@ -365,11 +390,6 @@ func (t *Torrent) makePieces() {
                beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files)
                endFile := pieceEndFileIndex(piece.torrentEndOffset(), files)
                piece.files = files[beginFile:endFile]
-               piece.undirtiedChunksIter = undirtiedChunksIter{
-                       TorrentDirtyChunks: &t.dirtyChunks,
-                       StartRequestIndex:  piece.requestIndexOffset(),
-                       EndRequestIndex:    piece.requestIndexOffset() + piece.numChunks(),
-               }
        }
 }
 
@@ -419,6 +439,7 @@ func (t *Torrent) setInfo(info *metainfo.Info) error {
        t.nameMu.Lock()
        t.info = info
        t.nameMu.Unlock()
+       t._chunksPerRegularPiece = chunkIndexType((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
        t.updateComplete()
        t.fileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles()))
        t.displayName = "" // Save a few bytes lol.
@@ -437,27 +458,28 @@ func (t *Torrent) pieceRequestOrderKey(i int) request_strategy.PieceRequestOrder
 
 // This seems to be all the follow-up tasks after info is set, that can't fail.
 func (t *Torrent) onSetInfo() {
+       t.pieceRequestOrder = rand.Perm(t.numPieces())
        t.initPieceRequestOrder()
+       MakeSliceWithLength(&t.requestState, t.numChunks())
+       MakeSliceWithLength(&t.requestPieceStates, t.numPieces())
        for i := range t.pieces {
                p := &t.pieces[i]
-               // Need to add availability before updating piece completion, as that may result in conns
+               // Need to add relativeAvailability before updating piece completion, as that may result in conns
                // being dropped.
-               if p.availability != 0 {
-                       panic(p.availability)
+               if p.relativeAvailability != 0 {
+                       panic(p.relativeAvailability)
                }
-               p.availability = int64(t.pieceAvailabilityFromPeers(i))
+               p.relativeAvailability = t.selectivePieceAvailabilityFromPeers(i)
                t.addRequestOrderPiece(i)
-               t.updatePieceCompletion(pieceIndex(i))
+               t.updatePieceCompletion(i)
                if !t.initialPieceCheckDisabled && !p.storageCompletionOk {
                        // t.logger.Printf("piece %s completion unknown, queueing check", p)
-                       t.queuePieceCheck(pieceIndex(i))
+                       t.queuePieceCheck(i)
                }
        }
        t.cl.event.Broadcast()
        close(t.gotMetainfoC)
        t.updateWantPeersEvent()
-       t.pendingRequests = make(map[RequestIndex]*Peer)
-       t.lastRequested = make(map[RequestIndex]time.Time)
        t.tryCreateMorePieceHashers()
        t.iterPeers(func(p *Peer) {
                p.onGotInfo(t.info)
@@ -528,7 +550,7 @@ func (t *Torrent) name() string {
        t.nameMu.RLock()
        defer t.nameMu.RUnlock()
        if t.haveInfo() {
-               return t.info.Name
+               return t.info.BestName()
        }
        if t.displayName != "" {
                return t.displayName
@@ -567,25 +589,33 @@ func (t *Torrent) newMetadataExtensionMessage(c *PeerConn, msgType pp.ExtendedMe
 }
 
 type pieceAvailabilityRun struct {
-       count        pieceIndex
-       availability int64
+       Count        pieceIndex
+       Availability int
 }
 
 func (me pieceAvailabilityRun) String() string {
-       return fmt.Sprintf("%v(%v)", me.count, me.availability)
+       return fmt.Sprintf("%v(%v)", me.Count, me.Availability)
 }
 
 func (t *Torrent) pieceAvailabilityRuns() (ret []pieceAvailabilityRun) {
        rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
-               ret = append(ret, pieceAvailabilityRun{availability: el.(int64), count: int(count)})
+               ret = append(ret, pieceAvailabilityRun{Availability: el.(int), Count: int(count)})
        })
        for i := range t.pieces {
-               rle.Append(t.pieces[i].availability, 1)
+               rle.Append(t.pieces[i].availability(), 1)
        }
        rle.Flush()
        return
 }
 
+func (t *Torrent) pieceAvailabilityFrequencies() (freqs []int) {
+       freqs = make([]int, t.numActivePeers()+1)
+       for i := range t.pieces {
+               freqs[t.piece(i).availability()]++
+       }
+       return
+}
+
 func (t *Torrent) pieceStateRuns() (ret PieceStateRuns) {
        rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
                ret = append(ret, PieceStateRun{
@@ -670,12 +700,27 @@ func (t *Torrent) writeStatus(w io.Writer) {
        if t.info != nil {
                fmt.Fprintf(w, "Num Pieces: %d (%d completed)\n", t.numPieces(), t.numPiecesCompleted())
                fmt.Fprintf(w, "Piece States: %s\n", t.pieceStateRuns())
-               fmt.Fprintf(w, "Piece availability: %v\n", strings.Join(func() (ret []string) {
-                       for _, run := range t.pieceAvailabilityRuns() {
-                               ret = append(ret, run.String())
-                       }
-                       return
-               }(), " "))
+               // Generates a huge, unhelpful listing when piece availability is very scattered. Prefer
+               // availability frequencies instead.
+               if false {
+                       fmt.Fprintf(w, "Piece availability: %v\n", strings.Join(func() (ret []string) {
+                               for _, run := range t.pieceAvailabilityRuns() {
+                                       ret = append(ret, run.String())
+                               }
+                               return
+                       }(), " "))
+               }
+               fmt.Fprintf(w, "Piece availability frequency: %v\n", strings.Join(
+                       func() (ret []string) {
+                               for avail, freq := range t.pieceAvailabilityFrequencies() {
+                                       if freq == 0 {
+                                               continue
+                                       }
+                                       ret = append(ret, fmt.Sprintf("%v: %v", avail, freq))
+                               }
+                               return
+                       }(),
+                       ", "))
        }
        fmt.Fprintf(w, "Reader Pieces:")
        t.forReaderOffsetPieces(func(begin, end pieceIndex) (again bool) {
@@ -804,7 +849,7 @@ func (t *Torrent) usualPieceSize() int {
 }
 
 func (t *Torrent) numPieces() pieceIndex {
-       return pieceIndex(t.info.NumPieces())
+       return t.info.NumPieces()
 }
 
 func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
@@ -812,7 +857,10 @@ func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
 }
 
 func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
-       t.closed.Set()
+       if !t.closed.Set() {
+               err = errors.New("already closed")
+               return
+       }
        if t.storage != nil {
                wg.Add(1)
                go func() {
@@ -833,6 +881,12 @@ func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
        if t.storage != nil {
                t.deletePieceRequestOrder()
        }
+       for i := range t.pieces {
+               p := t.piece(i)
+               if p.relativeAvailability != 0 {
+                       panic(fmt.Sprintf("piece %v has relative availability %v", i, p.relativeAvailability))
+               }
+       }
        t.pex.Reset()
        t.cl.event.Broadcast()
        t.pieceStateChanges.Close()
@@ -872,15 +926,15 @@ func (t *Torrent) pieceNumChunks(piece pieceIndex) chunkIndexType {
        return chunkIndexType((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
 }
 
-func (t *Torrent) chunksPerRegularPiece() uint32 {
-       return uint32((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
+func (t *Torrent) chunksPerRegularPiece() chunkIndexType {
+       return t._chunksPerRegularPiece
 }
 
-func (t *Torrent) numRequests() RequestIndex {
+func (t *Torrent) numChunks() RequestIndex {
        if t.numPieces() == 0 {
                return 0
        }
-       return uint32(t.numPieces()-1)*t.chunksPerRegularPiece() + t.pieceNumChunks(t.numPieces()-1)
+       return RequestIndex(t.numPieces()-1)*t.chunksPerRegularPiece() + t.pieceNumChunks(t.numPieces()-1)
 }
 
 func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) {
@@ -903,7 +957,20 @@ func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer {
        return pp.Integer(t.info.PieceLength)
 }
 
-func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash, err error) {
+func (t *Torrent) smartBanBlockCheckingWriter(piece pieceIndex) *blockCheckingWriter {
+       return &blockCheckingWriter{
+               cache:        &t.smartBanCache,
+               requestIndex: t.pieceRequestIndexOffset(piece),
+               chunkSize:    t.chunkSize.Int(),
+       }
+}
+
+func (t *Torrent) hashPiece(piece pieceIndex) (
+       ret metainfo.Hash,
+       // These are peers that sent us blocks that differ from what we hash here.
+       differingPeers map[bannableAddr]struct{},
+       err error,
+) {
        p := t.piece(piece)
        p.waitNoPendingWrites()
        storagePiece := t.pieces[piece].Storage()
@@ -919,13 +986,18 @@ func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash, err error) {
 
        hash := pieceHash.New()
        const logPieceContents = false
+       smartBanWriter := t.smartBanBlockCheckingWriter(piece)
+       writers := []io.Writer{hash, smartBanWriter}
+       var examineBuf bytes.Buffer
        if logPieceContents {
-               var examineBuf bytes.Buffer
-               _, err = storagePiece.WriteTo(io.MultiWriter(hash, &examineBuf))
-               log.Printf("hashed %q with copy err %v", examineBuf.Bytes(), err)
-       } else {
-               _, err = storagePiece.WriteTo(hash)
+               writers = append(writers, &examineBuf)
        }
+       _, err = storagePiece.WriteTo(io.MultiWriter(writers...))
+       if logPieceContents {
+               t.logger.WithDefaultLevel(log.Debug).Printf("hashed %q with copy err %v", examineBuf.Bytes(), err)
+       }
+       smartBanWriter.Flush()
+       differingPeers = smartBanWriter.badPeers
        missinggo.CopyExact(&ret, hash.Sum(nil))
        return
 }
@@ -1108,7 +1180,7 @@ func (t *Torrent) piecePriorityChanged(piece pieceIndex, reason string) {
                        if !c.peerHasPiece(piece) {
                                return
                        }
-                       if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(uint32(piece)) {
+                       if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) {
                                return
                        }
                        c.updateRequests(reason)
@@ -1192,7 +1264,7 @@ func (t *Torrent) piecePriority(piece pieceIndex) piecePriority {
 }
 
 func (t *Torrent) pendRequest(req RequestIndex) {
-       t.piece(int(req / t.chunksPerRegularPiece())).pendChunkIndex(req % t.chunksPerRegularPiece())
+       t.piece(t.pieceIndexOfRequestIndex(req)).pendChunkIndex(req % t.chunksPerRegularPiece())
 }
 
 func (t *Torrent) pieceCompletionChanged(piece pieceIndex, reason string) {
@@ -1266,7 +1338,7 @@ func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool {
                t.logger.Printf("marked piece %v complete but still has dirtiers", piece)
        }
        if changed {
-               log.Fstr("piece %d completion changed: %+v -> %+v", piece, cached, uncached).SetLevel(log.Debug).Log(t.logger)
+               log.Fstr("piece %d completion changed: %+v -> %+v", piece, cached, uncached).LogLevel(log.Debug, t.logger)
                t.pieceCompletionChanged(piece, "Torrent.updatePieceCompletion")
        }
        return changed
@@ -1396,22 +1468,22 @@ func (t *Torrent) deletePeerConn(c *PeerConn) (ret bool) {
                }
        }
        torrent.Add("deleted connections", 1)
-       if !c.deleteAllRequests().IsEmpty() {
-               t.iterPeers(func(p *Peer) {
-                       if p.isLowOnRequests() {
-                               p.updateRequests("Torrent.deletePeerConn")
-                       }
-               })
-       }
+       c.deleteAllRequests("Torrent.deletePeerConn")
        t.assertPendingRequests()
+       if t.numActivePeers() == 0 && len(t.connsWithAllPieces) != 0 {
+               panic(t.connsWithAllPieces)
+       }
        return
 }
 
 func (t *Torrent) decPeerPieceAvailability(p *Peer) {
+       if t.deleteConnWithAllPieces(p) {
+               return
+       }
        if !t.haveInfo() {
                return
        }
-       p.newPeerPieces().Iterate(func(i uint32) bool {
+       p.peerPieces().Iterate(func(i uint32) bool {
                p.t.decPieceAvailability(pieceIndex(i))
                return true
        })
@@ -1423,7 +1495,7 @@ func (t *Torrent) assertPendingRequests() {
        }
        // var actual pendingRequests
        // if t.haveInfo() {
-       //      actual.m = make([]int, t.numRequests())
+       //      actual.m = make([]int, t.numChunks())
        // }
        // t.iterPeers(func(p *Peer) {
        //      p.requestState.Requests.Iterate(func(x uint32) bool {
@@ -1490,15 +1562,23 @@ func (t *Torrent) onWebRtcConn(
        dcc webtorrent.DataChannelContext,
 ) {
        defer c.Close()
+       netConn := webrtcNetConn{
+               ReadWriteCloser:    c,
+               DataChannelContext: dcc,
+       }
+       peerRemoteAddr := netConn.RemoteAddr()
+       if t.cl.badPeerAddr(peerRemoteAddr) {
+               return
+       }
        pc, err := t.cl.initiateProtocolHandshakes(
                context.Background(),
-               webrtcNetConn{c, dcc},
+               netConn,
                t,
                dcc.LocalOffered,
                false,
-               webrtcNetAddr{dcc.Remote},
+               netConn.RemoteAddr(),
                webrtcNetwork,
-               fmt.Sprintf("webrtc offer_id %x", dcc.OfferId),
+               fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)),
        )
        if err != nil {
                t.logger.WithDefaultLevel(log.Error).Printf("error in handshaking webrtc connection: %v", err)
@@ -1824,7 +1904,7 @@ func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
                if !t.cl.config.DropDuplicatePeerIds {
                        continue
                }
-               if left, ok := c.hasPreferredNetworkOver(c0); ok && left {
+               if c.hasPreferredNetworkOver(c0) {
                        c0.close()
                        t.deletePeerConn(c0)
                } else {
@@ -1882,7 +1962,9 @@ func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
 }
 
 func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
-       t.logger.Log(log.Fstr("hashed piece %d (passed=%t)", piece, passed).SetLevel(log.Debug))
+       t.logger.LazyLog(log.Debug, func() log.Msg {
+               return log.Fstr("hashed piece %d (passed=%t)", piece, passed)
+       })
        p := t.piece(piece)
        p.numVerifies++
        t.cl.event.Broadcast()
@@ -1897,7 +1979,10 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
                } else {
                        log.Fmsg(
                                "piece %d failed hash: %d connections contributed", piece, len(p.dirtiers),
-                       ).AddValues(t, p).SetLevel(log.Debug).Log(t.logger)
+                       ).AddValues(t, p).LogLevel(
+
+                               log.Debug, t.logger)
+
                        pieceHashedNotCorrect.Add(1)
                }
        }
@@ -1965,8 +2050,16 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
 
                        if len(bannableTouchers) >= 1 {
                                c := bannableTouchers[0]
-                               t.cl.banPeerIP(c.remoteIp())
-                               c.drop()
+                               if len(bannableTouchers) != 1 {
+                                       t.logger.Levelf(log.Warning, "would have banned %v for touching piece %v after failed piece check", c.remoteIp(), piece)
+                               } else {
+                                       // Turns out it's still useful to ban peers like this because if there's only a
+                                       // single peer for a piece, and we never progress that piece to completion, we
+                                       // will never smart-ban them. Discovered in
+                                       // https://github.com/anacrolix/torrent/issues/715.
+                                       t.logger.Levelf(log.Warning, "banning %v for being sole dirtier of piece %v after failed piece check", c, piece)
+                                       c.ban()
+                               }
                        }
                }
                t.onIncompletePiece(piece)
@@ -1976,9 +2069,10 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
 }
 
 func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) {
-       // TODO: Make faster
-       for cn := range t.conns {
-               cn.tickleWriter()
+       start := t.pieceRequestIndexOffset(piece)
+       end := start + t.pieceNumChunks(piece)
+       for ri := start; ri < end; ri++ {
+               t.cancelRequest(ri)
        }
 }
 
@@ -2055,9 +2149,32 @@ func (t *Torrent) getPieceToHash() (ret pieceIndex, ok bool) {
        return
 }
 
+func (t *Torrent) dropBannedPeers() {
+       t.iterPeers(func(p *Peer) {
+               remoteIp := p.remoteIp()
+               if remoteIp == nil {
+                       if p.bannableAddr.Ok() {
+                               t.logger.WithDefaultLevel(log.Debug).Printf("can't get remote ip for peer %v", p)
+                       }
+                       return
+               }
+               netipAddr := netip.MustParseAddr(remoteIp.String())
+               if Some(netipAddr) != p.bannableAddr {
+                       t.logger.WithDefaultLevel(log.Debug).Printf(
+                               "peer remote ip does not match its bannable addr [peer=%v, remote ip=%v, bannable addr=%v]",
+                               p, remoteIp, p.bannableAddr)
+               }
+               if _, ok := t.cl.badPeerIPs[netipAddr]; ok {
+                       // Should this be a close?
+                       p.drop()
+                       t.logger.WithDefaultLevel(log.Debug).Printf("dropped %v for banned remote IP %v", p, netipAddr)
+               }
+       })
+}
+
 func (t *Torrent) pieceHasher(index pieceIndex) {
        p := t.piece(index)
-       sum, copyErr := t.hashPiece(index)
+       sum, failedPeers, copyErr := t.hashPiece(index)
        correct := sum == *p.hash
        switch copyErr {
        case nil, io.EOF:
@@ -2067,6 +2184,16 @@ func (t *Torrent) pieceHasher(index pieceIndex) {
        t.storageLock.RUnlock()
        t.cl.lock()
        defer t.cl.unlock()
+       if correct {
+               for peer := range failedPeers {
+                       t.cl.banPeerIP(peer.AsSlice())
+                       t.logger.WithDefaultLevel(log.Debug).Printf("smart banned %v for piece %v", peer, index)
+               }
+               t.dropBannedPeers()
+               for ri := t.pieceRequestIndexOffset(index); ri < t.pieceRequestIndexOffset(index+1); ri++ {
+                       t.smartBanCache.ForgetBlock(ri)
+               }
+       }
        p.hashing = false
        t.pieceHashed(index, correct, copyErr)
        t.updatePiecePriority(index, "Torrent.pieceHasher")
@@ -2200,6 +2327,7 @@ func (t *Torrent) DisallowDataUpload() {
        defer t.cl.unlock()
        t.dataUploadDisallowed = true
        for c := range t.conns {
+               // TODO: This doesn't look right. Shouldn't we tickle writers to choke peers or something instead?
                c.updateRequests("disallow data upload")
        }
 }
@@ -2225,7 +2353,24 @@ func (t *Torrent) callbacks() *Callbacks {
        return &t.cl.config.Callbacks
 }
 
-func (t *Torrent) addWebSeed(url string) {
+type AddWebSeedsOpt func(*webseed.Client)
+
+// Sets the WebSeed trailing path escaper for a webseed.Client.
+func WebSeedPathEscaper(custom webseed.PathEscaper) AddWebSeedsOpt {
+       return func(c *webseed.Client) {
+               c.PathEscaper = custom
+       }
+}
+
+func (t *Torrent) AddWebSeeds(urls []string, opts ...AddWebSeedsOpt) {
+       t.cl.lock()
+       defer t.cl.unlock()
+       for _, u := range urls {
+               t.addWebSeed(u, opts...)
+       }
+}
+
+func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) {
        if t.cl.config.DisableWebseeds {
                return
        }
@@ -2249,16 +2394,27 @@ func (t *Torrent) addWebSeed(url string) {
                        // requests mark more often, so recomputation is probably sooner than with regular peer
                        // conns. ~4x maxRequests would be about right.
                        PeerMaxRequests: 128,
-                       RemoteAddr:      remoteAddrFromUrl(url),
-                       callbacks:       t.callbacks(),
+                       // TODO: Set ban prefix?
+                       RemoteAddr: remoteAddrFromUrl(url),
+                       callbacks:  t.callbacks(),
                },
                client: webseed.Client{
-                       HttpClient: t.cl.webseedHttpClient,
+                       HttpClient: t.cl.httpClient,
                        Url:        url,
+                       ResponseBodyWrapper: func(r io.Reader) io.Reader {
+                               return &rateLimitedReader{
+                                       l: t.cl.config.DownloadRateLimiter,
+                                       r: r,
+                               }
+                       },
                },
                activeRequests: make(map[Request]webseed.Request, maxRequests),
                maxRequests:    maxRequests,
        }
+       ws.peer.initRequestState()
+       for _, opt := range opts {
+               opt(&ws.client)
+       }
        ws.peer.initUpdateRequestsTimer()
        ws.requesterCond.L = t.cl.locker()
        for i := 0; i < maxRequests; i += 1 {
@@ -2285,15 +2441,15 @@ func (t *Torrent) peerIsActive(p *Peer) (active bool) {
 }
 
 func (t *Torrent) requestIndexToRequest(ri RequestIndex) Request {
-       index := ri / t.chunksPerRegularPiece()
+       index := t.pieceIndexOfRequestIndex(ri)
        return Request{
                pp.Integer(index),
-               t.piece(int(index)).chunkIndexSpec(ri % t.chunksPerRegularPiece()),
+               t.piece(index).chunkIndexSpec(ri % t.chunksPerRegularPiece()),
        }
 }
 
 func (t *Torrent) requestIndexFromRequest(r Request) RequestIndex {
-       return t.pieceRequestIndexOffset(pieceIndex(r.Index)) + uint32(r.Begin/t.chunkSize)
+       return t.pieceRequestIndexOffset(pieceIndex(r.Index)) + RequestIndex(r.Begin/t.chunkSize)
 }
 
 func (t *Torrent) pieceRequestIndexOffset(piece pieceIndex) RequestIndex {
@@ -2305,14 +2461,68 @@ func (t *Torrent) updateComplete() {
 }
 
 func (t *Torrent) cancelRequest(r RequestIndex) *Peer {
-       p := t.pendingRequests[r]
+       p := t.requestingPeer(r)
        if p != nil {
                p.cancel(r)
        }
-       delete(t.pendingRequests, r)
+       // TODO: This is a check that an old invariant holds. It can be removed after some testing.
+       //delete(t.pendingRequests, r)
+       var zeroRequestState requestState
+       if t.requestState[r] != zeroRequestState {
+               panic("expected request state to be gone")
+       }
        return p
 }
 
 func (t *Torrent) requestingPeer(r RequestIndex) *Peer {
-       return t.pendingRequests[r]
+       return t.requestState[r].peer
+}
+
+func (t *Torrent) addConnWithAllPieces(p *Peer) {
+       if t.connsWithAllPieces == nil {
+               t.connsWithAllPieces = make(map[*Peer]struct{}, t.maxEstablishedConns)
+       }
+       t.connsWithAllPieces[p] = struct{}{}
+}
+
+func (t *Torrent) deleteConnWithAllPieces(p *Peer) bool {
+       _, ok := t.connsWithAllPieces[p]
+       delete(t.connsWithAllPieces, p)
+       return ok
+}
+
+func (t *Torrent) numActivePeers() int {
+       return len(t.conns) + len(t.webSeeds)
+}
+
+func (t *Torrent) hasStorageCap() bool {
+       f := t.storage.Capacity
+       if f == nil {
+               return false
+       }
+       _, ok := (*f)()
+       return ok
+}
+
+func (t *Torrent) pieceIndexOfRequestIndex(ri RequestIndex) pieceIndex {
+       return pieceIndex(ri / t.chunksPerRegularPiece())
+}
+
+func (t *Torrent) iterUndirtiedRequestIndexesInPiece(
+       reuseIter *typedRoaring.Iterator[RequestIndex],
+       piece pieceIndex,
+       f func(RequestIndex),
+) {
+       reuseIter.Initialize(&t.dirtyChunks)
+       pieceRequestIndexOffset := t.pieceRequestIndexOffset(piece)
+       iterBitmapUnsetInRange(
+               reuseIter,
+               pieceRequestIndexOffset, pieceRequestIndexOffset+t.pieceNumChunks(piece),
+               f,
+       )
+}
+
+type requestState struct {
+       peer *Peer
+       when time.Time
 }