]> Sergey Matveev's repositories - btrtrc.git/blobdiff - torrent.go
Merge branch 'master' into lazylog
[btrtrc.git] / torrent.go
index 744c65e409f34fdb0f0b4a7d8de51d218241adaf..22b4bb57dfe2e3d03560830042132ade17c71eae 100644 (file)
@@ -8,6 +8,7 @@ import (
        "errors"
        "fmt"
        "io"
+       "net/netip"
        "net/url"
        "sort"
        "strings"
@@ -19,6 +20,7 @@ import (
        "github.com/anacrolix/chansync"
        "github.com/anacrolix/chansync/events"
        "github.com/anacrolix/dht/v2"
+       . "github.com/anacrolix/generics"
        "github.com/anacrolix/log"
        "github.com/anacrolix/missinggo/perf"
        "github.com/anacrolix/missinggo/pubsub"
@@ -147,6 +149,8 @@ type Torrent struct {
 
        // Is On when all pieces are complete.
        Complete chansync.Flag
+
+       smartBanCache smartBanCache
 }
 
 func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) {
@@ -939,7 +943,20 @@ func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer {
        return pp.Integer(t.info.PieceLength)
 }
 
-func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash, err error) {
+func (t *Torrent) smartBanBlockCheckingWriter(piece pieceIndex) *blockCheckingWriter {
+       return &blockCheckingWriter{
+               cache:        &t.smartBanCache,
+               requestIndex: t.pieceRequestIndexOffset(piece),
+               chunkSize:    t.chunkSize.Int(),
+       }
+}
+
+func (t *Torrent) hashPiece(piece pieceIndex) (
+       ret metainfo.Hash,
+       // These are peers that sent us blocks that differ from what we hash here.
+       differingPeers map[bannableAddr]struct{},
+       err error,
+) {
        p := t.piece(piece)
        p.waitNoPendingWrites()
        storagePiece := t.pieces[piece].Storage()
@@ -955,13 +972,18 @@ func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash, err error) {
 
        hash := pieceHash.New()
        const logPieceContents = false
+       smartBanWriter := t.smartBanBlockCheckingWriter(piece)
+       writers := []io.Writer{hash, smartBanWriter}
+       var examineBuf bytes.Buffer
+       if logPieceContents {
+               writers = append(writers, &examineBuf)
+       }
+       _, err = storagePiece.WriteTo(io.MultiWriter(writers...))
        if logPieceContents {
-               var examineBuf bytes.Buffer
-               _, err = storagePiece.WriteTo(io.MultiWriter(hash, &examineBuf))
                log.Printf("hashed %q with copy err %v", examineBuf.Bytes(), err)
-       } else {
-               _, err = storagePiece.WriteTo(hash)
        }
+       smartBanWriter.Flush()
+       differingPeers = smartBanWriter.badPeers
        missinggo.CopyExact(&ret, hash.Sum(nil))
        return
 }
@@ -1302,7 +1324,7 @@ func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool {
                t.logger.Printf("marked piece %v complete but still has dirtiers", piece)
        }
        if changed {
-               log.Fstr("piece %d completion changed: %+v -> %+v", piece, cached, uncached).SetLevel(log.Debug).Log(t.logger)
+               log.Fstr("piece %d completion changed: %+v -> %+v", piece, cached, uncached).LogLevel(log.Debug, t.logger)
                t.pieceCompletionChanged(piece, "Torrent.updatePieceCompletion")
        }
        return changed
@@ -1932,7 +1954,9 @@ func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
 }
 
 func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
-       t.logger.Log(log.Fstr("hashed piece %d (passed=%t)", piece, passed).SetLevel(log.Debug))
+       t.logger.LazyLog(log.Debug, func() log.Msg {
+               return log.Fstr("hashed piece %d (passed=%t)", piece, passed)
+       })
        p := t.piece(piece)
        p.numVerifies++
        t.cl.event.Broadcast()
@@ -1947,7 +1971,10 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
                } else {
                        log.Fmsg(
                                "piece %d failed hash: %d connections contributed", piece, len(p.dirtiers),
-                       ).AddValues(t, p).SetLevel(log.Debug).Log(t.logger)
+                       ).AddValues(t, p).LogLevel(
+
+                               log.Debug, t.logger)
+
                        pieceHashedNotCorrect.Add(1)
                }
        }
@@ -2015,8 +2042,11 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
 
                        if len(bannableTouchers) >= 1 {
                                c := bannableTouchers[0]
-                               t.cl.banPeerIP(c.remoteIp())
-                               c.drop()
+                               log.Printf("would have banned %v for touching piece %v after failed piece check", c.remoteIp(), piece)
+                               if false {
+                                       t.cl.banPeerIP(c.remoteIp())
+                                       c.drop()
+                               }
                        }
                }
                t.onIncompletePiece(piece)
@@ -2104,9 +2134,31 @@ func (t *Torrent) getPieceToHash() (ret pieceIndex, ok bool) {
        return
 }
 
+func (t *Torrent) dropBannedPeers() {
+       t.iterPeers(func(p *Peer) {
+               remoteIp := p.remoteIp()
+               if remoteIp == nil {
+                       if p.bannableAddr.Ok() {
+                               log.Printf("can't get remote ip for peer %v", p)
+                       }
+                       return
+               }
+               netipAddr := netip.MustParseAddr(remoteIp.String())
+               if Some(netipAddr) != p.bannableAddr {
+                       log.Printf(
+                               "peer remote ip does not match its bannable addr [peer=%v, remote ip=%v, bannable addr=%v]",
+                               p, remoteIp, p.bannableAddr)
+               }
+               if _, ok := t.cl.badPeerIPs[netipAddr]; ok {
+                       p.drop()
+                       log.Printf("dropped %v for banned remote IP %v", p, netipAddr)
+               }
+       })
+}
+
 func (t *Torrent) pieceHasher(index pieceIndex) {
        p := t.piece(index)
-       sum, copyErr := t.hashPiece(index)
+       sum, failedPeers, copyErr := t.hashPiece(index)
        correct := sum == *p.hash
        switch copyErr {
        case nil, io.EOF:
@@ -2116,6 +2168,16 @@ func (t *Torrent) pieceHasher(index pieceIndex) {
        t.storageLock.RUnlock()
        t.cl.lock()
        defer t.cl.unlock()
+       if correct {
+               for peer := range failedPeers {
+                       t.cl.banPeerIP(peer.AsSlice())
+                       log.Printf("smart banned %v for piece %v", peer, index)
+               }
+               t.dropBannedPeers()
+               for ri := t.pieceRequestIndexOffset(index); ri < t.pieceRequestIndexOffset(index+1); ri++ {
+                       t.smartBanCache.ForgetBlock(ri)
+               }
+       }
        p.hashing = false
        t.pieceHashed(index, correct, copyErr)
        t.updatePiecePriority(index, "Torrent.pieceHasher")
@@ -2299,8 +2361,9 @@ func (t *Torrent) addWebSeed(url string) {
                        // requests mark more often, so recomputation is probably sooner than with regular peer
                        // conns. ~4x maxRequests would be about right.
                        PeerMaxRequests: 128,
-                       RemoteAddr:      remoteAddrFromUrl(url),
-                       callbacks:       t.callbacks(),
+                       // TODO: Set ban prefix?
+                       RemoteAddr: remoteAddrFromUrl(url),
+                       callbacks:  t.callbacks(),
                },
                client: webseed.Client{
                        HttpClient: t.cl.webseedHttpClient,