]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Add new metrics and PeerStats
authorMatt Joiner <anacrolix@gmail.com>
Thu, 17 Apr 2025 00:34:42 +0000 (10:34 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 17 Apr 2025 00:34:42 +0000 (10:34 +1000)
14 files changed:
client.go
client_test.go
peer-conn-msg-writer.go
peer-conn-msg-writer_test.go
peer-impl.go
peer-stats.go [new file with mode: 0644]
peer.go
peerconn.go
peerconn_test.go
piece.go
storage/interface.go
torrent.go
torrent_test.go
webseed-peer.go

index 7b7d979dc79bc6bd752c12e628f1201c98c13c83..0561b00764a76fb870c2acacd04fdf014df35060 100644 (file)
--- a/client.go
+++ b/client.go
@@ -1686,6 +1686,7 @@ func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerCon
                        c.bannableAddr = Some(netipAddrPort.Addr())
                }
        }
+       c.legacyPeerImpl = c
        c.peerImpl = c
        c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextText(fmt.Sprintf("%T %p", c, c))
        c.protocolLogger = c.logger.WithNames(protocolLoggingName)
index c71a8c43d94e655cbc65b30d07fea4c076775a58..b6ee1c8d2f8f22c3356eebae127248cce49f8353 100644 (file)
@@ -580,7 +580,7 @@ func TestPeerInvalidHave(t *testing.T) {
                callbacks: &cfg.Callbacks,
        }}
        tt.conns[cn] = struct{}{}
-       cn.peerImpl = cn
+       cn.legacyPeerImpl = cn
        cl.lock()
        defer cl.unlock()
        assert.NoError(t, cn.peerSentHave(0))
index 6132460823090e2a64b39d0fb9132e8ca67d929c..2300392cf72530344f0b611ced6ffd88971df1dd 100644 (file)
@@ -31,7 +31,7 @@ func (pc *PeerConn) initMessageWriter() {
                        defer pc.locker().RUnlock()
                        return pc.useful()
                },
-               writeBuffer: new(bytes.Buffer),
+               writeBuffer: new(peerConnMsgWriterBuffer),
        }
 }
 
@@ -47,6 +47,13 @@ func (pc *PeerConn) messageWriterRunner() {
        pc.messageWriter.run(pc.t.cl.config.KeepAliveTimeout)
 }
 
