}
}
+func (cl *Client) sendInitialPEX(conn *PeerConn, t *Torrent) {
+ peerPexExtendedId, ok := conn.PeerExtensionIDs[pp.ExtensionNamePex]
+ if !ok {
+ // peer did not advertise support for the PEX extension
+ conn.logger.Printf("no PEX support - not sending initial")
+ return
+ }
+ pexMsg := t.pexInitial()
+ if pexMsg == nil {
+ // not enough peers to share — e.g. len(t.conns < 50)
+ conn.logger.Printf("skipping PEX initial")
+ return
+ }
+ log.Printf("preparing PEX initial message: %v", pexMsg)
+ tx := pexMsg.Message(peerPexExtendedId)
+ conn.post(tx)
+}
+
func (cl *Client) dhtPort() (ret uint16) {
cl.eachDhtServer(func(s DhtServer) {
ret = uint16(missinggo.AddrPort(s.Addr()))
package peer_protocol
-import "github.com/anacrolix/dht/v2/krpc"
+import (
+ "net"
+
+ "github.com/anacrolix/dht/v2/krpc"
+ "github.com/anacrolix/torrent/bencode"
+)
type PexMsg struct {
Added krpc.CompactIPv4NodeAddrs `bencode:"added"`
Dropped6 krpc.CompactIPv6NodeAddrs `bencode:"dropped6"`
}
+func (m *PexMsg) AppendAdded(addr krpc.NodeAddr, f PexPeerFlags) {
+ ip := addr.IP
+ if ip.To4() != nil {
+ m.Added = append(m.Added, addr)
+ m.AddedFlags = append(m.AddedFlags, f)
+ } else if len(ip) == net.IPv6len {
+ m.Added6 = append(m.Added6, addr)
+ m.Added6Flags = append(m.Added6Flags, f)
+ }
+}
+
+func (m *PexMsg) AppendDropped(addr krpc.NodeAddr) {
+ ip := addr.IP
+ if ip.To4() != nil {
+ m.Dropped = append(m.Dropped, addr)
+ } else if len(ip) == net.IPv6len {
+ m.Dropped6 = append(m.Dropped6, addr)
+ }
+}
+
+func (pexMsg *PexMsg) Message(pexExtendedId ExtensionNumber) Message {
+ payload := bencode.MustMarshal(pexMsg)
+ return Message{
+ Type: Extended,
+ ExtendedID: pexExtendedId,
+ ExtendedPayload: payload,
+ }
+}
+
type PexPeerFlags byte
func (me PexPeerFlags) Get(f PexPeerFlags) bool {
}
const (
- PexPrefersEncryption = 0x01
- PexSeedUploadOnly = 0x02
- PexSupportsUtp = 0x04
- PexHolepunchSupport = 0x08
- PexOutgoingConn = 0x10
+ PexPrefersEncryption PexPeerFlags = 1 << iota
+ PexSeedUploadOnly
+ PexSupportsUtp
+ PexHolepunchSupport
+ PexOutgoingConn
)
package peer_protocol
import (
+ "bufio"
+ "bytes"
+ "net"
"testing"
"github.com/stretchr/testify/require"
+ "github.com/anacrolix/dht/v2/krpc"
"github.com/anacrolix/torrent/bencode"
)
require.NoError(t, err)
require.NoError(t, bencode.Unmarshal(b, &pm))
}
+
+func TestPexAppendAdded(t *testing.T) {
+ t.Run("ipv4", func(t *testing.T) {
+ addr := krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747}
+ f := PexPrefersEncryption | PexOutgoingConn
+ xm := PexMsg{}
+ xm.AppendAdded(addr, f)
+ require.EqualValues(t, len(xm.Added), 1)
+ require.EqualValues(t, len(xm.AddedFlags), 1)
+ require.EqualValues(t, len(xm.Added6), 0)
+ require.EqualValues(t, len(xm.Added6Flags), 0)
+ require.True(t, xm.Added[0].IP.Equal(addr.IP), "IPs should match")
+ require.EqualValues(t, xm.Added[0].Port, addr.Port)
+ require.EqualValues(t, xm.AddedFlags[0], f)
+ })
+ t.Run("ipv6", func(t *testing.T) {
+ addr := krpc.NodeAddr{IP: net.IPv6loopback, Port: 4747}
+ f := PexPrefersEncryption | PexOutgoingConn
+ xm := PexMsg{}
+ xm.AppendAdded(addr, f)
+ require.EqualValues(t, len(xm.Added), 0)
+ require.EqualValues(t, len(xm.AddedFlags), 0)
+ require.EqualValues(t, len(xm.Added6), 1)
+ require.EqualValues(t, len(xm.Added6Flags), 1)
+ require.True(t, xm.Added6[0].IP.Equal(addr.IP), "IPs should match")
+ require.EqualValues(t, xm.Added6[0].Port, addr.Port)
+ require.EqualValues(t, xm.Added6Flags[0], f)
+ })
+ t.Run("unspecified", func(t *testing.T) {
+ addr := krpc.NodeAddr{}
+ xm := PexMsg{}
+ xm.AppendAdded(addr, 0)
+ require.EqualValues(t, len(xm.Added), 0)
+ require.EqualValues(t, len(xm.AddedFlags), 0)
+ require.EqualValues(t, len(xm.Added6), 0)
+ require.EqualValues(t, len(xm.Added6Flags), 0)
+ })
+}
+
+func TestPexAppendDropped(t *testing.T) {
+ t.Run("ipv4", func(t *testing.T) {
+ addr := krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747}
+ xm := PexMsg{}
+ xm.AppendDropped(addr)
+ require.EqualValues(t, len(xm.Dropped), 1)
+ require.EqualValues(t, len(xm.Dropped6), 0)
+ require.True(t, xm.Dropped[0].IP.Equal(addr.IP), "IPs should match")
+ require.EqualValues(t, xm.Dropped[0].Port, addr.Port)
+ })
+ t.Run("ipv6", func(t *testing.T) {
+ addr := krpc.NodeAddr{IP: net.IPv6loopback, Port: 4747}
+ xm := PexMsg{}
+ xm.AppendDropped(addr)
+ require.EqualValues(t, len(xm.Dropped), 0)
+ require.EqualValues(t, len(xm.Dropped6), 1)
+ require.True(t, xm.Dropped6[0].IP.Equal(addr.IP), "IPs should match")
+ require.EqualValues(t, xm.Dropped6[0].Port, addr.Port)
+ })
+ t.Run("unspecified", func(t *testing.T) {
+ addr := krpc.NodeAddr{}
+ xm := PexMsg{}
+ xm.AppendDropped(addr)
+ require.EqualValues(t, len(xm.Dropped), 0)
+ require.EqualValues(t, len(xm.Dropped6), 0)
+ })
+}
+
+func TestMarshalPexMessage(t *testing.T) {
+ addr := krpc.NodeAddr{IP: net.IP{127, 0, 0, 1}, Port: 0x55aa}
+ f := PexPrefersEncryption | PexOutgoingConn
+ pm := PexMsg{}
+ pm.AppendAdded(addr, f)
+
+ b, err := bencode.Marshal(pm)
+ require.NoError(t, err)
+
+ pexExtendedId := ExtensionNumber(7)
+ msg := pm.Message(pexExtendedId)
+ expected := []byte("\x00\x00\x00\x4c\x14\x07d5:added6:\x7f\x00\x00\x01\x55\xaa7:added.f1:\x116:added60:8:added6.f0:7:dropped0:8:dropped60:e")
+ b, err = msg.MarshalBinary()
+ require.NoError(t, err)
+ require.EqualValues(t, b, expected)
+
+ msg = Message{}
+ dec := Decoder{
+ R: bufio.NewReader(bytes.NewBuffer(b)),
+ MaxLength: 128,
+ }
+ pmOut := PexMsg{}
+ err = dec.Decode(&msg)
+ require.NoError(t, err)
+ require.EqualValues(t, Extended, msg.Type)
+ require.EqualValues(t, pexExtendedId, msg.ExtendedID)
+ err = bencode.Unmarshal(msg.ExtendedPayload, &pmOut)
+ require.NoError(t, err)
+ require.EqualValues(t, len(pm.Added), len(pmOut.Added))
+ require.EqualValues(t, pm.Added[0].IP, pmOut.Added[0].IP)
+ require.EqualValues(t, pm.Added[0].Port, pmOut.Added[0].Port)
+}
peerChoking bool
peerRequests map[request]struct{}
PeerExtensionBytes pp.PeerExtensionBits
+ PeerPrefersEncryption bool // as indicated by 'e' field in extension handshake
+ PeerListenPort int
// The pieces the peer has claimed to have.
_peerPieces bitmap.Bitmap
// The peer has everything. This can occur due to a special message, when
if c.PeerExtensionIDs == nil {
c.PeerExtensionIDs = make(map[pp.ExtensionName]pp.ExtensionNumber, len(d.M))
}
+ c.PeerListenPort = d.Port
+ c.PeerPrefersEncryption = d.Encryption
for name, id := range d.M {
if _, ok := c.PeerExtensionIDs[name]; !ok {
torrent.Add(fmt.Sprintf("peers supporting extension %q", name), 1)
}
}
c.requestPendingMetadata()
+ if !cl.config.DisablePEX {
+ cl.sendInitialPEX(c, t)
+ // BUG no sending PEX updates yet
+ }
return nil
case metadataExtendedId:
err := cl.gotMetadataExtensionMsg(payload, t, c)
// advertising that we support PEX if it's disabled.
return nil
}
+ c.logger.Printf("incoming PEX message")
var pexMsg pp.PexMsg
err := bencode.Unmarshal(payload, &pexMsg)
if err != nil {
return fmt.Errorf("error unmarshalling PEX message: %s", err)
}
+ npeers := len(pexMsg.Added6) + len(pexMsg.Added)
+ c.logger.Printf("adding %d peers from PEX", npeers)
torrent.Add("pex added6 peers received", int64(len(pexMsg.Added6)))
var peers Peers
peers.AppendFromPex(pexMsg.Added6, pexMsg.Added6Flags)
return IpPort{ipa.IP, uint16(ipa.Port)}
}
+func (c *PeerConn) pexPeerFlags() pp.PexPeerFlags {
+ f := pp.PexPeerFlags(0)
+ if c.PeerPrefersEncryption {
+ f |= pp.PexPrefersEncryption
+ }
+ if c.outgoing {
+ f |= pp.PexOutgoingConn
+ }
+ if c.utp() {
+ f |= pp.PexSupportsUtp
+ }
+ return f
+}
+
func (c *PeerConn) String() string {
return fmt.Sprintf("connection %p", c)
}
require.NoError(b, <-mrlErr)
require.EqualValues(b, b.N, cn._stats.ChunksReadUseful.Int64())
}
+
+func TestPexPeerFlags(t *testing.T) {
+ var testcases = []struct {
+ conn *PeerConn
+ f pp.PexPeerFlags
+ }{
+ {&PeerConn{outgoing: false, PeerPrefersEncryption: false}, 0},
+ {&PeerConn{outgoing: false, PeerPrefersEncryption: true}, pp.PexPrefersEncryption},
+ {&PeerConn{outgoing: true, PeerPrefersEncryption: false}, pp.PexOutgoingConn},
+ {&PeerConn{outgoing: true, PeerPrefersEncryption: true}, pp.PexOutgoingConn | pp.PexPrefersEncryption},
+ {&PeerConn{network: "udp4"}, pp.PexSupportsUtp},
+ {&PeerConn{outgoing: true, network: "udp6"}, pp.PexOutgoingConn | pp.PexSupportsUtp},
+ {&PeerConn{outgoing: true, network: "tcp4"}, pp.PexOutgoingConn},
+ {&PeerConn{network: "tcp6"}, 0},
+ }
+ for i, tc := range testcases {
+ f := tc.conn.pexPeerFlags()
+ require.EqualValues(t, tc.f, f, i)
+ }
+}
"fmt"
"io"
"math/rand"
+ "net"
"net/url"
"sync"
"text/tabwriter"
"github.com/davecgh/go-spew/spew"
"github.com/anacrolix/dht/v2"
+ "github.com/anacrolix/dht/v2/krpc"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo"
"github.com/anacrolix/missinggo/perf"
// Count of each request across active connections.
pendingRequests map[request]int
+
+ pex pexState
}
func (t *Torrent) numConns() int {
if ipAddr, ok := tryIpPortFromNetAddr(p.Addr); ok {
if cl.badPeerIPPort(ipAddr.IP, ipAddr.Port) {
torrent.Add("peers not added because of bad addr", 1)
+ cl.logger.Printf("peers not added because of bad addr: %v", p)
return
}
}
for conn := range t.conns {
conn.close()
}
+ // PEX wipe state here
t.cl.event.Broadcast()
t.pieceStateChanges.Close()
t.updateWantPeersEvent()
func (t *Torrent) dropConnection(c *PeerConn) {
t.cl.event.Broadcast()
+ if !t.cl.config.DisablePEX {
+ t.pex.dropped(c)
+ }
c.close()
if t.deleteConnection(c) {
t.openNewConns()
panic(len(t.conns))
}
t.conns[c] = struct{}{}
+ if !t.cl.config.DisablePEX {
+ t.pex.added(c)
+ }
return nil
}
defer t.cl.unlock()
t.userOnWriteChunkErr = f
}
+
+func nodeAddr(addr net.Addr) krpc.NodeAddr {
+ ipport, _ := tryIpPortFromNetAddr(addr)
+ ip := ipport.IP
+ if ip4 := ip.To4(); ip4 != nil && len(ip) != net.IPv4len {
+ ip = ip4
+ }
+ return krpc.NodeAddr{IP: ip, Port: ipport.Port}
+}
+
+func (t *Torrent) pexInitial() *pp.PexMsg {
+ // BUG FIXME PEX prepare 25 recently connected peers
+ tx := &pp.PexMsg{}
+ for c := range t.conns {
+ addr := nodeAddr(c.remoteAddr)
+ f := c.pexPeerFlags()
+ tx.AppendAdded(addr, f)
+ }
+ nc := len(tx.Added) + len(tx.Added6)
+ // BUG if nc < 50 {
+ if nc < 1 {
+ return nil
+ }
+ return tx
+}
+
+type pexState struct{}
+
+func (s *pexState) added(c *PeerConn) {
+ return
+}
+
+func (s *pexState) dropped(c *PeerConn) {
+ return
+}
assert.False(t, tt.haveAllMetadataPieces())
assert.Nil(t, tt.Metainfo().InfoBytes)
}
+
+func TestTorrentPexInitial(t *testing.T) {
+ v := []*PeerConn{
+ &PeerConn{
+ remoteAddr: &net.UDPAddr{IP: net.IPv4(172, 17, 0, 2), Port: 5555},
+ },
+ &PeerConn{
+ remoteAddr: &net.UDPAddr{
+ IP: net.IP{0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
+ Port: 11111,
+ },
+ outgoing: true,
+ },
+ &PeerConn{
+ remoteAddr: &net.UDPAddr{IP: net.IP(nil), Port: 0},
+ },
+ }
+ torrent := &Torrent{conns: make(map[*PeerConn]struct{})}
+ for _, conn := range v {
+ torrent.conns[conn] = struct{}{}
+ }
+ tx := torrent.pexInitial()
+ require.NotNil(t, tx)
+ require.EqualValues(t, 1, len(tx.Added))
+ require.EqualValues(t, tx.Added[0].UDP().Network(), v[0].remoteAddr.Network())
+ require.EqualValues(t, tx.Added[0].UDP().String(), v[0].remoteAddr.String())
+ require.Zero(t, tx.AddedFlags[0])
+ require.EqualValues(t, 1, len(tx.Added6))
+ require.EqualValues(t, tx.Added6[0].UDP().Network(), v[1].remoteAddr.Network())
+ require.EqualValues(t, tx.Added6[0].UDP().String(), v[1].remoteAddr.String())
+ require.NotZero(t, tx.Added6Flags[0]&pp.PexOutgoingConn)
+}