Includes a test with unix sockets. Exposes AddDialer, AddListener, and reworks Peer.
// Peer connection info, handed about publicly.
type Peer struct {
Id [20]byte
- IP net.IP
- Port int
+ Addr net.Addr
Source peerSource
// Peer is known to support encryption.
SupportsEncryption bool
// FromPex generate Peer from peer exchange
func (me *Peer) FromPex(na krpc.NodeAddr, fs peer_protocol.PexPeerFlags) {
- me.IP = append([]byte(nil), na.IP...)
- me.Port = na.Port
+ me.Addr = ipPortAddr{append([]byte(nil), na.IP...), na.Port}
me.Source = peerSourcePex
// If they prefer encryption, they must support it.
if fs.Get(peer_protocol.PexPrefersEncryption) {
}
func (me Peer) addr() IpPort {
- return IpPort{IP: me.IP, Port: uint16(me.Port)}
+ return IpPort{IP: addrIpOrNil(me.Addr), Port: uint16(addrPortOrZero(me.Addr))}
}
func (ret Peers) AppendFromTracker(ps []tracker.Peer) Peers {
for _, p := range ps {
_p := Peer{
- IP: p.IP,
- Port: p.Port,
+ Addr: ipPortAddr{p.IP, p.Port},
Source: peerSourceTracker,
}
copy(_p.Id[:], p.ID)
peerID PeerID
defaultStorage *storage.Client
onClose []func()
- dialers []dialer
- listeners []listener
+ dialers []Dialer
+ listeners []Listener
dhtServers []*dht.Server
ipBlockList iplist.Ranger
// Our BitTorrent protocol extension bytes, sent in our BT handshakes.
return cl.peerID
}
+// Returns the port number for the first listener that has one. No longer assumes that all port
+// numbers are the same, due to support for custom listeners. Returns zero if no port number is
+// found.
func (cl *Client) LocalPort() (port int) {
- cl.eachListener(func(l listener) bool {
- _port := missinggo.AddrPort(l.Addr())
- if _port == 0 {
- panic(l)
- }
- if port == 0 {
- port = _port
- } else if port != _port {
- panic("mismatched ports")
- }
- return true
+ cl.eachListener(func(l Listener) bool {
+ port = addrPortOrZero(l.Addr())
+ return port == 0
})
return
}
return
}
+// Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
+// given address for any Torrent.
+func (cl *Client) AddDialer(d Dialer) {
+ cl.dialers = append(cl.dialers, d)
+}
+
+// Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
+// yourself.
+func (cl *Client) AddListener(l Listener) {
+ cl.listeners = append(cl.listeners, l)
+ go cl.acceptConnections(l)
+}
+
func (cl *Client) firewallCallback(net.Addr) bool {
cl.rLock()
block := !cl.wantConns()
}
}
+// TODO: Apply filters for non-standard networks, particularly rate-limiting.
func (cl *Client) rejectAccepted(conn net.Conn) error {
ra := conn.RemoteAddr()
- rip := missinggo.AddrIP(ra)
- if cl.config.DisableIPv4Peers && rip.To4() != nil {
- return errors.New("ipv4 peers disabled")
- }
- if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
- return errors.New("ipv4 disabled")
+ if rip := addrIpOrNil(ra); rip != nil {
+ if cl.config.DisableIPv4Peers && rip.To4() != nil {
+ return errors.New("ipv4 peers disabled")
+ }
+ if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
+ return errors.New("ipv4 disabled")
- }
- if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
- return errors.New("ipv6 disabled")
- }
- if cl.rateLimitAccept(rip) {
- return errors.New("source IP accepted rate limited")
- }
- if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
- return errors.New("bad source addr")
+ }
+ if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
+ return errors.New("ipv6 disabled")
+ }
+ if cl.rateLimitAccept(rip) {
+ return errors.New("source IP accepted rate limited")
+ }
+ if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
+ return errors.New("bad source addr")
+ }
}
return nil
}
} else {
go cl.incomingConnection(conn)
}
- log.Fmsg("accepted %s connection from %s", conn.RemoteAddr().Network(), conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger)
- torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1)
+ log.Fmsg("accepted %q connection at %q from %q",
+ l.Addr().Network(),
+ conn.LocalAddr(),
+ conn.RemoteAddr(),
+ ).AddValue(debugLogValue).Log(cl.logger)
+ torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
}()
if tc, ok := nc.(*net.TCPConn); ok {
tc.SetLinger(0)
}
- c := cl.newConnection(nc, false, missinggo.IpPortFromNetAddr(nc.RemoteAddr()), nc.RemoteAddr().Network())
+ c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network())
c.Discovery = peerSourceIncoming
cl.runReceivedConn(c)
}
func() {
cl.lock()
defer cl.unlock()
- cl.eachDialer(func(s dialer) bool {
+ cl.eachDialer(func(s Dialer) bool {
func() {
left++
//cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
return res
}
-func (cl *Client) dialFromSocket(ctx context.Context, s dialer, addr string) net.Conn {
+func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
network := s.LocalAddr().Network()
cte := cl.config.ConnTracker.Wait(
ctx,
}
return nil
}
- c, err := s.dial(ctx, addr)
+ c, err := s.Dial(ctx, addr)
// This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
// it now in case we close the connection forthwith.
if tc, ok := c.(*net.TCPConn); ok {
// Performs initiator handshakes and returns a connection. Returns nil
// *connection if no connection for valid reasons.
-func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool, remoteAddr IpPort, network string) (c *connection, err error) {
+func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool, remoteAddr net.Addr, network string) (c *connection, err error) {
c = cl.newConnection(nc, true, remoteAddr, network)
c.headerEncrypted = encryptHeader
ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
// Returns nil connection and nil error if no connection could be established
// for valid reasons.
-func (cl *Client) establishOutgoingConnEx(t *Torrent, addr IpPort, obfuscatedHeader bool) (*connection, error) {
+func (cl *Client) establishOutgoingConnEx(t *Torrent, addr net.Addr, obfuscatedHeader bool) (*connection, error) {
dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
cl.rLock()
defer cl.rUnlock()
// Returns nil connection and nil error if no connection could be established
// for valid reasons.
-func (cl *Client) establishOutgoingConn(t *Torrent, addr IpPort) (c *connection, err error) {
+func (cl *Client) establishOutgoingConn(t *Torrent, addr net.Addr) (c *connection, err error) {
torrent.Add("establish outgoing connection", 1)
obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
// Called to dial out and run a connection. The addr we're given is already
// considered half-open.
-func (cl *Client) outgoingConnection(t *Torrent, addr IpPort, ps peerSource, trusted bool) {
+func (cl *Client) outgoingConnection(t *Torrent, addr net.Addr, ps peerSource, trusted bool) {
cl.dialRateLimiter.Wait(context.Background())
c, err := cl.establishOutgoingConn(t, addr)
cl.lock()
cl.runHandshookConn(c, t)
}
-// The port number for incoming peer connections. 0 if the client isn't
-// listening.
+// The port number for incoming peer connections. 0 if the client isn't listening.
func (cl *Client) incomingPeerPort() int {
return cl.LocalPort()
}
},
V: cl.config.ExtendedHandshakeClientVersion,
Reqq: 64, // TODO: Really?
- YourIp: pp.CompactIp(conn.remoteAddr.IP),
+ YourIp: pp.CompactIp(addrIpOrNil(conn.remoteAddr)),
Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
Port: cl.incomingPeerPort(),
MetadataSize: torrent.metadataSize(),
}
}
+func (cl *Client) badPeerAddr(addr net.Addr) bool {
+ if ipa, ok := tryIpPortFromNetAddr(addr); ok {
+ return cl.badPeerIPPort(ipa.IP, ipa.Port)
+ }
+ return false
+}
+
func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
if port == 0 {
return true
peers: prioritizedPeers{
om: btree.New(32),
getPrio: func(p Peer) peerPriority {
- return bep40PriorityIgnoreError(cl.publicAddr(p.IP), p.addr())
+ return bep40PriorityIgnoreError(cl.publicAddr(addrIpOrNil(p.Addr)), p.addr())
},
},
conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
cl.badPeerIPs[ip.String()] = struct{}{}
}
-func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr IpPort, network string) (c *connection) {
+func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr net.Addr, network string) (c *connection) {
c = &connection{
conn: nc,
outgoing: outgoing,
return
}
t.addPeers([]Peer{{
- IP: ip,
- Port: port,
+ Addr: ipPortAddr{ip, port},
Source: peerSourceDhtAnnouncePeer,
}})
}
return nil
}
-func (cl *Client) eachDialer(f func(dialer) bool) {
+func (cl *Client) eachDialer(f func(Dialer) bool) {
for _, s := range cl.dialers {
if !f(s) {
break
}
}
-func (cl *Client) eachListener(f func(listener) bool) {
+func (cl *Client) eachListener(f func(Listener) bool) {
for _, s := range cl.listeners {
if !f(s) {
break
}
func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
- cl.eachListener(func(l listener) bool {
+ cl.eachListener(func(l Listener) bool {
ret = l
return !f(l)
})
}
func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
- return missinggo.AddrIP(cl.findListener(func(l net.Listener) bool {
- return f(missinggo.AddrIP(l.Addr()))
- }).Addr())
+ return addrIpOrNil(
+ cl.findListener(
+ func(l net.Listener) bool {
+ return f(addrIpOrNil(l.Addr()))
+ },
+ ).Addr(),
+ )
}
// Our IP as a peer should see it.
func (cl *Client) ListenAddrs() (ret []net.Addr) {
cl.lock()
defer cl.unlock()
- cl.eachListener(func(l listener) bool {
+ cl.eachListener(func(l Listener) bool {
ret = append(ret, l.Addr())
return true
})
return
}
-func (cl *Client) onBadAccept(addr IpPort) {
- ip := maskIpForAcceptLimiting(addr.IP)
+func (cl *Client) onBadAccept(addr net.Addr) {
+ ipa, ok := tryIpPortFromNetAddr(addr)
+ if !ok {
+ return
+ }
+ ip := maskIpForAcceptLimiting(ipa.IP)
if cl.acceptLimiter == nil {
cl.acceptLimiter = make(map[ipStr]int)
}
defer cl.Close()
port := cl.LocalPort()
assert.NotEqual(t, 0, port)
- cl.eachListener(func(s listener) bool {
+ cl.eachListener(func(s Listener) bool {
assert.Equal(t, port, missinggo.AddrPort(s.Addr()))
return true
})
return
}
ret = append(ret, torrent.Peer{
- IP: addr.IP,
- Port: addr.Port,
+ Addr: addr,
})
}
return
t.AddPeers(func() (ret []torrent.Peer) {
for _, ta := range flags.TestPeer {
ret = append(ret, torrent.Peer{
- IP: ta.IP,
- Port: ta.Port,
+ Addr: ta,
})
}
return
func addTestPeer(client *torrent.Client) {
for _, t := range client.Torrents() {
t.AddPeers([]torrent.Peer{{
- IP: args.TestPeer.IP,
- Port: args.TestPeer.Port,
+ Addr: args.TestPeer,
}})
}
}
// Store torrent file data in this directory unless .DefaultStorage is
// specified.
DataDir string `long:"data-dir" description:"directory to store downloaded torrent data"`
- // The address to listen for new uTP and TCP bittorrent protocol
- // connections. DHT shares a UDP socket with uTP unless configured
- // otherwise.
+ // The address to listen for new uTP and TCP BitTorrent protocol connections. DHT shares a UDP
+ // socket with uTP unless configured otherwise.
ListenHost func(network string) string
ListenPort int
NoDefaultPortForwarding bool
conn net.Conn
outgoing bool
network string
- remoteAddr IpPort
+ remoteAddr net.Addr
// The Reader and Writer for this Conn, with hooks installed for stats,
// limiting, deadlines etc.
w io.Writer
// Returns true if the connection is over IPv6.
func (cn *connection) ipv6() bool {
- ip := cn.remoteAddr.IP
+ ip := addrIpOrNil(cn.remoteAddr)
if ip.To4() != nil {
return false
}
return len(ip) == net.IPv6len
}
-// Returns true the dialer has the lower client peer ID. TODO: Find the
+// Returns true the if the dialer/initiator has the lower client peer ID. TODO: Find the
// specification for this.
func (cn *connection) isPreferredDirection() bool {
return bytes.Compare(cn.t.cl.peerID[:], cn.PeerID[:]) < 0 == cn.outgoing
req := newRequestFromMessage(&msg)
c.onPeerSentCancel(req)
case pp.Port:
+ ipa, ok := tryIpPortFromNetAddr(c.remoteAddr)
+ if !ok {
+ break
+ }
pingAddr := net.UDPAddr{
- IP: c.remoteAddr.IP,
- Port: int(c.remoteAddr.Port),
+ IP: ipa.IP,
+ Port: ipa.Port,
}
if msg.Port != 0 {
pingAddr.Port = int(msg.Port)
}
func (c *connection) remoteIp() net.IP {
- return c.remoteAddr.IP
+ return addrIpOrNil(c.remoteAddr)
}
func (c *connection) remoteIpPort() IpPort {
- return c.remoteAddr
+ ipa, _ := tryIpPortFromNetAddr(c.remoteAddr)
+ return IpPort{ipa.IP, uint16(ipa.Port)}
}
func (c *connection) String() string {
config: TestingConfig(),
}
cl.initLogger()
- c := cl.newConnection(nil, false, IpPort{}, "")
+ c := cl.newConnection(nil, false, nil, "")
c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil))
c.t.setInfo(&metainfo.Info{
Pieces: make([]byte, metainfo.HashSize*3),
t.setChunkSize(defaultChunkSize)
t._pendingPieces.Set(0, PiecePriorityNormal.BitmapPriority())
r, w := net.Pipe()
- cn := cl.newConnection(r, true, IpPort{}, "")
+ cn := cl.newConnection(r, true, nil, "")
cn.setTorrent(t)
mrlErr := make(chan error)
msg := pp.Message{
--- /dev/null
+package torrent
+
+import (
+ "context"
+ "net"
+
+ "github.com/anacrolix/missinggo/perf"
+)
+
+type Dialer interface {
+ // The network is implied by the instance.
+ Dial(_ context.Context, addr string) (net.Conn, error)
+ // This is required for registering with the connection tracker (router connection table
+ // emulating rate-limiter) before dialing. TODO: What about connections that wouldn't infringe
+ // on routers, like localhost or unix sockets.
+ LocalAddr() net.Addr
+}
+
+type NetDialer struct {
+ Network string
+ Dialer net.Dialer
+}
+
+func (me NetDialer) Dial(ctx context.Context, addr string) (_ net.Conn, err error) {
+ defer perf.ScopeTimerErr(&err)()
+ return me.Dialer.DialContext(ctx, me.Network, addr)
+}
+
+func (me NetDialer) LocalAddr() net.Addr {
+ return netDialerLocalAddr{me.Network, me.Dialer.LocalAddr}
+}
+
+type netDialerLocalAddr struct {
+ network string
+ addr net.Addr
+}
+
+func (me netDialerLocalAddr) Network() string { return me.network }
+
+func (me netDialerLocalAddr) String() string {
+ if me.addr == nil {
+ return ""
+ }
+ return me.addr.String()
+}
github.com/anacrolix/missinggo/perf v1.0.0
github.com/anacrolix/missinggo/v2 v2.3.2-0.20200110051601-fc3212fb3984
github.com/anacrolix/multiless v0.0.0-20191223025854-070b7994e841
- github.com/anacrolix/stm v0.2.0
github.com/anacrolix/sync v0.2.0
github.com/anacrolix/tagflag v1.0.1
github.com/anacrolix/upnp v0.1.1
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.4.0
github.com/tinylib/msgp v1.1.1 // indirect
- golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa
+ golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa // indirect
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543
)
--- /dev/null
+package torrent
+
+import (
+ "net"
+ "strconv"
+)
+
+// Extracts the port as an integer from an address string.
+func addrPortOrZero(addr net.Addr) int {
+ switch raw := addr.(type) {
+ case *net.UDPAddr:
+ return raw.Port
+ case *net.TCPAddr:
+ return raw.Port
+ default:
+ _, port, err := net.SplitHostPort(addr.String())
+ if err != nil {
+ return 0
+ }
+ i64, err := strconv.ParseInt(port, 0, 0)
+ if err != nil {
+ panic(err)
+ }
+ return int(i64)
+ }
+}
+
+func addrIpOrNil(addr net.Addr) net.IP {
+ if addr == nil {
+ return nil
+ }
+ switch raw := addr.(type) {
+ case *net.UDPAddr:
+ return raw.IP
+ case *net.TCPAddr:
+ return raw.IP
+ default:
+ host, _, err := net.SplitHostPort(addr.String())
+ if err != nil {
+ return nil
+ }
+ return net.ParseIP(host)
+ }
+}
+
+type ipPortAddr struct {
+ IP net.IP
+ Port int
+}
+
+func (ipPortAddr) Network() string {
+ return ""
+}
+
+func (me ipPortAddr) String() string {
+ return net.JoinHostPort(me.IP.String(), strconv.FormatInt(int64(me.Port), 10))
+}
+
+func tryIpPortFromNetAddr(na net.Addr) (ret ipPortAddr, ok bool) {
+ ret.IP = addrIpOrNil(na)
+ if ret.IP == nil {
+ return
+ }
+ ret.Port = addrPortOrZero(na)
+ ok = true
+ return
+}
LocalAddr() net.Addr
}) bool {
ra := nc.LocalAddr()
- rip := missinggo.AddrIP(ra)
+ rip := addrIpOrNil(ra)
return rip.To4() == nil && rip.To16() != nil
}
assert.Panics(t, func() { pp.PopMax() })
assert.False(t, ok)
ps := []Peer{
- {IP: net.ParseIP("1.2.3.4")},
- {IP: net.ParseIP("1::2")},
- {IP: net.ParseIP("")},
- {IP: net.ParseIP(""), Trusted: true},
+ {Addr: ipPortAddr{IP: net.ParseIP("1.2.3.4")}},
+ {Addr: ipPortAddr{IP: net.ParseIP("1::2")}},
+ {Addr: ipPortAddr{IP: net.ParseIP("")}},
+ {Addr: ipPortAddr{IP: net.ParseIP("")}, Trusted: true},
}
for i, p := range ps {
t.Logf("peer %d priority: %08x trusted: %t\n", i, pp.getPrio(p), p.Trusted)
"github.com/pkg/errors"
)
-type dialer interface {
- dial(_ context.Context, addr string) (net.Conn, error)
- LocalAddr() net.Addr
-}
-
-type listener interface {
+type Listener interface {
net.Listener
}
type socket interface {
- listener
- dialer
+ Listener
+ Dialer
}
func listen(n network, addr string, f firewallCallback) (socket, error) {
l, err := net.Listen(network, address)
return tcpSocket{
Listener: l,
- network: network,
+ NetDialer: NetDialer{
+ Network: network,
+ },
}, err
}
type tcpSocket struct {
net.Listener
- network string
- dialer net.Dialer
+ NetDialer
}
-func (me tcpSocket) dial(ctx context.Context, addr string) (_ net.Conn, err error) {
- defer perf.ScopeTimerErr(&err)()
- return me.dialer.DialContext(ctx, me.network, addr)
-}
-
-func (me tcpSocket) LocalAddr() net.Addr {
- return tcpSocketLocalAddr{me.network, me.Listener.Addr().String()}
-}
-
-type tcpSocketLocalAddr struct {
- network string
- s string
-}
-
-func (me tcpSocketLocalAddr) Network() string { return me.network }
-
-func (me tcpSocketLocalAddr) String() string { return "" }
-
func listenAll(networks []network, getHost func(string) string, port int, f firewallCallback) ([]socket, error) {
if len(networks) == 0 {
return nil, nil
network string
}
-func (me utpSocketSocket) dial(ctx context.Context, addr string) (conn net.Conn, err error) {
+func (me utpSocketSocket) Dial(ctx context.Context, addr string) (conn net.Conn, err error) {
defer perf.ScopeTimerErr(&err)()
return me.utpSocket.DialContext(ctx, me.network, addr)
}
--- /dev/null
+package test
+
+import (
+ "github.com/anacrolix/torrent"
+)
+
+func init() {
+ torrent.TestingTempDir.Init("torrent-test.test")
+}
SeederStorage func(string) storage.ClientImpl
SeederUploadRateLimiter *rate.Limiter
LeecherDownloadRateLimiter *rate.Limiter
+ ConfigureSeeder ConfigureClient
+ ConfigureLeecher ConfigureClient
}
func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
} else {
cfg.DataDir = greetingTempDir
}
+ if ps.ConfigureSeeder.Config != nil {
+ ps.ConfigureSeeder.Config(cfg)
+ }
seeder, err := torrent.NewClient(cfg)
require.NoError(t, err)
+ if ps.ConfigureSeeder.Client != nil {
+ ps.ConfigureSeeder.Client(seeder)
+ }
if ps.ExportClientStatus {
defer testutil.ExportStatusWriter(seeder, "s")()
}
}
cfg.Seed = false
//cfg.Debug = true
+ if ps.ConfigureLeecher.Config != nil {
+ ps.ConfigureLeecher.Config(cfg)
+ }
leecher, err := torrent.NewClient(cfg)
require.NoError(t, err)
defer leecher.Close()
+ if ps.ConfigureLeecher.Client != nil {
+ ps.ConfigureLeecher.Client(leecher)
+ }
if ps.ExportClientStatus {
defer testutil.ExportStatusWriter(leecher, "l")()
}
}()
wg.Wait()
}
+
+type ConfigureClient struct {
+ Config func(*torrent.ClientConfig)
+ Client func(*torrent.Client)
+}
--- /dev/null
+package test
+
+import (
+ "io"
+ "log"
+ "net"
+ "path/filepath"
+ "testing"
+
+ "github.com/anacrolix/torrent"
+)
+
+func TestUnixConns(t *testing.T) {
+ var closers []io.Closer
+ defer func() {
+ for _, c := range closers {
+ c.Close()
+ }
+ }()
+ configure := ConfigureClient{
+ Config: func(cfg *torrent.ClientConfig) {
+ cfg.DisableUTP = true
+ cfg.DisableTCP = true
+ cfg.Debug = true
+ },
+ Client: func(cl *torrent.Client) {
+ cl.AddDialer(torrent.NetDialer{Network: "unix"})
+ l, err := net.Listen("unix", filepath.Join(torrent.TestingTempDir.NewSub(), "socket"))
+ if err != nil {
+ panic(err)
+ }
+ log.Printf("created listener %q", l)
+ closers = append(closers, l)
+ cl.AddListener(l)
+ },
+ }
+ testClientTransfer(t, testClientTransferParams{
+ ConfigureSeeder: configure,
+ ConfigureLeecher: configure,
+ })
+}
ks = append(ks, Peer{
Id: conn.PeerID,
- IP: conn.remoteAddr.IP,
- Port: int(conn.remoteAddr.Port),
+ Addr: conn.remoteAddr,
Source: conn.Discovery,
// > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
// > But if we're not connected to them with an encrypted connection, I couldn't say
if t.closed.IsSet() {
return
}
- if cl.badPeerIPPort(p.IP, p.Port) {
- torrent.Add("peers not added because of bad addr", 1)
- return
+ if ipAddr, ok := tryIpPortFromNetAddr(p.Addr); ok {
+ if cl.badPeerIPPort(ipAddr.IP, ipAddr.Port) {
+ torrent.Add("peers not added because of bad addr", 1)
+ return
+ }
}
if t.peers.Add(p) {
torrent.Add("peers replaced", 1)
continue
}
t.addPeer(Peer{
- IP: cp.IP[:],
- Port: cp.Port,
+ Addr: ipPortAddr{cp.IP, cp.Port},
Source: peerSourceDhtGetPeers,
})
}
peers[addr] = struct{}{}
}
t.peers.Each(func(peer Peer) {
- peers[fmt.Sprintf("%s:%d", peer.IP, peer.Port)] = struct{}{}
+ peers[peer.Addr.String()] = struct{}{}
})
return len(peers)
}
if len(bannableTouchers) >= 1 {
c := bannableTouchers[0]
- t.cl.banPeerIP(c.remoteAddr.IP)
+ t.cl.banPeerIP(c.remoteIp())
c.Drop()
}
}
if peer.Id == t.cl.peerID {
return
}
- if t.cl.badPeerIPPort(peer.IP, peer.Port) && !peer.Trusted {
+
+ if t.cl.badPeerAddr(peer.Addr) && !peer.Trusted {
return
}
- addr := IpPort{peer.IP, uint16(peer.Port)}
+ addr := peer.Addr
if t.addrActive(addr.String()) {
return
}
t.AddPeers(func() (ps []Peer) {
for _, la := range cl.ListenAddrs() {
ps = append(ps, Peer{
- IP: missinggo.AddrIP(la),
- Port: missinggo.AddrPort(la),
+ Addr: la,
Trusted: true,
})
}