"errors"
"fmt"
"io"
+ "net/netip"
"net/url"
"sort"
"strings"
"github.com/anacrolix/chansync"
"github.com/anacrolix/chansync/events"
"github.com/anacrolix/dht/v2"
+ . "github.com/anacrolix/generics"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo/perf"
"github.com/anacrolix/missinggo/slices"
// Torrent sources in use keyed by the source string.
activeSources sync.Map
sourcesLogger log.Logger
+
+ smartBanCache smartBanCache
}
func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) {
return pp.Integer(t.info.PieceLength)
}
-func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash, err error) {
+func (t *Torrent) smartBanBlockCheckingWriter(piece pieceIndex) *blockCheckingWriter {
+ return &blockCheckingWriter{
+ cache: &t.smartBanCache,
+ requestIndex: t.pieceRequestIndexOffset(piece),
+ chunkSize: t.chunkSize.Int(),
+ }
+}
+
+func (t *Torrent) hashPiece(piece pieceIndex) (
+ ret metainfo.Hash,
+ // These are peers that sent us blocks that differ from what we hash here.
+ differingPeers map[bannableAddr]struct{},
+ err error,
+) {
p := t.piece(piece)
p.waitNoPendingWrites()
storagePiece := t.pieces[piece].Storage()
hash := pieceHash.New()
const logPieceContents = false
+ smartBanWriter := t.smartBanBlockCheckingWriter(piece)
+ writers := []io.Writer{hash, smartBanWriter}
+ var examineBuf bytes.Buffer
+ if logPieceContents {
+ writers = append(writers, &examineBuf)
+ }
+ _, err = storagePiece.WriteTo(io.MultiWriter(writers...))
if logPieceContents {
- var examineBuf bytes.Buffer
- _, err = storagePiece.WriteTo(io.MultiWriter(hash, &examineBuf))
log.Printf("hashed %q with copy err %v", examineBuf.Bytes(), err)
- } else {
- _, err = storagePiece.WriteTo(hash)
}
+ smartBanWriter.Flush()
+ differingPeers = smartBanWriter.badPeers
missinggo.CopyExact(&ret, hash.Sum(nil))
return
}
if len(bannableTouchers) >= 1 {
c := bannableTouchers[0]
- c.ban()
+ if len(bannableTouchers) != 1 {
+ t.logger.Levelf(log.Warning, "would have banned %v for touching piece %v after failed piece check", c.remoteIp(), piece)
+ } else {
+ // Turns out it's still useful to ban peers like this because if there's only a
+ // single peer for a piece, and we never progress that piece to completion, we
+ // will never smart-ban them. Discovered in
+ // https://github.com/anacrolix/torrent/issues/715.
+ t.logger.Levelf(log.Warning, "banning %v for being sole dirtier of piece %v after failed piece check", c, piece)
+ c.ban()
+ }
}
}
t.onIncompletePiece(piece)
return
}
+func (t *Torrent) dropBannedPeers() {
+ t.iterPeers(func(p *Peer) {
+ remoteIp := p.remoteIp()
+ if remoteIp == nil {
+ if p.bannableAddr.Ok() {
+ log.Printf("can't get remote ip for peer %v", p)
+ }
+ return
+ }
+ netipAddr := netip.MustParseAddr(remoteIp.String())
+ if Some(netipAddr) != p.bannableAddr {
+ log.Printf(
+ "peer remote ip does not match its bannable addr [peer=%v, remote ip=%v, bannable addr=%v]",
+ p, remoteIp, p.bannableAddr)
+ }
+ if _, ok := t.cl.badPeerIPs[netipAddr]; ok {
+ // Should this be a close?
+ p.drop()
+ log.Printf("dropped %v for banned remote IP %v", p, netipAddr)
+ }
+ })
+}
+
func (t *Torrent) pieceHasher(index pieceIndex) {
p := t.piece(index)
- sum, copyErr := t.hashPiece(index)
+ sum, failedPeers, copyErr := t.hashPiece(index)
correct := sum == *p.hash
switch copyErr {
case nil, io.EOF:
t.storageLock.RUnlock()
t.cl.lock()
defer t.cl.unlock()
+ if correct {
+ for peer := range failedPeers {
+ t.cl.banPeerIP(peer.AsSlice())
+ log.Printf("smart banned %v for piece %v", peer, index)
+ }
+ t.dropBannedPeers()
+ for ri := t.pieceRequestIndexOffset(index); ri < t.pieceRequestIndexOffset(index+1); ri++ {
+ t.smartBanCache.ForgetBlock(ri)
+ }
+ }
p.hashing = false
t.pieceHashed(index, correct, copyErr)
t.updatePiecePriority(index, "Torrent.pieceHasher")
// requests mark more often, so recomputation is probably sooner than with regular peer
// conns. ~4x maxRequests would be about right.
PeerMaxRequests: 128,
- RemoteAddr: remoteAddrFromUrl(url),
- callbacks: t.callbacks(),
+ // TODO: Set ban prefix?
+ RemoteAddr: remoteAddrFromUrl(url),
+ callbacks: t.callbacks(),
},
client: webseed.Client{
HttpClient: t.cl.httpClient,