]> Sergey Matveev's repositories - btrtrc.git/blob - pexconn.go
PEX: impede full-meching in tracker-less swarms by adding a cooldown minute
[btrtrc.git] / pexconn.go
1 package torrent
2
3 import (
4         "fmt"
5         "time"
6
7         "github.com/anacrolix/log"
8
9         pp "github.com/anacrolix/torrent/peer_protocol"
10 )
11
12 const (
13         pexRetryDelay = 10 * time.Second
14         pexInterval   = 1 * time.Minute
15 )
16
17 // per-connection PEX state
18 type pexConnState struct {
19         enabled bool
20         xid     pp.ExtensionNumber
21         seq     int
22         timer   *time.Timer
23         gate    chan struct{}
24         readyfn func()
25         torrent *Torrent
26         Listed  bool
27         info    log.Logger
28         dbg     log.Logger
29 }
30
31 func (s *pexConnState) IsEnabled() bool {
32         return s.enabled
33 }
34
35 // Init is called from the reader goroutine upon the extended handshake completion
36 func (s *pexConnState) Init(c *PeerConn) {
37         xid, ok := c.PeerExtensionIDs[pp.ExtensionNamePex]
38         if !ok || xid == 0 || c.t.cl.config.DisablePEX {
39                 return
40         }
41         s.xid = xid
42         s.seq = 0
43         s.torrent = c.t
44         s.info = c.t.cl.logger.WithDefaultLevel(log.Info)
45         s.dbg = c.logger.WithDefaultLevel(log.Debug)
46         s.readyfn = c.tickleWriter
47         s.gate = make(chan struct{}, 1)
48         s.timer = time.AfterFunc(0, func() {
49                 s.gate <- struct{}{}
50                 s.readyfn() // wake up the writer
51         })
52         s.enabled = true
53 }
54
55 // schedule next PEX message
56 func (s *pexConnState) sched(delay time.Duration) {
57         s.timer.Reset(delay)
58 }
59
60 // generate next PEX message for the peer; returns nil if nothing yet to send
61 func (s *pexConnState) genmsg() *pp.PexMsg {
62         tx, seq := s.torrent.pex.Genmsg(s.seq)
63         if tx.Len() == 0 {
64                 return nil
65         }
66         s.seq = seq
67         return &tx
68 }
69
70 // Share is called from the writer goroutine if when it is woken up with the write buffers empty
71 // Returns whether there's more room on the send buffer to write to.
72 func (s *pexConnState) Share(postfn messageWriter) bool {
73         select {
74         case <-s.gate:
75                 if tx := s.genmsg(); tx != nil {
76                         s.dbg.Print("sending PEX message: ", tx)
77                         flow := postfn(tx.Message(s.xid))
78                         s.sched(pexInterval)
79                         return flow
80                 } else {
81                         // no PEX to send this time - try again shortly
82                         s.sched(pexRetryDelay)
83                 }
84         default:
85         }
86         return true
87 }
88
89 // Recv is called from the reader goroutine
90 func (s *pexConnState) Recv(payload []byte) error {
91         if !s.torrent.wantPeers() {
92                 s.dbg.Printf("peer reserve ok, incoming PEX discarded")
93                 return nil
94         }
95         if time.Now().Before(s.torrent.pex.rest) {
96                 s.dbg.Printf("in cooldown period, incoming PEX discarded")
97                 return nil
98         }
99
100         rx, err := pp.LoadPexMsg(payload)
101         if err != nil {
102                 return fmt.Errorf("error unmarshalling PEX message: %s", err)
103         }
104         s.dbg.Print("incoming PEX message: ", rx)
105         torrent.Add("pex added peers received", int64(len(rx.Added)))
106         torrent.Add("pex added6 peers received", int64(len(rx.Added6)))
107
108         var peers peerInfos
109         peers.AppendFromPex(rx.Added6, rx.Added6Flags)
110         peers.AppendFromPex(rx.Added, rx.AddedFlags)
111         s.dbg.Printf("adding %d peers from PEX", len(peers))
112         if len(peers) > 0 {
113                 s.torrent.pex.rest = time.Now().Add(pexInterval)
114                 s.torrent.addPeers(peers)
115         }
116
117         // one day we may also want to:
118         // - check if the peer is not flooding us with PEX updates
119         // - handle drops somehow
120         // - detect malicious peers
121
122         return nil
123 }
124
125 func (s *pexConnState) Close() {
126         if s.timer != nil {
127                 s.timer.Stop()
128         }
129 }