]> Sergey Matveev's repositories - btrtrc.git/blob - pex.go
Fix addrPortOrZero for unix sockets on Windows
[btrtrc.git] / pex.go
1 package torrent
2
3 import (
4         "net"
5         "net/netip"
6         "sync"
7
8         pp "github.com/anacrolix/torrent/peer_protocol"
9 )
10
11 type pexEventType int
12
13 const (
14         pexAdd pexEventType = iota
15         pexDrop
16 )
17
18 // internal, based on BEP11
19 const (
20         pexTargAdded = 25 // put drops on hold when the number of alive connections is lower than this
21         pexMaxHold   = 25 // length of the drop hold-back buffer
22         pexMaxDelta  = 50 // upper bound on added+added6 and dropped+dropped6 in a single PEX message
23 )
24
25 // represents a single connection (t=pexAdd) or disconnection (t=pexDrop) event
26 type pexEvent struct {
27         t    pexEventType
28         addr netip.AddrPort
29         f    pp.PexPeerFlags
30         next *pexEvent // event feed list
31 }
32
33 // facilitates efficient de-duplication while generating PEX messages
34 type pexMsgFactory struct {
35         msg     pp.PexMsg
36         added   map[netip.AddrPort]struct{}
37         dropped map[netip.AddrPort]struct{}
38 }
39
40 func (me *pexMsgFactory) DeltaLen() int {
41         return int(max(
42                 int64(len(me.added)),
43                 int64(len(me.dropped))))
44 }
45
46 type addrKey = netip.AddrPort
47
48 // Returns the key to use to identify a given addr in the factory.
49 func (me *pexMsgFactory) addrKey(addr netip.AddrPort) addrKey {
50         return addr
51 }
52
53 // Returns whether the entry was added (we can check if we're cancelling out another entry and so
54 // won't hit the limit consuming this event).
55 func (me *pexMsgFactory) add(e pexEvent) {
56         key := me.addrKey(e.addr)
57         if _, ok := me.added[key]; ok {
58                 return
59         }
60         if me.added == nil {
61                 me.added = make(map[addrKey]struct{}, pexMaxDelta)
62         }
63         addr := krpcNodeAddrFromAddrPort(e.addr)
64         m := &me.msg
65         switch {
66         case addr.IP.To4() != nil:
67                 if _, ok := me.dropped[key]; ok {
68                         if i := m.Dropped.Index(addr); i >= 0 {
69                                 m.Dropped = append(m.Dropped[:i], m.Dropped[i+1:]...)
70                         }
71                         delete(me.dropped, key)
72                         return
73                 }
74                 m.Added = append(m.Added, addr)
75                 m.AddedFlags = append(m.AddedFlags, e.f)
76         case len(addr.IP) == net.IPv6len:
77                 if _, ok := me.dropped[key]; ok {
78                         if i := m.Dropped6.Index(addr); i >= 0 {
79                                 m.Dropped6 = append(m.Dropped6[:i], m.Dropped6[i+1:]...)
80                         }
81                         delete(me.dropped, key)
82                         return
83                 }
84                 m.Added6 = append(m.Added6, addr)
85                 m.Added6Flags = append(m.Added6Flags, e.f)
86         default:
87                 panic(addr)
88         }
89         me.added[key] = struct{}{}
90 }
91
92 // Returns whether the entry was added (we can check if we're cancelling out another entry and so
93 // won't hit the limit consuming this event).
94 func (me *pexMsgFactory) drop(e pexEvent) {
95         addr := krpcNodeAddrFromAddrPort(e.addr)
96         key := me.addrKey(e.addr)
97         if me.dropped == nil {
98                 me.dropped = make(map[addrKey]struct{}, pexMaxDelta)
99         }
100         if _, ok := me.dropped[key]; ok {
101                 return
102         }
103         m := &me.msg
104         switch {
105         case addr.IP.To4() != nil:
106                 if _, ok := me.added[key]; ok {
107                         if i := m.Added.Index(addr); i >= 0 {
108                                 m.Added = append(m.Added[:i], m.Added[i+1:]...)
109                                 m.AddedFlags = append(m.AddedFlags[:i], m.AddedFlags[i+1:]...)
110                         }
111                         delete(me.added, key)
112                         return
113                 }
114                 m.Dropped = append(m.Dropped, addr)
115         case len(addr.IP) == net.IPv6len:
116                 if _, ok := me.added[key]; ok {
117                         if i := m.Added6.Index(addr); i >= 0 {
118                                 m.Added6 = append(m.Added6[:i], m.Added6[i+1:]...)
119                                 m.Added6Flags = append(m.Added6Flags[:i], m.Added6Flags[i+1:]...)
120                         }
121                         delete(me.added, key)
122                         return
123                 }
124                 m.Dropped6 = append(m.Dropped6, addr)
125         }
126         me.dropped[key] = struct{}{}
127 }
128
129 func (me *pexMsgFactory) append(event pexEvent) {
130         switch event.t {
131         case pexAdd:
132                 me.add(event)
133         case pexDrop:
134                 me.drop(event)
135         default:
136                 panic(event.t)
137         }
138 }
139
140 func (me *pexMsgFactory) PexMsg() *pp.PexMsg {
141         return &me.msg
142 }
143
144 // Per-torrent PEX state
145 type pexState struct {
146         sync.RWMutex
147         tail *pexEvent     // event feed list
148         hold []pexEvent    // delayed drops
149         nc   int           // net number of alive conns
150         msg0 pexMsgFactory // initial message
151 }
152
153 // Reset wipes the state clean, releasing resources. Called from Torrent.Close().
154 func (s *pexState) Reset() {
155         s.Lock()
156         defer s.Unlock()
157         s.tail = nil
158         s.hold = nil
159         s.nc = 0
160         s.msg0 = pexMsgFactory{}
161 }
162
163 func (s *pexState) append(e *pexEvent) {
164         if s.tail != nil {
165                 s.tail.next = e
166         }
167         s.tail = e
168         s.msg0.append(*e)
169 }
170
171 func (s *pexState) Add(c *PeerConn) {
172         e, err := c.pexEvent(pexAdd)
173         if err != nil {
174                 return
175         }
176         s.Lock()
177         defer s.Unlock()
178         s.nc++
179         if s.nc >= pexTargAdded {
180                 for _, e := range s.hold {
181                         ne := e
182                         s.append(&ne)
183                 }
184                 s.hold = s.hold[:0]
185         }
186         c.pex.Listed = true
187         s.append(&e)
188 }
189
190 func (s *pexState) Drop(c *PeerConn) {
191         if !c.pex.Listed {
192                 // skip connections which were not previously Added
193                 return
194         }
195         e, err := c.pexEvent(pexDrop)
196         if err != nil {
197                 return
198         }
199         s.Lock()
200         defer s.Unlock()
201         s.nc--
202         if s.nc < pexTargAdded && len(s.hold) < pexMaxHold {
203                 s.hold = append(s.hold, e)
204         } else {
205                 s.append(&e)
206         }
207 }
208
209 // Generate a PEX message based on the event feed.
210 // Also returns a pointer to pass to the subsequent calls
211 // to produce incremental deltas.
212 func (s *pexState) Genmsg(start *pexEvent) (pp.PexMsg, *pexEvent) {
213         s.RLock()
214         defer s.RUnlock()
215         if start == nil {
216                 return *s.msg0.PexMsg(), s.tail
217         }
218         var msg pexMsgFactory
219         last := start
220         for e := start.next; e != nil; e = e.next {
221                 if msg.DeltaLen() >= pexMaxDelta {
222                         break
223                 }
224                 msg.append(*e)
225                 last = e
226         }
227         return *msg.PexMsg(), last
228 }
229
230 // The same as Genmsg but just counts up the distinct events that haven't been sent.
231 func (s *pexState) numPending(start *pexEvent) (num int) {
232         s.RLock()
233         defer s.RUnlock()
234         if start == nil {
235                 return s.msg0.PexMsg().Len()
236         }
237         for e := start.next; e != nil; e = e.next {
238                 num++
239         }
240         return
241 }