]> Sergey Matveev's repositories - btrtrc.git/blob - pex.go
PEX: add connection tracking
[btrtrc.git] / pex.go
1 package torrent
2
3 import (
4         "net"
5
6         "github.com/anacrolix/dht/v2/krpc"
7         pp "github.com/anacrolix/torrent/peer_protocol"
8 )
9
10 type pexEventType int
11
12 const (
13         pexAdd pexEventType = iota
14         pexDrop
15 )
16
17 // internal, based on BEP11
18 const (
19         pexTargAdded = 25 // put drops on hold when the number of alive connections is lower than this
20         pexMaxHold   = 25 // length of the drop hold-back buffer
21         pexMaxDelta  = 50 // upper bound on added+added6 and dropped+dropped6 in a single PEX message
22 )
23
24 // represents a single connection (t=pexAdd) or disconnection (t=pexDrop) event
25 type pexEvent struct {
26         t    pexEventType
27         addr net.Addr
28         f    pp.PexPeerFlags
29 }
30
31 // records the event into the peer protocol PEX message
32 func (e *pexEvent) put(m *pp.PexMsg) {
33         switch e.t {
34         case pexAdd:
35                 m.Add(nodeAddr(e.addr), e.f)
36         case pexDrop:
37                 m.Drop(nodeAddr(e.addr))
38         }
39 }
40
41 func nodeAddr(addr net.Addr) krpc.NodeAddr {
42         ipport, _ := tryIpPortFromNetAddr(addr)
43         return krpc.NodeAddr{IP: shortestIP(ipport.IP), Port: ipport.Port}
44 }
45
46 // mainly for the krpc marshallers
47 func shortestIP(ip net.IP) net.IP {
48         if ip4 := ip.To4(); ip4 != nil {
49                 return ip4
50         }
51         return ip
52 }
53
54 // Per-torrent PEX state
55 type pexState struct {
56         ev   []pexEvent // event feed, append-only
57         hold []pexEvent // delayed drops
58         nc   int        // net number of alive conns
59 }
60
61 func (s *pexState) Reset() {
62         s.ev = nil
63         s.hold = nil
64         s.nc = 0
65 }
66
67 func (s *pexState) Add(c *PeerConn) {
68         s.nc++
69         if s.nc >= pexTargAdded {
70                 s.ev = append(s.ev, s.hold...)
71                 s.hold = s.hold[:0]
72         }
73         e := c.pexEvent(pexAdd)
74         s.ev = append(s.ev, e)
75 }
76
77 func (s *pexState) Drop(c *PeerConn) {
78         e := c.pexEvent(pexDrop)
79         s.nc--
80         if s.nc < pexTargAdded && len(s.hold) < pexMaxHold {
81                 s.hold = append(s.hold, e)
82         } else {
83                 s.ev = append(s.ev, e)
84         }
85 }
86
87 // Generate a PEX message based on the event feed.
88 // Also returns an index to pass to the subsequent calls, producing incremental deltas.
89 func (s *pexState) Genmsg(start int) (*pp.PexMsg, int) {
90         m := new(pp.PexMsg)
91         n := start
92         for _, e := range s.ev[start:] {
93                 if start > 0 && m.DeltaLen() >= pexMaxDelta {
94                         break
95                 }
96                 e.put(m)
97                 n++
98         }
99         return m, n
100 }