]> Sergey Matveev's repositories - btrtrc.git/blob - pexconn.go
Misc debug status, pex conn tracking improvements
[btrtrc.git] / pexconn.go
1 package torrent
2
3 import (
4         "fmt"
5         "net/netip"
6         "time"
7
8         g "github.com/anacrolix/generics"
9
10         "github.com/anacrolix/log"
11
12         pp "github.com/anacrolix/torrent/peer_protocol"
13 )
14
15 const (
16         pexRetryDelay = 10 * time.Second
17         pexInterval   = 1 * time.Minute
18 )
19
20 // per-connection PEX state
21 type pexConnState struct {
22         enabled bool
23         xid     pp.ExtensionNumber
24         last    *pexEvent
25         timer   *time.Timer
26         gate    chan struct{}
27         readyfn func()
28         torrent *Torrent
29         Listed  bool
30         info    log.Logger
31         dbg     log.Logger
32         // Running record of live connections the remote end of the connection purports to have.
33         remoteLiveConns map[netip.AddrPort]g.Option[pp.PexPeerFlags]
34 }
35
36 func (s *pexConnState) IsEnabled() bool {
37         return s.enabled
38 }
39
40 // Init is called from the reader goroutine upon the extended handshake completion
41 func (s *pexConnState) Init(c *PeerConn) {
42         xid, ok := c.PeerExtensionIDs[pp.ExtensionNamePex]
43         if !ok || xid == 0 || c.t.cl.config.DisablePEX {
44                 return
45         }
46         s.xid = xid
47         s.last = nil
48         s.torrent = c.t
49         s.info = c.t.cl.logger.WithDefaultLevel(log.Info)
50         s.dbg = c.logger.WithDefaultLevel(log.Debug)
51         s.readyfn = c.tickleWriter
52         s.gate = make(chan struct{}, 1)
53         s.timer = time.AfterFunc(0, func() {
54                 s.gate <- struct{}{}
55                 s.readyfn() // wake up the writer
56         })
57         s.enabled = true
58 }
59
60 // schedule next PEX message
61 func (s *pexConnState) sched(delay time.Duration) {
62         s.timer.Reset(delay)
63 }
64
65 // generate next PEX message for the peer; returns nil if nothing yet to send
66 func (s *pexConnState) genmsg() *pp.PexMsg {
67         tx, last := s.torrent.pex.Genmsg(s.last)
68         if tx.Len() == 0 {
69                 return nil
70         }
71         s.last = last
72         return &tx
73 }
74
75 func (s *pexConnState) numPending() int {
76         if s.torrent == nil {
77                 return 0
78         }
79         return s.torrent.pex.numPending(s.last)
80 }
81
82 // Share is called from the writer goroutine if when it is woken up with the write buffers empty
83 // Returns whether there's more room on the send buffer to write to.
84 func (s *pexConnState) Share(postfn messageWriter) bool {
85         select {
86         case <-s.gate:
87                 if tx := s.genmsg(); tx != nil {
88                         s.dbg.Print("sending PEX message: ", tx)
89                         flow := postfn(tx.Message(s.xid))
90                         s.sched(pexInterval)
91                         return flow
92                 } else {
93                         // no PEX to send this time - try again shortly
94                         s.sched(pexRetryDelay)
95                 }
96         default:
97         }
98         return true
99 }
100
101 func (s *pexConnState) updateRemoteLiveConns(rx pp.PexMsg) (errs []error) {
102         for _, dropped := range rx.Dropped {
103                 addrPort, _ := ipv4AddrPortFromKrpcNodeAddr(dropped)
104                 delete(s.remoteLiveConns, addrPort)
105         }
106         for _, dropped := range rx.Dropped6 {
107                 addrPort, _ := ipv6AddrPortFromKrpcNodeAddr(dropped)
108                 delete(s.remoteLiveConns, addrPort)
109         }
110         for i, added := range rx.Added {
111                 addr := netip.AddrFrom4([4]byte(added.IP.To4()))
112                 addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
113                 flags := g.SliceGet(rx.AddedFlags, i)
114                 g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
115         }
116         for i, added := range rx.Added6 {
117                 addr := netip.AddrFrom16([16]byte(added.IP.To16()))
118                 addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
119                 flags := g.SliceGet(rx.Added6Flags, i)
120                 g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
121         }
122         return
123 }
124
125 // Recv is called from the reader goroutine
126 func (s *pexConnState) Recv(payload []byte) error {
127         rx, err := pp.LoadPexMsg(payload)
128         if err != nil {
129                 return fmt.Errorf("unmarshalling pex message: %w", err)
130         }
131         s.dbg.Printf("received pex message: %v", rx)
132         torrent.Add("pex added peers received", int64(len(rx.Added)))
133         torrent.Add("pex added6 peers received", int64(len(rx.Added6)))
134         s.updateRemoteLiveConns(rx)
135
136         if !s.torrent.wantPeers() {
137                 s.dbg.Printf("peer reserve ok, incoming PEX discarded")
138                 return nil
139         }
140         // TODO: This should be per conn, not for the whole Torrent.
141         if time.Now().Before(s.torrent.pex.rest) {
142                 s.dbg.Printf("in cooldown period, incoming PEX discarded")
143                 return nil
144         }
145
146         var peers peerInfos
147         peers.AppendFromPex(rx.Added6, rx.Added6Flags)
148         peers.AppendFromPex(rx.Added, rx.AddedFlags)
149         s.dbg.Printf("adding %d peers from PEX", len(peers))
150         if len(peers) > 0 {
151                 s.torrent.pex.rest = time.Now().Add(pexInterval)
152                 s.torrent.addPeers(peers)
153         }
154
155         // one day we may also want to:
156         // - check if the peer is not flooding us with PEX updates
157         // - handle drops somehow
158         // - detect malicious peers
159
160         return nil
161 }
162
163 func (s *pexConnState) Close() {
164         if s.timer != nil {
165                 s.timer.Stop()
166         }
167 }