Peer.go | 32 ++++++++++++++++++++++++++++++++ client.go | 33 +++++++++++++++++++++++++++++++-- prioritized_peers.go | 43 +++++++++++++++++++++++++++++++++++++++++++ torrent.go | 60 ++++++++++++++--------------------------------------- diff --git a/Peer.go b/Peer.go new file mode 100644 index 0000000000000000000000000000000000000000..9efed032a8320d70c0c6c6a0cfdceb6dd5ffba21 --- /dev/null +++ b/Peer.go @@ -0,0 +1,32 @@ +package torrent + +import ( + "net" + + "github.com/anacrolix/dht/krpc" +) + +type Peer struct { + Id [20]byte + IP net.IP + Port int + Source peerSource + // Peer is known to support encryption. + SupportsEncryption bool + pexPeerFlags +} + +func (me *Peer) FromPex(na krpc.NodeAddr, fs pexPeerFlags) { + me.IP = append([]byte(nil), na.IP...) + me.Port = na.Port + me.Source = peerSourcePEX + // If they prefer encryption, they must support it. + if fs.Get(pexPrefersEncryption) { + me.SupportsEncryption = true + } + me.pexPeerFlags = fs +} + +func (me Peer) addr() ipPort { + return ipPort{me.IP, uint16(me.Port)} +} diff --git a/client.go b/client.go index fc774ba1cd4ad616d7bbea0fc2c3f89db4f1e4a3..d932cd7f23918a77316997e87aa4ac31a3ae6aae 100644 --- a/client.go +++ b/client.go @@ -24,6 +24,7 @@ "github.com/anacrolix/missinggo/pubsub" "github.com/anacrolix/missinggo/slices" "github.com/anacrolix/sync" "github.com/dustin/go-humanize" + "github.com/google/btree" "golang.org/x/time/rate" "github.com/anacrolix/torrent/bencode" @@ -1029,8 +1030,13 @@ t = &Torrent{ cl: cl, infoHash: ih, - peers: make(map[peersKey]Peer), - conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent), + peers: prioritizedPeers{ + om: btree.New(2), + getPrio: func(p Peer) peerPriority { + return bep40Priority(cl.publicAddr(p.IP), p.addr()) + }, + }, + conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent), halfOpen: make(map[string]Peer), pieceStateChanges: pubsub.NewPubSub(), @@ -1251,3 +1257,26 @@ Port: p.Port, Source: peerSourceDHTAnnouncePeer, }}) } + +func firstNotNil(ips ...net.IP) net.IP { + for _, ip := range ips { + if ip != nil { + return ip + } + } + return nil +} + +func (cl *Client) publicIp(peer net.IP) net.IP { + // TODO: Use BEP 10 to determine how peers are seeing us. + if peer.To4() != nil { + return firstNotNil(cl.config.PublicIp4, missinggo.AddrIP(cl.ListenAddr()).To4()) + } else { + return firstNotNil(cl.config.PublicIp6, missinggo.AddrIP(cl.ListenAddr()).To16()) + } +} + +// Our IP as a peer should see it. +func (cl *Client) publicAddr(peer net.IP) ipPort { + return ipPort{cl.publicIp(peer), uint16(cl.incomingPeerPort())} +} diff --git a/prioritized_peers.go b/prioritized_peers.go new file mode 100644 index 0000000000000000000000000000000000000000..4ea45919094a0fcfeb9e7bd1325463b6d8586c30 --- /dev/null +++ b/prioritized_peers.go @@ -0,0 +1,43 @@ +package torrent + +import "github.com/google/btree" + +// Peers are stored with their priority at insertion. Their priority may +// change if our apparent IP changes, we don't currently handle that. +type prioritizedPeersItem struct { + prio peerPriority + p Peer +} + +func (me prioritizedPeersItem) Less(than btree.Item) bool { + return me.prio < than.(prioritizedPeersItem).prio +} + +type prioritizedPeers struct { + om *btree.BTree + getPrio func(Peer) peerPriority +} + +func (me *prioritizedPeers) Each(f func(Peer)) { + me.om.Ascend(func(i btree.Item) bool { + f(i.(prioritizedPeersItem).p) + return true + }) +} + +func (me *prioritizedPeers) Len() int { + return me.om.Len() +} + +// Returns true if a peer is replaced. +func (me *prioritizedPeers) Add(p Peer) bool { + return me.om.ReplaceOrInsert(prioritizedPeersItem{me.getPrio(p), p}) != nil +} + +func (me *prioritizedPeers) DeleteMin() { + me.om.DeleteMin() +} + +func (me *prioritizedPeers) PopMax() Peer { + return me.om.DeleteMax().(prioritizedPeersItem).p +} diff --git a/torrent.go b/torrent.go index 0b14f3f2f07af6d30a1bf0cd4588cabb89e77f87..2387efb5413a7ee1594778d5e689e333f21832ce 100644 --- a/torrent.go +++ b/torrent.go @@ -17,7 +17,6 @@ "text/tabwriter" "time" "github.com/anacrolix/dht" - "github.com/anacrolix/dht/krpc" "github.com/anacrolix/log" "github.com/anacrolix/missinggo" "github.com/anacrolix/missinggo/bitmap" @@ -96,7 +95,7 @@ // Reserve of peers to connect to. A peer can be both here and in the // active connections if were told about the peer after connecting with // them. That encourages us to reconnect to peers that are well known in // the swarm. - peers map[peersKey]Peer + peers prioritizedPeers wantPeersEvent missinggo.Event // An announcer for each tracker URL. trackerAnnouncers map[string]*trackerScraper @@ -155,9 +154,9 @@ // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active, // pending, and half-open peers. func (t *Torrent) KnownSwarm() (ks []Peer) { // Add pending peers to the list - for _, peer := range t.peers { + t.peers.Each(func(peer Peer) { ks = append(ks, peer) - } + }) // Add half-open peers to the list for _, peer := range t.halfOpen { @@ -254,13 +253,14 @@ if cl.badPeerIPPort(p.IP, p.Port) { torrent.Add("peers not added because of bad addr", 1) return } - t.openNewConns() - if len(t.peers) >= cl.config.TorrentPeersHighWater { - return + if t.peers.Add(p) { + torrent.Add("peers replaced", 1) } - t.peers[peersKey{string(p.IP), p.Port}] = p t.openNewConns() - + for t.peers.Len() > cl.config.TorrentPeersHighWater { + t.peers.DeleteMin() + torrent.Add("peers discarded", 1) + } } func (t *Torrent) invalidateMetadata() { @@ -735,27 +735,6 @@ func (t *Torrent) pendAllChunkSpecs(pieceIndex int) { t.pieces[pieceIndex].dirtyChunks.Clear() } -type Peer struct { - Id [20]byte - IP net.IP - Port int - Source peerSource - // Peer is known to support encryption. - SupportsEncryption bool - pexPeerFlags -} - -func (me *Peer) FromPex(na krpc.NodeAddr, fs pexPeerFlags) { - me.IP = append([]byte(nil), na.IP...) - me.Port = na.Port - me.Source = peerSourcePEX - // If they prefer encryption, they must support it. - if fs.Get(pexPrefersEncryption) { - me.SupportsEncryption = true - } - me.pexPeerFlags = fs -} - func (t *Torrent) pieceLength(piece int) pp.Integer { if t.info.PieceLength == 0 { // There will be no variance amongst pieces. Only pain. @@ -1077,21 +1056,14 @@ } func (t *Torrent) openNewConns() { defer t.updateWantPeersEvent() - for len(t.peers) != 0 { + for t.peers.Len() != 0 { if !t.wantConns() { return } if len(t.halfOpen) >= t.maxHalfOpen() { return } - var ( - k peersKey - p Peer - ) - for k, p = range t.peers { - break - } - delete(t.peers, k) + p := t.peers.PopMax() t.initiateConn(p) } } @@ -1272,7 +1244,7 @@ func (t *Torrent) wantPeers() bool { if t.closed.IsSet() { return false } - if len(t.peers) > t.cl.config.TorrentPeersLowWater { + if t.peers.Len() > t.cl.config.TorrentPeersLowWater { return false } return t.needData() || t.seeding() @@ -1395,7 +1367,7 @@ allAddrs[key] = struct{}{} } cl.mu.Lock() t.addPeers(addPeers) - numPeers := len(t.peers) + numPeers := t.peers.Len() cl.mu.Unlock() if numPeers >= cl.config.TorrentPeersHighWater { return @@ -1458,7 +1430,7 @@ func (t *Torrent) statsLocked() TorrentStats { t.stats.ActivePeers = len(t.conns) t.stats.HalfOpenPeers = len(t.halfOpen) - t.stats.PendingPeers = len(t.peers) + t.stats.PendingPeers = t.peers.Len() t.stats.TotalPeers = t.numTotalPeers() t.stats.ConnectedSeeders = 0 for c := range t.conns { @@ -1483,9 +1455,9 @@ } for addr := range t.halfOpen { peers[addr] = struct{}{} } - for _, peer := range t.peers { + t.peers.Each(func(peer Peer) { peers[fmt.Sprintf("%s:%d", peer.IP, peer.Port)] = struct{}{} - } + }) return len(peers) }