}
}
-// Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
+// Performs initiator handshakes and returns a connection. Returns nil *PeerConn if no connection
// for valid reasons.
func (cl *Client) initiateProtocolHandshakes(
ctx context.Context,
return nil, errors.New("dial failed")
}
addrIpPort, _ := tryIpPortFromNetAddr(addr)
- c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, obfuscatedHeader, newConnectionOpts{
- outgoing: true,
- remoteAddr: addr,
- // It would be possible to retrieve a public IP from the dialer used here?
- localPublicAddr: cl.publicAddr(addrIpPort.IP),
- network: dr.Dialer.DialerNetwork(),
- connString: regularNetConnPeerConnConnString(nc),
- })
+ c, err := cl.initiateProtocolHandshakes(
+ context.Background(), nc, t, obfuscatedHeader,
+ newConnectionOpts{
+ outgoing: true,
+ remoteAddr: addr,
+ // It would be possible to retrieve a public IP from the dialer used here?
+ localPublicAddr: cl.publicAddr(addrIpPort.IP),
+ network: dr.Dialer.DialerNetwork(),
+ connString: regularNetConnPeerConnConnString(nc),
+ })
if err != nil {
nc.Close()
}
}
}
c.peerImpl = c
- c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
+ c.logger = cl.logger.WithDefaultLevel(log.Warning)
c.setRW(connStatsReadWriter{nc, c})
c.r = &rateLimitedReader{
l: cl.config.DownloadRateLimiter,
r: c.r,
}
- c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", opts.remoteAddr, opts.network, opts.outgoing)
+ c.logger.Levelf(
+ log.Debug,
+ "new PeerConn %p [Client %p remoteAddr %v network %v outgoing %t]",
+ c, cl, opts.remoteAddr, opts.network, opts.outgoing,
+ )
for _, f := range cl.config.Callbacks.NewPeer {
f(&c.Peer)
}
github.com/anacrolix/dht/v2 v2.19.2-0.20221121215055-066ad8494444
github.com/anacrolix/envpprof v1.2.1
github.com/anacrolix/fuse v0.2.0
- github.com/anacrolix/generics v0.0.0-20220618083756-f99e35403a60
+ github.com/anacrolix/generics v0.0.0-20230428105757-683593396d68
github.com/anacrolix/go-libutp v1.2.0
github.com/anacrolix/log v0.13.2-0.20221123232138-02e2764801c3
github.com/anacrolix/missinggo v1.3.0
github.com/anacrolix/fuse v0.2.0/go.mod h1:Kfu02xBwnySDpH3N23BmrP3MDfwAQGRLUCj6XyeOvBQ=
github.com/anacrolix/generics v0.0.0-20220618083756-f99e35403a60 h1:k4/h2B1gGF+PJGyGHxs8nmHHt1pzWXZWBj6jn4OBlRc=
github.com/anacrolix/generics v0.0.0-20220618083756-f99e35403a60/go.mod h1:ff2rHB/joTV03aMSSn/AZNnaIpUw0h3njetGsaXcMy8=
+github.com/anacrolix/generics v0.0.0-20230428105757-683593396d68 h1:fyXlBfnlFzZSFckJ8QLb2lfmWfY++4RiUnae7ZMuv0A=
+github.com/anacrolix/generics v0.0.0-20230428105757-683593396d68/go.mod h1:ff2rHB/joTV03aMSSn/AZNnaIpUw0h3njetGsaXcMy8=
github.com/anacrolix/go-libutp v1.2.0 h1:sjxoB+/ARiKUR7IK/6wLWyADIBqGmu1fm0xo+8Yy7u0=
github.com/anacrolix/go-libutp v1.2.0/go.mod h1:RrJ3KcaDcf9Jqp33YL5V/5CBEc6xMc7aJL8wXfuWL50=
github.com/anacrolix/log v0.3.0/go.mod h1:lWvLTqzAnCWPJA08T2HCstZi0L1y2Wyvm3FJgwU9jwU=
--- /dev/null
+package torrent
+
+import (
+ "fmt"
+ "net"
+ "net/netip"
+
+ "github.com/anacrolix/dht/v2/krpc"
+)
+
+func ipv4AddrPortFromKrpcNodeAddr(na krpc.NodeAddr) (_ netip.AddrPort, err error) {
+ ip4 := na.IP.To4()
+ if ip4 == nil {
+ err = fmt.Errorf("not an ipv4 address: %v", na.IP)
+ return
+ }
+ addr := netip.AddrFrom4([4]byte(ip4))
+ addrPort := netip.AddrPortFrom(addr, uint16(na.Port))
+ return addrPort, nil
+}
+
+func ipv6AddrPortFromKrpcNodeAddr(na krpc.NodeAddr) (_ netip.AddrPort, err error) {
+ ip6 := na.IP.To16()
+ if ip6 == nil {
+ err = fmt.Errorf("not an ipv4 address: %v", na.IP)
+ return
+ }
+ addr := netip.AddrFrom16([16]byte(ip6))
+ addrPort := netip.AddrPortFrom(addr, uint16(na.Port))
+ return addrPort, nil
+}
+
+func addrPortFromPeerRemoteAddr(pra PeerRemoteAddr) (netip.AddrPort, error) {
+ switch v := pra.(type) {
+ case *net.TCPAddr:
+ return v.AddrPort(), nil
+ case *net.UDPAddr:
+ return v.AddrPort(), nil
+ case netip.AddrPort:
+ return v, nil
+ default:
+ return netip.ParseAddrPort(pra.String())
+ }
+}
next(None[pieceIndex]())
}
-func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
+func (cn *Peer) writeStatus(w io.Writer) {
// \t isn't preserved in <pre> blocks?
if cn.closed.IsSet() {
fmt.Fprint(w, "CLOSED: ")
"errors"
"fmt"
"io"
+ "math/bits"
"strconv"
+ "strings"
+ "unsafe"
"github.com/anacrolix/torrent/metainfo"
)
PeerExtensionBits [8]byte
)
+var bitTags = []struct {
+ bit ExtensionBit
+ tag string
+}{
+ // Ordered by their base protocol type values (PORT, fast.., EXTENDED)
+ {ExtensionBitDHT, "dht"},
+ {ExtensionBitFast, "fast"},
+ {ExtensionBitExtended, "ext"},
+}
+
func (pex PeerExtensionBits) String() string {
- return hex.EncodeToString(pex[:])
+ pexHex := hex.EncodeToString(pex[:])
+ tags := make([]string, 0, len(bitTags)+1)
+ for _, bitTag := range bitTags {
+ if pex.GetBit(bitTag.bit) {
+ tags = append(tags, bitTag.tag)
+ pex.SetBit(bitTag.bit, false)
+ }
+ }
+ unknownCount := bits.OnesCount64(*(*uint64)((unsafe.Pointer(unsafe.SliceData(pex[:])))))
+ if unknownCount != 0 {
+ tags = append(tags, fmt.Sprintf("%v unknown", unknownCount))
+ }
+ return fmt.Sprintf("%v (%s)", pexHex, strings.Join(tags, ", "))
+
}
func NewPeerExtensionBytes(bits ...ExtensionBit) (ret PeerExtensionBits) {
}
}
+// Unmarshals and returns a PEX message.
func LoadPexMsg(b []byte) (ret PexMsg, err error) {
err = bencode.Unmarshal(b, &ret)
return
}
func (cn *PeerConn) peerImplStatusLines() []string {
- return []string{fmt.Sprintf("%+-55q %s %s", cn.PeerID, cn.PeerExtensionBytes, cn.connString)}
+ lines := make([]string, 0, 2)
+ lines = append(
+ lines,
+ fmt.Sprintf("%+-55q %v %s", cn.PeerID, cn.PeerExtensionBytes, cn.connString))
+ if cn.supportsExtension(pp.ExtensionNamePex) {
+ lines = append(
+ lines,
+ fmt.Sprintf(
+ "pex: %v conns, %v unsent events",
+ cn.pex.remoteLiveConns,
+ cn.pex.numPending()))
+ }
+ return lines
}
// Returns true if the connection is over IPv6.
c.requestPendingMetadata()
if !t.cl.config.DisablePEX {
t.pex.Add(c) // we learnt enough now
+ // This checks the extension is supported internally.
c.pex.Init(c)
}
return nil
}
}
-func (me *pexMsgFactory) PexMsg() pp.PexMsg {
- return me.msg
+func (me *pexMsgFactory) PexMsg() *pp.PexMsg {
+ return &me.msg
}
// Convert an arbitrary torrent peer Addr into one that can be represented by the compact addr
s.RLock()
defer s.RUnlock()
if start == nil {
- return s.msg0.PexMsg(), s.tail
+ return *s.msg0.PexMsg(), s.tail
}
var msg pexMsgFactory
last := start
msg.append(*e)
last = e
}
- return msg.PexMsg(), last
+ return *msg.PexMsg(), last
+}
+
+// The same as Genmsg but just counts up the distinct events that haven't been sent.
+func (s *pexState) numPending(start *pexEvent) (num int) {
+ s.RLock()
+ defer s.RUnlock()
+ if start == nil {
+ return s.msg0.PexMsg().Len()
+ }
+ for e := start.next; e != nil; e = e.next {
+ num++
+ }
+ return
}
import (
"fmt"
+ "net/netip"
"time"
+ g "github.com/anacrolix/generics"
+
"github.com/anacrolix/log"
pp "github.com/anacrolix/torrent/peer_protocol"
Listed bool
info log.Logger
dbg log.Logger
+ // Running record of live connections the remote end of the connection purports to have.
+ remoteLiveConns map[netip.AddrPort]g.Option[pp.PexPeerFlags]
}
func (s *pexConnState) IsEnabled() bool {
return &tx
}
+func (s *pexConnState) numPending() int {
+ if s.torrent == nil {
+ return 0
+ }
+ return s.torrent.pex.numPending(s.last)
+}
+
// Share is called from the writer goroutine if when it is woken up with the write buffers empty
// Returns whether there's more room on the send buffer to write to.
func (s *pexConnState) Share(postfn messageWriter) bool {
return true
}
+func (s *pexConnState) updateRemoteLiveConns(rx pp.PexMsg) (errs []error) {
+ for _, dropped := range rx.Dropped {
+ addrPort, _ := ipv4AddrPortFromKrpcNodeAddr(dropped)
+ delete(s.remoteLiveConns, addrPort)
+ }
+ for _, dropped := range rx.Dropped6 {
+ addrPort, _ := ipv6AddrPortFromKrpcNodeAddr(dropped)
+ delete(s.remoteLiveConns, addrPort)
+ }
+ for i, added := range rx.Added {
+ addr := netip.AddrFrom4([4]byte(added.IP.To4()))
+ addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
+ flags := g.SliceGet(rx.AddedFlags, i)
+ g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
+ }
+ for i, added := range rx.Added6 {
+ addr := netip.AddrFrom16([16]byte(added.IP.To16()))
+ addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
+ flags := g.SliceGet(rx.Added6Flags, i)
+ g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
+ }
+ return
+}
+
// Recv is called from the reader goroutine
func (s *pexConnState) Recv(payload []byte) error {
+ rx, err := pp.LoadPexMsg(payload)
+ if err != nil {
+ return fmt.Errorf("unmarshalling pex message: %w", err)
+ }
+ s.dbg.Printf("received pex message: %v", rx)
+ torrent.Add("pex added peers received", int64(len(rx.Added)))
+ torrent.Add("pex added6 peers received", int64(len(rx.Added6)))
+ s.updateRemoteLiveConns(rx)
+
if !s.torrent.wantPeers() {
s.dbg.Printf("peer reserve ok, incoming PEX discarded")
return nil
}
+ // TODO: This should be per conn, not for the whole Torrent.
if time.Now().Before(s.torrent.pex.rest) {
s.dbg.Printf("in cooldown period, incoming PEX discarded")
return nil
}
- rx, err := pp.LoadPexMsg(payload)
- if err != nil {
- return fmt.Errorf("error unmarshalling PEX message: %s", err)
- }
- s.dbg.Print("incoming PEX message: ", rx)
- torrent.Add("pex added peers received", int64(len(rx.Added)))
- torrent.Add("pex added6 peers received", int64(len(rx.Added6)))
-
var peers peerInfos
peers.AppendFromPex(rx.Added6, rx.Added6Flags)
peers.AppendFromPex(rx.Added, rx.AddedFlags)
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/internal/testutil"
"github.com/anacrolix/torrent/storage"
- "github.com/frankban/quicktest"
+ qt "github.com/frankban/quicktest"
"golang.org/x/time/rate"
"github.com/stretchr/testify/assert"
defer wg.Done()
r := llg.NewReader()
defer r.Close()
- quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil)
+ qt.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), qt.IsNil)
}()
done := make(chan struct{})
defer close(done)
// them. That encourages us to reconnect to peers that are well known in
// the swarm.
peers prioritizedPeers
- // Whether we want to know to know more peers.
+ // Whether we want to know more peers.
wantPeersEvent missinggo.Event
// An announcer for each tracker URL.
trackerAnnouncers map[string]torrentTrackerAnnouncer
for i, c := range peers {
fmt.Fprintf(w, "%2d. ", i+1)
buf.Reset()
- c.writeStatus(&buf, t)
+ c.writeStatus(&buf)
w.Write(bytes.TrimRight(
bytes.ReplaceAll(buf.Bytes(), []byte("\n"), []byte("\n ")),
" "))
panic(len(t.conns))
}
t.conns[c] = struct{}{}
+ t.cl.event.Broadcast()
if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() {
t.pex.Add(c) // as no further extended handshake expected
}