From: Matt Joiner Date: Mon, 1 May 2023 00:19:24 +0000 (+1000) Subject: Misc debug status, pex conn tracking improvements X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=1e13625c7398094de923257f5855ef9af8d20681;p=btrtrc.git Misc debug status, pex conn tracking improvements --- diff --git a/client.go b/client.go index b6fed302..9306624b 100644 --- a/client.go +++ b/client.go @@ -686,7 +686,7 @@ func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) { } } -// Performs initiator handshakes and returns a connection. Returns nil *connection if no connection +// Performs initiator handshakes and returns a connection. Returns nil *PeerConn if no connection // for valid reasons. func (cl *Client) initiateProtocolHandshakes( ctx context.Context, @@ -730,14 +730,16 @@ func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfus return nil, errors.New("dial failed") } addrIpPort, _ := tryIpPortFromNetAddr(addr) - c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, obfuscatedHeader, newConnectionOpts{ - outgoing: true, - remoteAddr: addr, - // It would be possible to retrieve a public IP from the dialer used here? - localPublicAddr: cl.publicAddr(addrIpPort.IP), - network: dr.Dialer.DialerNetwork(), - connString: regularNetConnPeerConnConnString(nc), - }) + c, err := cl.initiateProtocolHandshakes( + context.Background(), nc, t, obfuscatedHeader, + newConnectionOpts{ + outgoing: true, + remoteAddr: addr, + // It would be possible to retrieve a public IP from the dialer used here? + localPublicAddr: cl.publicAddr(addrIpPort.IP), + network: dr.Dialer.DialerNetwork(), + connString: regularNetConnPeerConnConnString(nc), + }) if err != nil { nc.Close() } @@ -1510,13 +1512,17 @@ func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerCon } } c.peerImpl = c - c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c) + c.logger = cl.logger.WithDefaultLevel(log.Warning) c.setRW(connStatsReadWriter{nc, c}) c.r = &rateLimitedReader{ l: cl.config.DownloadRateLimiter, r: c.r, } - c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", opts.remoteAddr, opts.network, opts.outgoing) + c.logger.Levelf( + log.Debug, + "new PeerConn %p [Client %p remoteAddr %v network %v outgoing %t]", + c, cl, opts.remoteAddr, opts.network, opts.outgoing, + ) for _, f := range cl.config.Callbacks.NewPeer { f(&c.Peer) } diff --git a/go.mod b/go.mod index 64e1d63a..cf5700a1 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/anacrolix/dht/v2 v2.19.2-0.20221121215055-066ad8494444 github.com/anacrolix/envpprof v1.2.1 github.com/anacrolix/fuse v0.2.0 - github.com/anacrolix/generics v0.0.0-20220618083756-f99e35403a60 + github.com/anacrolix/generics v0.0.0-20230428105757-683593396d68 github.com/anacrolix/go-libutp v1.2.0 github.com/anacrolix/log v0.13.2-0.20221123232138-02e2764801c3 github.com/anacrolix/missinggo v1.3.0 diff --git a/go.sum b/go.sum index aef4f0ee..4e9c7ec4 100644 --- a/go.sum +++ b/go.sum @@ -78,6 +78,8 @@ github.com/anacrolix/fuse v0.2.0 h1:pc+To78kI2d/WUjIyrsdqeJQAesuwpGxlI3h1nAv3Do= github.com/anacrolix/fuse v0.2.0/go.mod h1:Kfu02xBwnySDpH3N23BmrP3MDfwAQGRLUCj6XyeOvBQ= github.com/anacrolix/generics v0.0.0-20220618083756-f99e35403a60 h1:k4/h2B1gGF+PJGyGHxs8nmHHt1pzWXZWBj6jn4OBlRc= github.com/anacrolix/generics v0.0.0-20220618083756-f99e35403a60/go.mod h1:ff2rHB/joTV03aMSSn/AZNnaIpUw0h3njetGsaXcMy8= +github.com/anacrolix/generics v0.0.0-20230428105757-683593396d68 h1:fyXlBfnlFzZSFckJ8QLb2lfmWfY++4RiUnae7ZMuv0A= +github.com/anacrolix/generics v0.0.0-20230428105757-683593396d68/go.mod h1:ff2rHB/joTV03aMSSn/AZNnaIpUw0h3njetGsaXcMy8= github.com/anacrolix/go-libutp v1.2.0 h1:sjxoB+/ARiKUR7IK/6wLWyADIBqGmu1fm0xo+8Yy7u0= github.com/anacrolix/go-libutp v1.2.0/go.mod h1:RrJ3KcaDcf9Jqp33YL5V/5CBEc6xMc7aJL8wXfuWL50= github.com/anacrolix/log v0.3.0/go.mod h1:lWvLTqzAnCWPJA08T2HCstZi0L1y2Wyvm3FJgwU9jwU= diff --git a/netip-addrport.go b/netip-addrport.go new file mode 100644 index 00000000..cf9edfd5 --- /dev/null +++ b/netip-addrport.go @@ -0,0 +1,44 @@ +package torrent + +import ( + "fmt" + "net" + "net/netip" + + "github.com/anacrolix/dht/v2/krpc" +) + +func ipv4AddrPortFromKrpcNodeAddr(na krpc.NodeAddr) (_ netip.AddrPort, err error) { + ip4 := na.IP.To4() + if ip4 == nil { + err = fmt.Errorf("not an ipv4 address: %v", na.IP) + return + } + addr := netip.AddrFrom4([4]byte(ip4)) + addrPort := netip.AddrPortFrom(addr, uint16(na.Port)) + return addrPort, nil +} + +func ipv6AddrPortFromKrpcNodeAddr(na krpc.NodeAddr) (_ netip.AddrPort, err error) { + ip6 := na.IP.To16() + if ip6 == nil { + err = fmt.Errorf("not an ipv4 address: %v", na.IP) + return + } + addr := netip.AddrFrom16([16]byte(ip6)) + addrPort := netip.AddrPortFrom(addr, uint16(na.Port)) + return addrPort, nil +} + +func addrPortFromPeerRemoteAddr(pra PeerRemoteAddr) (netip.AddrPort, error) { + switch v := pra.(type) { + case *net.TCPAddr: + return v.AddrPort(), nil + case *net.UDPAddr: + return v.AddrPort(), nil + case netip.AddrPort: + return v, nil + default: + return netip.ParseAddrPort(pra.String()) + } +} diff --git a/peer.go b/peer.go index d5ed19e5..e88485b2 100644 --- a/peer.go +++ b/peer.go @@ -275,7 +275,7 @@ func (cn *Peer) iterContiguousPieceRequests(f func(piece pieceIndex, count int)) next(None[pieceIndex]()) } -func (cn *Peer) writeStatus(w io.Writer, t *Torrent) { +func (cn *Peer) writeStatus(w io.Writer) { // \t isn't preserved in
 blocks?
 	if cn.closed.IsSet() {
 		fmt.Fprint(w, "CLOSED: ")
diff --git a/peer_protocol/handshake.go b/peer_protocol/handshake.go
index acdc3da5..76dc2b05 100644
--- a/peer_protocol/handshake.go
+++ b/peer_protocol/handshake.go
@@ -5,7 +5,10 @@ import (
 	"errors"
 	"fmt"
 	"io"
+	"math/bits"
 	"strconv"
+	"strings"
+	"unsafe"
 
 	"github.com/anacrolix/torrent/metainfo"
 )
@@ -33,8 +36,31 @@ type (
 	PeerExtensionBits [8]byte
 )
 
+var bitTags = []struct {
+	bit ExtensionBit
+	tag string
+}{
+	// Ordered by their base protocol type values (PORT, fast.., EXTENDED)
+	{ExtensionBitDHT, "dht"},
+	{ExtensionBitFast, "fast"},
+	{ExtensionBitExtended, "ext"},
+}
+
 func (pex PeerExtensionBits) String() string {
-	return hex.EncodeToString(pex[:])
+	pexHex := hex.EncodeToString(pex[:])
+	tags := make([]string, 0, len(bitTags)+1)
+	for _, bitTag := range bitTags {
+		if pex.GetBit(bitTag.bit) {
+			tags = append(tags, bitTag.tag)
+			pex.SetBit(bitTag.bit, false)
+		}
+	}
+	unknownCount := bits.OnesCount64(*(*uint64)((unsafe.Pointer(unsafe.SliceData(pex[:])))))
+	if unknownCount != 0 {
+		tags = append(tags, fmt.Sprintf("%v unknown", unknownCount))
+	}
+	return fmt.Sprintf("%v (%s)", pexHex, strings.Join(tags, ", "))
+
 }
 
 func NewPeerExtensionBytes(bits ...ExtensionBit) (ret PeerExtensionBits) {
diff --git a/peer_protocol/pex.go b/peer_protocol/pex.go
index 784aaa59..466548a3 100644
--- a/peer_protocol/pex.go
+++ b/peer_protocol/pex.go
@@ -28,6 +28,7 @@ func (m *PexMsg) Message(pexExtendedId ExtensionNumber) Message {
 	}
 }
 
+// Unmarshals and returns a PEX message.
 func LoadPexMsg(b []byte) (ret PexMsg, err error) {
 	err = bencode.Unmarshal(b, &ret)
 	return
diff --git a/peerconn.go b/peerconn.go
index 0318cb6e..1c55db62 100644
--- a/peerconn.go
+++ b/peerconn.go
@@ -62,7 +62,19 @@ type PeerConn struct {
 }
 
 func (cn *PeerConn) peerImplStatusLines() []string {
-	return []string{fmt.Sprintf("%+-55q %s %s", cn.PeerID, cn.PeerExtensionBytes, cn.connString)}
+	lines := make([]string, 0, 2)
+	lines = append(
+		lines,
+		fmt.Sprintf("%+-55q %v %s", cn.PeerID, cn.PeerExtensionBytes, cn.connString))
+	if cn.supportsExtension(pp.ExtensionNamePex) {
+		lines = append(
+			lines,
+			fmt.Sprintf(
+				"pex: %v conns, %v unsent events",
+				cn.pex.remoteLiveConns,
+				cn.pex.numPending()))
+	}
+	return lines
 }
 
 // Returns true if the connection is over IPv6.
@@ -848,6 +860,7 @@ func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err
 		c.requestPendingMetadata()
 		if !t.cl.config.DisablePEX {
 			t.pex.Add(c) // we learnt enough now
+			// This checks the extension is supported internally.
 			c.pex.Init(c)
 		}
 		return nil
diff --git a/pex.go b/pex.go
index 7fa0be88..4561c5e4 100644
--- a/pex.go
+++ b/pex.go
@@ -145,8 +145,8 @@ func (me *pexMsgFactory) append(event pexEvent) {
 	}
 }
 
-func (me *pexMsgFactory) PexMsg() pp.PexMsg {
-	return me.msg
+func (me *pexMsgFactory) PexMsg() *pp.PexMsg {
+	return &me.msg
 }
 
 // Convert an arbitrary torrent peer Addr into one that can be represented by the compact addr
@@ -225,7 +225,7 @@ func (s *pexState) Genmsg(start *pexEvent) (pp.PexMsg, *pexEvent) {
 	s.RLock()
 	defer s.RUnlock()
 	if start == nil {
-		return s.msg0.PexMsg(), s.tail
+		return *s.msg0.PexMsg(), s.tail
 	}
 	var msg pexMsgFactory
 	last := start
@@ -236,5 +236,18 @@ func (s *pexState) Genmsg(start *pexEvent) (pp.PexMsg, *pexEvent) {
 		msg.append(*e)
 		last = e
 	}
-	return msg.PexMsg(), last
+	return *msg.PexMsg(), last
+}
+
+// The same as Genmsg but just counts up the distinct events that haven't been sent.
+func (s *pexState) numPending(start *pexEvent) (num int) {
+	s.RLock()
+	defer s.RUnlock()
+	if start == nil {
+		return s.msg0.PexMsg().Len()
+	}
+	for e := start.next; e != nil; e = e.next {
+		num++
+	}
+	return
 }
diff --git a/pexconn.go b/pexconn.go
index d0308f75..5ccc0220 100644
--- a/pexconn.go
+++ b/pexconn.go
@@ -2,8 +2,11 @@ package torrent
 
 import (
 	"fmt"
+	"net/netip"
 	"time"
 
+	g "github.com/anacrolix/generics"
+
 	"github.com/anacrolix/log"
 
 	pp "github.com/anacrolix/torrent/peer_protocol"
@@ -26,6 +29,8 @@ type pexConnState struct {
 	Listed  bool
 	info    log.Logger
 	dbg     log.Logger
+	// Running record of live connections the remote end of the connection purports to have.
+	remoteLiveConns map[netip.AddrPort]g.Option[pp.PexPeerFlags]
 }
 
 func (s *pexConnState) IsEnabled() bool {
@@ -67,6 +72,13 @@ func (s *pexConnState) genmsg() *pp.PexMsg {
 	return &tx
 }
 
+func (s *pexConnState) numPending() int {
+	if s.torrent == nil {
+		return 0
+	}
+	return s.torrent.pex.numPending(s.last)
+}
+
 // Share is called from the writer goroutine if when it is woken up with the write buffers empty
 // Returns whether there's more room on the send buffer to write to.
 func (s *pexConnState) Share(postfn messageWriter) bool {
@@ -86,25 +98,51 @@ func (s *pexConnState) Share(postfn messageWriter) bool {
 	return true
 }
 
+func (s *pexConnState) updateRemoteLiveConns(rx pp.PexMsg) (errs []error) {
+	for _, dropped := range rx.Dropped {
+		addrPort, _ := ipv4AddrPortFromKrpcNodeAddr(dropped)
+		delete(s.remoteLiveConns, addrPort)
+	}
+	for _, dropped := range rx.Dropped6 {
+		addrPort, _ := ipv6AddrPortFromKrpcNodeAddr(dropped)
+		delete(s.remoteLiveConns, addrPort)
+	}
+	for i, added := range rx.Added {
+		addr := netip.AddrFrom4([4]byte(added.IP.To4()))
+		addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
+		flags := g.SliceGet(rx.AddedFlags, i)
+		g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
+	}
+	for i, added := range rx.Added6 {
+		addr := netip.AddrFrom16([16]byte(added.IP.To16()))
+		addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
+		flags := g.SliceGet(rx.Added6Flags, i)
+		g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
+	}
+	return
+}
+
 // Recv is called from the reader goroutine
 func (s *pexConnState) Recv(payload []byte) error {
+	rx, err := pp.LoadPexMsg(payload)
+	if err != nil {
+		return fmt.Errorf("unmarshalling pex message: %w", err)
+	}
+	s.dbg.Printf("received pex message: %v", rx)
+	torrent.Add("pex added peers received", int64(len(rx.Added)))
+	torrent.Add("pex added6 peers received", int64(len(rx.Added6)))
+	s.updateRemoteLiveConns(rx)
+
 	if !s.torrent.wantPeers() {
 		s.dbg.Printf("peer reserve ok, incoming PEX discarded")
 		return nil
 	}
+	// TODO: This should be per conn, not for the whole Torrent.
 	if time.Now().Before(s.torrent.pex.rest) {
 		s.dbg.Printf("in cooldown period, incoming PEX discarded")
 		return nil
 	}
 
-	rx, err := pp.LoadPexMsg(payload)
-	if err != nil {
-		return fmt.Errorf("error unmarshalling PEX message: %s", err)
-	}
-	s.dbg.Print("incoming PEX message: ", rx)
-	torrent.Add("pex added peers received", int64(len(rx.Added)))
-	torrent.Add("pex added6 peers received", int64(len(rx.Added6)))
-
 	var peers peerInfos
 	peers.AppendFromPex(rx.Added6, rx.Added6Flags)
 	peers.AppendFromPex(rx.Added, rx.AddedFlags)
diff --git a/test/transfer_test.go b/test/transfer_test.go
index b96c94d3..fa159326 100644
--- a/test/transfer_test.go
+++ b/test/transfer_test.go
@@ -13,7 +13,7 @@ import (
 	"github.com/anacrolix/torrent"
 	"github.com/anacrolix/torrent/internal/testutil"
 	"github.com/anacrolix/torrent/storage"
-	"github.com/frankban/quicktest"
+	qt "github.com/frankban/quicktest"
 	"golang.org/x/time/rate"
 
 	"github.com/stretchr/testify/assert"
@@ -188,7 +188,7 @@ func TestSeedAfterDownloading(t *testing.T) {
 		defer wg.Done()
 		r := llg.NewReader()
 		defer r.Close()
-		quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil)
+		qt.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), qt.IsNil)
 	}()
 	done := make(chan struct{})
 	defer close(done)
diff --git a/torrent.go b/torrent.go
index f8d31b64..2f6832e9 100644
--- a/torrent.go
+++ b/torrent.go
@@ -108,7 +108,7 @@ type Torrent struct {
 	// them. That encourages us to reconnect to peers that are well known in
 	// the swarm.
 	peers prioritizedPeers
-	// Whether we want to know to know more peers.
+	// Whether we want to know more peers.
 	wantPeersEvent missinggo.Event
 	// An announcer for each tracker URL.
 	trackerAnnouncers map[string]torrentTrackerAnnouncer
@@ -774,7 +774,7 @@ func (t *Torrent) writeStatus(w io.Writer) {
 	for i, c := range peers {
 		fmt.Fprintf(w, "%2d. ", i+1)
 		buf.Reset()
-		c.writeStatus(&buf, t)
+		c.writeStatus(&buf)
 		w.Write(bytes.TrimRight(
 			bytes.ReplaceAll(buf.Bytes(), []byte("\n"), []byte("\n    ")),
 			" "))
@@ -1983,6 +1983,7 @@ func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
 		panic(len(t.conns))
 	}
 	t.conns[c] = struct{}{}
+	t.cl.event.Broadcast()
 	if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() {
 		t.pex.Add(c) // as no further extended handshake expected
 	}