]> Sergey Matveev's repositories - btrtrc.git/blobdiff - torrent.go
Use reusable roaring iterators
[btrtrc.git] / torrent.go
index b9ecb0f636ac0764f25f395cd71bdcb38bd06224..bd7b75738471913dc9ddb2da784f82b71f814800 100644 (file)
@@ -8,7 +8,8 @@ import (
        "errors"
        "fmt"
        "io"
-       "net/http"
+       "math/rand"
+       "net/netip"
        "net/url"
        "sort"
        "strings"
@@ -20,16 +21,18 @@ import (
        "github.com/anacrolix/chansync"
        "github.com/anacrolix/chansync/events"
        "github.com/anacrolix/dht/v2"
+       . "github.com/anacrolix/generics"
        "github.com/anacrolix/log"
        "github.com/anacrolix/missinggo/perf"
-       "github.com/anacrolix/missinggo/pubsub"
        "github.com/anacrolix/missinggo/slices"
        "github.com/anacrolix/missinggo/v2"
        "github.com/anacrolix/missinggo/v2/bitmap"
+       "github.com/anacrolix/missinggo/v2/pubsub"
        "github.com/anacrolix/multiless"
        "github.com/anacrolix/sync"
+       request_strategy "github.com/anacrolix/torrent/request-strategy"
+       typedRoaring "github.com/anacrolix/torrent/typed-roaring"
        "github.com/davecgh/go-spew/spew"
-       "github.com/google/go-cmp/cmp"
        "github.com/pion/datachannel"
 
        "github.com/anacrolix/torrent/bencode"
@@ -60,8 +63,11 @@ type Torrent struct {
        closed   chansync.SetOnce
        infoHash metainfo.Hash
        pieces   []Piece
+
+       // The order pieces are requested if there's no stronger reason like availability or priority.
+       pieceRequestOrder []int
        // Values are the piece indices that changed.
-       pieceStateChanges *pubsub.PubSub
+       pieceStateChanges pubsub.PubSub[PieceStateChange]
        // The size of chunks to request from peers over the wire. This is
        // normally 16KiB by convention these days.
        chunkSize pp.Integer
@@ -85,8 +91,9 @@ type Torrent struct {
        fileIndex segments.Index
        files     *[]*File
 
-       webSeeds map[string]*Peer
+       _chunksPerRegularPiece chunkIndexType
 
+       webSeeds map[string]*Peer
        // Active peer connections, running message stream loops. TODO: Make this
        // open (not-closed) connections only.
        conns               map[*PeerConn]struct{}
@@ -138,19 +145,34 @@ type Torrent struct {
        activePieceHashes         int
        initialPieceCheckDisabled bool
 
-       // Count of each request across active connections.
-       pendingRequests pendingRequests
+       connsWithAllPieces map[*Peer]struct{}
+
+       requestState []requestState
        // Chunks we've written to since the corresponding piece was last checked.
-       dirtyChunks roaring.Bitmap
+       dirtyChunks typedRoaring.Bitmap[RequestIndex]
 
        pex pexState
 
        // Is On when all pieces are complete.
        Complete chansync.Flag
+
+       // Torrent sources in use keyed by the source string.
+       activeSources sync.Map
+       sourcesLogger log.Logger
+
+       smartBanCache smartBanCache
+
+       // Large allocations reused between request state updates.
+       requestPieceStates []request_strategy.PieceRequestOrderState
+       requestIndexes     []RequestIndex
 }
 
-func (t *Torrent) pieceAvailabilityFromPeers(i pieceIndex) (count int) {
+func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) {
+       // This could be done with roaring.BitSliceIndexing.
        t.iterPeers(func(peer *Peer) {
+               if _, ok := t.connsWithAllPieces[peer]; ok {
+                       return
+               }
                if peer.peerHasPiece(i) {
                        count++
                }
@@ -163,17 +185,19 @@ func (t *Torrent) decPieceAvailability(i pieceIndex) {
                return
        }
        p := t.piece(i)
-       if p.availability <= 0 {
-               panic(p.availability)
+       if p.relativeAvailability <= 0 {
+               panic(p.relativeAvailability)
        }
-       p.availability--
+       p.relativeAvailability--
+       t.updatePieceRequestOrder(i)
 }
 
 func (t *Torrent) incPieceAvailability(i pieceIndex) {
        // If we don't the info, this should be reconciled when we do.
        if t.haveInfo() {
                p := t.piece(i)
-               p.availability++
+               p.relativeAvailability++
+               t.updatePieceRequestOrder(i)
        }
 }
 
@@ -241,6 +265,9 @@ func (t *Torrent) pieceComplete(piece pieceIndex) bool {
 }
 
 func (t *Torrent) pieceCompleteUncached(piece pieceIndex) storage.Completion {
+       if t.storage == nil {
+               return storage.Completion{Complete: false, Ok: true}
+       }
        return t.pieces[piece].Storage().Completion()
 }
 
@@ -259,8 +286,14 @@ func (t *Torrent) addrActive(addr string) bool {
 }
 
 func (t *Torrent) appendUnclosedConns(ret []*PeerConn) []*PeerConn {
+       return t.appendConns(ret, func(conn *PeerConn) bool {
+               return !conn.closed.IsSet()
+       })
+}
+
+func (t *Torrent) appendConns(ret []*PeerConn, f func(*PeerConn) bool) []*PeerConn {
        for c := range t.conns {
-               if !c.closed.IsSet() {
+               if f(c) {
                        ret = append(ret, c)
                }
        }
@@ -357,11 +390,6 @@ func (t *Torrent) makePieces() {
                beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files)
                endFile := pieceEndFileIndex(piece.torrentEndOffset(), files)
                piece.files = files[beginFile:endFile]
-               piece.undirtiedChunksIter = undirtiedChunksIter{
-                       TorrentDirtyChunks: &t.dirtyChunks,
-                       StartRequestIndex:  piece.requestIndexOffset(),
-                       EndRequestIndex:    piece.requestIndexOffset() + piece.numChunks(),
-               }
        }
 }
 
@@ -411,6 +439,7 @@ func (t *Torrent) setInfo(info *metainfo.Info) error {
        t.nameMu.Lock()
        t.info = info
        t.nameMu.Unlock()
+       t._chunksPerRegularPiece = chunkIndexType((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
        t.updateComplete()
        t.fileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles()))
        t.displayName = "" // Save a few bytes lol.
@@ -420,26 +449,37 @@ func (t *Torrent) setInfo(info *metainfo.Info) error {
        return nil
 }
 
+func (t *Torrent) pieceRequestOrderKey(i int) request_strategy.PieceRequestOrderKey {
+       return request_strategy.PieceRequestOrderKey{
+               InfoHash: t.infoHash,
+               Index:    i,
+       }
+}
+
 // This seems to be all the follow-up tasks after info is set, that can't fail.
 func (t *Torrent) onSetInfo() {
+       t.pieceRequestOrder = rand.Perm(t.numPieces())
+       t.initPieceRequestOrder()
+       MakeSliceWithLength(&t.requestState, t.numChunks())
+       MakeSliceWithLength(&t.requestPieceStates, t.numPieces())
        for i := range t.pieces {
                p := &t.pieces[i]
-               // Need to add availability before updating piece completion, as that may result in conns
+               // Need to add relativeAvailability before updating piece completion, as that may result in conns
                // being dropped.
-               if p.availability != 0 {
-                       panic(p.availability)
+               if p.relativeAvailability != 0 {
+                       panic(p.relativeAvailability)
                }
-               p.availability = int64(t.pieceAvailabilityFromPeers(i))
-               t.updatePieceCompletion(pieceIndex(i))
+               p.relativeAvailability = t.selectivePieceAvailabilityFromPeers(i)
+               t.addRequestOrderPiece(i)
+               t.updatePieceCompletion(i)
                if !t.initialPieceCheckDisabled && !p.storageCompletionOk {
                        // t.logger.Printf("piece %s completion unknown, queueing check", p)
-                       t.queuePieceCheck(pieceIndex(i))
+                       t.queuePieceCheck(i)
                }
        }
        t.cl.event.Broadcast()
        close(t.gotMetainfoC)
        t.updateWantPeersEvent()
-       t.pendingRequests.Init(t.numRequests())
        t.tryCreateMorePieceHashers()
        t.iterPeers(func(p *Peer) {
                p.onGotInfo(t.info)
@@ -510,7 +550,7 @@ func (t *Torrent) name() string {
        t.nameMu.RLock()
        defer t.nameMu.RUnlock()
        if t.haveInfo() {
-               return t.info.Name
+               return t.info.BestName()
        }
        if t.displayName != "" {
                return t.displayName
@@ -549,25 +589,33 @@ func (t *Torrent) newMetadataExtensionMessage(c *PeerConn, msgType pp.ExtendedMe
 }
 
 type pieceAvailabilityRun struct {
-       count        pieceIndex
-       availability int64
+       Count        pieceIndex
+       Availability int
 }
 
 func (me pieceAvailabilityRun) String() string {
-       return fmt.Sprintf("%v(%v)", me.count, me.availability)
+       return fmt.Sprintf("%v(%v)", me.Count, me.Availability)
 }
 
 func (t *Torrent) pieceAvailabilityRuns() (ret []pieceAvailabilityRun) {
        rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
-               ret = append(ret, pieceAvailabilityRun{availability: el.(int64), count: int(count)})
+               ret = append(ret, pieceAvailabilityRun{Availability: el.(int), Count: int(count)})
        })
        for i := range t.pieces {
-               rle.Append(t.pieces[i].availability, 1)
+               rle.Append(t.pieces[i].availability(), 1)
        }
        rle.Flush()
        return
 }
 
+func (t *Torrent) pieceAvailabilityFrequencies() (freqs []int) {
+       freqs = make([]int, t.numActivePeers()+1)
+       for i := range t.pieces {
+               freqs[t.piece(i).availability()]++
+       }
+       return
+}
+
 func (t *Torrent) pieceStateRuns() (ret PieceStateRuns) {
        rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
                ret = append(ret, PieceStateRun{
@@ -652,12 +700,27 @@ func (t *Torrent) writeStatus(w io.Writer) {
        if t.info != nil {
                fmt.Fprintf(w, "Num Pieces: %d (%d completed)\n", t.numPieces(), t.numPiecesCompleted())
                fmt.Fprintf(w, "Piece States: %s\n", t.pieceStateRuns())
-               fmt.Fprintf(w, "Piece availability: %v\n", strings.Join(func() (ret []string) {
-                       for _, run := range t.pieceAvailabilityRuns() {
-                               ret = append(ret, run.String())
-                       }
-                       return
-               }(), " "))
+               // Generates a huge, unhelpful listing when piece availability is very scattered. Prefer
+               // availability frequencies instead.
+               if false {
+                       fmt.Fprintf(w, "Piece availability: %v\n", strings.Join(func() (ret []string) {
+                               for _, run := range t.pieceAvailabilityRuns() {
+                                       ret = append(ret, run.String())
+                               }
+                               return
+                       }(), " "))
+               }
+               fmt.Fprintf(w, "Piece availability frequency: %v\n", strings.Join(
+                       func() (ret []string) {
+                               for avail, freq := range t.pieceAvailabilityFrequencies() {
+                                       if freq == 0 {
+                                               continue
+                                       }
+                                       ret = append(ret, fmt.Sprintf("%v: %v", avail, freq))
+                               }
+                               return
+                       }(),
+                       ", "))
        }
        fmt.Fprintf(w, "Reader Pieces:")
        t.forReaderOffsetPieces(func(begin, end pieceIndex) (again bool) {
@@ -786,7 +849,7 @@ func (t *Torrent) usualPieceSize() int {
 }
 
 func (t *Torrent) numPieces() pieceIndex {
-       return pieceIndex(t.info.NumPieces())
+       return t.info.NumPieces()
 }
 
 func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
@@ -794,7 +857,10 @@ func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
 }
 
 func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
-       t.closed.Set()
+       if !t.closed.Set() {
+               err = errors.New("already closed")
+               return
+       }
        if t.storage != nil {
                wg.Add(1)
                go func() {
@@ -812,6 +878,15 @@ func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
        t.iterPeers(func(p *Peer) {
                p.close()
        })
+       if t.storage != nil {
+               t.deletePieceRequestOrder()
+       }
+       for i := range t.pieces {
+               p := t.piece(i)
+               if p.relativeAvailability != 0 {
+                       panic(fmt.Sprintf("piece %v has relative availability %v", i, p.relativeAvailability))
+               }
+       }
        t.pex.Reset()
        t.cl.event.Broadcast()
        t.pieceStateChanges.Close()
@@ -851,15 +926,15 @@ func (t *Torrent) pieceNumChunks(piece pieceIndex) chunkIndexType {
        return chunkIndexType((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
 }
 
-func (t *Torrent) chunksPerRegularPiece() uint32 {
-       return uint32((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
+func (t *Torrent) chunksPerRegularPiece() chunkIndexType {
+       return t._chunksPerRegularPiece
 }
 
-func (t *Torrent) numRequests() RequestIndex {
+func (t *Torrent) numChunks() RequestIndex {
        if t.numPieces() == 0 {
                return 0
        }
-       return uint32(t.numPieces()-1)*t.chunksPerRegularPiece() + t.pieceNumChunks(t.numPieces()-1)
+       return RequestIndex(t.numPieces()-1)*t.chunksPerRegularPiece() + t.pieceNumChunks(t.numPieces()-1)
 }
 
 func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) {
@@ -882,7 +957,20 @@ func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer {
        return pp.Integer(t.info.PieceLength)
 }
 
-func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash, err error) {
+func (t *Torrent) smartBanBlockCheckingWriter(piece pieceIndex) *blockCheckingWriter {
+       return &blockCheckingWriter{
+               cache:        &t.smartBanCache,
+               requestIndex: t.pieceRequestIndexOffset(piece),
+               chunkSize:    t.chunkSize.Int(),
+       }
+}
+
+func (t *Torrent) hashPiece(piece pieceIndex) (
+       ret metainfo.Hash,
+       // These are peers that sent us blocks that differ from what we hash here.
+       differingPeers map[bannableAddr]struct{},
+       err error,
+) {
        p := t.piece(piece)
        p.waitNoPendingWrites()
        storagePiece := t.pieces[piece].Storage()
@@ -898,19 +986,24 @@ func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash, err error) {
 
        hash := pieceHash.New()
        const logPieceContents = false
+       smartBanWriter := t.smartBanBlockCheckingWriter(piece)
+       writers := []io.Writer{hash, smartBanWriter}
+       var examineBuf bytes.Buffer
        if logPieceContents {
-               var examineBuf bytes.Buffer
-               _, err = storagePiece.WriteTo(io.MultiWriter(hash, &examineBuf))
-               log.Printf("hashed %q with copy err %v", examineBuf.Bytes(), err)
-       } else {
-               _, err = storagePiece.WriteTo(hash)
+               writers = append(writers, &examineBuf)
+       }
+       _, err = storagePiece.WriteTo(io.MultiWriter(writers...))
+       if logPieceContents {
+               t.logger.WithDefaultLevel(log.Debug).Printf("hashed %q with copy err %v", examineBuf.Bytes(), err)
        }
+       smartBanWriter.Flush()
+       differingPeers = smartBanWriter.badPeers
        missinggo.CopyExact(&ret, hash.Sum(nil))
        return
 }
 
 func (t *Torrent) haveAnyPieces() bool {
-       return t._completedPieces.GetCardinality() != 0
+       return !t._completedPieces.IsEmpty()
 }
 
 func (t *Torrent) haveAllPieces() bool {
@@ -971,21 +1064,23 @@ func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
 // conns (which is a map).
 var peerConnSlices sync.Pool
 
+func getPeerConnSlice(cap int) []*PeerConn {
+       getInterface := peerConnSlices.Get()
+       if getInterface == nil {
+               return make([]*PeerConn, 0, cap)
+       } else {
+               return getInterface.([]*PeerConn)[:0]
+       }
+}
+
 // The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad
 // connection is one that usually sends us unwanted pieces, or has been in the worse half of the
 // established connections for more than a minute. This is O(n log n). If there was a way to not
 // consider the position of a conn relative to the total number, it could be reduced to O(n).
 func (t *Torrent) worstBadConn() (ret *PeerConn) {
-       var sl []*PeerConn
-       getInterface := peerConnSlices.Get()
-       if getInterface == nil {
-               sl = make([]*PeerConn, 0, len(t.conns))
-       } else {
-               sl = getInterface.([]*PeerConn)[:0]
-       }
-       sl = t.appendUnclosedConns(sl)
-       defer peerConnSlices.Put(sl)
-       wcs := worseConnSlice{sl}
+       wcs := worseConnSlice{conns: t.appendUnclosedConns(getPeerConnSlice(len(t.conns)))}
+       defer peerConnSlices.Put(wcs.conns)
+       wcs.initKeys()
        heap.Init(&wcs)
        for wcs.Len() != 0 {
                c := heap.Pop(&wcs).(*PeerConn)
@@ -1076,15 +1171,18 @@ func (t *Torrent) maybeNewConns() {
 func (t *Torrent) piecePriorityChanged(piece pieceIndex, reason string) {
        if t._pendingPieces.Contains(uint32(piece)) {
                t.iterPeers(func(c *Peer) {
-                       if c.actualRequestState.Interested {
-                               return
-                       }
+                       // if c.requestState.Interested {
+                       //      return
+                       // }
                        if !c.isLowOnRequests() {
                                return
                        }
                        if !c.peerHasPiece(piece) {
                                return
                        }
+                       if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) {
+                               return
+                       }
                        c.updateRequests(reason)
                })
        }
@@ -1093,6 +1191,11 @@ func (t *Torrent) piecePriorityChanged(piece pieceIndex, reason string) {
 }
 
 func (t *Torrent) updatePiecePriority(piece pieceIndex, reason string) {
+       if !t.closed.IsSet() {
+               // It would be possible to filter on pure-priority changes here to avoid churning the piece
+               // request order.
+               t.updatePieceRequestOrder(piece)
+       }
        p := &t.pieces[piece]
        newPrio := p.uncachedPriority()
        // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
@@ -1161,7 +1264,7 @@ func (t *Torrent) piecePriority(piece pieceIndex) piecePriority {
 }
 
 func (t *Torrent) pendRequest(req RequestIndex) {
-       t.piece(int(req / t.chunksPerRegularPiece())).pendChunkIndex(req % t.chunksPerRegularPiece())
+       t.piece(t.pieceIndexOfRequestIndex(req)).pendChunkIndex(req % t.chunksPerRegularPiece())
 }
 
 func (t *Torrent) pieceCompletionChanged(piece pieceIndex, reason string) {
@@ -1225,15 +1328,17 @@ func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool {
        x := uint32(piece)
        if complete {
                t._completedPieces.Add(x)
+               t.openNewConns()
        } else {
                t._completedPieces.Remove(x)
        }
+       p.t.updatePieceRequestOrder(piece)
        t.updateComplete()
        if complete && len(p.dirtiers) != 0 {
                t.logger.Printf("marked piece %v complete but still has dirtiers", piece)
        }
        if changed {
-               log.Fstr("piece %d completion changed: %+v -> %+v", piece, cached, uncached).SetLevel(log.Debug).Log(t.logger)
+               log.Fstr("piece %d completion changed: %+v -> %+v", piece, cached, uncached).LogLevel(log.Debug, t.logger)
                t.pieceCompletionChanged(piece, "Torrent.updatePieceCompletion")
        }
        return changed
@@ -1363,16 +1468,22 @@ func (t *Torrent) deletePeerConn(c *PeerConn) (ret bool) {
                }
        }
        torrent.Add("deleted connections", 1)
-       c.deleteAllRequests()
+       c.deleteAllRequests("Torrent.deletePeerConn")
        t.assertPendingRequests()
+       if t.numActivePeers() == 0 && len(t.connsWithAllPieces) != 0 {
+               panic(t.connsWithAllPieces)
+       }
        return
 }
 
 func (t *Torrent) decPeerPieceAvailability(p *Peer) {
+       if t.deleteConnWithAllPieces(p) {
+               return
+       }
        if !t.haveInfo() {
                return
        }
-       p.newPeerPieces().Iterate(func(i uint32) bool {
+       p.peerPieces().Iterate(func(i uint32) bool {
                p.t.decPieceAvailability(pieceIndex(i))
                return true
        })
@@ -1382,20 +1493,20 @@ func (t *Torrent) assertPendingRequests() {
        if !check {
                return
        }
-       var actual pendingRequests
-       if t.haveInfo() {
-               actual.m = make([]int, t.numRequests())
-       }
-       t.iterPeers(func(p *Peer) {
-               p.actualRequestState.Requests.Iterate(func(x uint32) bool {
-                       actual.Inc(x)
-                       return true
-               })
-       })
-       diff := cmp.Diff(actual.m, t.pendingRequests.m)
-       if diff != "" {
-               panic(diff)
-       }
+       // var actual pendingRequests
+       // if t.haveInfo() {
+       //      actual.m = make([]int, t.numChunks())
+       // }
+       // t.iterPeers(func(p *Peer) {
+       //      p.requestState.Requests.Iterate(func(x uint32) bool {
+       //              actual.Inc(x)
+       //              return true
+       //      })
+       // })
+       // diff := cmp.Diff(actual.m, t.pendingRequests.m)
+       // if diff != "" {
+       //      panic(diff)
+       // }
 }
 
 func (t *Torrent) dropConnection(c *PeerConn) {
@@ -1406,6 +1517,7 @@ func (t *Torrent) dropConnection(c *PeerConn) {
        }
 }
 
+// Peers as in contact information for dialing out.
 func (t *Torrent) wantPeers() bool {
        if t.closed.IsSet() {
                return false
@@ -1413,7 +1525,7 @@ func (t *Torrent) wantPeers() bool {
        if t.peers.Len() > t.cl.config.TorrentPeersLowWater {
                return false
        }
-       return t.needData() || t.seeding()
+       return t.wantConns()
 }
 
 func (t *Torrent) updateWantPeersEvent() {
@@ -1450,15 +1562,23 @@ func (t *Torrent) onWebRtcConn(
        dcc webtorrent.DataChannelContext,
 ) {
        defer c.Close()
+       netConn := webrtcNetConn{
+               ReadWriteCloser:    c,
+               DataChannelContext: dcc,
+       }
+       peerRemoteAddr := netConn.RemoteAddr()
+       if t.cl.badPeerAddr(peerRemoteAddr) {
+               return
+       }
        pc, err := t.cl.initiateProtocolHandshakes(
                context.Background(),
-               webrtcNetConn{c, dcc},
+               netConn,
                t,
                dcc.LocalOffered,
                false,
-               webrtcNetAddr{dcc.Remote},
+               netConn.RemoteAddr(),
                webrtcNetwork,
-               fmt.Sprintf("webrtc offer_id %x", dcc.OfferId),
+               fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)),
        )
        if err != nil {
                t.logger.WithDefaultLevel(log.Error).Printf("error in handshaking webrtc connection: %v", err)
@@ -1469,6 +1589,7 @@ func (t *Torrent) onWebRtcConn(
        } else {
                pc.Discovery = PeerSourceIncoming
        }
+       pc.conn.SetWriteDeadline(time.Time{})
        t.cl.lock()
        defer t.cl.unlock()
        err = t.cl.runHandshookConn(pc, t)
@@ -1670,7 +1791,10 @@ func (t *Torrent) dhtAnnouncer(s DhtServer) {
                        if t.closed.IsSet() {
                                return
                        }
-                       if !t.wantPeers() {
+                       // We're also announcing ourselves as a listener, so we don't just want peer addresses.
+                       // TODO: We can include the announce_peer step depending on whether we can receive
+                       // inbound connections. We should probably only announce once every 15 mins too.
+                       if !t.wantConns() {
                                goto wait
                        }
                        // TODO: Determine if there's a listener on the port we're announcing.
@@ -1780,7 +1904,7 @@ func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
                if !t.cl.config.DropDuplicatePeerIds {
                        continue
                }
-               if left, ok := c.hasPreferredNetworkOver(c0); ok && left {
+               if c.hasPreferredNetworkOver(c0) {
                        c0.close()
                        t.deletePeerConn(c0)
                } else {
@@ -1812,13 +1936,10 @@ func (t *Torrent) wantConns() bool {
        if t.closed.IsSet() {
                return false
        }
-       if !t.seeding() && !t.needData() {
+       if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) {
                return false
        }
-       if len(t.conns) < t.maxEstablishedConns {
-               return true
-       }
-       return t.worstBadConn() != nil
+       return len(t.conns) < t.maxEstablishedConns || t.worstBadConn() != nil
 }
 
 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
@@ -1826,18 +1947,24 @@ func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
        defer t.cl.unlock()
        oldMax = t.maxEstablishedConns
        t.maxEstablishedConns = max
-       wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), func(l, r *PeerConn) bool {
-               return worseConn(&l.Peer, &r.Peer)
-       })
+       wcs := worseConnSlice{
+               conns: t.appendConns(nil, func(*PeerConn) bool {
+                       return true
+               }),
+       }
+       wcs.initKeys()
+       heap.Init(&wcs)
        for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
-               t.dropConnection(wcs.Pop().(*PeerConn))
+               t.dropConnection(heap.Pop(&wcs).(*PeerConn))
        }
        t.openNewConns()
        return oldMax
 }
 
 func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
-       t.logger.Log(log.Fstr("hashed piece %d (passed=%t)", piece, passed).SetLevel(log.Debug))
+       t.logger.LazyLog(log.Debug, func() log.Msg {
+               return log.Fstr("hashed piece %d (passed=%t)", piece, passed)
+       })
        p := t.piece(piece)
        p.numVerifies++
        t.cl.event.Broadcast()
@@ -1852,7 +1979,10 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
                } else {
                        log.Fmsg(
                                "piece %d failed hash: %d connections contributed", piece, len(p.dirtiers),
-                       ).AddValues(t, p).SetLevel(log.Debug).Log(t.logger)
+                       ).AddValues(t, p).LogLevel(
+
+                               log.Debug, t.logger)
+
                        pieceHashedNotCorrect.Add(1)
                }
        }
@@ -1920,8 +2050,16 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
 
                        if len(bannableTouchers) >= 1 {
                                c := bannableTouchers[0]
-                               t.cl.banPeerIP(c.remoteIp())
-                               c.drop()
+                               if len(bannableTouchers) != 1 {
+                                       t.logger.Levelf(log.Warning, "would have banned %v for touching piece %v after failed piece check", c.remoteIp(), piece)
+                               } else {
+                                       // Turns out it's still useful to ban peers like this because if there's only a
+                                       // single peer for a piece, and we never progress that piece to completion, we
+                                       // will never smart-ban them. Discovered in
+                                       // https://github.com/anacrolix/torrent/issues/715.
+                                       t.logger.Levelf(log.Warning, "banning %v for being sole dirtier of piece %v after failed piece check", c, piece)
+                                       c.ban()
+                               }
                        }
                }
                t.onIncompletePiece(piece)
@@ -1931,9 +2069,10 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
 }
 
 func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) {
-       // TODO: Make faster
-       for cn := range t.conns {
-               cn.tickleWriter()
+       start := t.pieceRequestIndexOffset(piece)
+       end := start + t.pieceNumChunks(piece)
+       for ri := start; ri < end; ri++ {
+               t.cancelRequest(ri)
        }
 }
 
@@ -2010,9 +2149,32 @@ func (t *Torrent) getPieceToHash() (ret pieceIndex, ok bool) {
        return
 }
 
+func (t *Torrent) dropBannedPeers() {
+       t.iterPeers(func(p *Peer) {
+               remoteIp := p.remoteIp()
+               if remoteIp == nil {
+                       if p.bannableAddr.Ok() {
+                               t.logger.WithDefaultLevel(log.Debug).Printf("can't get remote ip for peer %v", p)
+                       }
+                       return
+               }
+               netipAddr := netip.MustParseAddr(remoteIp.String())
+               if Some(netipAddr) != p.bannableAddr {
+                       t.logger.WithDefaultLevel(log.Debug).Printf(
+                               "peer remote ip does not match its bannable addr [peer=%v, remote ip=%v, bannable addr=%v]",
+                               p, remoteIp, p.bannableAddr)
+               }
+               if _, ok := t.cl.badPeerIPs[netipAddr]; ok {
+                       // Should this be a close?
+                       p.drop()
+                       t.logger.WithDefaultLevel(log.Debug).Printf("dropped %v for banned remote IP %v", p, netipAddr)
+               }
+       })
+}
+
 func (t *Torrent) pieceHasher(index pieceIndex) {
        p := t.piece(index)
-       sum, copyErr := t.hashPiece(index)
+       sum, failedPeers, copyErr := t.hashPiece(index)
        correct := sum == *p.hash
        switch copyErr {
        case nil, io.EOF:
@@ -2022,6 +2184,16 @@ func (t *Torrent) pieceHasher(index pieceIndex) {
        t.storageLock.RUnlock()
        t.cl.lock()
        defer t.cl.unlock()
+       if correct {
+               for peer := range failedPeers {
+                       t.cl.banPeerIP(peer.AsSlice())
+                       t.logger.WithDefaultLevel(log.Debug).Printf("smart banned %v for piece %v", peer, index)
+               }
+               t.dropBannedPeers()
+               for ri := t.pieceRequestIndexOffset(index); ri < t.pieceRequestIndexOffset(index+1); ri++ {
+                       t.smartBanCache.ForgetBlock(ri)
+               }
+       }
        p.hashing = false
        t.pieceHashed(index, correct, copyErr)
        t.updatePiecePriority(index, "Torrent.pieceHasher")
@@ -2155,6 +2327,7 @@ func (t *Torrent) DisallowDataUpload() {
        defer t.cl.unlock()
        t.dataUploadDisallowed = true
        for c := range t.conns {
+               // TODO: This doesn't look right. Shouldn't we tickle writers to choke peers or something instead?
                c.updateRequests("disallow data upload")
        }
 }
@@ -2180,23 +2353,36 @@ func (t *Torrent) callbacks() *Callbacks {
        return &t.cl.config.Callbacks
 }
 
-var WebseedHttpClient = &http.Client{
-       Transport: &http.Transport{
-               MaxConnsPerHost: 10,
-       },
+type AddWebSeedsOpt func(*webseed.Client)
+
+// Sets the WebSeed trailing path escaper for a webseed.Client.
+func WebSeedPathEscaper(custom webseed.PathEscaper) AddWebSeedsOpt {
+       return func(c *webseed.Client) {
+               c.PathEscaper = custom
+       }
 }
 
-func (t *Torrent) addWebSeed(url string) {
+func (t *Torrent) AddWebSeeds(urls []string, opts ...AddWebSeedsOpt) {
+       t.cl.lock()
+       defer t.cl.unlock()
+       for _, u := range urls {
+               t.addWebSeed(u, opts...)
+       }
+}
+
+func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) {
        if t.cl.config.DisableWebseeds {
                return
        }
        if _, ok := t.webSeeds[url]; ok {
                return
        }
-       // I don't think Go http supports pipelining requests. However we can have more ready to go
+       // I don't think Go http supports pipelining requests. However, we can have more ready to go
        // right away. This value should be some multiple of the number of connections to a host. I
-       // would expect that double maxRequests plus a bit would be appropriate.
-       const maxRequests = 32
+       // would expect that double maxRequests plus a bit would be appropriate. This value is based on
+       // downloading Sintel (08ada5a7a6183aae1e09d831df6748d566095a10) from
+       // "https://webtorrent.io/torrents/".
+       const maxRequests = 16
        ws := webseedPeer{
                peer: Peer{
                        t:                        t,
@@ -2208,21 +2394,31 @@ func (t *Torrent) addWebSeed(url string) {
                        // requests mark more often, so recomputation is probably sooner than with regular peer
                        // conns. ~4x maxRequests would be about right.
                        PeerMaxRequests: 128,
-                       RemoteAddr:      remoteAddrFromUrl(url),
-                       callbacks:       t.callbacks(),
+                       // TODO: Set ban prefix?
+                       RemoteAddr: remoteAddrFromUrl(url),
+                       callbacks:  t.callbacks(),
                },
                client: webseed.Client{
-                       // Consider a MaxConnsPerHost in the transport for this, possibly in a global Client.
-                       HttpClient: WebseedHttpClient,
+                       HttpClient: t.cl.httpClient,
                        Url:        url,
+                       ResponseBodyWrapper: func(r io.Reader) io.Reader {
+                               return &rateLimitedReader{
+                                       l: t.cl.config.DownloadRateLimiter,
+                                       r: r,
+                               }
+                       },
                },
                activeRequests: make(map[Request]webseed.Request, maxRequests),
                maxRequests:    maxRequests,
        }
+       ws.peer.initRequestState()
+       for _, opt := range opts {
+               opt(&ws.client)
+       }
        ws.peer.initUpdateRequestsTimer()
        ws.requesterCond.L = t.cl.locker()
        for i := 0; i < maxRequests; i += 1 {
-               go ws.requester()
+               go ws.requester(i)
        }
        for _, f := range t.callbacks().NewPeer {
                f(&ws.peer)
@@ -2245,15 +2441,15 @@ func (t *Torrent) peerIsActive(p *Peer) (active bool) {
 }
 
 func (t *Torrent) requestIndexToRequest(ri RequestIndex) Request {
-       index := ri / t.chunksPerRegularPiece()
+       index := t.pieceIndexOfRequestIndex(ri)
        return Request{
                pp.Integer(index),
-               t.piece(int(index)).chunkIndexSpec(ri % t.chunksPerRegularPiece()),
+               t.piece(index).chunkIndexSpec(ri % t.chunksPerRegularPiece()),
        }
 }
 
 func (t *Torrent) requestIndexFromRequest(r Request) RequestIndex {
-       return t.pieceRequestIndexOffset(pieceIndex(r.Index)) + uint32(r.Begin/t.chunkSize)
+       return t.pieceRequestIndexOffset(pieceIndex(r.Index)) + RequestIndex(r.Begin/t.chunkSize)
 }
 
 func (t *Torrent) pieceRequestIndexOffset(piece pieceIndex) RequestIndex {
@@ -2263,3 +2459,70 @@ func (t *Torrent) pieceRequestIndexOffset(piece pieceIndex) RequestIndex {
 func (t *Torrent) updateComplete() {
        t.Complete.SetBool(t.haveAllPieces())
 }
+
+func (t *Torrent) cancelRequest(r RequestIndex) *Peer {
+       p := t.requestingPeer(r)
+       if p != nil {
+               p.cancel(r)
+       }
+       // TODO: This is a check that an old invariant holds. It can be removed after some testing.
+       //delete(t.pendingRequests, r)
+       var zeroRequestState requestState
+       if t.requestState[r] != zeroRequestState {
+               panic("expected request state to be gone")
+       }
+       return p
+}
+
+func (t *Torrent) requestingPeer(r RequestIndex) *Peer {
+       return t.requestState[r].peer
+}
+
+func (t *Torrent) addConnWithAllPieces(p *Peer) {
+       if t.connsWithAllPieces == nil {
+               t.connsWithAllPieces = make(map[*Peer]struct{}, t.maxEstablishedConns)
+       }
+       t.connsWithAllPieces[p] = struct{}{}
+}
+
+func (t *Torrent) deleteConnWithAllPieces(p *Peer) bool {
+       _, ok := t.connsWithAllPieces[p]
+       delete(t.connsWithAllPieces, p)
+       return ok
+}
+
+func (t *Torrent) numActivePeers() int {
+       return len(t.conns) + len(t.webSeeds)
+}
+
+func (t *Torrent) hasStorageCap() bool {
+       f := t.storage.Capacity
+       if f == nil {
+               return false
+       }
+       _, ok := (*f)()
+       return ok
+}
+
+func (t *Torrent) pieceIndexOfRequestIndex(ri RequestIndex) pieceIndex {
+       return pieceIndex(ri / t.chunksPerRegularPiece())
+}
+
+func (t *Torrent) iterUndirtiedRequestIndexesInPiece(
+       reuseIter *typedRoaring.Iterator[RequestIndex],
+       piece pieceIndex,
+       f func(RequestIndex),
+) {
+       reuseIter.Initialize(&t.dirtyChunks)
+       pieceRequestIndexOffset := t.pieceRequestIndexOffset(piece)
+       iterBitmapUnsetInRange(
+               reuseIter,
+               pieceRequestIndexOffset, pieceRequestIndexOffset+t.pieceNumChunks(piece),
+               f,
+       )
+}
+
+type requestState struct {
+       peer *Peer
+       when time.Time
+}