--- /dev/null
+package torrent
+
+import (
+ "net"
+
+ "github.com/anacrolix/dht/krpc"
+)
+
+type Peer struct {
+ Id [20]byte
+ IP net.IP
+ Port int
+ Source peerSource
+ // Peer is known to support encryption.
+ SupportsEncryption bool
+ pexPeerFlags
+}
+
+func (me *Peer) FromPex(na krpc.NodeAddr, fs pexPeerFlags) {
+ me.IP = append([]byte(nil), na.IP...)
+ me.Port = na.Port
+ me.Source = peerSourcePEX
+ // If they prefer encryption, they must support it.
+ if fs.Get(pexPrefersEncryption) {
+ me.SupportsEncryption = true
+ }
+ me.pexPeerFlags = fs
+}
+
+func (me Peer) addr() ipPort {
+ return ipPort{me.IP, uint16(me.Port)}
+}
"github.com/anacrolix/missinggo/slices"
"github.com/anacrolix/sync"
"github.com/dustin/go-humanize"
+ "github.com/google/btree"
"golang.org/x/time/rate"
"github.com/anacrolix/torrent/bencode"
t = &Torrent{
cl: cl,
infoHash: ih,
- peers: make(map[peersKey]Peer),
- conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
+ peers: prioritizedPeers{
+ om: btree.New(2),
+ getPrio: func(p Peer) peerPriority {
+ return bep40Priority(cl.publicAddr(p.IP), p.addr())
+ },
+ },
+ conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
halfOpen: make(map[string]Peer),
pieceStateChanges: pubsub.NewPubSub(),
Source: peerSourceDHTAnnouncePeer,
}})
}
+
+func firstNotNil(ips ...net.IP) net.IP {
+ for _, ip := range ips {
+ if ip != nil {
+ return ip
+ }
+ }
+ return nil
+}
+
+func (cl *Client) publicIp(peer net.IP) net.IP {
+ // TODO: Use BEP 10 to determine how peers are seeing us.
+ if peer.To4() != nil {
+ return firstNotNil(cl.config.PublicIp4, missinggo.AddrIP(cl.ListenAddr()).To4())
+ } else {
+ return firstNotNil(cl.config.PublicIp6, missinggo.AddrIP(cl.ListenAddr()).To16())
+ }
+}
+
+// Our IP as a peer should see it.
+func (cl *Client) publicAddr(peer net.IP) ipPort {
+ return ipPort{cl.publicIp(peer), uint16(cl.incomingPeerPort())}
+}
--- /dev/null
+package torrent
+
+import "github.com/google/btree"
+
+// Peers are stored with their priority at insertion. Their priority may
+// change if our apparent IP changes, we don't currently handle that.
+type prioritizedPeersItem struct {
+ prio peerPriority
+ p Peer
+}
+
+func (me prioritizedPeersItem) Less(than btree.Item) bool {
+ return me.prio < than.(prioritizedPeersItem).prio
+}
+
+type prioritizedPeers struct {
+ om *btree.BTree
+ getPrio func(Peer) peerPriority
+}
+
+func (me *prioritizedPeers) Each(f func(Peer)) {
+ me.om.Ascend(func(i btree.Item) bool {
+ f(i.(prioritizedPeersItem).p)
+ return true
+ })
+}
+
+func (me *prioritizedPeers) Len() int {
+ return me.om.Len()
+}
+
+// Returns true if a peer is replaced.
+func (me *prioritizedPeers) Add(p Peer) bool {
+ return me.om.ReplaceOrInsert(prioritizedPeersItem{me.getPrio(p), p}) != nil
+}
+
+func (me *prioritizedPeers) DeleteMin() {
+ me.om.DeleteMin()
+}
+
+func (me *prioritizedPeers) PopMax() Peer {
+ return me.om.DeleteMax().(prioritizedPeersItem).p
+}
"time"
"github.com/anacrolix/dht"
- "github.com/anacrolix/dht/krpc"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo"
"github.com/anacrolix/missinggo/bitmap"
// active connections if were told about the peer after connecting with
// them. That encourages us to reconnect to peers that are well known in
// the swarm.
- peers map[peersKey]Peer
+ peers prioritizedPeers
wantPeersEvent missinggo.Event
// An announcer for each tracker URL.
trackerAnnouncers map[string]*trackerScraper
// pending, and half-open peers.
func (t *Torrent) KnownSwarm() (ks []Peer) {
// Add pending peers to the list
- for _, peer := range t.peers {
+ t.peers.Each(func(peer Peer) {
ks = append(ks, peer)
- }
+ })
// Add half-open peers to the list
for _, peer := range t.halfOpen {
torrent.Add("peers not added because of bad addr", 1)
return
}
- t.openNewConns()
- if len(t.peers) >= cl.config.TorrentPeersHighWater {
- return
+ if t.peers.Add(p) {
+ torrent.Add("peers replaced", 1)
}
- t.peers[peersKey{string(p.IP), p.Port}] = p
t.openNewConns()
-
+ for t.peers.Len() > cl.config.TorrentPeersHighWater {
+ t.peers.DeleteMin()
+ torrent.Add("peers discarded", 1)
+ }
}
func (t *Torrent) invalidateMetadata() {
t.pieces[pieceIndex].dirtyChunks.Clear()
}
-type Peer struct {
- Id [20]byte
- IP net.IP
- Port int
- Source peerSource
- // Peer is known to support encryption.
- SupportsEncryption bool
- pexPeerFlags
-}
-
-func (me *Peer) FromPex(na krpc.NodeAddr, fs pexPeerFlags) {
- me.IP = append([]byte(nil), na.IP...)
- me.Port = na.Port
- me.Source = peerSourcePEX
- // If they prefer encryption, they must support it.
- if fs.Get(pexPrefersEncryption) {
- me.SupportsEncryption = true
- }
- me.pexPeerFlags = fs
-}
-
func (t *Torrent) pieceLength(piece int) pp.Integer {
if t.info.PieceLength == 0 {
// There will be no variance amongst pieces. Only pain.
func (t *Torrent) openNewConns() {
defer t.updateWantPeersEvent()
- for len(t.peers) != 0 {
+ for t.peers.Len() != 0 {
if !t.wantConns() {
return
}
if len(t.halfOpen) >= t.maxHalfOpen() {
return
}
- var (
- k peersKey
- p Peer
- )
- for k, p = range t.peers {
- break
- }
- delete(t.peers, k)
+ p := t.peers.PopMax()
t.initiateConn(p)
}
}
if t.closed.IsSet() {
return false
}
- if len(t.peers) > t.cl.config.TorrentPeersLowWater {
+ if t.peers.Len() > t.cl.config.TorrentPeersLowWater {
return false
}
return t.needData() || t.seeding()
}
cl.mu.Lock()
t.addPeers(addPeers)
- numPeers := len(t.peers)
+ numPeers := t.peers.Len()
cl.mu.Unlock()
if numPeers >= cl.config.TorrentPeersHighWater {
return
func (t *Torrent) statsLocked() TorrentStats {
t.stats.ActivePeers = len(t.conns)
t.stats.HalfOpenPeers = len(t.halfOpen)
- t.stats.PendingPeers = len(t.peers)
+ t.stats.PendingPeers = t.peers.Len()
t.stats.TotalPeers = t.numTotalPeers()
t.stats.ConnectedSeeders = 0
for c := range t.conns {
for addr := range t.halfOpen {
peers[addr] = struct{}{}
}
- for _, peer := range t.peers {
+ t.peers.Each(func(peer Peer) {
peers[fmt.Sprintf("%s:%d", peer.IP, peer.Port)] = struct{}{}
- }
+ })
return len(peers)
}