]> Sergey Matveev's repositories - btrtrc.git/blob - pex.go
optimise PEX by avoiding intermediate storage while preparing PEX messages
[btrtrc.git] / pex.go
1 package torrent
2
3 import (
4         "net"
5         "sync"
6
7         "github.com/anacrolix/dht/v2/krpc"
8         pp "github.com/anacrolix/torrent/peer_protocol"
9 )
10
11 type pexEventType int
12
13 const (
14         pexAdd pexEventType = iota
15         pexDrop
16 )
17
18 // internal, based on BEP11
19 const (
20         pexTargAdded = 25 // put drops on hold when the number of alive connections is lower than this
21         pexMaxHold   = 25 // length of the drop hold-back buffer
22         pexMaxDelta  = 50 // upper bound on added+added6 and dropped+dropped6 in a single PEX message
23 )
24
25 // represents a single connection (t=pexAdd) or disconnection (t=pexDrop) event
26 type pexEvent struct {
27         t    pexEventType
28         addr net.Addr
29         f    pp.PexPeerFlags
30 }
31
32 // facilitates efficient de-duplication while generating PEX messages
33 type pexMsgFactory struct {
34         msg     pp.PexMsg
35         added   map[addrKey]struct{}
36         dropped map[addrKey]struct{}
37 }
38
39 func (me *pexMsgFactory) DeltaLen() int {
40         return int(max(
41                 int64(len(me.added)),
42                 int64(len(me.dropped))))
43 }
44
45 type addrKey string
46
47 // Returns the key to use to identify a given addr in the factory.
48 func (me *pexMsgFactory) addrKey(addr net.Addr) addrKey {
49         return addrKey(addr.String())
50 }
51
52 func addrEqual(a, b *krpc.NodeAddr) bool {
53         return a.IP.Equal(b.IP) && a.Port == b.Port
54 }
55
56 func addrIndex(v []krpc.NodeAddr, a *krpc.NodeAddr) int {
57         for i := range v {
58                 if addrEqual(&v[i], a) {
59                         return i
60                 }
61         }
62         return -1
63 }
64
65 // Returns whether the entry was added (we can check if we're cancelling out another entry and so
66 // won't hit the limit consuming this event).
67 func (me *pexMsgFactory) add(e pexEvent) {
68         key := me.addrKey(e.addr)
69         if _, ok := me.added[key]; ok {
70                 return
71         }
72         if me.added == nil {
73                 me.added = make(map[addrKey]struct{}, pexMaxDelta)
74         }
75         addr, ok := nodeAddr(e.addr)
76         if !ok {
77                 return
78         }
79         m := &me.msg
80         switch {
81         case addr.IP.To4() != nil:
82                 if _, ok := me.dropped[key]; ok {
83                         if i := addrIndex(m.Dropped.NodeAddrs(), &addr); i >= 0 {
84                                 m.Dropped = append(m.Dropped[:i], m.Dropped[i+1:]...)
85                         }
86                         delete(me.dropped, key)
87                         return
88                 }
89                 m.Added = append(m.Added, addr)
90                 m.AddedFlags = append(m.AddedFlags, e.f)
91         case len(addr.IP) == net.IPv6len:
92                 if _, ok := me.dropped[key]; ok {
93                         if i := addrIndex(m.Dropped6.NodeAddrs(), &addr); i >= 0 {
94                                 m.Dropped6 = append(m.Dropped6[:i], m.Dropped6[i+1:]...)
95                         }
96                         delete(me.dropped, key)
97                         return
98                 }
99                 m.Added6 = append(m.Added6, addr)
100                 m.Added6Flags = append(m.Added6Flags, e.f)
101         default:
102                 panic(addr)
103         }
104         me.added[key] = struct{}{}
105 }
106
107 // Returns whether the entry was added (we can check if we're cancelling out another entry and so
108 // won't hit the limit consuming this event).
109 func (me *pexMsgFactory) drop(e pexEvent) {
110         addr, ok := nodeAddr(e.addr)
111         if !ok {
112                 return
113         }
114         key := me.addrKey(e.addr)
115         if me.dropped == nil {
116                 me.dropped = make(map[addrKey]struct{}, pexMaxDelta)
117         }
118         if _, ok := me.dropped[key]; ok {
119                 return
120         }
121         m := &me.msg
122         switch {
123         case addr.IP.To4() != nil:
124                 if _, ok := me.added[key]; ok {
125                         if i := addrIndex(m.Added.NodeAddrs(), &addr); i >= 0 {
126                                 m.Added = append(m.Added[:i], m.Added[i+1:]...)
127                                 m.AddedFlags = append(m.AddedFlags[:i], m.AddedFlags[i+1:]...)
128                         }
129                         delete(me.added, key)
130                         return
131                 }
132                 m.Dropped = append(m.Dropped, addr)
133         case len(addr.IP) == net.IPv6len:
134                 if _, ok := me.added[key]; ok {
135                         if i := addrIndex(m.Added6.NodeAddrs(), &addr); i >= 0 {
136                                 m.Added6 = append(m.Added6[:i], m.Added6[i+1:]...)
137                                 m.Added6Flags = append(m.Added6Flags[:i], m.Added6Flags[i+1:]...)
138                         }
139                         delete(me.added, key)
140                         return
141                 }
142                 m.Dropped6 = append(m.Dropped6, addr)
143         }
144         me.dropped[key] = struct{}{}
145 }
146
147 func (me *pexMsgFactory) addEvent(event pexEvent) {
148         switch event.t {
149         case pexAdd:
150                 me.add(event)
151         case pexDrop:
152                 me.drop(event)
153         default:
154                 panic(event.t)
155         }
156 }
157
158 func (me *pexMsgFactory) PexMsg() pp.PexMsg {
159         return me.msg
160 }
161
162 // Convert an arbitrary torrent peer Addr into one that can be represented by the compact addr
163 // format.
164 func nodeAddr(addr net.Addr) (_ krpc.NodeAddr, ok bool) {
165         ipport, ok := tryIpPortFromNetAddr(addr)
166         if !ok {
167                 return
168         }
169         return krpc.NodeAddr{IP: shortestIP(ipport.IP), Port: ipport.Port}, true
170 }
171
172 // mainly for the krpc marshallers
173 func shortestIP(ip net.IP) net.IP {
174         if ip4 := ip.To4(); ip4 != nil {
175                 return ip4
176         }
177         return ip
178 }
179
180 // Per-torrent PEX state
181 type pexState struct {
182         ev        []pexEvent    // event feed, append-only
183         hold      []pexEvent    // delayed drops
184         nc        int           // net number of alive conns
185         initCache pexMsgFactory // last generated initial message
186         initSeq   int           // number of events which went into initCache
187         initLock  sync.RWMutex  // serialise access to initCache and initSeq
188 }
189
190 // Reset wipes the state clean, releasing resources. Called from Torrent.Close().
191 func (s *pexState) Reset() {
192         s.ev = nil
193         s.hold = nil
194         s.nc = 0
195         s.initLock.Lock()
196         s.initCache = pexMsgFactory{}
197         s.initSeq = 0
198         s.initLock.Unlock()
199 }
200
201 func (s *pexState) Add(c *PeerConn) {
202         s.nc++
203         if s.nc >= pexTargAdded {
204                 s.ev = append(s.ev, s.hold...)
205                 s.hold = s.hold[:0]
206         }
207         e := c.pexEvent(pexAdd)
208         s.ev = append(s.ev, e)
209         c.pex.Listed = true
210 }
211
212 func (s *pexState) Drop(c *PeerConn) {
213         if !c.pex.Listed {
214                 // skip connections which were not previously Added
215                 return
216         }
217         e := c.pexEvent(pexDrop)
218         s.nc--
219         if s.nc < pexTargAdded && len(s.hold) < pexMaxHold {
220                 s.hold = append(s.hold, e)
221         } else {
222                 s.ev = append(s.ev, e)
223         }
224 }
225
226 // Generate a PEX message based on the event feed. Also returns an index to pass to the subsequent
227 // calls, producing incremental deltas.
228 func (s *pexState) Genmsg(start int) (pp.PexMsg, int) {
229         if start == 0 {
230                 return s.genmsg0()
231         }
232
233         var factory pexMsgFactory
234         n := start
235         for _, e := range s.ev[start:] {
236                 if start > 0 && factory.DeltaLen() >= pexMaxDelta {
237                         break
238                 }
239                 factory.addEvent(e)
240                 n++
241         }
242         return factory.PexMsg(), n
243 }
244
245 func (s *pexState) genmsg0() (pp.PexMsg, int) {
246         s.initLock.Lock()
247         for _, e := range s.ev[s.initSeq:] {
248                 s.initCache.addEvent(e)
249                 s.initSeq++
250         }
251         s.initLock.Unlock()
252         s.initLock.RLock()
253         n := s.initSeq
254         msg := s.initCache.PexMsg()
255         s.initLock.RUnlock()
256         return msg, n
257 }