6 "github.com/anacrolix/dht/v2/krpc"
7 pp "github.com/anacrolix/torrent/peer_protocol"
13 pexAdd pexEventType = iota
17 // internal, based on BEP11
19 pexTargAdded = 25 // put drops on hold when the number of alive connections is lower than this
20 pexMaxHold = 25 // length of the drop hold-back buffer
21 pexMaxDelta = 50 // upper bound on added+added6 and dropped+dropped6 in a single PEX message
24 // represents a single connection (t=pexAdd) or disconnection (t=pexDrop) event
25 type pexEvent struct {
31 // facilitates efficient de-duplication while generating PEX messages
32 type pexMsgFactory struct {
33 added map[addrKey]pexEvent
34 dropped map[addrKey]pexEvent
37 func (me *pexMsgFactory) DeltaLen() int {
40 int64(len(me.dropped))))
45 // Returns the key to use to identify a given addr in the factory.
46 func (me *pexMsgFactory) addrKey(addr net.Addr) addrKey {
47 return addrKey(addr.String())
50 // Returns whether the entry was added (we can check if we're cancelling out another entry and so
51 // won't hit the limit consuming this event).
52 func (me *pexMsgFactory) add(e pexEvent) {
53 key := me.addrKey(e.addr)
54 if _, ok := me.dropped[key]; ok {
55 delete(me.dropped, key)
59 me.added = make(map[addrKey]pexEvent, pexMaxDelta)
64 // Returns whether the entry was added (we can check if we're cancelling out another entry and so
65 // won't hit the limit consuming this event).
66 func (me *pexMsgFactory) drop(e pexEvent) {
67 key := me.addrKey(e.addr)
68 if _, ok := me.added[key]; ok {
72 if me.dropped == nil {
73 me.dropped = make(map[addrKey]pexEvent, pexMaxDelta)
78 func (me *pexMsgFactory) addEvent(event pexEvent) {
89 func (me *pexMsgFactory) PexMsg() (ret pp.PexMsg) {
90 for key, added := range me.added {
91 addr, ok := nodeAddr(added.addr)
97 ret.Added = append(ret.Added, addr)
98 ret.AddedFlags = append(ret.AddedFlags, added.f)
100 ret.Added6 = append(ret.Added6, addr)
101 ret.Added6Flags = append(ret.Added6Flags, added.f)
106 for key, dropped := range me.dropped {
107 addr, ok := nodeAddr(dropped.addr)
111 switch len(addr.IP) {
113 ret.Dropped = append(ret.Dropped, addr)
115 ret.Dropped6 = append(ret.Dropped6, addr)
123 // Convert an arbitrary torrent peer Addr into one that can be represented by the compact addr
125 func nodeAddr(addr net.Addr) (_ krpc.NodeAddr, ok bool) {
126 ipport, ok := tryIpPortFromNetAddr(addr)
130 return krpc.NodeAddr{IP: shortestIP(ipport.IP), Port: ipport.Port}, true
133 // mainly for the krpc marshallers
134 func shortestIP(ip net.IP) net.IP {
135 if ip4 := ip.To4(); ip4 != nil {
141 // Per-torrent PEX state
142 type pexState struct {
143 ev []pexEvent // event feed, append-only
144 hold []pexEvent // delayed drops
145 nc int // net number of alive conns
148 func (s *pexState) Reset() {
154 func (s *pexState) Add(c *PeerConn) {
156 if s.nc >= pexTargAdded {
157 s.ev = append(s.ev, s.hold...)
160 e := c.pexEvent(pexAdd)
161 s.ev = append(s.ev, e)
165 func (s *pexState) Drop(c *PeerConn) {
167 // skip connections which were not previously Added
170 e := c.pexEvent(pexDrop)
172 if s.nc < pexTargAdded && len(s.hold) < pexMaxHold {
173 s.hold = append(s.hold, e)
175 s.ev = append(s.ev, e)
179 // Generate a PEX message based on the event feed. Also returns an index to pass to the subsequent
180 // calls, producing incremental deltas.
181 func (s *pexState) Genmsg(start int) (pp.PexMsg, int) {
182 var factory pexMsgFactory
184 for _, e := range s.ev[start:] {
185 if start > 0 && factory.DeltaLen() >= pexMaxDelta {
191 return factory.PexMsg(), n