From 5f1d937b6298a909becacccf7b2c682d6bab00df Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 18 Dec 2019 13:52:00 +1100 Subject: [PATCH] Add connection trust flag, and more tests with small caches Thanks to observations and feedback from @ccampbell. --- Peer.go | 3 +++ client.go | 6 ++++-- client_test.go | 19 ++++++++++++++++--- connection.go | 17 +++++++++++++++++ misc.go | 4 ++-- torrent.go | 36 ++++++++++++++++++++---------------- 6 files changed, 62 insertions(+), 23 deletions(-) diff --git a/Peer.go b/Peer.go index 6a017330..080fad4d 100644 --- a/Peer.go +++ b/Peer.go @@ -8,6 +8,7 @@ import ( "github.com/anacrolix/torrent/peer_protocol" ) +// Peer connection info, handed about publicly. type Peer struct { Id [20]byte IP net.IP @@ -16,6 +17,8 @@ type Peer struct { // Peer is known to support encryption. SupportsEncryption bool peer_protocol.PexPeerFlags + // Whether we can ignore poor or bad behaviour from the peer. + Trusted bool } func (me *Peer) FromPex(na krpc.NodeAddr, fs peer_protocol.PexPeerFlags) { diff --git a/client.go b/client.go index 05e9fdfd..866aa235 100644 --- a/client.go +++ b/client.go @@ -19,12 +19,12 @@ import ( "github.com/anacrolix/dht/v2" "github.com/anacrolix/dht/v2/krpc" "github.com/anacrolix/log" - "github.com/anacrolix/missinggo" "github.com/anacrolix/missinggo/bitmap" "github.com/anacrolix/missinggo/perf" "github.com/anacrolix/missinggo/pproffd" "github.com/anacrolix/missinggo/pubsub" "github.com/anacrolix/missinggo/slices" + "github.com/anacrolix/missinggo/v2" "github.com/anacrolix/missinggo/v2/conntrack" "github.com/anacrolix/sync" "github.com/davecgh/go-spew/spew" @@ -690,7 +690,7 @@ func (cl *Client) establishOutgoingConn(t *Torrent, addr IpPort) (c *connection, // Called to dial out and run a connection. The addr we're given is already // considered half-open. -func (cl *Client) outgoingConnection(t *Torrent, addr IpPort, ps peerSource) { +func (cl *Client) outgoingConnection(t *Torrent, addr IpPort, ps peerSource, trusted bool) { cl.dialRateLimiter.Wait(context.Background()) c, err := cl.establishOutgoingConn(t, addr) cl.lock() @@ -706,6 +706,7 @@ func (cl *Client) outgoingConnection(t *Torrent, addr IpPort, ps peerSource) { } defer c.Close() c.Discovery = ps + c.trusted = trusted cl.runHandshookConn(c, t) } @@ -1212,6 +1213,7 @@ func (cl *Client) AddDHTNodes(nodes []string) { } func (cl *Client) banPeerIP(ip net.IP) { + cl.logger.Printf("banning ip %v", ip) if cl.badPeerIPs == nil { cl.badPeerIPs = make(map[string]struct{}) } diff --git a/client_test.go b/client_test.go index 0ad01eec..a4520bae 100644 --- a/client_test.go +++ b/client_test.go @@ -228,7 +228,7 @@ func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl { return storage.NewResourcePieces(fc.AsResourceProvider()) } -func TestClientTransferSmallCache(t *testing.T) { +func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) { testClientTransfer(t, testClientTransferParams{ LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{ SetCapacity: true, @@ -237,14 +237,26 @@ func TestClientTransferSmallCache(t *testing.T) { Capacity: 5, Wrapper: fileCachePieceResourceStorage, }), - SetReadahead: true, + SetReadahead: setReadahead, // Can't readahead too far or the cache will thrash and drop data we // thought we had. - Readahead: 0, + Readahead: readahead, ExportClientStatus: true, }) } +func TestClientTransferSmallCachePieceSizedReadahead(t *testing.T) { + testClientTransferSmallCache(t, true, 5) +} + +func TestClientTransferSmallCacheLargeReadahead(t *testing.T) { + testClientTransferSmallCache(t, true, 15) +} + +func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) { + testClientTransferSmallCache(t, false, -1) +} + func TestClientTransferVarious(t *testing.T) { // Leecher storage for _, ls := range []storageFactory{ @@ -342,6 +354,7 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) { cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter } cfg.Seed = false + //cfg.Debug = true leecher, err := NewClient(cfg) require.NoError(t, err) defer leecher.Close() diff --git a/connection.go b/connection.go index 8e415a50..3c2adea2 100644 --- a/connection.go +++ b/connection.go @@ -55,6 +55,7 @@ type connection struct { headerEncrypted bool cryptoMethod mse.CryptoMethod Discovery peerSource + trusted bool closed missinggo.Event // Set true after we've added our ConnStats generated during handshake to // other ConnStat instances as determined when the *Torrent became known. @@ -1568,3 +1569,19 @@ func (c *connection) remoteIpPort() IpPort { func (c *connection) String() string { return fmt.Sprintf("connection %p", c) } + +func (c *connection) trust() connectionTrust { + return connectionTrust{c.trusted, c.netGoodPiecesDirtied()} +} + +type connectionTrust struct { + Implicit bool + NetGoodPiecesDirted int64 +} + +func (l connectionTrust) Less(r connectionTrust) bool { + var ml missinggo.MultiLess + ml.NextBool(!l.Implicit, !r.Implicit) + ml.StrictNext(l.NetGoodPiecesDirted == r.NetGoodPiecesDirted, l.NetGoodPiecesDirted < r.NetGoodPiecesDirted) + return ml.Less() +} diff --git a/misc.go b/misc.go index bdf53189..9e1628a9 100644 --- a/misc.go +++ b/misc.go @@ -4,7 +4,7 @@ import ( "errors" "net" - "github.com/anacrolix/missinggo" + "github.com/anacrolix/missinggo/v2" "golang.org/x/time/rate" "github.com/anacrolix/torrent/metainfo" @@ -107,7 +107,7 @@ func chunkIndexSpec(index pp.Integer, pieceLength, chunkSize pp.Integer) chunkSp } func connLessTrusted(l, r *connection) bool { - return l.netGoodPiecesDirtied() < r.netGoodPiecesDirtied() + return l.trust().Less(r.trust()) } func connIsIpv6(nc interface { diff --git a/torrent.go b/torrent.go index d0e17a20..8a7a72f1 100644 --- a/torrent.go +++ b/torrent.go @@ -1548,8 +1548,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, correct bool) { } } else { if len(touchers) != 0 { - // Don't increment stats above connection-level for every involved - // connection. + // Don't increment stats above connection-level for every involved connection. t.allStats((*ConnStats).incrementPiecesDirtiedBad) for _, c := range touchers { // Y u do dis peer?! @@ -1557,16 +1556,20 @@ func (t *Torrent) pieceHashed(piece pieceIndex, correct bool) { } slices.Sort(touchers, connLessTrusted) if t.cl.config.Debug { - t.logger.Printf("dropping first corresponding conn from trust: %v", func() (ret []int64) { - for _, c := range touchers { - ret = append(ret, c.netGoodPiecesDirtied()) - } - return - }()) + t.logger.Printf("conns by trust for piece %d: %v", + piece, + func() (ret []connectionTrust) { + for _, c := range touchers { + ret = append(ret, c.trust()) + } + return + }()) } c := touchers[0] - t.cl.banPeerIP(c.remoteAddr.IP) - c.Drop() + if !c.trust().Implicit { + t.cl.banPeerIP(c.remoteAddr.IP) + c.Drop() + } } t.onIncompletePiece(piece) p.Storage().MarkNotComplete() @@ -1702,13 +1705,12 @@ func (t *Torrent) VerifyData() { } } -// Start the process of connecting to the given peer for the given torrent if -// appropriate. +// Start the process of connecting to the given peer for the given torrent if appropriate. func (t *Torrent) initiateConn(peer Peer) { if peer.Id == t.cl.peerID { return } - if t.cl.badPeerIPPort(peer.IP, peer.Port) { + if t.cl.badPeerIPPort(peer.IP, peer.Port) && !peer.Trusted { return } addr := IpPort{peer.IP, uint16(peer.Port)} @@ -1716,15 +1718,17 @@ func (t *Torrent) initiateConn(peer Peer) { return } t.halfOpen[addr.String()] = peer - go t.cl.outgoingConnection(t, addr, peer.Source) + go t.cl.outgoingConnection(t, addr, peer.Source, peer.Trusted) } +// Adds each a trusted, pending peer for each of the Client's addresses. func (t *Torrent) AddClientPeer(cl *Client) { t.AddPeers(func() (ps []Peer) { for _, la := range cl.ListenAddrs() { ps = append(ps, Peer{ - IP: missinggo.AddrIP(la), - Port: missinggo.AddrPort(la), + IP: missinggo.AddrIP(la), + Port: missinggo.AddrPort(la), + Trusted: true, }) } return -- 2.44.0