]> Sergey Matveev's repositories - btrtrc.git/blob - pexconn.go
Merge branch 'webtorrent'
[btrtrc.git] / pexconn.go
1 package torrent
2
3 import (
4         "fmt"
5         "time"
6
7         "github.com/anacrolix/log"
8
9         pp "github.com/anacrolix/torrent/peer_protocol"
10 )
11
12 const (
13         pexRetryDelay = 10 * time.Second
14         pexInterval   = 1 * time.Minute
15 )
16
17 // per-connection PEX state
18 type pexConnState struct {
19         enabled bool
20         xid     pp.ExtensionNumber
21         seq     int
22         timer   *time.Timer
23         gate    chan struct{}
24         readyfn func()
25         torrent *Torrent
26         Listed  bool
27         info    log.Logger
28         dbg     log.Logger
29 }
30
31 func (s *pexConnState) IsEnabled() bool {
32         return s.enabled
33 }
34
35 // Init is called from the reader goroutine upon the extended handshake completion
36 func (s *pexConnState) Init(c *PeerConn) {
37         xid, ok := c.PeerExtensionIDs[pp.ExtensionNamePex]
38         if !ok || xid == 0 || c.t.cl.config.DisablePEX {
39                 return
40         }
41         s.xid = xid
42         s.seq = 0
43         s.torrent = c.t
44         s.info = c.t.cl.logger
45         s.dbg = c.logger
46         s.readyfn = c.tickleWriter
47         s.gate = make(chan struct{}, 1)
48         s.timer = time.AfterFunc(0, func() {
49                 s.gate <- struct{}{}
50                 s.readyfn() // wake up the writer
51         })
52         s.enabled = true
53 }
54
55 // schedule next PEX message
56 func (s *pexConnState) sched(delay time.Duration) {
57         s.timer.Reset(delay)
58 }
59
60 // generate next PEX message for the peer; returns nil if nothing yet to send
61 func (s *pexConnState) genmsg() *pp.PexMsg {
62         tx, seq := s.torrent.pex.Genmsg(s.seq)
63         if tx.Len() == 0 {
64                 return nil
65         }
66         s.seq = seq
67         return tx
68 }
69
70 // Share is called from the writer goroutine if when it is woken up with the write buffers empty
71 func (s *pexConnState) Share(postfn messageWriter) {
72         select {
73         case <-s.gate:
74                 if tx := s.genmsg(); tx != nil {
75                         s.dbg.Print("sending PEX message: ", tx)
76                         postfn(tx.Message(s.xid))
77                         s.sched(pexInterval)
78                 } else {
79                         // no PEX to send this time - try again shortly
80                         s.sched(pexRetryDelay)
81                 }
82         default:
83                 return
84         }
85 }
86
87 // Recv is called from the reader goroutine
88 func (s *pexConnState) Recv(payload []byte) error {
89         rx, err := pp.LoadPexMsg(payload)
90         if err != nil {
91                 return fmt.Errorf("error unmarshalling PEX message: %s", err)
92         }
93         s.dbg.Print("incoming PEX message: ", rx)
94         torrent.Add("pex added peers received", int64(len(rx.Added)))
95         torrent.Add("pex added6 peers received", int64(len(rx.Added6)))
96
97         var peers Peers
98         peers.AppendFromPex(rx.Added6, rx.Added6Flags)
99         peers.AppendFromPex(rx.Added, rx.AddedFlags)
100         s.dbg.Printf("adding %d peers from PEX", len(peers))
101         s.torrent.addPeers(peers)
102         // s.dbg.Print("known swarm now:", s.torrent.KnownSwarm())
103
104         // one day we may also want to:
105         // - check if the peer is not flooding us with PEX updates
106         // - handle drops somehow
107         // - detect malicious peers
108
109         return nil
110 }
111
112 func (s *pexConnState) Close() {
113         if s.timer != nil {
114                 s.timer.Stop()
115         }
116 }