8 pp "github.com/anacrolix/torrent/peer_protocol"
14 pexAdd pexEventType = iota
18 // internal, based on BEP11
20 pexTargAdded = 25 // put drops on hold when the number of alive connections is lower than this
21 pexMaxHold = 25 // length of the drop hold-back buffer
22 pexMaxDelta = 50 // upper bound on added+added6 and dropped+dropped6 in a single PEX message
25 // represents a single connection (t=pexAdd) or disconnection (t=pexDrop) event
26 type pexEvent struct {
30 next *pexEvent // event feed list
33 // facilitates efficient de-duplication while generating PEX messages
34 type pexMsgFactory struct {
36 added map[netip.AddrPort]struct{}
37 dropped map[netip.AddrPort]struct{}
40 func (me *pexMsgFactory) DeltaLen() int {
43 int64(len(me.dropped))))
46 type addrKey = netip.AddrPort
48 // Returns the key to use to identify a given addr in the factory.
49 func (me *pexMsgFactory) addrKey(addr netip.AddrPort) addrKey {
53 // Returns whether the entry was added (we can check if we're cancelling out another entry and so
54 // won't hit the limit consuming this event).
55 func (me *pexMsgFactory) add(e pexEvent) {
56 key := me.addrKey(e.addr)
57 if _, ok := me.added[key]; ok {
61 me.added = make(map[addrKey]struct{}, pexMaxDelta)
63 addr := krpcNodeAddrFromAddrPort(e.addr)
66 case addr.IP.To4() != nil:
67 if _, ok := me.dropped[key]; ok {
68 if i := m.Dropped.Index(addr); i >= 0 {
69 m.Dropped = append(m.Dropped[:i], m.Dropped[i+1:]...)
71 delete(me.dropped, key)
74 m.Added = append(m.Added, addr)
75 m.AddedFlags = append(m.AddedFlags, e.f)
76 case len(addr.IP) == net.IPv6len:
77 if _, ok := me.dropped[key]; ok {
78 if i := m.Dropped6.Index(addr); i >= 0 {
79 m.Dropped6 = append(m.Dropped6[:i], m.Dropped6[i+1:]...)
81 delete(me.dropped, key)
84 m.Added6 = append(m.Added6, addr)
85 m.Added6Flags = append(m.Added6Flags, e.f)
89 me.added[key] = struct{}{}
92 // Returns whether the entry was added (we can check if we're cancelling out another entry and so
93 // won't hit the limit consuming this event).
94 func (me *pexMsgFactory) drop(e pexEvent) {
95 addr := krpcNodeAddrFromAddrPort(e.addr)
96 key := me.addrKey(e.addr)
97 if me.dropped == nil {
98 me.dropped = make(map[addrKey]struct{}, pexMaxDelta)
100 if _, ok := me.dropped[key]; ok {
105 case addr.IP.To4() != nil:
106 if _, ok := me.added[key]; ok {
107 if i := m.Added.Index(addr); i >= 0 {
108 m.Added = append(m.Added[:i], m.Added[i+1:]...)
109 m.AddedFlags = append(m.AddedFlags[:i], m.AddedFlags[i+1:]...)
111 delete(me.added, key)
114 m.Dropped = append(m.Dropped, addr)
115 case len(addr.IP) == net.IPv6len:
116 if _, ok := me.added[key]; ok {
117 if i := m.Added6.Index(addr); i >= 0 {
118 m.Added6 = append(m.Added6[:i], m.Added6[i+1:]...)
119 m.Added6Flags = append(m.Added6Flags[:i], m.Added6Flags[i+1:]...)
121 delete(me.added, key)
124 m.Dropped6 = append(m.Dropped6, addr)
126 me.dropped[key] = struct{}{}
129 func (me *pexMsgFactory) append(event pexEvent) {
140 func (me *pexMsgFactory) PexMsg() *pp.PexMsg {
144 // Per-torrent PEX state
145 type pexState struct {
147 tail *pexEvent // event feed list
148 hold []pexEvent // delayed drops
149 nc int // net number of alive conns
150 msg0 pexMsgFactory // initial message
153 // Reset wipes the state clean, releasing resources. Called from Torrent.Close().
154 func (s *pexState) Reset() {
160 s.msg0 = pexMsgFactory{}
163 func (s *pexState) append(e *pexEvent) {
171 func (s *pexState) Add(c *PeerConn) {
172 e, err := c.pexEvent(pexAdd)
179 if s.nc >= pexTargAdded {
180 for _, e := range s.hold {
190 func (s *pexState) Drop(c *PeerConn) {
192 // skip connections which were not previously Added
195 e, err := c.pexEvent(pexDrop)
202 if s.nc < pexTargAdded && len(s.hold) < pexMaxHold {
203 s.hold = append(s.hold, e)
209 // Generate a PEX message based on the event feed.
210 // Also returns a pointer to pass to the subsequent calls
211 // to produce incremental deltas.
212 func (s *pexState) Genmsg(start *pexEvent) (pp.PexMsg, *pexEvent) {
216 return *s.msg0.PexMsg(), s.tail
218 var msg pexMsgFactory
220 for e := start.next; e != nil; e = e.next {
221 if msg.DeltaLen() >= pexMaxDelta {
227 return *msg.PexMsg(), last
230 // The same as Genmsg but just counts up the distinct events that haven't been sent.
231 func (s *pexState) numPending(start *pexEvent) (num int) {
235 return s.msg0.PexMsg().Len()
237 for e := start.next; e != nil; e = e.next {