]> Sergey Matveev's repositories - btrtrc.git/blob - pex.go
PEX: add periodic deltas
[btrtrc.git] / pex.go
1 package torrent
2
3 import (
4         "net"
5
6         "github.com/anacrolix/dht/v2/krpc"
7         pp "github.com/anacrolix/torrent/peer_protocol"
8 )
9
10 type pexEventType int
11
12 const (
13         pexAdd pexEventType = iota
14         pexDrop
15 )
16
17 // internal, based on BEP11
18 const (
19         pexTargAdded = 25 // put drops on hold when the number of alive connections is lower than this
20         pexMaxHold   = 25 // length of the drop hold-back buffer
21         pexMaxDelta  = 50 // upper bound on added+added6 and dropped+dropped6 in a single PEX message
22 )
23
24 // represents a single connection (t=pexAdd) or disconnection (t=pexDrop) event
25 type pexEvent struct {
26         t    pexEventType
27         addr net.Addr
28         f    pp.PexPeerFlags
29 }
30
31 // records the event into the peer protocol PEX message
32 func (e *pexEvent) put(m *pp.PexMsg) {
33         switch e.t {
34         case pexAdd:
35                 m.Add(nodeAddr(e.addr), e.f)
36         case pexDrop:
37                 m.Drop(nodeAddr(e.addr))
38         }
39 }
40
41 func nodeAddr(addr net.Addr) krpc.NodeAddr {
42         ipport, _ := tryIpPortFromNetAddr(addr)
43         return krpc.NodeAddr{IP: shortestIP(ipport.IP), Port: ipport.Port}
44 }
45
46 // mainly for the krpc marshallers
47 func shortestIP(ip net.IP) net.IP {
48         if ip4 := ip.To4(); ip4 != nil {
49                 return ip4
50         }
51         return ip
52 }
53
54 // Per-torrent PEX state
55 type pexState struct {
56         ev   []pexEvent // event feed, append-only
57         hold []pexEvent // delayed drops
58         nc   int        // net number of alive conns
59 }
60
61 func (s *pexState) Reset() {
62         s.ev = nil
63         s.hold = nil
64         s.nc = 0
65 }
66
67 func (s *pexState) Add(c *PeerConn) {
68         s.nc++
69         if s.nc >= pexTargAdded {
70                 s.ev = append(s.ev, s.hold...)
71                 s.hold = s.hold[:0]
72         }
73         e := c.pexEvent(pexAdd)
74         s.ev = append(s.ev, e)
75         c.pex.Listed = true
76 }
77
78 func (s *pexState) Drop(c *PeerConn) {
79         if !c.pex.Listed {
80                 // skip connections which were not previously Added
81                 return
82         }
83         e := c.pexEvent(pexDrop)
84         s.nc--
85         if s.nc < pexTargAdded && len(s.hold) < pexMaxHold {
86                 s.hold = append(s.hold, e)
87         } else {
88                 s.ev = append(s.ev, e)
89         }
90 }
91
92 // Generate a PEX message based on the event feed.
93 // Also returns an index to pass to the subsequent calls, producing incremental deltas.
94 func (s *pexState) Genmsg(start int) (*pp.PexMsg, int) {
95         m := new(pp.PexMsg)
96         n := start
97         for _, e := range s.ev[start:] {
98                 if start > 0 && m.DeltaLen() >= pexMaxDelta {
99                         break
100                 }
101                 e.put(m)
102                 n++
103         }
104         return m, n
105 }