]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Prioritize pending nodes with BEP 40
authorMatt Joiner <anacrolix@gmail.com>
Wed, 4 Apr 2018 07:59:28 +0000 (17:59 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 4 Apr 2018 07:59:28 +0000 (17:59 +1000)
Peer.go [new file with mode: 0644]
client.go
prioritized_peers.go [new file with mode: 0644]
torrent.go

diff --git a/Peer.go b/Peer.go
new file mode 100644 (file)
index 0000000..9efed03
--- /dev/null
+++ b/Peer.go
@@ -0,0 +1,32 @@
+package torrent
+
+import (
+       "net"
+
+       "github.com/anacrolix/dht/krpc"
+)
+
+type Peer struct {
+       Id     [20]byte
+       IP     net.IP
+       Port   int
+       Source peerSource
+       // Peer is known to support encryption.
+       SupportsEncryption bool
+       pexPeerFlags
+}
+
+func (me *Peer) FromPex(na krpc.NodeAddr, fs pexPeerFlags) {
+       me.IP = append([]byte(nil), na.IP...)
+       me.Port = na.Port
+       me.Source = peerSourcePEX
+       // If they prefer encryption, they must support it.
+       if fs.Get(pexPrefersEncryption) {
+               me.SupportsEncryption = true
+       }
+       me.pexPeerFlags = fs
+}
+
+func (me Peer) addr() ipPort {
+       return ipPort{me.IP, uint16(me.Port)}
+}
index fc774ba1cd4ad616d7bbea0fc2c3f89db4f1e4a3..d932cd7f23918a77316997e87aa4ac31a3ae6aae 100644 (file)
--- a/client.go
+++ b/client.go
@@ -24,6 +24,7 @@ import (
        "github.com/anacrolix/missinggo/slices"
        "github.com/anacrolix/sync"
        "github.com/dustin/go-humanize"
+       "github.com/google/btree"
        "golang.org/x/time/rate"
 
        "github.com/anacrolix/torrent/bencode"
@@ -1029,8 +1030,13 @@ func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (
        t = &Torrent{
                cl:       cl,
                infoHash: ih,
-               peers:    make(map[peersKey]Peer),
-               conns:    make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
+               peers: prioritizedPeers{
+                       om: btree.New(2),
+                       getPrio: func(p Peer) peerPriority {
+                               return bep40Priority(cl.publicAddr(p.IP), p.addr())
+                       },
+               },
+               conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
 
                halfOpen:          make(map[string]Peer),
                pieceStateChanges: pubsub.NewPubSub(),
@@ -1251,3 +1257,26 @@ func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
                Source: peerSourceDHTAnnouncePeer,
        }})
 }
