Thanks to observations and feedback from @ccampbell.
"github.com/anacrolix/torrent/peer_protocol"
)
+// Peer connection info, handed about publicly.
type Peer struct {
Id [20]byte
IP net.IP
// Peer is known to support encryption.
SupportsEncryption bool
peer_protocol.PexPeerFlags
+ // Whether we can ignore poor or bad behaviour from the peer.
+ Trusted bool
}
func (me *Peer) FromPex(na krpc.NodeAddr, fs peer_protocol.PexPeerFlags) {
"github.com/anacrolix/dht/v2"
"github.com/anacrolix/dht/v2/krpc"
"github.com/anacrolix/log"
- "github.com/anacrolix/missinggo"
"github.com/anacrolix/missinggo/bitmap"
"github.com/anacrolix/missinggo/perf"
"github.com/anacrolix/missinggo/pproffd"
"github.com/anacrolix/missinggo/pubsub"
"github.com/anacrolix/missinggo/slices"
+ "github.com/anacrolix/missinggo/v2"
"github.com/anacrolix/missinggo/v2/conntrack"
"github.com/anacrolix/sync"
"github.com/davecgh/go-spew/spew"
// Called to dial out and run a connection. The addr we're given is already
// considered half-open.
-func (cl *Client) outgoingConnection(t *Torrent, addr IpPort, ps peerSource) {
+func (cl *Client) outgoingConnection(t *Torrent, addr IpPort, ps peerSource, trusted bool) {
cl.dialRateLimiter.Wait(context.Background())
c, err := cl.establishOutgoingConn(t, addr)
cl.lock()
}
defer c.Close()
c.Discovery = ps
+ c.trusted = trusted
cl.runHandshookConn(c, t)
}
}
func (cl *Client) banPeerIP(ip net.IP) {
+ cl.logger.Printf("banning ip %v", ip)
if cl.badPeerIPs == nil {
cl.badPeerIPs = make(map[string]struct{})
}
return storage.NewResourcePieces(fc.AsResourceProvider())
}
-func TestClientTransferSmallCache(t *testing.T) {
+func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) {
testClientTransfer(t, testClientTransferParams{
LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
SetCapacity: true,
Capacity: 5,
Wrapper: fileCachePieceResourceStorage,
}),
- SetReadahead: true,
+ SetReadahead: setReadahead,
// Can't readahead too far or the cache will thrash and drop data we
// thought we had.
- Readahead: 0,
+ Readahead: readahead,
ExportClientStatus: true,
})
}
+func TestClientTransferSmallCachePieceSizedReadahead(t *testing.T) {
+ testClientTransferSmallCache(t, true, 5)
+}
+
+func TestClientTransferSmallCacheLargeReadahead(t *testing.T) {
+ testClientTransferSmallCache(t, true, 15)
+}
+
+func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) {
+ testClientTransferSmallCache(t, false, -1)
+}
+
func TestClientTransferVarious(t *testing.T) {
// Leecher storage
for _, ls := range []storageFactory{
cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
}
cfg.Seed = false
+ //cfg.Debug = true
leecher, err := NewClient(cfg)
require.NoError(t, err)
defer leecher.Close()
headerEncrypted bool
cryptoMethod mse.CryptoMethod
Discovery peerSource
+ trusted bool
closed missinggo.Event
// Set true after we've added our ConnStats generated during handshake to
// other ConnStat instances as determined when the *Torrent became known.
func (c *connection) String() string {
return fmt.Sprintf("connection %p", c)
}
+
+func (c *connection) trust() connectionTrust {
+ return connectionTrust{c.trusted, c.netGoodPiecesDirtied()}
+}
+
+type connectionTrust struct {
+ Implicit bool
+ NetGoodPiecesDirted int64
+}
+
+func (l connectionTrust) Less(r connectionTrust) bool {
+ var ml missinggo.MultiLess
+ ml.NextBool(!l.Implicit, !r.Implicit)
+ ml.StrictNext(l.NetGoodPiecesDirted == r.NetGoodPiecesDirted, l.NetGoodPiecesDirted < r.NetGoodPiecesDirted)
+ return ml.Less()
+}
"errors"
"net"
- "github.com/anacrolix/missinggo"
+ "github.com/anacrolix/missinggo/v2"
"golang.org/x/time/rate"
"github.com/anacrolix/torrent/metainfo"
}
func connLessTrusted(l, r *connection) bool {
- return l.netGoodPiecesDirtied() < r.netGoodPiecesDirtied()
+ return l.trust().Less(r.trust())
}
func connIsIpv6(nc interface {
}
} else {
if len(touchers) != 0 {
- // Don't increment stats above connection-level for every involved
- // connection.
+ // Don't increment stats above connection-level for every involved connection.
t.allStats((*ConnStats).incrementPiecesDirtiedBad)
for _, c := range touchers {
// Y u do dis peer?!
}
slices.Sort(touchers, connLessTrusted)
if t.cl.config.Debug {
- t.logger.Printf("dropping first corresponding conn from trust: %v", func() (ret []int64) {
- for _, c := range touchers {
- ret = append(ret, c.netGoodPiecesDirtied())
- }
- return
- }())
+ t.logger.Printf("conns by trust for piece %d: %v",
+ piece,
+ func() (ret []connectionTrust) {
+ for _, c := range touchers {
+ ret = append(ret, c.trust())
+ }
+ return
+ }())
}
c := touchers[0]
- t.cl.banPeerIP(c.remoteAddr.IP)
- c.Drop()
+ if !c.trust().Implicit {
+ t.cl.banPeerIP(c.remoteAddr.IP)
+ c.Drop()
+ }
}
t.onIncompletePiece(piece)
p.Storage().MarkNotComplete()
}
}
-// Start the process of connecting to the given peer for the given torrent if
-// appropriate.
+// Start the process of connecting to the given peer for the given torrent if appropriate.
func (t *Torrent) initiateConn(peer Peer) {
if peer.Id == t.cl.peerID {
return
}
- if t.cl.badPeerIPPort(peer.IP, peer.Port) {
+ if t.cl.badPeerIPPort(peer.IP, peer.Port) && !peer.Trusted {
return
}
addr := IpPort{peer.IP, uint16(peer.Port)}
return
}
t.halfOpen[addr.String()] = peer
- go t.cl.outgoingConnection(t, addr, peer.Source)
+ go t.cl.outgoingConnection(t, addr, peer.Source, peer.Trusted)
}
+// Adds each a trusted, pending peer for each of the Client's addresses.
func (t *Torrent) AddClientPeer(cl *Client) {
t.AddPeers(func() (ps []Peer) {
for _, la := range cl.ListenAddrs() {
ps = append(ps, Peer{
- IP: missinggo.AddrIP(la),
- Port: missinggo.AddrPort(la),
+ IP: missinggo.AddrIP(la),
+ Port: missinggo.AddrPort(la),
+ Trusted: true,
})
}
return