]> Sergey Matveev's repositories - btrtrc.git/blob - pex.go
PEX: fluid event log
[btrtrc.git] / pex.go
1 package torrent
2
3 import (
4         "net"
5         "sync"
6         "time"
7
8         "github.com/anacrolix/dht/v2/krpc"
9         pp "github.com/anacrolix/torrent/peer_protocol"
10 )
11
12 type pexEventType int
13
14 const (
15         pexAdd pexEventType = iota
16         pexDrop
17 )
18
19 // internal, based on BEP11
20 const (
21         pexTargAdded = 25 // put drops on hold when the number of alive connections is lower than this
22         pexMaxHold   = 25 // length of the drop hold-back buffer
23         pexMaxDelta  = 50 // upper bound on added+added6 and dropped+dropped6 in a single PEX message
24 )
25
26 // represents a single connection (t=pexAdd) or disconnection (t=pexDrop) event
27 type pexEvent struct {
28         t    pexEventType
29         addr PeerRemoteAddr
30         f    pp.PexPeerFlags
31         next *pexEvent // event feed list
32 }
33
34 // facilitates efficient de-duplication while generating PEX messages
35 type pexMsgFactory struct {
36         msg     pp.PexMsg
37         added   map[addrKey]struct{}
38         dropped map[addrKey]struct{}
39 }
40
41 func (me *pexMsgFactory) DeltaLen() int {
42         return int(max(
43                 int64(len(me.added)),
44                 int64(len(me.dropped))))
45 }
46
47 type addrKey string
48
49 // Returns the key to use to identify a given addr in the factory.
50 func (me *pexMsgFactory) addrKey(addr PeerRemoteAddr) addrKey {
51         return addrKey(addr.String())
52 }
53
54 func addrEqual(a, b *krpc.NodeAddr) bool {
55         return a.IP.Equal(b.IP) && a.Port == b.Port
56 }
57
58 func addrIndex(v []krpc.NodeAddr, a *krpc.NodeAddr) int {
59         for i := 0; i < len(v); i += 1 {
60                 if addrEqual(&v[i], a) {
61                         return i
62                 }
63         }
64         return -1
65 }
66
67 // Returns whether the entry was added (we can check if we're cancelling out another entry and so
68 // won't hit the limit consuming this event).
69 func (me *pexMsgFactory) add(e pexEvent) {
70         key := me.addrKey(e.addr)
71         if _, ok := me.added[key]; ok {
72                 return
73         }
74         if me.added == nil {
75                 me.added = make(map[addrKey]struct{}, pexMaxDelta)
76         }
77         addr, ok := nodeAddr(e.addr)
78         if !ok {
79                 return
80         }
81         m := &me.msg
82         switch {
83         case addr.IP.To4() != nil:
84                 if _, ok := me.dropped[key]; ok {
85                         if i := addrIndex(m.Dropped.NodeAddrs(), &addr); i >= 0 {
86                                 m.Dropped = append(m.Dropped[:i], m.Dropped[i+1:]...)
87                         }
88                         delete(me.dropped, key)
89                         return
90                 }
91                 m.Added = append(m.Added, addr)
92                 m.AddedFlags = append(m.AddedFlags, e.f)
93         case len(addr.IP) == net.IPv6len:
94                 if _, ok := me.dropped[key]; ok {
95                         if i := addrIndex(m.Dropped6.NodeAddrs(), &addr); i >= 0 {
96                                 m.Dropped6 = append(m.Dropped6[:i], m.Dropped6[i+1:]...)
97                         }
98                         delete(me.dropped, key)
99                         return
100                 }
101                 m.Added6 = append(m.Added6, addr)
102                 m.Added6Flags = append(m.Added6Flags, e.f)
103         default:
104                 panic(addr)
105         }
106         me.added[key] = struct{}{}
107 }
108
109 // Returns whether the entry was added (we can check if we're cancelling out another entry and so
110 // won't hit the limit consuming this event).
111 func (me *pexMsgFactory) drop(e pexEvent) {
112         addr, ok := nodeAddr(e.addr)
113         if !ok {
114                 return
115         }
116         key := me.addrKey(e.addr)
117         if me.dropped == nil {
118                 me.dropped = make(map[addrKey]struct{}, pexMaxDelta)
119         }
120         if _, ok := me.dropped[key]; ok {
121                 return
122         }
123         m := &me.msg
124         switch {
125         case addr.IP.To4() != nil:
126                 if _, ok := me.added[key]; ok {
127                         if i := addrIndex(m.Added.NodeAddrs(), &addr); i >= 0 {
128                                 m.Added = append(m.Added[:i], m.Added[i+1:]...)
129                                 m.AddedFlags = append(m.AddedFlags[:i], m.AddedFlags[i+1:]...)
130                         }
131                         delete(me.added, key)
132                         return
133                 }
134                 m.Dropped = append(m.Dropped, addr)
135         case len(addr.IP) == net.IPv6len:
136                 if _, ok := me.added[key]; ok {
137                         if i := addrIndex(m.Added6.NodeAddrs(), &addr); i >= 0 {
138                                 m.Added6 = append(m.Added6[:i], m.Added6[i+1:]...)
139                                 m.Added6Flags = append(m.Added6Flags[:i], m.Added6Flags[i+1:]...)
140                         }
141                         delete(me.added, key)
142                         return
143                 }
144                 m.Dropped6 = append(m.Dropped6, addr)
145         }
146         me.dropped[key] = struct{}{}
147 }
148
149 func (me *pexMsgFactory) append(event pexEvent) {
150         switch event.t {
151         case pexAdd:
152                 me.add(event)
153         case pexDrop:
154                 me.drop(event)
155         default:
156                 panic(event.t)
157         }
158 }
159
160 func (me *pexMsgFactory) PexMsg() pp.PexMsg {
161         return me.msg
162 }
163
164 // Convert an arbitrary torrent peer Addr into one that can be represented by the compact addr
165 // format.
166 func nodeAddr(addr PeerRemoteAddr) (krpc.NodeAddr, bool) {
167         ipport, _ := tryIpPortFromNetAddr(addr)
168         ok := ipport.IP != nil
169         return krpc.NodeAddr{IP: shortestIP(ipport.IP), Port: ipport.Port}, ok
170 }
171
172 // mainly for the krpc marshallers
173 func shortestIP(ip net.IP) net.IP {
174         if ip4 := ip.To4(); ip4 != nil {
175                 return ip4
176         }
177         return ip
178 }
179
180 // Per-torrent PEX state
181 type pexState struct {
182         sync.RWMutex
183         tail *pexEvent     // event feed list
184         hold []pexEvent    // delayed drops
185         rest time.Time     // cooldown deadline on inbound
186         nc   int           // net number of alive conns
187         msg0 pexMsgFactory // initial message
188 }
189
190 // Reset wipes the state clean, releasing resources. Called from Torrent.Close().
191 func (s *pexState) Reset() {
192         s.Lock()
193         defer s.Unlock()
194         s.tail = nil
195         s.hold = nil
196         s.nc = 0
197         s.rest = time.Time{}
198         s.msg0 = pexMsgFactory{}
199 }
200
201 func (s *pexState) append(e *pexEvent) {
202         if s.tail != nil {
203                 s.tail.next = e
204         }
205         s.tail = e
206         s.msg0.append(*e)
207 }
208
209 func (s *pexState) Add(c *PeerConn) {
210         s.Lock()
211         defer s.Unlock()
212         s.nc++
213         if s.nc >= pexTargAdded {
214                 for _, e := range s.hold {
215                         ne := e
216                         s.append(&ne)
217                 }
218                 s.hold = s.hold[:0]
219         }
220         e := c.pexEvent(pexAdd)
221         c.pex.Listed = true
222         s.append(&e)
223 }
224
225 func (s *pexState) Drop(c *PeerConn) {
226         if !c.pex.Listed {
227                 // skip connections which were not previously Added
228                 return
229         }
230         s.Lock()
231         defer s.Unlock()
232         e := c.pexEvent(pexDrop)
233         s.nc--
234         if s.nc < pexTargAdded && len(s.hold) < pexMaxHold {
235                 s.hold = append(s.hold, e)
236         } else {
237                 s.append(&e)
238         }
239 }
240
241 // Generate a PEX message based on the event feed.
242 // Also returns a pointer to pass to the subsequent calls
243 // to produce incremental deltas.
244 func (s *pexState) Genmsg(start *pexEvent) (pp.PexMsg, *pexEvent) {
245         s.RLock()
246         defer s.RUnlock()
247         if start == nil {
248                 return s.msg0.PexMsg(), s.tail
249         }
250         var msg pexMsgFactory
251         last := start
252         for e := start.next; e != nil; e = e.next {
253                 if msg.DeltaLen() >= pexMaxDelta {
254                         break
255                 }
256                 msg.append(*e)
257                 last = e
258         }
259         return msg.PexMsg(), last
260 }