]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Misc debug status, pex conn tracking improvements
authorMatt Joiner <anacrolix@gmail.com>
Mon, 1 May 2023 00:19:24 +0000 (10:19 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 1 May 2023 00:19:24 +0000 (10:19 +1000)
12 files changed:
client.go
go.mod
go.sum
netip-addrport.go [new file with mode: 0644]
peer.go
peer_protocol/handshake.go
peer_protocol/pex.go
peerconn.go
pex.go
pexconn.go
test/transfer_test.go
torrent.go

index b6fed302f3a5dcca68ad1f0071cff0c05d62d440..9306624b96a0ff9b44b335415bb70fefe33f941a 100644 (file)
--- a/client.go
+++ b/client.go
@@ -686,7 +686,7 @@ func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
        }
 }
 
-// Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
+// Performs initiator handshakes and returns a connection. Returns nil *PeerConn if no connection
 // for valid reasons.
 func (cl *Client) initiateProtocolHandshakes(
        ctx context.Context,
@@ -730,14 +730,16 @@ func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfus
                return nil, errors.New("dial failed")
        }
        addrIpPort, _ := tryIpPortFromNetAddr(addr)
-       c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, obfuscatedHeader, newConnectionOpts{
-               outgoing:   true,
-               remoteAddr: addr,
-               // It would be possible to retrieve a public IP from the dialer used here?
-               localPublicAddr: cl.publicAddr(addrIpPort.IP),
-               network:         dr.Dialer.DialerNetwork(),
-               connString:      regularNetConnPeerConnConnString(nc),
-       })
+       c, err := cl.initiateProtocolHandshakes(
+               context.Background(), nc, t, obfuscatedHeader,
+               newConnectionOpts{
+                       outgoing:   true,
+                       remoteAddr: addr,
+                       // It would be possible to retrieve a public IP from the dialer used here?
+                       localPublicAddr: cl.publicAddr(addrIpPort.IP),
+                       network:         dr.Dialer.DialerNetwork(),
+                       connString:      regularNetConnPeerConnConnString(nc),
+               })
        if err != nil {
                nc.Close()
        }
@@ -1510,13 +1512,17 @@ func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerCon
                }
        }
        c.peerImpl = c
-       c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
+       c.logger = cl.logger.WithDefaultLevel(log.Warning)
        c.setRW(connStatsReadWriter{nc, c})
        c.r = &rateLimitedReader{
                l: cl.config.DownloadRateLimiter,
                r: c.r,
        }
-       c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", opts.remoteAddr, opts.network, opts.outgoing)
+       c.logger.Levelf(
+               log.Debug,
+               "new PeerConn %p [Client %p remoteAddr %v network %v outgoing %t]",
+               c, cl, opts.remoteAddr, opts.network, opts.outgoing,
+       )
        for _, f := range cl.config.Callbacks.NewPeer {
                f(&c.Peer)
        }
