From 92f6209c5f7cd438b9f6d255c78507ff1d370de5 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 4 Apr 2018 17:59:28 +1000 Subject: [PATCH] Prioritize pending nodes with BEP 40 --- Peer.go | 32 +++++++++++++++++++++++ client.go | 33 ++++++++++++++++++++++-- prioritized_peers.go | 43 +++++++++++++++++++++++++++++++ torrent.go | 60 ++++++++++++-------------------------------- 4 files changed, 122 insertions(+), 46 deletions(-) create mode 100644 Peer.go create mode 100644 prioritized_peers.go diff --git a/Peer.go b/Peer.go new file mode 100644 index 00000000..9efed032 --- /dev/null +++ b/Peer.go @@ -0,0 +1,32 @@ +package torrent + +import ( + "net" + + "github.com/anacrolix/dht/krpc" +) + +type Peer struct { + Id [20]byte + IP net.IP + Port int + Source peerSource + // Peer is known to support encryption. + SupportsEncryption bool + pexPeerFlags +} + +func (me *Peer) FromPex(na krpc.NodeAddr, fs pexPeerFlags) { + me.IP = append([]byte(nil), na.IP...) + me.Port = na.Port + me.Source = peerSourcePEX + // If they prefer encryption, they must support it. + if fs.Get(pexPrefersEncryption) { + me.SupportsEncryption = true + } + me.pexPeerFlags = fs +} + +func (me Peer) addr() ipPort { + return ipPort{me.IP, uint16(me.Port)} +} diff --git a/client.go b/client.go index fc774ba1..d932cd7f 100644 --- a/client.go +++ b/client.go @@ -24,6 +24,7 @@ import ( "github.com/anacrolix/missinggo/slices" "github.com/anacrolix/sync" "github.com/dustin/go-humanize" + "github.com/google/btree" "golang.org/x/time/rate" "github.com/anacrolix/torrent/bencode" @@ -1029,8 +1030,13 @@ func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) ( t = &Torrent{ cl: cl, infoHash: ih, - peers: make(map[peersKey]Peer), - conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent), + peers: prioritizedPeers{ + om: btree.New(2), + getPrio: func(p Peer) peerPriority { + return bep40Priority(cl.publicAddr(p.IP), p.addr()) + }, + }, + conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent), halfOpen: make(map[string]Peer), pieceStateChanges: pubsub.NewPubSub(), @@ -1251,3 +1257,26 @@ func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) { Source: peerSourceDHTAnnouncePeer, }}) } + +func firstNotNil(ips ...net.IP) net.IP { + for _, ip := range ips { + if ip != nil { + return ip + } + } + return nil +} + +func (cl *Client) publicIp(peer net.IP) net.IP { + // TODO: Use BEP 10 to determine how peers are seeing us. + if peer.To4() != nil { + return firstNotNil(cl.config.PublicIp4, missinggo.AddrIP(cl.ListenAddr()).To4()) + } else { + return firstNotNil(cl.config.PublicIp6, missinggo.AddrIP(cl.ListenAddr()).To16()) + } +} + +// Our IP as a peer should see it. +func (cl *Client) publicAddr(peer net.IP) ipPort { + return ipPort{cl.publicIp(peer), uint16(cl.incomingPeerPort())} +} diff --git a/prioritized_peers.go b/prioritized_peers.go new file mode 100644 index 00000000..4ea45919 --- /dev/null +++ b/prioritized_peers.go @@ -0,0 +1,43 @@ +package torrent + +import "github.com/google/btree" + +// Peers are stored with their priority at insertion. Their priority may +// change if our apparent IP changes, we don't currently handle that. +type prioritizedPeersItem struct { + prio peerPriority + p Peer +} + +func (me prioritizedPeersItem) Less(than btree.Item) bool { + return me.prio < than.(prioritizedPeersItem).prio +} + +type prioritizedPeers struct { + om *btree.BTree + getPrio func(Peer) peerPriority +} + +func (me *prioritizedPeers) Each(f func(Peer)) { + me.om.Ascend(func(i btree.Item) bool { + f(i.(prioritizedPeersItem).p) + return true + }) +} + +func (me *prioritizedPeers) Len() int { + return me.om.Len() +} + +// Returns true if a peer is replaced. +func (me *prioritizedPeers) Add(p Peer) bool { + return me.om.ReplaceOrInsert(prioritizedPeersItem{me.getPrio(p), p}) != nil +} + +func (me *prioritizedPeers) DeleteMin() { + me.om.DeleteMin() +} + +func (me *prioritizedPeers) PopMax() Peer { + return me.om.DeleteMax().(prioritizedPeersItem).p +} diff --git a/torrent.go b/torrent.go index 0b14f3f2..2387efb5 100644 --- a/torrent.go +++ b/torrent.go @@ -17,7 +17,6 @@ import ( "time" "github.com/anacrolix/dht" - "github.com/anacrolix/dht/krpc" "github.com/anacrolix/log" "github.com/anacrolix/missinggo" "github.com/anacrolix/missinggo/bitmap" @@ -96,7 +95,7 @@ type Torrent struct { // active connections if were told about the peer after connecting with // them. That encourages us to reconnect to peers that are well known in // the swarm. - peers map[peersKey]Peer + peers prioritizedPeers wantPeersEvent missinggo.Event // An announcer for each tracker URL. trackerAnnouncers map[string]*trackerScraper @@ -155,9 +154,9 @@ func (t *Torrent) Closed() <-chan struct{} { // pending, and half-open peers. func (t *Torrent) KnownSwarm() (ks []Peer) { // Add pending peers to the list - for _, peer := range t.peers { + t.peers.Each(func(peer Peer) { ks = append(ks, peer) - } + }) // Add half-open peers to the list for _, peer := range t.halfOpen { @@ -254,13 +253,14 @@ func (t *Torrent) addPeer(p Peer) { torrent.Add("peers not added because of bad addr", 1) return } - t.openNewConns() - if len(t.peers) >= cl.config.TorrentPeersHighWater { - return + if t.peers.Add(p) { + torrent.Add("peers replaced", 1) } - t.peers[peersKey{string(p.IP), p.Port}] = p t.openNewConns() - + for t.peers.Len() > cl.config.TorrentPeersHighWater { + t.peers.DeleteMin() + torrent.Add("peers discarded", 1) + } } func (t *Torrent) invalidateMetadata() { @@ -735,27 +735,6 @@ func (t *Torrent) pendAllChunkSpecs(pieceIndex int) { t.pieces[pieceIndex].dirtyChunks.Clear() } -type Peer struct { - Id [20]byte - IP net.IP - Port int - Source peerSource - // Peer is known to support encryption. - SupportsEncryption bool - pexPeerFlags -} - -func (me *Peer) FromPex(na krpc.NodeAddr, fs pexPeerFlags) { - me.IP = append([]byte(nil), na.IP...) - me.Port = na.Port - me.Source = peerSourcePEX - // If they prefer encryption, they must support it. - if fs.Get(pexPrefersEncryption) { - me.SupportsEncryption = true - } - me.pexPeerFlags = fs -} - func (t *Torrent) pieceLength(piece int) pp.Integer { if t.info.PieceLength == 0 { // There will be no variance amongst pieces. Only pain. @@ -1077,21 +1056,14 @@ func (t *Torrent) maxHalfOpen() int { func (t *Torrent) openNewConns() { defer t.updateWantPeersEvent() - for len(t.peers) != 0 { + for t.peers.Len() != 0 { if !t.wantConns() { return } if len(t.halfOpen) >= t.maxHalfOpen() { return } - var ( - k peersKey - p Peer - ) - for k, p = range t.peers { - break - } - delete(t.peers, k) + p := t.peers.PopMax() t.initiateConn(p) } } @@ -1272,7 +1244,7 @@ func (t *Torrent) wantPeers() bool { if t.closed.IsSet() { return false } - if len(t.peers) > t.cl.config.TorrentPeersLowWater { + if t.peers.Len() > t.cl.config.TorrentPeersLowWater { return false } return t.needData() || t.seeding() @@ -1395,7 +1367,7 @@ func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) { } cl.mu.Lock() t.addPeers(addPeers) - numPeers := len(t.peers) + numPeers := t.peers.Len() cl.mu.Unlock() if numPeers >= cl.config.TorrentPeersHighWater { return @@ -1458,7 +1430,7 @@ func (t *Torrent) Stats() TorrentStats { func (t *Torrent) statsLocked() TorrentStats { t.stats.ActivePeers = len(t.conns) t.stats.HalfOpenPeers = len(t.halfOpen) - t.stats.PendingPeers = len(t.peers) + t.stats.PendingPeers = t.peers.Len() t.stats.TotalPeers = t.numTotalPeers() t.stats.ConnectedSeeders = 0 for c := range t.conns { @@ -1483,9 +1455,9 @@ func (t *Torrent) numTotalPeers() int { for addr := range t.halfOpen { peers[addr] = struct{}{} } - for _, peer := range t.peers { + t.peers.Each(func(peer Peer) { peers[fmt.Sprintf("%s:%d", peer.IP, peer.Port)] = struct{}{} - } + }) return len(peers) } -- 2.50.0