]> Sergey Matveev's repositories - btrtrc.git/blob - pex.go
optimise generation of the initial PEX
[btrtrc.git] / pex.go
1 package torrent
2
3 import (
4         "net"
5         "sync"
6
7         "github.com/anacrolix/dht/v2/krpc"
8         pp "github.com/anacrolix/torrent/peer_protocol"
9 )
10
11 type pexEventType int
12
13 const (
14         pexAdd pexEventType = iota
15         pexDrop
16 )
17
18 // internal, based on BEP11
19 const (
20         pexTargAdded = 25 // put drops on hold when the number of alive connections is lower than this
21         pexMaxHold   = 25 // length of the drop hold-back buffer
22         pexMaxDelta  = 50 // upper bound on added+added6 and dropped+dropped6 in a single PEX message
23 )
24
25 // represents a single connection (t=pexAdd) or disconnection (t=pexDrop) event
26 type pexEvent struct {
27         t    pexEventType
28         addr net.Addr
29         f    pp.PexPeerFlags
30 }
31
32 // facilitates efficient de-duplication while generating PEX messages
33 type pexMsgFactory struct {
34         added   map[addrKey]pexEvent
35         dropped map[addrKey]pexEvent
36 }
37
38 func (me *pexMsgFactory) DeltaLen() int {
39         return int(max(
40                 int64(len(me.added)),
41                 int64(len(me.dropped))))
42 }
43
44 type addrKey string
45
46 // Returns the key to use to identify a given addr in the factory.
47 func (me *pexMsgFactory) addrKey(addr net.Addr) addrKey {
48         return addrKey(addr.String())
49 }
50
51 // Returns whether the entry was added (we can check if we're cancelling out another entry and so
52 // won't hit the limit consuming this event).
53 func (me *pexMsgFactory) add(e pexEvent) {
54         key := me.addrKey(e.addr)
55         if _, ok := me.dropped[key]; ok {
56                 delete(me.dropped, key)
57                 return
58         }
59         if me.added == nil {
60                 me.added = make(map[addrKey]pexEvent, pexMaxDelta)
61         }
62         me.added[key] = e
63 }
64
65 // Returns whether the entry was added (we can check if we're cancelling out another entry and so
66 // won't hit the limit consuming this event).
67 func (me *pexMsgFactory) drop(e pexEvent) {
68         key := me.addrKey(e.addr)
69         if _, ok := me.added[key]; ok {
70                 delete(me.added, key)
71                 return
72         }
73         if me.dropped == nil {
74                 me.dropped = make(map[addrKey]pexEvent, pexMaxDelta)
75         }
76         me.dropped[key] = e
77 }
78
79 func (me *pexMsgFactory) addEvent(event pexEvent) {
80         switch event.t {
81         case pexAdd:
82                 me.add(event)
83         case pexDrop:
84                 me.drop(event)
85         default:
86                 panic(event.t)
87         }
88 }
89
90 func (me *pexMsgFactory) PexMsg() (ret pp.PexMsg) {
91         for key, added := range me.added {
92                 addr, ok := nodeAddr(added.addr)
93                 if !ok {
94                         continue
95                 }
96                 switch len(addr.IP) {
97                 case net.IPv4len:
98                         ret.Added = append(ret.Added, addr)
99                         ret.AddedFlags = append(ret.AddedFlags, added.f)
100                 case net.IPv6len:
101                         ret.Added6 = append(ret.Added6, addr)
102                         ret.Added6Flags = append(ret.Added6Flags, added.f)
103                 default:
104                         panic(key)
105                 }
106         }
107         for key, dropped := range me.dropped {
108                 addr, ok := nodeAddr(dropped.addr)
109                 if !ok {
110                         continue
111                 }
112                 switch len(addr.IP) {
113                 case net.IPv4len:
114                         ret.Dropped = append(ret.Dropped, addr)
115                 case net.IPv6len:
116                         ret.Dropped6 = append(ret.Dropped6, addr)
117                 default:
118                         panic(key)
119                 }
120         }
121         return
122 }
123
124 // Convert an arbitrary torrent peer Addr into one that can be represented by the compact addr
125 // format.
126 func nodeAddr(addr net.Addr) (_ krpc.NodeAddr, ok bool) {
127         ipport, ok := tryIpPortFromNetAddr(addr)
128         if !ok {
129                 return
130         }
131         return krpc.NodeAddr{IP: shortestIP(ipport.IP), Port: ipport.Port}, true
132 }
133
134 // mainly for the krpc marshallers
135 func shortestIP(ip net.IP) net.IP {
136         if ip4 := ip.To4(); ip4 != nil {
137                 return ip4
138         }
139         return ip
140 }
141
142 // Per-torrent PEX state
143 type pexState struct {
144         ev        []pexEvent    // event feed, append-only
145         hold      []pexEvent    // delayed drops
146         nc        int           // net number of alive conns
147         initCache pexMsgFactory // last generated initial message
148         initSeq   int           // number of events which went into initCache
149         initLock  sync.RWMutex  // serialise access to initCache and initSeq
150 }
151
152 // Reset wipes the state clean, releasing resources. Called from Torrent.Close().
153 func (s *pexState) Reset() {
154         s.ev = nil
155         s.hold = nil
156         s.nc = 0
157         s.initLock.Lock()
158         s.initCache = pexMsgFactory{}
159         s.initSeq = 0
160         s.initLock.Unlock()
161 }
162
163 func (s *pexState) Add(c *PeerConn) {
164         s.nc++
165         if s.nc >= pexTargAdded {
166                 s.ev = append(s.ev, s.hold...)
167                 s.hold = s.hold[:0]
168         }
169         e := c.pexEvent(pexAdd)
170         s.ev = append(s.ev, e)
171         c.pex.Listed = true
172 }
173
174 func (s *pexState) Drop(c *PeerConn) {
175         if !c.pex.Listed {
176                 // skip connections which were not previously Added
177                 return
178         }
179         e := c.pexEvent(pexDrop)
180         s.nc--
181         if s.nc < pexTargAdded && len(s.hold) < pexMaxHold {
182                 s.hold = append(s.hold, e)
183         } else {
184                 s.ev = append(s.ev, e)
185         }
186 }
187
188 // Generate a PEX message based on the event feed. Also returns an index to pass to the subsequent
189 // calls, producing incremental deltas.
190 func (s *pexState) Genmsg(start int) (pp.PexMsg, int) {
191         if start == 0 {
192                 return s.genmsg0()
193         }
194
195         var factory pexMsgFactory
196         n := start
197         for _, e := range s.ev[start:] {
198                 if start > 0 && factory.DeltaLen() >= pexMaxDelta {
199                         break
200                 }
201                 factory.addEvent(e)
202                 n++
203         }
204         return factory.PexMsg(), n
205 }
206
207 func (s *pexState) genmsg0() (pp.PexMsg, int) {
208         s.initLock.Lock()
209         for _, e := range s.ev[s.initSeq:] {
210                 s.initCache.addEvent(e)
211                 s.initSeq++
212         }
213         s.initLock.Unlock()
214         s.initLock.RLock()
215         n := s.initSeq
216         msg := s.initCache.PexMsg()
217         s.initLock.RUnlock()
218         return msg, n
219 }