]> Sergey Matveev's repositories - btrtrc.git/blob - pex.go
PEX: impede full-meching in tracker-less swarms by adding a cooldown minute
[btrtrc.git] / pex.go
1 package torrent
2
3 import (
4         "net"
5         "sync"
6         "time"
7
8         "github.com/anacrolix/dht/v2/krpc"
9         pp "github.com/anacrolix/torrent/peer_protocol"
10 )
11
12 type pexEventType int
13
14 const (
15         pexAdd pexEventType = iota
16         pexDrop
17 )
18
19 // internal, based on BEP11
20 const (
21         pexTargAdded = 25 // put drops on hold when the number of alive connections is lower than this
22         pexMaxHold   = 25 // length of the drop hold-back buffer
23         pexMaxDelta  = 50 // upper bound on added+added6 and dropped+dropped6 in a single PEX message
24 )
25
26 // represents a single connection (t=pexAdd) or disconnection (t=pexDrop) event
27 type pexEvent struct {
28         t    pexEventType
29         addr PeerRemoteAddr
30         f    pp.PexPeerFlags
31 }
32
33 // facilitates efficient de-duplication while generating PEX messages
34 type pexMsgFactory struct {
35         msg     pp.PexMsg
36         added   map[addrKey]struct{}
37         dropped map[addrKey]struct{}
38 }
39
40 func (me *pexMsgFactory) DeltaLen() int {
41         return int(max(
42                 int64(len(me.added)),
43                 int64(len(me.dropped))))
44 }
45
46 type addrKey string
47
48 // Returns the key to use to identify a given addr in the factory.
49 func (me *pexMsgFactory) addrKey(addr PeerRemoteAddr) addrKey {
50         return addrKey(addr.String())
51 }
52
53 func addrEqual(a, b *krpc.NodeAddr) bool {
54         return a.IP.Equal(b.IP) && a.Port == b.Port
55 }
56
57 func addrIndex(v []krpc.NodeAddr, a *krpc.NodeAddr) int {
58         for i := range v {
59                 if addrEqual(&v[i], a) {
60                         return i
61                 }
62         }
63         return -1
64 }
65
66 // Returns whether the entry was added (we can check if we're cancelling out another entry and so
67 // won't hit the limit consuming this event).
68 func (me *pexMsgFactory) add(e pexEvent) {
69         key := me.addrKey(e.addr)
70         if _, ok := me.added[key]; ok {
71                 return
72         }
73         if me.added == nil {
74                 me.added = make(map[addrKey]struct{}, pexMaxDelta)
75         }
76         addr, ok := nodeAddr(e.addr)
77         if !ok {
78                 return
79         }
80         m := &me.msg
81         switch {
82         case addr.IP.To4() != nil:
83                 if _, ok := me.dropped[key]; ok {
84                         if i := addrIndex(m.Dropped.NodeAddrs(), &addr); i >= 0 {
85                                 m.Dropped = append(m.Dropped[:i], m.Dropped[i+1:]...)
86                         }
87                         delete(me.dropped, key)
88                         return
89                 }
90                 m.Added = append(m.Added, addr)
91                 m.AddedFlags = append(m.AddedFlags, e.f)
92         case len(addr.IP) == net.IPv6len:
93                 if _, ok := me.dropped[key]; ok {
94                         if i := addrIndex(m.Dropped6.NodeAddrs(), &addr); i >= 0 {
95                                 m.Dropped6 = append(m.Dropped6[:i], m.Dropped6[i+1:]...)
96                         }
97                         delete(me.dropped, key)
98                         return
99                 }
100                 m.Added6 = append(m.Added6, addr)
101                 m.Added6Flags = append(m.Added6Flags, e.f)
102         default:
103                 panic(addr)
104         }
105         me.added[key] = struct{}{}
106 }
107
108 // Returns whether the entry was added (we can check if we're cancelling out another entry and so
109 // won't hit the limit consuming this event).
110 func (me *pexMsgFactory) drop(e pexEvent) {
111         addr, ok := nodeAddr(e.addr)
112         if !ok {
113                 return
114         }
115         key := me.addrKey(e.addr)
116         if me.dropped == nil {
117                 me.dropped = make(map[addrKey]struct{}, pexMaxDelta)
118         }
119         if _, ok := me.dropped[key]; ok {
120                 return
121         }
122         m := &me.msg
123         switch {
124         case addr.IP.To4() != nil:
125                 if _, ok := me.added[key]; ok {
126                         if i := addrIndex(m.Added.NodeAddrs(), &addr); i >= 0 {
127                                 m.Added = append(m.Added[:i], m.Added[i+1:]...)
128                                 m.AddedFlags = append(m.AddedFlags[:i], m.AddedFlags[i+1:]...)
129                         }
130                         delete(me.added, key)
131                         return
132                 }
133                 m.Dropped = append(m.Dropped, addr)
134         case len(addr.IP) == net.IPv6len:
135                 if _, ok := me.added[key]; ok {
136                         if i := addrIndex(m.Added6.NodeAddrs(), &addr); i >= 0 {
137                                 m.Added6 = append(m.Added6[:i], m.Added6[i+1:]...)
138                                 m.Added6Flags = append(m.Added6Flags[:i], m.Added6Flags[i+1:]...)
139                         }
140                         delete(me.added, key)
141                         return
142                 }
143                 m.Dropped6 = append(m.Dropped6, addr)
144         }
145         me.dropped[key] = struct{}{}
146 }
147
148 func (me *pexMsgFactory) addEvent(event pexEvent) {
149         switch event.t {
150         case pexAdd:
151                 me.add(event)
152         case pexDrop:
153                 me.drop(event)
154         default:
155                 panic(event.t)
156         }
157 }
158
159 func (me *pexMsgFactory) PexMsg() pp.PexMsg {
160         return me.msg
161 }
162
163 // Convert an arbitrary torrent peer Addr into one that can be represented by the compact addr
164 // format.
165 func nodeAddr(addr PeerRemoteAddr) (krpc.NodeAddr, bool) {
166         ipport, _ := tryIpPortFromNetAddr(addr)
167         ok := ipport.IP != nil
168         return krpc.NodeAddr{IP: shortestIP(ipport.IP), Port: ipport.Port}, ok
169 }
170
171 // mainly for the krpc marshallers
172 func shortestIP(ip net.IP) net.IP {
173         if ip4 := ip.To4(); ip4 != nil {
174                 return ip4
175         }
176         return ip
177 }
178
179 // Per-torrent PEX state
180 type pexState struct {
181         ev        []pexEvent    // event feed, append-only
182         hold      []pexEvent    // delayed drops
183         rest      time.Time     // cooldown deadline on inbound
184         nc        int           // net number of alive conns
185         initCache pexMsgFactory // last generated initial message
186         initSeq   int           // number of events which went into initCache
187         initLock  sync.RWMutex  // serialise access to initCache and initSeq
188 }
189
190 // Reset wipes the state clean, releasing resources. Called from Torrent.Close().
191 func (s *pexState) Reset() {
192         s.ev = nil
193         s.hold = nil
194         s.nc = 0
195         s.rest = time.Time{}
196         s.initLock.Lock()
197         s.initCache = pexMsgFactory{}
198         s.initSeq = 0
199         s.initLock.Unlock()
200 }
201
202 func (s *pexState) Add(c *PeerConn) {
203         s.nc++
204         if s.nc >= pexTargAdded {
205                 s.ev = append(s.ev, s.hold...)
206                 s.hold = s.hold[:0]
207         }
208         e := c.pexEvent(pexAdd)
209         s.ev = append(s.ev, e)
210         c.pex.Listed = true
211 }
212
213 func (s *pexState) Drop(c *PeerConn) {
214         if !c.pex.Listed {
215                 // skip connections which were not previously Added
216                 return
217         }
218         e := c.pexEvent(pexDrop)
219         s.nc--
220         if s.nc < pexTargAdded && len(s.hold) < pexMaxHold {
221                 s.hold = append(s.hold, e)
222         } else {
223                 s.ev = append(s.ev, e)
224         }
225 }
226
227 // Generate a PEX message based on the event feed. Also returns an index to pass to the subsequent
228 // calls, producing incremental deltas.
229 func (s *pexState) Genmsg(start int) (pp.PexMsg, int) {
230         if start == 0 {
231                 return s.genmsg0()
232         }
233
234         var factory pexMsgFactory
235         n := start
236         for _, e := range s.ev[start:] {
237                 if start > 0 && factory.DeltaLen() >= pexMaxDelta {
238                         break
239                 }
240                 factory.addEvent(e)
241                 n++
242         }
243         return factory.PexMsg(), n
244 }
245
246 func (s *pexState) genmsg0() (pp.PexMsg, int) {
247         s.initLock.Lock()
248         for _, e := range s.ev[s.initSeq:] {
249                 s.initCache.addEvent(e)
250                 s.initSeq++
251         }
252         s.initLock.Unlock()
253         s.initLock.RLock()
254         n := s.initSeq
255         msg := s.initCache.PexMsg()
256         s.initLock.RUnlock()
257         return msg, n
258 }