]> Sergey Matveev's repositories - btrtrc.git/blob - pexconn.go
PEX: integrate with send throttling
[btrtrc.git] / pexconn.go
1 package torrent
2
3 import (
4         "fmt"
5         "time"
6
7         "github.com/anacrolix/log"
8
9         pp "github.com/anacrolix/torrent/peer_protocol"
10 )
11
12 const (
13         pexRetryDelay = 10 * time.Second
14         pexInterval   = 1 * time.Minute
15 )
16
17 // per-connection PEX state
18 type pexConnState struct {
19         enabled bool
20         xid     pp.ExtensionNumber
21         seq     int
22         timer   *time.Timer
23         gate    chan struct{}
24         readyfn func()
25         torrent *Torrent
26         Listed  bool
27         info    log.Logger
28         dbg     log.Logger
29 }
30
31 func (s *pexConnState) IsEnabled() bool {
32         return s.enabled
33 }
34
35 // Init is called from the reader goroutine upon the extended handshake completion
36 func (s *pexConnState) Init(c *PeerConn) {
37         xid, ok := c.PeerExtensionIDs[pp.ExtensionNamePex]
38         if !ok || xid == 0 || c.t.cl.config.DisablePEX {
39                 return
40         }
41         s.xid = xid
42         s.seq = 0
43         s.torrent = c.t
44         s.info = c.t.cl.logger
45         s.dbg = c.logger
46         s.readyfn = c.tickleWriter
47         s.gate = make(chan struct{}, 1)
48         s.timer = time.AfterFunc(0, func() {
49                 s.gate <- struct{}{}
50                 s.readyfn() // wake up the writer
51         })
52         s.enabled = true
53 }
54
55 // schedule next PEX message
56 func (s *pexConnState) sched(delay time.Duration) {
57         s.timer.Reset(delay)
58 }
59
60 // generate next PEX message for the peer; returns nil if nothing yet to send
61 func (s *pexConnState) genmsg() *pp.PexMsg {
62         tx, seq := s.torrent.pex.Genmsg(s.seq)
63         if tx.Len() == 0 {
64                 return nil
65         }
66         s.seq = seq
67         return tx
68 }
69
70 // Share is called from the writer goroutine if when it is woken up with the write buffers empty
71 // Returns whether there's more room on the send buffer to write to.
72 func (s *pexConnState) Share(postfn messageWriter) bool {
73         select {
74         case <-s.gate:
75                 if tx := s.genmsg(); tx != nil {
76                         s.dbg.Print("sending PEX message: ", tx)
77                         flow := postfn(tx.Message(s.xid))
78                         s.sched(pexInterval)
79                         return flow
80                 } else {
81                         // no PEX to send this time - try again shortly
82                         s.sched(pexRetryDelay)
83                 }
84         default:
85         }
86         return true
87 }
88
89 // Recv is called from the reader goroutine
90 func (s *pexConnState) Recv(payload []byte) error {
91         rx, err := pp.LoadPexMsg(payload)
92         if err != nil {
93                 return fmt.Errorf("error unmarshalling PEX message: %s", err)
94         }
95         s.dbg.Print("incoming PEX message: ", rx)
96         torrent.Add("pex added peers received", int64(len(rx.Added)))
97         torrent.Add("pex added6 peers received", int64(len(rx.Added6)))
98
99         var peers Peers
100         peers.AppendFromPex(rx.Added6, rx.Added6Flags)
101         peers.AppendFromPex(rx.Added, rx.AddedFlags)
102         s.dbg.Printf("adding %d peers from PEX", len(peers))
103         s.torrent.addPeers(peers)
104         // s.dbg.Print("known swarm now:", s.torrent.KnownSwarm())
105
106         // one day we may also want to:
107         // - check if the peer is not flooding us with PEX updates
108         // - handle drops somehow
109         // - detect malicious peers
110
111         return nil
112 }
113
114 func (s *pexConnState) Close() {
115         if s.timer != nil {
116                 s.timer.Stop()
117         }
118 }