]> Sergey Matveev's repositories - btrtrc.git/blob - pex.go
Attempt holepunch after initial dial fails
[btrtrc.git] / pex.go
1 package torrent
2
3 import (
4         "net"
5         "sync"
6
7         "github.com/anacrolix/dht/v2/krpc"
8
9         pp "github.com/anacrolix/torrent/peer_protocol"
10 )
11
12 type pexEventType int
13
14 const (
15         pexAdd pexEventType = iota
16         pexDrop
17 )
18
19 // internal, based on BEP11
20 const (
21         pexTargAdded = 25 // put drops on hold when the number of alive connections is lower than this
22         pexMaxHold   = 25 // length of the drop hold-back buffer
23         pexMaxDelta  = 50 // upper bound on added+added6 and dropped+dropped6 in a single PEX message
24 )
25
26 // represents a single connection (t=pexAdd) or disconnection (t=pexDrop) event
27 type pexEvent struct {
28         t    pexEventType
29         addr PeerRemoteAddr
30         f    pp.PexPeerFlags
31         next *pexEvent // event feed list
32 }
33
34 // facilitates efficient de-duplication while generating PEX messages
35 type pexMsgFactory struct {
36         msg     pp.PexMsg
37         added   map[addrKey]struct{}
38         dropped map[addrKey]struct{}
39 }
40
41 func (me *pexMsgFactory) DeltaLen() int {
42         return int(max(
43                 int64(len(me.added)),
44                 int64(len(me.dropped))))
45 }
46
47 type addrKey string
48
49 // Returns the key to use to identify a given addr in the factory.
50 func (me *pexMsgFactory) addrKey(addr PeerRemoteAddr) addrKey {
51         return addrKey(addr.String())
52 }
53
54 // Returns whether the entry was added (we can check if we're cancelling out another entry and so
55 // won't hit the limit consuming this event).
56 func (me *pexMsgFactory) add(e pexEvent) {
57         key := me.addrKey(e.addr)
58         if _, ok := me.added[key]; ok {
59                 return
60         }
61         if me.added == nil {
62                 me.added = make(map[addrKey]struct{}, pexMaxDelta)
63         }
64         addr, ok := nodeAddr(e.addr)
65         if !ok {
66                 return
67         }
68         m := &me.msg
69         switch {
70         case addr.IP.To4() != nil:
71                 if _, ok := me.dropped[key]; ok {
72                         if i := m.Dropped.Index(addr); i >= 0 {
73                                 m.Dropped = append(m.Dropped[:i], m.Dropped[i+1:]...)
74                         }
75                         delete(me.dropped, key)
76                         return
77                 }
78                 m.Added = append(m.Added, addr)
79                 m.AddedFlags = append(m.AddedFlags, e.f)
80         case len(addr.IP) == net.IPv6len:
81                 if _, ok := me.dropped[key]; ok {
82                         if i := m.Dropped6.Index(addr); i >= 0 {
83                                 m.Dropped6 = append(m.Dropped6[:i], m.Dropped6[i+1:]...)
84                         }
85                         delete(me.dropped, key)
86                         return
87                 }
88                 m.Added6 = append(m.Added6, addr)
89                 m.Added6Flags = append(m.Added6Flags, e.f)
90         default:
91                 panic(addr)
92         }
93         me.added[key] = struct{}{}
94 }
95
96 // Returns whether the entry was added (we can check if we're cancelling out another entry and so
97 // won't hit the limit consuming this event).
98 func (me *pexMsgFactory) drop(e pexEvent) {
99         addr, ok := nodeAddr(e.addr)
100         if !ok {
101                 return
102         }
103         key := me.addrKey(e.addr)
104         if me.dropped == nil {
105                 me.dropped = make(map[addrKey]struct{}, pexMaxDelta)
106         }
107         if _, ok := me.dropped[key]; ok {
108                 return
109         }
110         m := &me.msg
111         switch {
112         case addr.IP.To4() != nil:
113                 if _, ok := me.added[key]; ok {
114                         if i := m.Added.Index(addr); i >= 0 {
115                                 m.Added = append(m.Added[:i], m.Added[i+1:]...)
116                                 m.AddedFlags = append(m.AddedFlags[:i], m.AddedFlags[i+1:]...)
117                         }
118                         delete(me.added, key)
119                         return
120                 }
121                 m.Dropped = append(m.Dropped, addr)
122         case len(addr.IP) == net.IPv6len:
123                 if _, ok := me.added[key]; ok {
124                         if i := m.Added6.Index(addr); i >= 0 {
125                                 m.Added6 = append(m.Added6[:i], m.Added6[i+1:]...)
126                                 m.Added6Flags = append(m.Added6Flags[:i], m.Added6Flags[i+1:]...)
127                         }
128                         delete(me.added, key)
129                         return
130                 }
131                 m.Dropped6 = append(m.Dropped6, addr)
132         }
133         me.dropped[key] = struct{}{}
134 }
135
136 func (me *pexMsgFactory) append(event pexEvent) {
137         switch event.t {
138         case pexAdd:
139                 me.add(event)
140         case pexDrop:
141                 me.drop(event)
142         default:
143                 panic(event.t)
144         }
145 }
146
147 func (me *pexMsgFactory) PexMsg() *pp.PexMsg {
148         return &me.msg
149 }
150
151 // Convert an arbitrary torrent peer Addr into one that can be represented by the compact addr
152 // format.
153 func nodeAddr(addr PeerRemoteAddr) (krpc.NodeAddr, bool) {
154         ipport, _ := tryIpPortFromNetAddr(addr)
155         ok := ipport.IP != nil
156         return krpc.NodeAddr{IP: ipport.IP, Port: ipport.Port}, ok
157 }
158
159 // Per-torrent PEX state
160 type pexState struct {
161         sync.RWMutex
162         tail *pexEvent     // event feed list
163         hold []pexEvent    // delayed drops
164         nc   int           // net number of alive conns
165         msg0 pexMsgFactory // initial message
166 }
167
168 // Reset wipes the state clean, releasing resources. Called from Torrent.Close().
169 func (s *pexState) Reset() {
170         s.Lock()
171         defer s.Unlock()
172         s.tail = nil
173         s.hold = nil
174         s.nc = 0
175         s.msg0 = pexMsgFactory{}
176 }
177
178 func (s *pexState) append(e *pexEvent) {
179         if s.tail != nil {
180                 s.tail.next = e
181         }
182         s.tail = e
183         s.msg0.append(*e)
184 }
185
186 func (s *pexState) Add(c *PeerConn) {
187         s.Lock()
188         defer s.Unlock()
189         s.nc++
190         if s.nc >= pexTargAdded {
191                 for _, e := range s.hold {
192                         ne := e
193                         s.append(&ne)
194                 }
195                 s.hold = s.hold[:0]
196         }
197         e := c.pexEvent(pexAdd)
198         c.pex.Listed = true
199         s.append(&e)
200 }
201
202 func (s *pexState) Drop(c *PeerConn) {
203         if !c.pex.Listed {
204                 // skip connections which were not previously Added
205                 return
206         }
207         s.Lock()
208         defer s.Unlock()
209         e := c.pexEvent(pexDrop)
210         s.nc--
211         if s.nc < pexTargAdded && len(s.hold) < pexMaxHold {
212                 s.hold = append(s.hold, e)
213         } else {
214                 s.append(&e)
215         }
216 }
217
218 // Generate a PEX message based on the event feed.
219 // Also returns a pointer to pass to the subsequent calls
220 // to produce incremental deltas.
221 func (s *pexState) Genmsg(start *pexEvent) (pp.PexMsg, *pexEvent) {
222         s.RLock()
223         defer s.RUnlock()
224         if start == nil {
225                 return *s.msg0.PexMsg(), s.tail
226         }
227         var msg pexMsgFactory
228         last := start
229         for e := start.next; e != nil; e = e.next {
230                 if msg.DeltaLen() >= pexMaxDelta {
231                         break
232                 }
233                 msg.append(*e)
234                 last = e
235         }
236         return *msg.PexMsg(), last
237 }
238
239 // The same as Genmsg but just counts up the distinct events that haven't been sent.
240 func (s *pexState) numPending(start *pexEvent) (num int) {
241         s.RLock()
242         defer s.RUnlock()
243         if start == nil {
244                 return s.msg0.PexMsg().Len()
245         }
246         for e := start.next; e != nil; e = e.next {
247                 num++
248         }
249         return
250 }