]> Sergey Matveev's repositories - btrtrc.git/blob - pexconn.go
cmd/btrtrc client
[btrtrc.git] / pexconn.go
1 package torrent
2
3 import (
4         "fmt"
5         "net/netip"
6         "time"
7
8         g "github.com/anacrolix/generics"
9         "github.com/anacrolix/log"
10
11         pp "github.com/anacrolix/torrent/peer_protocol"
12 )
13
14 const (
15         pexRetryDelay = 10 * time.Second
16         pexInterval   = 1 * time.Minute
17 )
18
19 // per-connection PEX state
20 type pexConnState struct {
21         enabled bool
22         xid     pp.ExtensionNumber
23         last    *pexEvent
24         timer   *time.Timer
25         gate    chan struct{}
26         readyfn func()
27         torrent *Torrent
28         Listed  bool
29         logger  log.Logger
30         // Running record of live connections the remote end of the connection purports to have.
31         remoteLiveConns map[netip.AddrPort]g.Option[pp.PexPeerFlags]
32         lastRecv        time.Time
33 }
34
35 func (s *pexConnState) IsEnabled() bool {
36         return s.enabled
37 }
38
39 // Init is called from the reader goroutine upon the extended handshake completion
40 func (s *pexConnState) Init(c *PeerConn) {
41         xid, ok := c.PeerExtensionIDs[pp.ExtensionNamePex]
42         if !ok || xid == 0 || c.t.cl.config.DisablePEX {
43                 return
44         }
45         s.xid = xid
46         s.last = nil
47         s.torrent = c.t
48         s.logger = c.logger.WithDefaultLevel(log.Debug).WithNames("pex")
49         s.readyfn = c.tickleWriter
50         s.gate = make(chan struct{}, 1)
51         s.timer = time.AfterFunc(0, func() {
52                 s.gate <- struct{}{}
53                 s.readyfn() // wake up the writer
54         })
55         s.enabled = true
56 }
57
58 // schedule next PEX message
59 func (s *pexConnState) sched(delay time.Duration) {
60         s.timer.Reset(delay)
61 }
62
63 // generate next PEX message for the peer; returns nil if nothing yet to send
64 func (s *pexConnState) genmsg() *pp.PexMsg {
65         tx, last := s.torrent.pex.Genmsg(s.last)
66         if tx.Len() == 0 {
67                 return nil
68         }
69         s.last = last
70         return &tx
71 }
72
73 func (s *pexConnState) numPending() int {
74         if s.torrent == nil {
75                 return 0
76         }
77         return s.torrent.pex.numPending(s.last)
78 }
79
80 // Share is called from the writer goroutine if when it is woken up with the write buffers empty
81 // Returns whether there's more room on the send buffer to write to.
82 func (s *pexConnState) Share(postfn messageWriter) bool {
83         select {
84         case <-s.gate:
85                 if tx := s.genmsg(); tx != nil {
86                         s.logger.Print("sending PEX message: ", tx)
87                         flow := postfn(tx.Message(s.xid))
88                         s.sched(pexInterval)
89                         return flow
90                 } else {
91                         // no PEX to send this time - try again shortly
92                         s.sched(pexRetryDelay)
93                 }
94         default:
95         }
96         return true
97 }
98
99 func (s *pexConnState) updateRemoteLiveConns(rx pp.PexMsg) (errs []error) {
100         for _, dropped := range rx.Dropped {
101                 addrPort, _ := ipv4AddrPortFromKrpcNodeAddr(dropped)
102                 delete(s.remoteLiveConns, addrPort)
103         }
104         for _, dropped := range rx.Dropped6 {
105                 addrPort, _ := ipv6AddrPortFromKrpcNodeAddr(dropped)
106                 delete(s.remoteLiveConns, addrPort)
107         }
108         for i, added := range rx.Added {
109                 addr := netip.AddrFrom4(*(*[4]byte)(added.IP.To4()))
110                 addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
111                 flags := g.SliceGet(rx.AddedFlags, i)
112                 g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
113         }
114         for i, added := range rx.Added6 {
115                 addr := netip.AddrFrom16(*(*[16]byte)(added.IP.To16()))
116                 addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
117                 flags := g.SliceGet(rx.Added6Flags, i)
118                 g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
119         }
120         return
121 }
122
123 // Recv is called from the reader goroutine
124 func (s *pexConnState) Recv(payload []byte) error {
125         rx, err := pp.LoadPexMsg(payload)
126         if err != nil {
127                 return fmt.Errorf("unmarshalling pex message: %w", err)
128         }
129         s.logger.Printf("received pex message: %v", rx)
130         torrent.Add("pex added peers received", int64(len(rx.Added)))
131         torrent.Add("pex added6 peers received", int64(len(rx.Added6)))
132
133         // "Clients must batch updates to send no more than 1 PEX message per minute."
134         timeSinceLastRecv := time.Since(s.lastRecv)
135         if timeSinceLastRecv < 45*time.Second {
136                 return fmt.Errorf("last received only %v ago", timeSinceLastRecv)
137         }
138         s.lastRecv = time.Now()
139         s.updateRemoteLiveConns(rx)
140
141         var peers peerInfos
142         peers.AppendFromPex(rx.Added6, rx.Added6Flags)
143         peers.AppendFromPex(rx.Added, rx.AddedFlags)
144         if time.Now().Before(s.torrent.pex.rest) {
145                 s.logger.Printf("in cooldown period, incoming PEX discarded")
146                 return nil
147         }
148         added := s.torrent.addPeers(peers)
149         s.logger.Printf("got %v peers over pex, added %v", len(peers), added)
150
151         if len(peers) > 0 {
152                 s.torrent.pex.rest = time.Now().Add(pexInterval)
153         }
154
155         // one day we may also want to:
156         // - handle drops somehow
157         // - detect malicious peers
158
159         return nil
160 }
161
162 func (s *pexConnState) Close() {
163         if s.timer != nil {
164                 s.timer.Stop()
165         }
166 }