]> Sergey Matveev's repositories - btrtrc.git/blob - pex.go
simplify pexMsgFactory
[btrtrc.git] / pex.go
1 package torrent
2
3 import (
4         "net"
5
6         "github.com/anacrolix/dht/v2/krpc"
7         pp "github.com/anacrolix/torrent/peer_protocol"
8 )
9
10 type pexEventType int
11
12 const (
13         pexAdd pexEventType = iota
14         pexDrop
15 )
16
17 // internal, based on BEP11
18 const (
19         pexTargAdded = 25 // put drops on hold when the number of alive connections is lower than this
20         pexMaxHold   = 25 // length of the drop hold-back buffer
21         pexMaxDelta  = 50 // upper bound on added+added6 and dropped+dropped6 in a single PEX message
22 )
23
24 // represents a single connection (t=pexAdd) or disconnection (t=pexDrop) event
25 type pexEvent struct {
26         t    pexEventType
27         addr net.Addr
28         f    pp.PexPeerFlags
29 }
30
31 // facilitates efficient de-duplication while generating PEX messages
32 type pexMsgFactory struct {
33         added   map[addrKey]pexEvent
34         dropped map[addrKey]pexEvent
35 }
36
37 func (me *pexMsgFactory) DeltaLen() int {
38         return int(max(
39                 int64(len(me.added)),
40                 int64(len(me.dropped))))
41 }
42
43 type addrKey string
44
45 // Returns the key to use to identify a given addr in the factory.
46 func (me *pexMsgFactory) addrKey(addr net.Addr) addrKey {
47         return addrKey(addr.String())
48 }
49
50 // Returns whether the entry was added (we can check if we're cancelling out another entry and so
51 // won't hit the limit consuming this event).
52 func (me *pexMsgFactory) add(e pexEvent) {
53         key := me.addrKey(e.addr)
54         if _, ok := me.dropped[key]; ok {
55                 delete(me.dropped, key)
56                 return
57         }
58         if me.added == nil {
59                 me.added = make(map[addrKey]pexEvent, pexMaxDelta)
60         }
61         me.added[key] = e
62 }
63
64 // Returns whether the entry was added (we can check if we're cancelling out another entry and so
65 // won't hit the limit consuming this event).
66 func (me *pexMsgFactory) drop(e pexEvent) {
67         key := me.addrKey(e.addr)
68         if _, ok := me.added[key]; ok {
69                 delete(me.added, key)
70                 return
71         }
72         if me.dropped == nil {
73                 me.dropped = make(map[addrKey]pexEvent, pexMaxDelta)
74         }
75         me.dropped[key] = e
76 }
77
78 func (me *pexMsgFactory) addEvent(event pexEvent) {
79         switch event.t {
80         case pexAdd:
81                 me.add(event)
82         case pexDrop:
83                 me.drop(event)
84         default:
85                 panic(event.t)
86         }
87 }
88
89 func (me *pexMsgFactory) PexMsg() (ret pp.PexMsg) {
90         for key, added := range me.added {
91                 addr, ok := nodeAddr(added.addr)
92                 if !ok {
93                         continue
94                 }
95                 switch len(addr.IP) {
96                 case net.IPv4len:
97                         ret.Added = append(ret.Added, addr)
98                         ret.AddedFlags = append(ret.AddedFlags, added.f)
99                 case net.IPv6len:
100                         ret.Added6 = append(ret.Added6, addr)
101                         ret.Added6Flags = append(ret.Added6Flags, added.f)
102                 default:
103                         panic(key)
104                 }
105         }
106         for key, dropped := range me.dropped {
107                 addr, ok := nodeAddr(dropped.addr)
108                 if !ok {
109                         continue
110                 }
111                 switch len(addr.IP) {
112                 case net.IPv4len:
113                         ret.Dropped = append(ret.Dropped, addr)
114                 case net.IPv6len:
115                         ret.Dropped6 = append(ret.Dropped6, addr)
116                 default:
117                         panic(key)
118                 }
119         }
120         return
121 }
122
123 // Convert an arbitrary torrent peer Addr into one that can be represented by the compact addr
124 // format.
125 func nodeAddr(addr net.Addr) (_ krpc.NodeAddr, ok bool) {
126         ipport, ok := tryIpPortFromNetAddr(addr)
127         if !ok {
128                 return
129         }
130         return krpc.NodeAddr{IP: shortestIP(ipport.IP), Port: ipport.Port}, true
131 }
132
133 // mainly for the krpc marshallers
134 func shortestIP(ip net.IP) net.IP {
135         if ip4 := ip.To4(); ip4 != nil {
136                 return ip4
137         }
138         return ip
139 }
140
141 // Per-torrent PEX state
142 type pexState struct {
143         ev   []pexEvent // event feed, append-only
144         hold []pexEvent // delayed drops
145         nc   int        // net number of alive conns
146 }
147
148 func (s *pexState) Reset() {
149         s.ev = nil
150         s.hold = nil
151         s.nc = 0
152 }
153
154 func (s *pexState) Add(c *PeerConn) {
155         s.nc++
156         if s.nc >= pexTargAdded {
157                 s.ev = append(s.ev, s.hold...)
158                 s.hold = s.hold[:0]
159         }
160         e := c.pexEvent(pexAdd)
161         s.ev = append(s.ev, e)
162         c.pex.Listed = true
163 }
164
165 func (s *pexState) Drop(c *PeerConn) {
166         if !c.pex.Listed {
167                 // skip connections which were not previously Added
168                 return
169         }
170         e := c.pexEvent(pexDrop)
171         s.nc--
172         if s.nc < pexTargAdded && len(s.hold) < pexMaxHold {
173                 s.hold = append(s.hold, e)
174         } else {
175                 s.ev = append(s.ev, e)
176         }
177 }
178
179 // Generate a PEX message based on the event feed. Also returns an index to pass to the subsequent
180 // calls, producing incremental deltas.
181 func (s *pexState) Genmsg(start int) (pp.PexMsg, int) {
182         var factory pexMsgFactory
183         n := start
184         for _, e := range s.ev[start:] {
185                 if start > 0 && factory.DeltaLen() >= pexMaxDelta {
186                         break
187                 }
188                 factory.addEvent(e)
189                 n++
190         }
191         return factory.PexMsg(), n
192 }