diff --git a/go.mod b/go.mod
index 64e1d63a4d1f779b3b243c9a64736e6c69c33a7a..cf5700a18b5243d9c981076946343b8aca24538f 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -12,7 +12,7 @@ require (
        github.com/anacrolix/dht/v2 v2.19.2-0.20221121215055-066ad8494444
        github.com/anacrolix/envpprof v1.2.1
        github.com/anacrolix/fuse v0.2.0
-       github.com/anacrolix/generics v0.0.0-20220618083756-f99e35403a60
+       github.com/anacrolix/generics v0.0.0-20230428105757-683593396d68
        github.com/anacrolix/go-libutp v1.2.0
        github.com/anacrolix/log v0.13.2-0.20221123232138-02e2764801c3
        github.com/anacrolix/missinggo v1.3.0
diff --git a/go.sum b/go.sum
index aef4f0eec853b5684e7c3da25166724d9b5a1366..4e9c7ec466a58915f1bd66ea079866dde6cb8c8c 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -78,6 +78,8 @@ github.com/anacrolix/fuse v0.2.0 h1:pc+To78kI2d/WUjIyrsdqeJQAesuwpGxlI3h1nAv3Do=
 github.com/anacrolix/fuse v0.2.0/go.mod h1:Kfu02xBwnySDpH3N23BmrP3MDfwAQGRLUCj6XyeOvBQ=
 github.com/anacrolix/generics v0.0.0-20220618083756-f99e35403a60 h1:k4/h2B1gGF+PJGyGHxs8nmHHt1pzWXZWBj6jn4OBlRc=
 github.com/anacrolix/generics v0.0.0-20220618083756-f99e35403a60/go.mod h1:ff2rHB/joTV03aMSSn/AZNnaIpUw0h3njetGsaXcMy8=
+github.com/anacrolix/generics v0.0.0-20230428105757-683593396d68 h1:fyXlBfnlFzZSFckJ8QLb2lfmWfY++4RiUnae7ZMuv0A=
+github.com/anacrolix/generics v0.0.0-20230428105757-683593396d68/go.mod h1:ff2rHB/joTV03aMSSn/AZNnaIpUw0h3njetGsaXcMy8=
 github.com/anacrolix/go-libutp v1.2.0 h1:sjxoB+/ARiKUR7IK/6wLWyADIBqGmu1fm0xo+8Yy7u0=
 github.com/anacrolix/go-libutp v1.2.0/go.mod h1:RrJ3KcaDcf9Jqp33YL5V/5CBEc6xMc7aJL8wXfuWL50=
 github.com/anacrolix/log v0.3.0/go.mod h1:lWvLTqzAnCWPJA08T2HCstZi0L1y2Wyvm3FJgwU9jwU=
diff --git a/netip-addrport.go b/netip-addrport.go
new file mode 100644 (file)
index 0000000..cf9edfd
--- /dev/null
@@ -0,0 +1,44 @@
+package torrent
+
+import (
+       "fmt"
+       "net"
+       "net/netip"
+
+       "github.com/anacrolix/dht/v2/krpc"
+)
+
+func ipv4AddrPortFromKrpcNodeAddr(na krpc.NodeAddr) (_ netip.AddrPort, err error) {
+       ip4 := na.IP.To4()
+       if ip4 == nil {
+               err = fmt.Errorf("not an ipv4 address: %v", na.IP)
+               return
+       }
+       addr := netip.AddrFrom4([4]byte(ip4))
+       addrPort := netip.AddrPortFrom(addr, uint16(na.Port))
+       return addrPort, nil
+}
+
+func ipv6AddrPortFromKrpcNodeAddr(na krpc.NodeAddr) (_ netip.AddrPort, err error) {
+       ip6 := na.IP.To16()
+       if ip6 == nil {
+               err = fmt.Errorf("not an ipv4 address: %v", na.IP)
+               return
+       }
+       addr := netip.AddrFrom16([16]byte(ip6))
+       addrPort := netip.AddrPortFrom(addr, uint16(na.Port))
+       return addrPort, nil
+}
+
+func addrPortFromPeerRemoteAddr(pra PeerRemoteAddr) (netip.AddrPort, error) {
+       switch v := pra.(type) {
+       case *net.TCPAddr:
+               return v.AddrPort(), nil
+       case *net.UDPAddr:
+               return v.AddrPort(), nil
+       case netip.AddrPort:
+               return v, nil
+       default:
+               return netip.ParseAddrPort(pra.String())
+       }
+}
diff --git a/peer.go b/peer.go
index d5ed19e53ac45471b589327afe0443805194f742..e88485b296e5b93c44c8d91597c1f413192200f8 100644 (file)
--- a/peer.go
+++ b/peer.go
@@ -275,7 +275,7 @@ func (cn *Peer) iterContiguousPieceRequests(f func(piece pieceIndex, count int))
        next(None[pieceIndex]())
 }
 
