]> Sergey Matveev's repositories - btrtrc.git/blob - pexconn.go
Drop support for go 1.20
[btrtrc.git] / pexconn.go
1 package torrent
2
3 import (
4         "fmt"
5         "net/netip"
6         "time"
7
8         g "github.com/anacrolix/generics"
9         "github.com/anacrolix/log"
10
11         pp "github.com/anacrolix/torrent/peer_protocol"
12 )
13
14 const (
15         pexRetryDelay = 10 * time.Second
16         pexInterval   = 1 * time.Minute
17 )
18
19 // per-connection PEX state
20 type pexConnState struct {
21         enabled bool
22         xid     pp.ExtensionNumber
23         last    *pexEvent
24         timer   *time.Timer
25         gate    chan struct{}
26         readyfn func()
27         torrent *Torrent
28         Listed  bool
29         info    log.Logger
30         dbg     log.Logger
31         // Running record of live connections the remote end of the connection purports to have.
32         remoteLiveConns map[netip.AddrPort]g.Option[pp.PexPeerFlags]
33         lastRecv        time.Time
34 }
35
36 func (s *pexConnState) IsEnabled() bool {
37         return s.enabled
38 }
39
40 // Init is called from the reader goroutine upon the extended handshake completion
41 func (s *pexConnState) Init(c *PeerConn) {
42         xid, ok := c.PeerExtensionIDs[pp.ExtensionNamePex]
43         if !ok || xid == 0 || c.t.cl.config.DisablePEX {
44                 return
45         }
46         s.xid = xid
47         s.last = nil
48         s.torrent = c.t
49         s.info = c.t.cl.logger.WithDefaultLevel(log.Info)
50         s.dbg = c.logger.WithDefaultLevel(log.Debug)
51         s.readyfn = c.tickleWriter
52         s.gate = make(chan struct{}, 1)
53         s.timer = time.AfterFunc(0, func() {
54                 s.gate <- struct{}{}
55                 s.readyfn() // wake up the writer
56         })
57         s.enabled = true
58 }
59
60 // schedule next PEX message
61 func (s *pexConnState) sched(delay time.Duration) {
62         s.timer.Reset(delay)
63 }
64
65 // generate next PEX message for the peer; returns nil if nothing yet to send
66 func (s *pexConnState) genmsg() *pp.PexMsg {
67         tx, last := s.torrent.pex.Genmsg(s.last)
68         if tx.Len() == 0 {
69                 return nil
70         }
71         s.last = last
72         return &tx
73 }
74
75 func (s *pexConnState) numPending() int {
76         if s.torrent == nil {
77                 return 0
78         }
79         return s.torrent.pex.numPending(s.last)
80 }
81
82 // Share is called from the writer goroutine if when it is woken up with the write buffers empty
83 // Returns whether there's more room on the send buffer to write to.
84 func (s *pexConnState) Share(postfn messageWriter) bool {
85         select {
86         case <-s.gate:
87                 if tx := s.genmsg(); tx != nil {
88                         s.dbg.Print("sending PEX message: ", tx)
89                         flow := postfn(tx.Message(s.xid))
90                         s.sched(pexInterval)
91                         return flow
92                 } else {
93                         // no PEX to send this time - try again shortly
94                         s.sched(pexRetryDelay)
95                 }
96         default:
97         }
98         return true
99 }
100
101 func (s *pexConnState) updateRemoteLiveConns(rx pp.PexMsg) (errs []error) {
102         for _, dropped := range rx.Dropped {
103                 addrPort, _ := ipv4AddrPortFromKrpcNodeAddr(dropped)
104                 delete(s.remoteLiveConns, addrPort)
105         }
106         for _, dropped := range rx.Dropped6 {
107                 addrPort, _ := ipv6AddrPortFromKrpcNodeAddr(dropped)
108                 delete(s.remoteLiveConns, addrPort)
109         }
110         for i, added := range rx.Added {
111                 addr := netip.AddrFrom4(*(*[4]byte)(added.IP.To4()))
112                 addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
113                 flags := g.SliceGet(rx.AddedFlags, i)
114                 g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
115         }
116         for i, added := range rx.Added6 {
117                 addr := netip.AddrFrom16(*(*[16]byte)(added.IP.To16()))
118                 addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
119                 flags := g.SliceGet(rx.Added6Flags, i)
120                 g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
121         }
122         return
123 }
124
125 // Recv is called from the reader goroutine
126 func (s *pexConnState) Recv(payload []byte) error {
127         rx, err := pp.LoadPexMsg(payload)
128         if err != nil {
129                 return fmt.Errorf("unmarshalling pex message: %w", err)
130         }
131         s.dbg.Printf("received pex message: %v", rx)
132         torrent.Add("pex added peers received", int64(len(rx.Added)))
133         torrent.Add("pex added6 peers received", int64(len(rx.Added6)))
134
135         // "Clients must batch updates to send no more than 1 PEX message per minute."
136         timeSinceLastRecv := time.Since(s.lastRecv)
137         if timeSinceLastRecv < 45*time.Second {
138                 return fmt.Errorf("last received only %v ago", timeSinceLastRecv)
139         }
140         s.lastRecv = time.Now()
141         s.updateRemoteLiveConns(rx)
142
143         var peers peerInfos
144         peers.AppendFromPex(rx.Added6, rx.Added6Flags)
145         peers.AppendFromPex(rx.Added, rx.AddedFlags)
146         if time.Now().Before(s.torrent.pex.rest) {
147                 s.dbg.Printf("in cooldown period, incoming PEX discarded")
148                 return nil
149         }
150         added := s.torrent.addPeers(peers)
151         s.dbg.Printf("got %v peers over pex, added %v", len(peers), added)
152
153         if len(peers) > 0 {
154                 s.torrent.pex.rest = time.Now().Add(pexInterval)
155         }
156
157         // one day we may also want to:
158         // - handle drops somehow
159         // - detect malicious peers
160
161         return nil
162 }
163
164 func (s *pexConnState) Close() {
165         if s.timer != nil {
166                 s.timer.Stop()
167         }
168 }