]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Add connection trust flag, and more tests with small caches
authorMatt Joiner <anacrolix@gmail.com>
Wed, 18 Dec 2019 02:52:00 +0000 (13:52 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 18 Dec 2019 02:52:00 +0000 (13:52 +1100)
Thanks to observations and feedback from @ccampbell.

Peer.go
client.go
client_test.go
connection.go
misc.go
torrent.go

diff --git a/Peer.go b/Peer.go
index 6a0173300c2cfb2a8b3a5de34bb077699e629a33..080fad4db1b066bf0c9209ed805f56ddef40e4e4 100644 (file)
--- a/Peer.go
+++ b/Peer.go
@@ -8,6 +8,7 @@ import (
        "github.com/anacrolix/torrent/peer_protocol"
 )
 
+// Peer connection info, handed about publicly.
 type Peer struct {
        Id     [20]byte
        IP     net.IP
@@ -16,6 +17,8 @@ type Peer struct {
        // Peer is known to support encryption.
        SupportsEncryption bool
        peer_protocol.PexPeerFlags
+       // Whether we can ignore poor or bad behaviour from the peer.
+       Trusted bool
 }
 
 func (me *Peer) FromPex(na krpc.NodeAddr, fs peer_protocol.PexPeerFlags) {
index 05e9fdfd6d0a4a2ed1e059c08cdad475b2053e4e..866aa2358578243fd4ab422c3a4dc34022b0ca2e 100644 (file)
--- a/client.go
+++ b/client.go
@@ -19,12 +19,12 @@ import (
        "github.com/anacrolix/dht/v2"
        "github.com/anacrolix/dht/v2/krpc"
        "github.com/anacrolix/log"
-       "github.com/anacrolix/missinggo"
        "github.com/anacrolix/missinggo/bitmap"
        "github.com/anacrolix/missinggo/perf"
        "github.com/anacrolix/missinggo/pproffd"
        "github.com/anacrolix/missinggo/pubsub"
        "github.com/anacrolix/missinggo/slices"
+       "github.com/anacrolix/missinggo/v2"
        "github.com/anacrolix/missinggo/v2/conntrack"
        "github.com/anacrolix/sync"
        "github.com/davecgh/go-spew/spew"
@@ -690,7 +690,7 @@ func (cl *Client) establishOutgoingConn(t *Torrent, addr IpPort) (c *connection,
 
 // Called to dial out and run a connection. The addr we're given is already
 // considered half-open.
-func (cl *Client) outgoingConnection(t *Torrent, addr IpPort, ps peerSource) {
+func (cl *Client) outgoingConnection(t *Torrent, addr IpPort, ps peerSource, trusted bool) {
        cl.dialRateLimiter.Wait(context.Background())
        c, err := cl.establishOutgoingConn(t, addr)
        cl.lock()
@@ -706,6 +706,7 @@ func (cl *Client) outgoingConnection(t *Torrent, addr IpPort, ps peerSource) {
        }
        defer c.Close()
        c.Discovery = ps
+       c.trusted = trusted
        cl.runHandshookConn(c, t)
 }
 
@@ -1212,6 +1213,7 @@ func (cl *Client) AddDHTNodes(nodes []string) {
 }
 
 func (cl *Client) banPeerIP(ip net.IP) {
+       cl.logger.Printf("banning ip %v", ip)
        if cl.badPeerIPs == nil {
                cl.badPeerIPs = make(map[string]struct{})
        }
index 0ad01eecdc592659139c71fe4006f6a91045b58c..a4520baee1dfbbc27b79bb6bb2e23d3f7d264408 100644 (file)
@@ -228,7 +228,7 @@ func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
        return storage.NewResourcePieces(fc.AsResourceProvider())
 }
 
-func TestClientTransferSmallCache(t *testing.T) {
+func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) {
        testClientTransfer(t, testClientTransferParams{
                LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
                        SetCapacity: true,
@@ -237,14 +237,26 @@ func TestClientTransferSmallCache(t *testing.T) {
                        Capacity: 5,
                        Wrapper:  fileCachePieceResourceStorage,
                }),
-               SetReadahead: true,
+               SetReadahead: setReadahead,
                // Can't readahead too far or the cache will thrash and drop data we
                // thought we had.
-               Readahead:          0,
+               Readahead:          readahead,
                ExportClientStatus: true,
        })
 }
 
+func TestClientTransferSmallCachePieceSizedReadahead(t *testing.T) {
+       testClientTransferSmallCache(t, true, 5)
+}
+
+func TestClientTransferSmallCacheLargeReadahead(t *testing.T) {
+       testClientTransferSmallCache(t, true, 15)
+}
+
+func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) {
+       testClientTransferSmallCache(t, false, -1)
+}
+
 func TestClientTransferVarious(t *testing.T) {
        // Leecher storage
        for _, ls := range []storageFactory{
@@ -342,6 +354,7 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
                cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
        }
        cfg.Seed = false
+       //cfg.Debug = true
        leecher, err := NewClient(cfg)
        require.NoError(t, err)
        defer leecher.Close()
index 8e415a5039eff158a1168a93f484f16fd55cf985..3c2adea2d916a464ef2448060b4e9479fd7b3d3f 100644 (file)
@@ -55,6 +55,7 @@ type connection struct {
        headerEncrypted bool
        cryptoMethod    mse.CryptoMethod
        Discovery       peerSource
+       trusted         bool
        closed          missinggo.Event
        // Set true after we've added our ConnStats generated during handshake to
        // other ConnStat instances as determined when the *Torrent became known.
@@ -1568,3 +1569,19 @@ func (c *connection) remoteIpPort() IpPort {
 func (c *connection) String() string {
        return fmt.Sprintf("connection %p", c)
 }
+
+func (c *connection) trust() connectionTrust {
+       return connectionTrust{c.trusted, c.netGoodPiecesDirtied()}
+}
+
+type connectionTrust struct {
+       Implicit            bool
+       NetGoodPiecesDirted int64
+}
+
+func (l connectionTrust) Less(r connectionTrust) bool {
+       var ml missinggo.MultiLess
+       ml.NextBool(!l.Implicit, !r.Implicit)
+       ml.StrictNext(l.NetGoodPiecesDirted == r.NetGoodPiecesDirted, l.NetGoodPiecesDirted < r.NetGoodPiecesDirted)
+       return ml.Less()
+}
diff --git a/misc.go b/misc.go
index bdf531894157f5ecdb24dcfb5b2bf752b1f8b220..9e1628a9c99babed8c29065e5422eb1e1c3a152a 100644 (file)
--- a/misc.go
+++ b/misc.go
@@ -4,7 +4,7 @@ import (
        "errors"
        "net"
 
-       "github.com/anacrolix/missinggo"
+       "github.com/anacrolix/missinggo/v2"
        "golang.org/x/time/rate"
 
        "github.com/anacrolix/torrent/metainfo"
@@ -107,7 +107,7 @@ func chunkIndexSpec(index pp.Integer, pieceLength, chunkSize pp.Integer) chunkSp
 }
 
 func connLessTrusted(l, r *connection) bool {
-       return l.netGoodPiecesDirtied() < r.netGoodPiecesDirtied()
+       return l.trust().Less(r.trust())
 }
 
 func connIsIpv6(nc interface {
index d0e17a20d05c7bdc9c5bb69e51b487e5f0e68881..8a7a72f1a10191869738c13b00626f805f72a57e 100644 (file)
@@ -1548,8 +1548,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, correct bool) {
                }
        } else {
                if len(touchers) != 0 {
-                       // Don't increment stats above connection-level for every involved
-                       // connection.
+                       // Don't increment stats above connection-level for every involved connection.
                        t.allStats((*ConnStats).incrementPiecesDirtiedBad)
                        for _, c := range touchers {
                                // Y u do dis peer?!
@@ -1557,16 +1556,20 @@ func (t *Torrent) pieceHashed(piece pieceIndex, correct bool) {
                        }
                        slices.Sort(touchers, connLessTrusted)
                        if t.cl.config.Debug {
-                               t.logger.Printf("dropping first corresponding conn from trust: %v", func() (ret []int64) {
-                                       for _, c := range touchers {
-                                               ret = append(ret, c.netGoodPiecesDirtied())
-                                       }
-                                       return
-                               }())
+                               t.logger.Printf("conns by trust for piece %d: %v",
+                                       piece,
+                                       func() (ret []connectionTrust) {
+                                               for _, c := range touchers {
+                                                       ret = append(ret, c.trust())
+                                               }
+                                               return
+                                       }())
                        }
                        c := touchers[0]
-                       t.cl.banPeerIP(c.remoteAddr.IP)
-                       c.Drop()
+                       if !c.trust().Implicit {
+                               t.cl.banPeerIP(c.remoteAddr.IP)
+                               c.Drop()
+                       }
                }
                t.onIncompletePiece(piece)
                p.Storage().MarkNotComplete()
@@ -1702,13 +1705,12 @@ func (t *Torrent) VerifyData() {
        }
 }
 
-// Start the process of connecting to the given peer for the given torrent if
-// appropriate.
+// Start the process of connecting to the given peer for the given torrent if appropriate.
 func (t *Torrent) initiateConn(peer Peer) {
        if peer.Id == t.cl.peerID {
                return
        }
-       if t.cl.badPeerIPPort(peer.IP, peer.Port) {
+       if t.cl.badPeerIPPort(peer.IP, peer.Port) && !peer.Trusted {
                return
        }
        addr := IpPort{peer.IP, uint16(peer.Port)}
@@ -1716,15 +1718,17 @@ func (t *Torrent) initiateConn(peer Peer) {
                return
        }
        t.halfOpen[addr.String()] = peer
-       go t.cl.outgoingConnection(t, addr, peer.Source)
+       go t.cl.outgoingConnection(t, addr, peer.Source, peer.Trusted)
 }
 
+// Adds each a trusted, pending peer for each of the Client's addresses.
 func (t *Torrent) AddClientPeer(cl *Client) {
        t.AddPeers(func() (ps []Peer) {
                for _, la := range cl.ListenAddrs() {
                        ps = append(ps, Peer{
-                               IP:   missinggo.AddrIP(la),
-                               Port: missinggo.AddrPort(la),
+                               IP:      missinggo.AddrIP(la),
+                               Port:    missinggo.AddrPort(la),
+                               Trusted: true,
                        })
                }
                return