}
}
-func (cl *Client) sendInitialPEX(conn *PeerConn, t *Torrent) {
- xid, ok := conn.PeerExtensionIDs[pp.ExtensionNamePex]
- if !ok {
- return
- }
- m, seq := t.pex.Genmsg(0)
- conn.pexSeq = seq
- if m.Len() == 0 {
- cl.logger.Printf("no initial PEX this time")
- // FIXME see how can we schedule another initial for later
- return
- }
- conn.logger.Printf("sending initial PEX message: %v", m)
- conn.post(m.Message(xid))
-}
-
func (cl *Client) dhtPort() (ret uint16) {
cl.eachDhtServer(func(s DhtServer) {
ret = uint16(missinggo.AddrPort(s.Addr()))
return lenDropped
}
-func (pexMsg *PexMsg) Message(pexExtendedId ExtensionNumber) Message {
- payload := bencode.MustMarshal(pexMsg)
+func (m *PexMsg) Message(pexExtendedId ExtensionNumber) Message {
+ payload := bencode.MustMarshal(m)
return Message{
Type: Extended,
ExtendedID: pexExtendedId,
}
}
+func LoadPexMsg(b []byte) (*PexMsg, error) {
+ m := new(PexMsg)
+ if err := bencode.Unmarshal(b, m); err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+
type PexPeerFlags byte
func (me PexPeerFlags) Get(f PexPeerFlags) bool {
// response.
metadataRequests []bool
sentHaves bitmap.Bitmap
- pexSeq int
+ pex pexConnState
// Stuff controlled by the remote peer.
PeerID PeerID
if !cn.closed.Set() {
return
}
+ if cn.pex.IsEnabled() {
+ cn.pex.Close()
+ }
cn.tickleWriter()
cn.discardPieceInclination()
cn._pieceRequestOrder.Clear()
}
cn.requestsLowWater = len(cn.requests) / 2
}
-
+ if cn.pex.IsEnabled() {
+ cn.pex.Share(msg) // gated internally
+ }
cn.upload(msg)
}
}
}
c.requestPendingMetadata()
- if !cl.config.DisablePEX {
- cl.sendInitialPEX(c, t)
- // BUG no sending PEX updates yet
+ if !t.cl.config.DisablePEX {
+ t.pex.Add(c) // we learnt enough now
+ c.pex.Init(c)
}
return nil
case metadataExtendedId:
}
return nil
case pexExtendedId:
- if cl.config.DisablePEX {
- // TODO: Maybe close the connection. Check that we're not
- // advertising that we support PEX if it's disabled.
- return nil
- }
- c.logger.Printf("incoming PEX message")
- var pexMsg pp.PexMsg
- err := bencode.Unmarshal(payload, &pexMsg)
- if err != nil {
- return fmt.Errorf("error unmarshalling PEX message: %s", err)
+ if !c.pex.IsEnabled() {
+ return nil // or hang-up maybe?
}
- npeers := len(pexMsg.Added6) + len(pexMsg.Added)
- c.logger.Printf("adding %d peers from PEX", npeers)
- torrent.Add("pex added6 peers received", int64(len(pexMsg.Added6)))
- var peers Peers
- peers.AppendFromPex(pexMsg.Added6, pexMsg.Added6Flags)
- peers.AppendFromPex(pexMsg.Added, pexMsg.AddedFlags)
- t.addPeers(peers)
- return nil
+ return c.pex.Recv(payload)
default:
return fmt.Errorf("unexpected extended message ID: %v", id)
}
if c.outgoing {
f |= pp.PexOutgoingConn
}
- if c.utp() {
+ if c.remoteAddr != nil && strings.Contains(c.remoteAddr.Network(), "udp") {
f |= pp.PexSupportsUtp
}
return f
}
+func (c *PeerConn) dialAddr() net.Addr {
+ if !c.outgoing && c.PeerListenPort != 0 {
+ switch addr := c.remoteAddr.(type) {
+ case *net.TCPAddr:
+ dialAddr := *addr
+ dialAddr.Port = c.PeerListenPort
+ return &dialAddr
+ case *net.UDPAddr:
+ dialAddr := *addr
+ dialAddr.Port = c.PeerListenPort
+ return &dialAddr
+ }
+ }
+ return c.remoteAddr
+}
+
func (c *PeerConn) pexEvent(t pexEventType) pexEvent {
f := c.pexPeerFlags()
- return pexEvent{t, c.remoteAddr, f}
+ addr := c.dialAddr()
+ return pexEvent{t, addr, f}
}
func (c *PeerConn) String() string {
require.EqualValues(b, b.N, cn._stats.ChunksReadUseful.Int64())
}
-func TestPexPeerFlags(t *testing.T) {
+func TestConnPexPeerFlags(t *testing.T) {
+ var (
+ tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
+ udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
+ )
var testcases = []struct {
conn *PeerConn
f pp.PexPeerFlags
{&PeerConn{outgoing: false, PeerPrefersEncryption: true}, pp.PexPrefersEncryption},
{&PeerConn{outgoing: true, PeerPrefersEncryption: false}, pp.PexOutgoingConn},
{&PeerConn{outgoing: true, PeerPrefersEncryption: true}, pp.PexOutgoingConn | pp.PexPrefersEncryption},
- {&PeerConn{network: "udp4"}, pp.PexSupportsUtp},
- {&PeerConn{outgoing: true, network: "udp6"}, pp.PexOutgoingConn | pp.PexSupportsUtp},
- {&PeerConn{outgoing: true, network: "tcp4"}, pp.PexOutgoingConn},
- {&PeerConn{network: "tcp6"}, 0},
+ {&PeerConn{remoteAddr: udpAddr}, pp.PexSupportsUtp},
+ {&PeerConn{remoteAddr: udpAddr, outgoing: true}, pp.PexOutgoingConn | pp.PexSupportsUtp},
+ {&PeerConn{remoteAddr: tcpAddr, outgoing: true}, pp.PexOutgoingConn},
+ {&PeerConn{remoteAddr: tcpAddr}, 0},
}
for i, tc := range testcases {
f := tc.conn.pexPeerFlags()
require.EqualValues(t, tc.f, f, i)
}
}
+
+func TestConnPexEvent(t *testing.T) {
+ var (
+ udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
+ tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
+ dialTcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4747}
+ dialUdpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4747}
+ )
+ var testcases = []struct {
+ t pexEventType
+ c *PeerConn
+ e pexEvent
+ }{
+ {
+ pexAdd,
+ &PeerConn{remoteAddr: udpAddr},
+ pexEvent{pexAdd, udpAddr, pp.PexSupportsUtp},
+ },
+ {
+ pexDrop,
+ &PeerConn{remoteAddr: tcpAddr, outgoing: true, PeerListenPort: dialTcpAddr.Port},
+ pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn},
+ },
+ {
+ pexAdd,
+ &PeerConn{remoteAddr: tcpAddr, PeerListenPort: dialTcpAddr.Port},
+ pexEvent{pexAdd, dialTcpAddr, 0},
+ },
+ {
+ pexDrop,
+ &PeerConn{remoteAddr: udpAddr, PeerListenPort: dialUdpAddr.Port},
+ pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp},
+ },
+ }
+ for i, tc := range testcases {
+ e := tc.c.pexEvent(tc.t)
+ require.EqualValues(t, tc.e, e, i)
+ }
+}
}
e := c.pexEvent(pexAdd)
s.ev = append(s.ev, e)
+ c.pex.Listed = true
}
func (s *pexState) Drop(c *PeerConn) {
+ if !c.pex.Listed {
+ // skip connections which were not previously Added
+ return
+ }
e := c.pexEvent(pexDrop)
s.nc--
if s.nc < pexTargAdded && len(s.hold) < pexMaxHold {
func TestPexDropped(t *testing.T) {
t.Run("belowTarg", func(t *testing.T) {
s := &pexState{nc: 1}
- s.Drop(&PeerConn{remoteAddr: addrs[0]})
+ s.Drop(&PeerConn{remoteAddr: addrs[0], pex: pexConnState{Listed: true}})
targ := &pexState{
hold: []pexEvent{pexEvent{pexDrop, addrs[0], 0}},
nc: 0,
})
t.Run("aboveTarg", func(t *testing.T) {
s := &pexState{nc: pexTargAdded + 1}
- s.Drop(&PeerConn{remoteAddr: addrs[0]})
+ s.Drop(&PeerConn{remoteAddr: addrs[0], pex: pexConnState{Listed: true}})
targ := &pexState{
ev: []pexEvent{pexEvent{pexDrop, addrs[0], 0}},
nc: pexTargAdded,
}
require.EqualValues(t, targ, s)
})
+ t.Run("aboveTargNotListed", func(t *testing.T) {
+ s := &pexState{nc: pexTargAdded + 1}
+ s.Drop(&PeerConn{remoteAddr: addrs[0], pex: pexConnState{Listed: false}})
+ targ := &pexState{nc: pexTargAdded + 1}
+ require.EqualValues(t, targ, s)
+ })
}
func TestPexReset(t *testing.T) {
--- /dev/null
+package torrent
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/anacrolix/log"
+
+ pp "github.com/anacrolix/torrent/peer_protocol"
+)
+
+const (
+ pexRetryDelay = 10 * time.Second
+ pexInterval = 1 * time.Minute
+)
+
+// per-connection PEX state
+type pexConnState struct {
+ enabled bool
+ xid pp.ExtensionNumber
+ seq int
+ timer *time.Timer
+ gate chan struct{}
+ readyfn func()
+ torrent *Torrent
+ Listed bool
+ info log.Logger
+ dbg log.Logger
+}
+
+func (s *pexConnState) IsEnabled() bool {
+ return s.enabled
+}
+
+// Init is called from the reader goroutine upon the extended handshake completion
+func (s *pexConnState) Init(c *PeerConn) {
+ xid, ok := c.PeerExtensionIDs[pp.ExtensionNamePex]
+ if !ok || xid == 0 || c.t.cl.config.DisablePEX {
+ return
+ }
+ s.xid = xid
+ s.seq = 0
+ s.torrent = c.t
+ s.info = c.t.cl.logger
+ s.dbg = c.logger
+ s.readyfn = c.tickleWriter
+ s.gate = make(chan struct{}, 1)
+ s.timer = time.AfterFunc(0, func() {
+ s.gate <- struct{}{}
+ s.readyfn() // wake up the writer
+ })
+ s.enabled = true
+}
+
+// schedule next PEX message
+func (s *pexConnState) sched(delay time.Duration) {
+ s.timer.Reset(delay)
+}
+
+// generate next PEX message for the peer; returns nil if nothing yet to send
+func (s *pexConnState) genmsg() *pp.PexMsg {
+ tx, seq := s.torrent.pex.Genmsg(s.seq)
+ if tx.Len() == 0 {
+ return nil
+ }
+ s.seq = seq
+ return tx
+}
+
+// Share is called from the writer goroutine if when it is woken up with the write buffers empty
+func (s *pexConnState) Share(postfn messageWriter) {
+ select {
+ case <-s.gate:
+ if tx := s.genmsg(); tx != nil {
+ s.dbg.Print("sending PEX message: ", tx)
+ postfn(tx.Message(s.xid))
+ s.sched(pexInterval)
+ } else {
+ // no PEX to send this time - try again shortly
+ s.sched(pexRetryDelay)
+ }
+ default:
+ return
+ }
+}
+
+// Recv is called from the reader goroutine
+func (s *pexConnState) Recv(payload []byte) error {
+ rx, err := pp.LoadPexMsg(payload)
+ if err != nil {
+ return fmt.Errorf("error unmarshalling PEX message: %s", err)
+ }
+ s.dbg.Print("incoming PEX message: ", rx)
+ torrent.Add("pex added peers received", int64(len(rx.Added)))
+ torrent.Add("pex added6 peers received", int64(len(rx.Added6)))
+
+ var peers Peers
+ peers.AppendFromPex(rx.Added6, rx.Added6Flags)
+ peers.AppendFromPex(rx.Added, rx.AddedFlags)
+ s.dbg.Printf("adding %d peers from PEX", len(peers))
+ s.torrent.addPeers(peers)
+ // s.dbg.Print("known swarm now:", s.torrent.KnownSwarm())
+
+ // one day we may also want to:
+ // - check if the peer is not flooding us with PEX updates
+ // - handle drops somehow
+ // - detect malicious peers
+
+ return nil
+}
+
+func (s *pexConnState) Close() {
+ if s.timer != nil {
+ s.timer.Stop()
+ }
+}
--- /dev/null
+package torrent
+
+import (
+ "net"
+ "testing"
+
+ "github.com/anacrolix/dht/v2/krpc"
+ "github.com/stretchr/testify/require"
+
+ "github.com/anacrolix/torrent/metainfo"
+ pp "github.com/anacrolix/torrent/peer_protocol"
+)
+
+func TestPexConnState(t *testing.T) {
+ cl := Client{
+ config: TestingConfig(),
+ }
+ cl.initLogger()
+ torrent := cl.newTorrent(metainfo.Hash{}, nil)
+ addr := &net.TCPAddr{IP: net.IPv6loopback, Port: 4747}
+ c := cl.newConnection(nil, false, addr, "")
+ c.PeerExtensionIDs = make(map[pp.ExtensionName]pp.ExtensionNumber)
+ c.PeerExtensionIDs[pp.ExtensionNamePex] = pexExtendedId
+ c.writerCond.L.Lock()
+ c.setTorrent(torrent)
+ torrent.addConnection(c)
+
+ c.pex.Init(c)
+ require.True(t, c.pex.IsEnabled(), "should get enabled")
+ defer c.pex.Close()
+
+ var out pp.Message
+ writerCalled := false
+ testWriter := func(m pp.Message) bool {
+ writerCalled = true
+ out = m
+ return true
+ }
+ c.writerCond.Wait()
+ c.pex.Share(testWriter)
+ require.True(t, writerCalled)
+ require.EqualValues(t, pp.Extended, out.Type)
+ require.EqualValues(t, pexExtendedId, out.ExtendedID)
+
+ x, err := pp.LoadPexMsg(out.ExtendedPayload)
+ require.NoError(t, err)
+ targx := &pp.PexMsg{
+ Added: krpc.CompactIPv4NodeAddrs(nil),
+ AddedFlags: []pp.PexPeerFlags{},
+ Added6: krpc.CompactIPv6NodeAddrs{
+ nodeAddr(addr),
+ },
+ Added6Flags: []pp.PexPeerFlags{0},
+ }
+ require.EqualValues(t, targx, x)
+}
if ipAddr, ok := tryIpPortFromNetAddr(p.Addr); ok {
if cl.badPeerIPPort(ipAddr.IP, ipAddr.Port) {
torrent.Add("peers not added because of bad addr", 1)
- cl.logger.Printf("peers not added because of bad addr: %v", p)
+ // cl.logger.Printf("peers not added because of bad addr: %v", p)
return
}
}
}
_, ret = t.conns[c]
delete(t.conns, c)
+ if !t.cl.config.DisablePEX {
+ t.pex.Drop(c)
+ }
torrent.Add("deleted connections", 1)
c.deleteAllRequests()
if len(t.conns) == 0 {
func (t *Torrent) dropConnection(c *PeerConn) {
t.cl.event.Broadcast()
- if !t.cl.config.DisablePEX {
- t.pex.Drop(c)
- }
c.close()
if t.deleteConnection(c) {
t.openNewConns()
panic(len(t.conns))
}
t.conns[c] = struct{}{}
- if !t.cl.config.DisablePEX {
- t.pex.Add(c)
+ if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() {
+ t.pex.Add(c) // as no further extended handshake expected
}
return nil
}