]> Sergey Matveev's repositories - btrtrc.git/blob - pex.go
PEX: use new NodeAddr search methods in krpc
[btrtrc.git] / pex.go
1 package torrent
2
3 import (
4         "net"
5         "sync"
6         "time"
7
8         "github.com/anacrolix/dht/v2/krpc"
9         pp "github.com/anacrolix/torrent/peer_protocol"
10 )
11
12 type pexEventType int
13
14 const (
15         pexAdd pexEventType = iota
16         pexDrop
17 )
18
19 // internal, based on BEP11
20 const (
21         pexTargAdded = 25 // put drops on hold when the number of alive connections is lower than this
22         pexMaxHold   = 25 // length of the drop hold-back buffer
23         pexMaxDelta  = 50 // upper bound on added+added6 and dropped+dropped6 in a single PEX message
24 )
25
26 // represents a single connection (t=pexAdd) or disconnection (t=pexDrop) event
27 type pexEvent struct {
28         t    pexEventType
29         addr PeerRemoteAddr
30         f    pp.PexPeerFlags
31         next *pexEvent // event feed list
32 }
33
34 // facilitates efficient de-duplication while generating PEX messages
35 type pexMsgFactory struct {
36         msg     pp.PexMsg
37         added   map[addrKey]struct{}
38         dropped map[addrKey]struct{}
39 }
40
41 func (me *pexMsgFactory) DeltaLen() int {
42         return int(max(
43                 int64(len(me.added)),
44                 int64(len(me.dropped))))
45 }
46
47 type addrKey string
48
49 // Returns the key to use to identify a given addr in the factory.
50 func (me *pexMsgFactory) addrKey(addr PeerRemoteAddr) addrKey {
51         return addrKey(addr.String())
52 }
53
54 // Returns whether the entry was added (we can check if we're cancelling out another entry and so
55 // won't hit the limit consuming this event).
56 func (me *pexMsgFactory) add(e pexEvent) {
57         key := me.addrKey(e.addr)
58         if _, ok := me.added[key]; ok {
59                 return
60         }
61         if me.added == nil {
62                 me.added = make(map[addrKey]struct{}, pexMaxDelta)
63         }
64         addr, ok := nodeAddr(e.addr)
65         if !ok {
66                 return
67         }
68         m := &me.msg
69         switch {
70         case addr.IP.To4() != nil:
71                 if _, ok := me.dropped[key]; ok {
72                         if i := m.Dropped.Index(addr); i >= 0 {
73                                 m.Dropped = append(m.Dropped[:i], m.Dropped[i+1:]...)
74                         }
75                         delete(me.dropped, key)
76                         return
77                 }
78                 m.Added = append(m.Added, addr)
79                 m.AddedFlags = append(m.AddedFlags, e.f)
80         case len(addr.IP) == net.IPv6len:
81                 if _, ok := me.dropped[key]; ok {
82                         if i := m.Dropped6.Index(addr); i >= 0 {
83                                 m.Dropped6 = append(m.Dropped6[:i], m.Dropped6[i+1:]...)
84                         }
85                         delete(me.dropped, key)
86                         return
87                 }
88                 m.Added6 = append(m.Added6, addr)
89                 m.Added6Flags = append(m.Added6Flags, e.f)
90         default:
91                 panic(addr)
92         }
93         me.added[key] = struct{}{}
94 }
95
96 // Returns whether the entry was added (we can check if we're cancelling out another entry and so
97 // won't hit the limit consuming this event).
98 func (me *pexMsgFactory) drop(e pexEvent) {
99         addr, ok := nodeAddr(e.addr)
100         if !ok {
101                 return
102         }
103         key := me.addrKey(e.addr)
104         if me.dropped == nil {
105                 me.dropped = make(map[addrKey]struct{}, pexMaxDelta)
106         }
107         if _, ok := me.dropped[key]; ok {
108                 return
109         }
110         m := &me.msg
111         switch {
112         case addr.IP.To4() != nil:
113                 if _, ok := me.added[key]; ok {
114                         if i := m.Added.Index(addr); i >= 0 {
115                                 m.Added = append(m.Added[:i], m.Added[i+1:]...)
116                                 m.AddedFlags = append(m.AddedFlags[:i], m.AddedFlags[i+1:]...)
117                         }
118                         delete(me.added, key)
119                         return
120                 }
121                 m.Dropped = append(m.Dropped, addr)
122         case len(addr.IP) == net.IPv6len:
123                 if _, ok := me.added[key]; ok {
124                         if i := m.Added6.Index(addr); i >= 0 {
125                                 m.Added6 = append(m.Added6[:i], m.Added6[i+1:]...)
126                                 m.Added6Flags = append(m.Added6Flags[:i], m.Added6Flags[i+1:]...)
127                         }
128                         delete(me.added, key)
129                         return
130                 }
131                 m.Dropped6 = append(m.Dropped6, addr)
132         }
133         me.dropped[key] = struct{}{}
134 }
135
136 func (me *pexMsgFactory) append(event pexEvent) {
137         switch event.t {
138         case pexAdd:
139                 me.add(event)
140         case pexDrop:
141                 me.drop(event)
142         default:
143                 panic(event.t)
144         }
145 }
146
147 func (me *pexMsgFactory) PexMsg() pp.PexMsg {
148         return me.msg
149 }
150
151 // Convert an arbitrary torrent peer Addr into one that can be represented by the compact addr
152 // format.
153 func nodeAddr(addr PeerRemoteAddr) (krpc.NodeAddr, bool) {
154         ipport, _ := tryIpPortFromNetAddr(addr)
155         ok := ipport.IP != nil
156         return krpc.NodeAddr{IP: shortestIP(ipport.IP), Port: ipport.Port}, ok
157 }
158
159 // mainly for the krpc marshallers
160 func shortestIP(ip net.IP) net.IP {
161         if ip4 := ip.To4(); ip4 != nil {
162                 return ip4
163         }
164         return ip
165 }
166
167 // Per-torrent PEX state
168 type pexState struct {
169         sync.RWMutex
170         tail *pexEvent     // event feed list
171         hold []pexEvent    // delayed drops
172         rest time.Time     // cooldown deadline on inbound
173         nc   int           // net number of alive conns
174         msg0 pexMsgFactory // initial message
175 }
176
177 // Reset wipes the state clean, releasing resources. Called from Torrent.Close().
178 func (s *pexState) Reset() {
179         s.Lock()
180         defer s.Unlock()
181         s.tail = nil
182         s.hold = nil
183         s.nc = 0
184         s.rest = time.Time{}
185         s.msg0 = pexMsgFactory{}
186 }
187
188 func (s *pexState) append(e *pexEvent) {
189         if s.tail != nil {
190                 s.tail.next = e
191         }
192         s.tail = e
193         s.msg0.append(*e)
194 }
195
196 func (s *pexState) Add(c *PeerConn) {
197         s.Lock()
198         defer s.Unlock()
199         s.nc++
200         if s.nc >= pexTargAdded {
201                 for _, e := range s.hold {
202                         ne := e
203                         s.append(&ne)
204                 }
205                 s.hold = s.hold[:0]
206         }
207         e := c.pexEvent(pexAdd)
208         c.pex.Listed = true
209         s.append(&e)
210 }
211
212 func (s *pexState) Drop(c *PeerConn) {
213         if !c.pex.Listed {
214                 // skip connections which were not previously Added
215                 return
216         }
217         s.Lock()
218         defer s.Unlock()
219         e := c.pexEvent(pexDrop)
220         s.nc--
221         if s.nc < pexTargAdded && len(s.hold) < pexMaxHold {
222                 s.hold = append(s.hold, e)
223         } else {
224                 s.append(&e)
225         }
226 }
227
228 // Generate a PEX message based on the event feed.
229 // Also returns a pointer to pass to the subsequent calls
230 // to produce incremental deltas.
231 func (s *pexState) Genmsg(start *pexEvent) (pp.PexMsg, *pexEvent) {
232         s.RLock()
233         defer s.RUnlock()
234         if start == nil {
235                 return s.msg0.PexMsg(), s.tail
236         }
237         var msg pexMsgFactory
238         last := start
239         for e := start.next; e != nil; e = e.next {
240                 if msg.DeltaLen() >= pexMaxDelta {
241                         break
242                 }
243                 msg.append(*e)
244                 last = e
245         }
246         return msg.PexMsg(), last
247 }