-func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
+func (cn *Peer) writeStatus(w io.Writer) {
        // \t isn't preserved in <pre> blocks?
        if cn.closed.IsSet() {
                fmt.Fprint(w, "CLOSED: ")
index acdc3da58faa2af1cb3e1bce906488751edaf148..76dc2b05d250f422b7dfd6d9bbbfcc5b985181a6 100644 (file)
@@ -5,7 +5,10 @@ import (
        "errors"
        "fmt"
        "io"
+       "math/bits"
        "strconv"
+       "strings"
+       "unsafe"
 
        "github.com/anacrolix/torrent/metainfo"
 )
@@ -33,8 +36,31 @@ type (
        PeerExtensionBits [8]byte
 )
 
+var bitTags = []struct {
+       bit ExtensionBit
+       tag string
+}{
+       // Ordered by their base protocol type values (PORT, fast.., EXTENDED)
+       {ExtensionBitDHT, "dht"},
+       {ExtensionBitFast, "fast"},
+       {ExtensionBitExtended, "ext"},
+}
+
 func (pex PeerExtensionBits) String() string {
-       return hex.EncodeToString(pex[:])
+       pexHex := hex.EncodeToString(pex[:])
+       tags := make([]string, 0, len(bitTags)+1)
+       for _, bitTag := range bitTags {
+               if pex.GetBit(bitTag.bit) {
+                       tags = append(tags, bitTag.tag)
+                       pex.SetBit(bitTag.bit, false)
+               }
+       }
+       unknownCount := bits.OnesCount64(*(*uint64)((unsafe.Pointer(unsafe.SliceData(pex[:])))))
+       if unknownCount != 0 {
+               tags = append(tags, fmt.Sprintf("%v unknown", unknownCount))
+       }
+       return fmt.Sprintf("%v (%s)", pexHex, strings.Join(tags, ", "))
+
 }
 
 func NewPeerExtensionBytes(bits ...ExtensionBit) (ret PeerExtensionBits) {
index 784aaa59257b6fcc15c3dc6bf997251ce5f2cff2..466548a30acf604c79a65b47c9438a5f772aaf64 100644 (file)
@@ -28,6 +28,7 @@ func (m *PexMsg) Message(pexExtendedId ExtensionNumber) Message {
        }
 }
 
+// Unmarshals and returns a PEX message.
 func LoadPexMsg(b []byte) (ret PexMsg, err error) {
        err = bencode.Unmarshal(b, &ret)
        return
index 0318cb6e33d7e7df3f2e63f03f715c937c437f8d..1c55db625825f132d9e2314d975de09e53b84f86 100644 (file)
@@ -62,7 +62,19 @@ type PeerConn struct {
 }
 
 func (cn *PeerConn) peerImplStatusLines() []string {
-       return []string{fmt.Sprintf("%+-55q %s %s", cn.PeerID, cn.PeerExtensionBytes, cn.connString)}
+       lines := make([]string, 0, 2)
+       lines = append(
+               lines,
+               fmt.Sprintf("%+-55q %v %s", cn.PeerID, cn.PeerExtensionBytes, cn.connString))
+       if cn.supportsExtension(pp.ExtensionNamePex) {
+               lines = append(
+                       lines,
+                       fmt.Sprintf(
+                               "pex: %v conns, %v unsent events",
+                               cn.pex.remoteLiveConns,
+                               cn.pex.numPending()))
+       }
+       return lines
 }
 
 // Returns true if the connection is over IPv6.
@@ -848,6 +860,7 @@ func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err
                c.requestPendingMetadata()
                if !t.cl.config.DisablePEX {
                        t.pex.Add(c) // we learnt enough now
+                       // This checks the extension is supported internally.
                        c.pex.Init(c)
                }
                return nil
diff --git a/pex.go b/pex.go
index 7fa0be887d4c83e785cb60181cf4d561cdf0c720..4561c5e4d281ab8dea3d1269dab4ddba86f8fa0d 100644 (file)
--- a/pex.go
+++ b/pex.go
@@ -145,8 +145,8 @@ func (me *pexMsgFactory) append(event pexEvent) {
        }
 }
 
-func (me *pexMsgFactory) PexMsg() pp.PexMsg {
-       return me.msg
+func (me *pexMsgFactory) PexMsg() *pp.PexMsg {
+       return &me.msg
 }
 
 // Convert an arbitrary torrent peer Addr into one that can be represented by the compact addr
@@ -225,7 +225,7 @@ func (s *pexState) Genmsg(start *pexEvent) (pp.PexMsg, *pexEvent) {
        s.RLock()
        defer s.RUnlock()
        if start == nil {
-               return s.msg0.PexMsg(), s.tail
+               return *s.msg0.PexMsg(), s.tail
        }
        var msg pexMsgFactory
        last := start
@@ -236,5 +236,18 @@ func (s *pexState) Genmsg(start *pexEvent) (pp.PexMsg, *pexEvent) {
                msg.append(*e)
                last = e
        }
-       return msg.PexMsg(), last
+       return *msg.PexMsg(), last
+}
+
+// The same as Genmsg but just counts up the distinct events that haven't been sent.
+func (s *pexState) numPending(start *pexEvent) (num int) {
+       s.RLock()
+       defer s.RUnlock()
+       if start == nil {
+               return s.msg0.PexMsg().Len()
+       }
+       for e := start.next; e != nil; e = e.next {
+               num++
+       }
+       return
 }
index d0308f756db44ddc9d134842f7f85ed5521fa529..5ccc02200d96a8f4bb8cc01b3e8e0d84a91f1faa 100644 (file)
@@ -2,8 +2,11 @@ package torrent
 
 import (
        "fmt"
+       "net/netip"
        "time"
 
+       g "github.com/anacrolix/generics"
+
        "github.com/anacrolix/log"
 
        pp "github.com/anacrolix/torrent/peer_protocol"
@@ -26,6 +29,8 @@ type pexConnState struct {
        Listed  bool
        info    log.Logger
        dbg     log.Logger
+       // Running record of live connections the remote end of the connection purports to have.
+       remoteLiveConns map[netip.AddrPort]g.Option[pp.PexPeerFlags]
 }
 
 func (s *pexConnState) IsEnabled() bool {
@@ -67,6 +72,13 @@ func (s *pexConnState) genmsg() *pp.PexMsg {
        return &tx
 }
 
+func (s *pexConnState) numPending() int {
+       if s.torrent == nil {
+               return 0
+       }
+       return s.torrent.pex.numPending(s.last)
+}
+
 // Share is called from the writer goroutine if when it is woken up with the write buffers empty
 // Returns whether there's more room on the send buffer to write to.
 func (s *pexConnState) Share(postfn messageWriter) bool {
@@ -86,25 +98,51 @@ func (s *pexConnState) Share(postfn messageWriter) bool {
        return true
 }
 
+func (s *pexConnState) updateRemoteLiveConns(rx pp.PexMsg) (errs []error) {
+       for _, dropped := range rx.Dropped {
+               addrPort, _ := ipv4AddrPortFromKrpcNodeAddr(dropped)
+               delete(s.remoteLiveConns, addrPort)
+       }
+       for _, dropped := range rx.Dropped6 {
+               addrPort, _ := ipv6AddrPortFromKrpcNodeAddr(dropped)
+               delete(s.remoteLiveConns, addrPort)
+       }
+       for i, added := range rx.Added {
+               addr := netip.AddrFrom4([4]byte(added.IP.To4()))
+               addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
+               flags := g.SliceGet(rx.AddedFlags, i)
+               g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
+       }
+       for i, added := range rx.Added6 {
+               addr := netip.AddrFrom16([16]byte(added.IP.To16()))
+               addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
+               flags := g.SliceGet(rx.Added6Flags, i)
+               g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
+       }
+       return
+}
+
 // Recv is called from the reader goroutine
 func (s *pexConnState) Recv(payload []byte) error {
+       rx, err := pp.LoadPexMsg(payload)
+       if err != nil {
+               return fmt.Errorf("unmarshalling pex message: %w", err)
+       }
+       s.dbg.Printf("received pex message: %v", rx)
+       torrent.Add("pex added peers received", int64(len(rx.Added)))
+       torrent.Add("pex added6 peers received", int64(len(rx.Added6)))
+       s.updateRemoteLiveConns(rx)
+
        if !s.torrent.wantPeers() {
                s.dbg.Printf("peer reserve ok, incoming PEX discarded")
                return nil
        }
+       // TODO: This should be per conn, not for the whole Torrent.
        if time.Now().Before(s.torrent.pex.rest) {
                s.dbg.Printf("in cooldown period, incoming PEX discarded")
                return nil
        }
 
-       rx, err := pp.LoadPexMsg(payload)
-       if err != nil {
-               return fmt.Errorf("error unmarshalling PEX message: %s", err)
-       }
-       s.dbg.Print("incoming PEX message: ", rx)
-       torrent.Add("pex added peers received", int64(len(rx.Added)))
-       torrent.Add("pex added6 peers received", int64(len(rx.Added6)))
-
        var peers peerInfos
        peers.AppendFromPex(rx.Added6, rx.Added6Flags)
        peers.AppendFromPex(rx.Added, rx.AddedFlags)
index b96c94d3298d931fcb5f0768373ef50b97119c54..fa159326a908f9526a1a730e45296adfe82d18ce 100644 (file)
@@ -13,7 +13,7 @@ import (
        "github.com/anacrolix/torrent"
        "github.com/anacrolix/torrent/internal/testutil"
        "github.com/anacrolix/torrent/storage"
-       "github.com/frankban/quicktest"
+       qt "github.com/frankban/quicktest"
        "golang.org/x/time/rate"
 
        "github.com/stretchr/testify/assert"
@@ -188,7 +188,7 @@ func TestSeedAfterDownloading(t *testing.T) {
                defer wg.Done()
                r := llg.NewReader()
                defer r.Close()
-               quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil)
+               qt.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), qt.IsNil)
        }()
        done := make(chan struct{})
        defer close(done)
index f8d31b647d5a9949686d98a3f1c3007117935170..2f6832e9703d943f15b7525b4052e81805007713 100644 (file)
@@ -108,7 +108,7 @@ type Torrent struct {
        // them. That encourages us to reconnect to peers that are well known in
        // the swarm.
        peers prioritizedPeers
-       // Whether we want to know to know more peers.
+       // Whether we want to know more peers.
        wantPeersEvent missinggo.Event
        // An announcer for each tracker URL.
        trackerAnnouncers map[string]torrentTrackerAnnouncer
@@ -774,7 +774,7 @@ func (t *Torrent) writeStatus(w io.Writer) {
        for i, c := range peers {
                fmt.Fprintf(w, "%2d. ", i+1)
                buf.Reset()
-               c.writeStatus(&buf, t)
+               c.writeStatus(&buf)
                w.Write(bytes.TrimRight(
                        bytes.ReplaceAll(buf.Bytes(), []byte("\n"), []byte("\n    ")),
                        " "))
@@ -1983,6 +1983,7 @@ func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
                panic(len(t.conns))
        }
        t.conns[c] = struct{}{}
+       t.cl.event.Broadcast()
        if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() {
                t.pex.Add(c) // as no further extended handshake expected
        }