+type peerConnMsgWriterBuffer struct {
+       // The number of bytes in the buffer that are part of a piece message. When
+       // the whole buffer is written, we can count this many bytes.
+       pieceDataBytes int
+       bytes.Buffer
+}
+
 type peerConnMsgWriter struct {
        // Must not be called with the local mutex held, as it will call back into the write method.
        fillWriteBuffer func()
@@ -58,7 +65,12 @@ type peerConnMsgWriter struct {
        mu        sync.Mutex
        writeCond chansync.BroadcastCond
        // Pointer so we can swap with the "front buffer".
-       writeBuffer *bytes.Buffer
+       writeBuffer *peerConnMsgWriterBuffer
+
+       totalWriteDuration    time.Duration
+       totalBytesWritten     int64
+       totalDataBytesWritten int64
+       dataUploadRate        float64
 }
 
 // Routine that writes to the peer. Some of what to write is buffered by
@@ -67,7 +79,7 @@ type peerConnMsgWriter struct {
 func (cn *peerConnMsgWriter) run(keepAliveTimeout time.Duration) {
        lastWrite := time.Now()
        keepAliveTimer := time.NewTimer(keepAliveTimeout)
-       frontBuf := new(bytes.Buffer)
+       frontBuf := new(peerConnMsgWriterBuffer)
        for {
                if cn.closed.IsSet() {
                        return
@@ -96,6 +108,8 @@ func (cn *peerConnMsgWriter) run(keepAliveTimeout time.Duration) {
                        panic("expected non-empty front buffer")
                }
                var err error
+               startedWriting := time.Now()
+               startingBufLen := frontBuf.Len()
                for frontBuf.Len() != 0 {
                        next := frontBuf.Bytes()
                        var n int
@@ -112,6 +126,15 @@ func (cn *peerConnMsgWriter) run(keepAliveTimeout time.Duration) {
                        cn.logger.WithDefaultLevel(log.Debug).Printf("error writing: %v", err)
                        return
                }
+               // Track what was sent and how long it took.
+               writeDuration := time.Since(startedWriting)
+               cn.mu.Lock()
+               cn.dataUploadRate = float64(frontBuf.pieceDataBytes) / writeDuration.Seconds()
+               cn.totalWriteDuration += writeDuration
+               cn.totalBytesWritten += int64(startingBufLen)
+               cn.totalDataBytesWritten += int64(frontBuf.pieceDataBytes)
+               cn.mu.Unlock()
+               frontBuf.pieceDataBytes = 0
                lastWrite = time.Now()
                keepAliveTimer.Reset(keepAliveTimeout)
        }
@@ -125,7 +148,11 @@ func (cn *peerConnMsgWriter) writeToBuffer(msg pp.Message) (err error) {
                        cn.writeBuffer.Truncate(originalLen)
                }
        }()
-       return msg.WriteTo(cn.writeBuffer)
+       err = msg.WriteTo(cn.writeBuffer)
+       if err == nil {
+               cn.writeBuffer.pieceDataBytes += len(msg.Piece)
+       }
+       return
 }
 
 func (cn *peerConnMsgWriter) write(msg pp.Message) bool {
index 308d18e5af200530c780a6c750d000e0cb20abbb..a0db83a67375ba833804fd1afc3b893b559ed492 100644 (file)
@@ -1,7 +1,6 @@
 package torrent
 
 import (
-       "bytes"
        "testing"
 
        "github.com/dustin/go-humanize"
@@ -22,7 +21,7 @@ var benchmarkPieceLengths = []int{defaultChunkSize, 1 << 20, 4 << 20, 8 << 20}
 
 func runBenchmarkWriteToBuffer(b *testing.B, length int64) {
        writer := &peerConnMsgWriter{
-               writeBuffer: &bytes.Buffer{},
+               writeBuffer: new(peerConnMsgWriterBuffer),
        }
        msg := PieceMsg(length)
 
@@ -53,7 +52,7 @@ func BenchmarkWritePieceMsg(b *testing.B) {
 
 func runBenchmarkMarshalBinaryWrite(b *testing.B, length int64) {
        writer := &peerConnMsgWriter{
-               writeBuffer: &bytes.Buffer{},
+               writeBuffer: &peerConnMsgWriterBuffer{},
        }
        msg := PieceMsg(length)
 
index f9f9096b198ea5d247d157fb940fbc9172ace1ac..2a036aea4979b87691887818bdd85f7bfbf9bdd2 100644 (file)
@@ -7,9 +7,11 @@ import (
 )
 
 // Contains implementation details that differ between peer types, like Webseeds and regular
-// BitTorrent protocol connections. Some methods are underlined so as to avoid collisions with
-// legacy PeerConn methods.
-type peerImpl interface {
+// BitTorrent protocol connections. These methods are embedded in the child types of Peer for legacy
+// expectations that they exist on the child type. Some methods are underlined to avoid collisions
+// with legacy PeerConn methods. New methods and calls that are fixed up should be migrated over to
+// newHotPeerImpl.
+type legacyPeerImpl interface {
        // Trigger the actual request state to get updated
        handleUpdateRequests()
        writeInterested(interested bool) bool
@@ -35,3 +37,8 @@ type peerImpl interface {
        peerHasAllPieces() (all, known bool)
        peerPieces() *roaring.Bitmap
 }
+
+// Abstract methods implemented by subclasses of Peer.
+type newHotPeerImpl interface {
+       lastWriteUploadRate() float64
+}
diff --git a/peer-stats.go b/peer-stats.go
new file mode 100644 (file)
index 0000000..5f228d5
--- /dev/null
@@ -0,0 +1,10 @@
+package torrent
+
+type PeerStats struct {
+       ConnStats
+
+       DownloadRate        float64
+       LastWriteUploadRate float64
+       // How many pieces the peer has.
+       RemotePieceCount int
+}
diff --git a/peer.go b/peer.go
index 35d7066b7dcdf40db2fd17355573b4cd4e6c02f4..27f094fafd0ef0b164d3371890fb5f15bcfef036 100644 (file)
--- a/peer.go
+++ b/peer.go
@@ -31,7 +31,8 @@ type (
 
                t *Torrent
 
-               peerImpl
+               legacyPeerImpl
+               peerImpl  newHotPeerImpl
                callbacks *Callbacks
 
                outgoing   bool
@@ -142,6 +143,16 @@ func (p *Peer) Torrent() *Torrent {
        return p.t
 }
 
+func (p *Peer) Stats() (ret PeerStats) {
+       p.locker().RLock()
+       defer p.locker().RUnlock()
+       ret.ConnStats = p._stats.Copy()
+       ret.DownloadRate = p.downloadRate()
+       ret.LastWriteUploadRate = p.peerImpl.lastWriteUploadRate()
+       ret.RemotePieceCount = p.remotePieceCount()
+       return
+}
+
 func (p *Peer) initRequestState() {
        p.requestState.Requests = &peerRequests{}
 }
@@ -210,12 +221,17 @@ func (cn *Peer) bestPeerNumPieces() pieceIndex {
        return cn.peerMinPieces
 }
 
-func (cn *Peer) completedString() string {
+// How many pieces we think the peer has.
+func (cn *Peer) remotePieceCount() pieceIndex {
        have := pieceIndex(cn.peerPieces().GetCardinality())
        if all, _ := cn.peerHasAllPieces(); all {
                have = cn.bestPeerNumPieces()
        }
-       return fmt.Sprintf("%d/%d", have, cn.bestPeerNumPieces())
+       return have
+}
+
+func (cn *Peer) completedString() string {
+       return fmt.Sprintf("%d/%d", cn.remotePieceCount(), cn.bestPeerNumPieces())
 }
 
 func eventAgeString(t time.Time) string {
@@ -256,11 +272,9 @@ func (cn *Peer) downloadRate() float64 {
        return float64(num) / cn.totalExpectingTime().Seconds()
 }
 
+// Deprecated: Use Peer.Stats.
 func (p *Peer) DownloadRate() float64 {
-       p.locker().RLock()
-       defer p.locker().RUnlock()
-
-       return p.downloadRate()
+       return p.Stats().DownloadRate
 }
 
 func (cn *Peer) iterContiguousPieceRequests(f func(piece pieceIndex, count int)) {
@@ -336,7 +350,7 @@ func (p *Peer) close() {
        for _, prs := range p.peerRequests {
                prs.allocReservation.Drop()
        }
-       p.peerImpl.onClose()
+       p.legacyPeerImpl.onClose()
        if p.t != nil {
                p.t.decPeerPieceAvailability(p)
        }
@@ -476,7 +490,7 @@ func (cn *Peer) request(r RequestIndex) (more bool, err error) {
        for _, f := range cn.callbacks.SentRequest {
                f(PeerRequestEvent{cn, ppReq})
        }
-       return cn.peerImpl._request(ppReq), nil
+       return cn.legacyPeerImpl._request(ppReq), nil
 }
 
 func (me *Peer) cancel(r RequestIndex) {
@@ -877,7 +891,7 @@ func (cn *Peer) stats() *ConnStats {
 }
 
 func (p *Peer) TryAsPeerConn() (*PeerConn, bool) {
-       pc, ok := p.peerImpl.(*PeerConn)
+       pc, ok := p.legacyPeerImpl.(*PeerConn)
        return pc, ok
 }
 
index ad069a293dc416b9d65a76476bdb02357c0c6d41..2a3fd94720068b5ca03393b9b56f0905da58d65a 100644 (file)
@@ -98,6 +98,12 @@ type PeerConn struct {
        receivedHashPieces map[[32]byte][][32]byte
 }
 
+func (cn *PeerConn) lastWriteUploadRate() float64 {
+       cn.messageWriter.mu.Lock()
+       defer cn.messageWriter.mu.Unlock()
+       return cn.messageWriter.dataUploadRate
+}
+
 func (cn *PeerConn) pexStatus() string {
        if !cn.bitExtensionEnabled(pp.ExtensionBitLtep) {
                return "extended protocol disabled"
index 7dc28bd1ac999454670fcf7438956446c6b794bb..e3d1c8a3bd9e4ae276864016e58c0b8cdf918098 100644 (file)
@@ -277,7 +277,7 @@ func TestHaveAllThenBitfield(t *testing.T) {
                Peer: Peer{t: tt},
        }
        pc.initRequestState()
-       pc.peerImpl = &pc
+       pc.legacyPeerImpl = &pc
        tt.conns[&pc] = struct{}{}
        c.Assert(pc.onPeerSentHaveAll(), qt.IsNil)
        c.Check(pc.t.connsWithAllPieces, qt.DeepEquals, map[*Peer]struct{}{&pc.Peer: {}})
index 66ae2b0c862a98a8bf0b91d553607c58bf858181..074f6601cd33e3c7642574634e9b9fe096da9637 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -30,9 +30,10 @@ type Piece struct {
 
        readerCond chansync.BroadcastCond
 
-       numVerifies         pieceVerifyCount
-       numVerifiesCond     chansync.BroadcastCond
-       hashing             bool
+       numVerifies     pieceVerifyCount
+       numVerifiesCond chansync.BroadcastCond
+       hashing         bool
+       // The piece state may have changed, and is being synchronized with storage.
        marking             bool
        storageCompletionOk bool
 
@@ -268,7 +269,9 @@ func (p *Piece) effectivePriority() (ret PiecePriority) {
 }
 
 // Tells the Client to refetch the completion status from storage, updating priority etc. if
-// necessary. Might be useful if you know the state of the piece data has changed externally.
+// necessary. Might be useful if you know the state of the piece data has
+// changed externally. TODO: Document why this is public, maybe change name to
+// SyncCompletion or something.
 func (p *Piece) UpdateCompletion() {
        p.t.cl.lock()
        defer p.t.cl.unlock()
index 981ed88d55d13ee1c5093475d8de7c4494a8d34d..f7ccd42f0c68985f96c5f4c8f31a1fd95b8d6321 100644 (file)
@@ -55,6 +55,7 @@ type PieceImpl interface {
        Completion() Completion
 }
 
+// TODO: Yo where the fuck is the documentation.
 type Completion struct {
        Complete bool
        Ok       bool
index c1a381db150e8831d676b92146c8acf22d1f9379..312e4509f5d0f4c43f3bbe38b693ff7772467d15 100644 (file)
@@ -1152,7 +1152,7 @@ func (t *Torrent) countBytesHashed(n int64) {
 
 func (t *Torrent) hashPiece(piece pieceIndex) (
        correct bool,
-// These are peers that sent us blocks that differ from what we hash here.
+       // These are peers that sent us blocks that differ from what we hash here.
        differingPeers map[bannableAddr]struct{},
        err error,
 ) {
@@ -1226,7 +1226,7 @@ func sumExactly(dst []byte, sum func(b []byte) []byte) {
 }
 
 func (t *Torrent) hashPieceWithSpecificHash(piece pieceIndex, h hash.Hash) (
-// These are peers that sent us blocks that differ from what we hash here.
+       // These are peers that sent us blocks that differ from what we hash here.
        differingPeers map[bannableAddr]struct{},
        err error,
 ) {
@@ -1268,8 +1268,8 @@ func (t *Torrent) havePiece(index pieceIndex) bool {
 }
 
 func (t *Torrent) maybeDropMutuallyCompletePeer(
-// I'm not sure about taking peer here, not all peer implementations actually drop. Maybe that's
-// okay?
+       // I'm not sure about taking peer here, not all peer implementations actually drop. Maybe that's
+       // okay?
        p *PeerConn,
 ) {
        if !t.cl.config.DropMutuallyCompletePeers {
@@ -2956,6 +2956,8 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) {
                f(&ws.peer)
        }
        ws.peer.logger = t.logger.WithContextValue(&ws).WithNames("webseed")
+       // TODO: Abstract out a common struct initializer for this...
+       ws.peer.legacyPeerImpl = &ws
        ws.peer.peerImpl = &ws
        if t.haveInfo() {
                ws.onGotInfo(t.info)
index bd409b6834dc68b025c356030fcc87334b1481e0..663716df006b5437280aa8a44ceb9dd2290ab968 100644 (file)
@@ -238,7 +238,7 @@ func TestRelativeAvailabilityHaveNone(t *testing.T) {
        g.MakeMapIfNil(&tt.conns)
        pc := PeerConn{}
        pc.t = &tt
-       pc.peerImpl = &pc
+       pc.legacyPeerImpl = &pc
        pc.initRequestState()
        g.InitNew(&pc.callbacks)
        tt.conns[&pc] = struct{}{}
index aeddbaa54dcc54df0adcdf7eb058c73fc996f516..c6da58fffff33d4ab5c6cc9daa40ff78f94edcad 100644 (file)
@@ -29,7 +29,12 @@ type webseedPeer struct {
        lastUnhandledErr time.Time
 }
 
-var _ peerImpl = (*webseedPeer)(nil)
+func (me *webseedPeer) lastWriteUploadRate() float64 {
+       // We never upload to webseeds.
+       return 0
+}
+
+var _ legacyPeerImpl = (*webseedPeer)(nil)
 
 func (me *webseedPeer) peerImplStatusLines() []string {
        return []string{