7 "github.com/anacrolix/log"
9 pp "github.com/anacrolix/torrent/peer_protocol"
13 pexRetryDelay = 10 * time.Second
14 pexInterval = 1 * time.Minute
17 // per-connection PEX state
18 type pexConnState struct {
20 xid pp.ExtensionNumber
31 func (s *pexConnState) IsEnabled() bool {
35 // Init is called from the reader goroutine upon the extended handshake completion
36 func (s *pexConnState) Init(c *PeerConn) {
37 xid, ok := c.PeerExtensionIDs[pp.ExtensionNamePex]
38 if !ok || xid == 0 || c.t.cl.config.DisablePEX {
44 s.info = c.t.cl.logger.WithDefaultLevel(log.Info)
45 s.dbg = c.logger.WithDefaultLevel(log.Debug)
46 s.readyfn = c.tickleWriter
47 s.gate = make(chan struct{}, 1)
48 s.timer = time.AfterFunc(0, func() {
50 s.readyfn() // wake up the writer
55 // schedule next PEX message
56 func (s *pexConnState) sched(delay time.Duration) {
60 // generate next PEX message for the peer; returns nil if nothing yet to send
61 func (s *pexConnState) genmsg() *pp.PexMsg {
62 tx, last := s.torrent.pex.Genmsg(s.last)
70 // Share is called from the writer goroutine if when it is woken up with the write buffers empty
71 // Returns whether there's more room on the send buffer to write to.
72 func (s *pexConnState) Share(postfn messageWriter) bool {
75 if tx := s.genmsg(); tx != nil {
76 s.dbg.Print("sending PEX message: ", tx)
77 flow := postfn(tx.Message(s.xid))
81 // no PEX to send this time - try again shortly
82 s.sched(pexRetryDelay)
89 // Recv is called from the reader goroutine
90 func (s *pexConnState) Recv(payload []byte) error {
91 if !s.torrent.wantPeers() {
92 s.dbg.Printf("peer reserve ok, incoming PEX discarded")
95 if time.Now().Before(s.torrent.pex.rest) {
96 s.dbg.Printf("in cooldown period, incoming PEX discarded")
100 rx, err := pp.LoadPexMsg(payload)
102 return fmt.Errorf("error unmarshalling PEX message: %s", err)
104 s.dbg.Print("incoming PEX message: ", rx)
105 torrent.Add("pex added peers received", int64(len(rx.Added)))
106 torrent.Add("pex added6 peers received", int64(len(rx.Added6)))
109 peers.AppendFromPex(rx.Added6, rx.Added6Flags)
110 peers.AppendFromPex(rx.Added, rx.AddedFlags)
111 s.dbg.Printf("adding %d peers from PEX", len(peers))
113 s.torrent.pex.rest = time.Now().Add(pexInterval)
114 s.torrent.addPeers(peers)
117 // one day we may also want to:
118 // - check if the peer is not flooding us with PEX updates
119 // - handle drops somehow
120 // - detect malicious peers
125 func (s *pexConnState) Close() {