]> Sergey Matveev's repositories - btrtrc.git/blob - pexconn.go
Rate limit received PEX messages per connection
[btrtrc.git] / pexconn.go
1 package torrent
2
3 import (
4         "fmt"
5         "net/netip"
6         "time"
7
8         g "github.com/anacrolix/generics"
9
10         "github.com/anacrolix/log"
11
12         pp "github.com/anacrolix/torrent/peer_protocol"
13 )
14
15 const (
16         pexRetryDelay = 10 * time.Second
17         pexInterval   = 1 * time.Minute
18 )
19
20 // per-connection PEX state
21 type pexConnState struct {
22         enabled bool
23         xid     pp.ExtensionNumber
24         last    *pexEvent
25         timer   *time.Timer
26         gate    chan struct{}
27         readyfn func()
28         torrent *Torrent
29         Listed  bool
30         info    log.Logger
31         dbg     log.Logger
32         // Running record of live connections the remote end of the connection purports to have.
33         remoteLiveConns map[netip.AddrPort]g.Option[pp.PexPeerFlags]
34         lastRecv        time.Time
35 }
36
37 func (s *pexConnState) IsEnabled() bool {
38         return s.enabled
39 }
40
41 // Init is called from the reader goroutine upon the extended handshake completion
42 func (s *pexConnState) Init(c *PeerConn) {
43         xid, ok := c.PeerExtensionIDs[pp.ExtensionNamePex]
44         if !ok || xid == 0 || c.t.cl.config.DisablePEX {
45                 return
46         }
47         s.xid = xid
48         s.last = nil
49         s.torrent = c.t
50         s.info = c.t.cl.logger.WithDefaultLevel(log.Info)
51         s.dbg = c.logger.WithDefaultLevel(log.Debug)
52         s.readyfn = c.tickleWriter
53         s.gate = make(chan struct{}, 1)
54         s.timer = time.AfterFunc(0, func() {
55                 s.gate <- struct{}{}
56                 s.readyfn() // wake up the writer
57         })
58         s.enabled = true
59 }
60
61 // schedule next PEX message
62 func (s *pexConnState) sched(delay time.Duration) {
63         s.timer.Reset(delay)
64 }
65
66 // generate next PEX message for the peer; returns nil if nothing yet to send
67 func (s *pexConnState) genmsg() *pp.PexMsg {
68         tx, last := s.torrent.pex.Genmsg(s.last)
69         if tx.Len() == 0 {
70                 return nil
71         }
72         s.last = last
73         return &tx
74 }
75
76 func (s *pexConnState) numPending() int {
77         if s.torrent == nil {
78                 return 0
79         }
80         return s.torrent.pex.numPending(s.last)
81 }
82
83 // Share is called from the writer goroutine if when it is woken up with the write buffers empty
84 // Returns whether there's more room on the send buffer to write to.
85 func (s *pexConnState) Share(postfn messageWriter) bool {
86         select {
87         case <-s.gate:
88                 if tx := s.genmsg(); tx != nil {
89                         s.dbg.Print("sending PEX message: ", tx)
90                         flow := postfn(tx.Message(s.xid))
91                         s.sched(pexInterval)
92                         return flow
93                 } else {
94                         // no PEX to send this time - try again shortly
95                         s.sched(pexRetryDelay)
96                 }
97         default:
98         }
99         return true
100 }
101
102 func (s *pexConnState) updateRemoteLiveConns(rx pp.PexMsg) (errs []error) {
103         for _, dropped := range rx.Dropped {
104                 addrPort, _ := ipv4AddrPortFromKrpcNodeAddr(dropped)
105                 delete(s.remoteLiveConns, addrPort)
106         }
107         for _, dropped := range rx.Dropped6 {
108                 addrPort, _ := ipv6AddrPortFromKrpcNodeAddr(dropped)
109                 delete(s.remoteLiveConns, addrPort)
110         }
111         for i, added := range rx.Added {
112                 addr := netip.AddrFrom4([4]byte(added.IP.To4()))
113                 addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
114                 flags := g.SliceGet(rx.AddedFlags, i)
115                 g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
116         }
117         for i, added := range rx.Added6 {
118                 addr := netip.AddrFrom16([16]byte(added.IP.To16()))
119                 addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
120                 flags := g.SliceGet(rx.Added6Flags, i)
121                 g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
122         }
123         return
124 }
125
126 // Recv is called from the reader goroutine
127 func (s *pexConnState) Recv(payload []byte) error {
128         rx, err := pp.LoadPexMsg(payload)
129         if err != nil {
130                 return fmt.Errorf("unmarshalling pex message: %w", err)
131         }
132         s.dbg.Printf("received pex message: %v", rx)
133         torrent.Add("pex added peers received", int64(len(rx.Added)))
134         torrent.Add("pex added6 peers received", int64(len(rx.Added6)))
135
136         // "Clients must batch updates to send no more than 1 PEX message per minute."
137         timeSinceLastRecv := time.Since(s.lastRecv)
138         if timeSinceLastRecv < 45*time.Second {
139                 return fmt.Errorf("last received only %v ago", timeSinceLastRecv)
140         }
141         s.lastRecv = time.Now()
142         s.updateRemoteLiveConns(rx)
143
144         var peers peerInfos
145         peers.AppendFromPex(rx.Added6, rx.Added6Flags)
146         peers.AppendFromPex(rx.Added, rx.AddedFlags)
147         added := s.torrent.addPeers(peers)
148         s.dbg.Printf("got %v peers over pex, added %v", len(peers), added)
149
150         // one day we may also want to:
151         // - check if the peer is not flooding us with PEX updates
152         // - handle drops somehow
153         // - detect malicious peers
154
155         return nil
156 }
157
158 func (s *pexConnState) Close() {
159         if s.timer != nil {
160                 s.timer.Stop()
161         }
162 }