+
+func firstNotNil(ips ...net.IP) net.IP {
+       for _, ip := range ips {
+               if ip != nil {
+                       return ip
+               }
+       }
+       return nil
+}
+
+func (cl *Client) publicIp(peer net.IP) net.IP {
+       // TODO: Use BEP 10 to determine how peers are seeing us.
+       if peer.To4() != nil {
+               return firstNotNil(cl.config.PublicIp4, missinggo.AddrIP(cl.ListenAddr()).To4())
+       } else {
+               return firstNotNil(cl.config.PublicIp6, missinggo.AddrIP(cl.ListenAddr()).To16())
+       }
+}
+
+// Our IP as a peer should see it.
+func (cl *Client) publicAddr(peer net.IP) ipPort {
+       return ipPort{cl.publicIp(peer), uint16(cl.incomingPeerPort())}
+}
diff --git a/prioritized_peers.go b/prioritized_peers.go
new file mode 100644 (file)
index 0000000..4ea4591
--- /dev/null
@@ -0,0 +1,43 @@
+package torrent
+
+import "github.com/google/btree"
+
+// Peers are stored with their priority at insertion. Their priority may
+// change if our apparent IP changes, we don't currently handle that.
+type prioritizedPeersItem struct {
+       prio peerPriority
+       p    Peer
+}
+
+func (me prioritizedPeersItem) Less(than btree.Item) bool {
+       return me.prio < than.(prioritizedPeersItem).prio
+}
+
+type prioritizedPeers struct {
+       om      *btree.BTree
+       getPrio func(Peer) peerPriority
+}
+
+func (me *prioritizedPeers) Each(f func(Peer)) {
+       me.om.Ascend(func(i btree.Item) bool {
+               f(i.(prioritizedPeersItem).p)
+               return true
+       })
+}
+
+func (me *prioritizedPeers) Len() int {
+       return me.om.Len()
+}
+
+// Returns true if a peer is replaced.
+func (me *prioritizedPeers) Add(p Peer) bool {
+       return me.om.ReplaceOrInsert(prioritizedPeersItem{me.getPrio(p), p}) != nil
+}
+
+func (me *prioritizedPeers) DeleteMin() {
+       me.om.DeleteMin()
+}
+
+func (me *prioritizedPeers) PopMax() Peer {
+       return me.om.DeleteMax().(prioritizedPeersItem).p
+}
index 0b14f3f2f07af6d30a1bf0cd4588cabb89e77f87..2387efb5413a7ee1594778d5e689e333f21832ce 100644 (file)
@@ -17,7 +17,6 @@ import (
        "time"
 
        "github.com/anacrolix/dht"
-       "github.com/anacrolix/dht/krpc"
        "github.com/anacrolix/log"
        "github.com/anacrolix/missinggo"
        "github.com/anacrolix/missinggo/bitmap"
@@ -96,7 +95,7 @@ type Torrent struct {
        // active connections if were told about the peer after connecting with
        // them. That encourages us to reconnect to peers that are well known in
        // the swarm.
-       peers          map[peersKey]Peer
+       peers          prioritizedPeers
        wantPeersEvent missinggo.Event
        // An announcer for each tracker URL.
        trackerAnnouncers map[string]*trackerScraper
@@ -155,9 +154,9 @@ func (t *Torrent) Closed() <-chan struct{} {
 // pending, and half-open peers.
 func (t *Torrent) KnownSwarm() (ks []Peer) {
        // Add pending peers to the list
-       for _, peer := range t.peers {
+       t.peers.Each(func(peer Peer) {
                ks = append(ks, peer)
-       }
+       })
 
        // Add half-open peers to the list
        for _, peer := range t.halfOpen {
@@ -254,13 +253,14 @@ func (t *Torrent) addPeer(p Peer) {
                torrent.Add("peers not added because of bad addr", 1)
                return
        }
-       t.openNewConns()
-       if len(t.peers) >= cl.config.TorrentPeersHighWater {
-               return
+       if t.peers.Add(p) {
+               torrent.Add("peers replaced", 1)
        }
-       t.peers[peersKey{string(p.IP), p.Port}] = p
        t.openNewConns()
-
+       for t.peers.Len() > cl.config.TorrentPeersHighWater {
+               t.peers.DeleteMin()
+               torrent.Add("peers discarded", 1)
+       }
 }
 
 func (t *Torrent) invalidateMetadata() {
@@ -735,27 +735,6 @@ func (t *Torrent) pendAllChunkSpecs(pieceIndex int) {
        t.pieces[pieceIndex].dirtyChunks.Clear()
 }
 
-type Peer struct {
-       Id     [20]byte
-       IP     net.IP
-       Port   int
-       Source peerSource
-       // Peer is known to support encryption.
-       SupportsEncryption bool
-       pexPeerFlags
-}
-
-func (me *Peer) FromPex(na krpc.NodeAddr, fs pexPeerFlags) {
-       me.IP = append([]byte(nil), na.IP...)
-       me.Port = na.Port
-       me.Source = peerSourcePEX
-       // If they prefer encryption, they must support it.
-       if fs.Get(pexPrefersEncryption) {
-               me.SupportsEncryption = true
-       }
-       me.pexPeerFlags = fs
-}
-
 func (t *Torrent) pieceLength(piece int) pp.Integer {
        if t.info.PieceLength == 0 {
                // There will be no variance amongst pieces. Only pain.
@@ -1077,21 +1056,14 @@ func (t *Torrent) maxHalfOpen() int {
 
 func (t *Torrent) openNewConns() {
        defer t.updateWantPeersEvent()
-       for len(t.peers) != 0 {
+       for t.peers.Len() != 0 {
                if !t.wantConns() {
                        return
                }
                if len(t.halfOpen) >= t.maxHalfOpen() {
                        return
                }
-               var (
-                       k peersKey
-                       p Peer
-               )
-               for k, p = range t.peers {
-                       break
-               }
-               delete(t.peers, k)
+               p := t.peers.PopMax()
                t.initiateConn(p)
        }
 }
@@ -1272,7 +1244,7 @@ func (t *Torrent) wantPeers() bool {
        if t.closed.IsSet() {
                return false
        }
-       if len(t.peers) > t.cl.config.TorrentPeersLowWater {
+       if t.peers.Len() > t.cl.config.TorrentPeersLowWater {
                return false
        }
        return t.needData() || t.seeding()
@@ -1395,7 +1367,7 @@ func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) {
                        }
                        cl.mu.Lock()
                        t.addPeers(addPeers)
-                       numPeers := len(t.peers)
+                       numPeers := t.peers.Len()
                        cl.mu.Unlock()
                        if numPeers >= cl.config.TorrentPeersHighWater {
                                return
@@ -1458,7 +1430,7 @@ func (t *Torrent) Stats() TorrentStats {
 func (t *Torrent) statsLocked() TorrentStats {
        t.stats.ActivePeers = len(t.conns)
        t.stats.HalfOpenPeers = len(t.halfOpen)
-       t.stats.PendingPeers = len(t.peers)
+       t.stats.PendingPeers = t.peers.Len()
        t.stats.TotalPeers = t.numTotalPeers()
        t.stats.ConnectedSeeders = 0
        for c := range t.conns {
@@ -1483,9 +1455,9 @@ func (t *Torrent) numTotalPeers() int {
        for addr := range t.halfOpen {
                peers[addr] = struct{}{}
        }
-       for _, peer := range t.peers {
+       t.peers.Each(func(peer Peer) {
                peers[fmt.Sprintf("%s:%d", peer.IP, peer.Port)] = struct{}{}
-       }
+       })
        return len(peers)
 }