From fae873df28762ace768a4ef4485eae80fbd30d42 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 17 Apr 2025 10:34:42 +1000 Subject: [PATCH] Add new metrics and PeerStats --- client.go | 1 + client_test.go | 2 +- peer-conn-msg-writer.go | 35 +++++++++++++++++++++++++++++++---- peer-conn-msg-writer_test.go | 5 ++--- peer-impl.go | 13 ++++++++++--- peer-stats.go | 10 ++++++++++ peer.go | 34 ++++++++++++++++++++++++---------- peerconn.go | 6 ++++++ peerconn_test.go | 2 +- piece.go | 11 +++++++---- storage/interface.go | 1 + torrent.go | 10 ++++++---- torrent_test.go | 2 +- webseed-peer.go | 7 ++++++- 14 files changed, 107 insertions(+), 32 deletions(-) create mode 100644 peer-stats.go diff --git a/client.go b/client.go index 7b7d979d..0561b007 100644 --- a/client.go +++ b/client.go @@ -1686,6 +1686,7 @@ func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerCon c.bannableAddr = Some(netipAddrPort.Addr()) } } + c.legacyPeerImpl = c c.peerImpl = c c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextText(fmt.Sprintf("%T %p", c, c)) c.protocolLogger = c.logger.WithNames(protocolLoggingName) diff --git a/client_test.go b/client_test.go index c71a8c43..b6ee1c8d 100644 --- a/client_test.go +++ b/client_test.go @@ -580,7 +580,7 @@ func TestPeerInvalidHave(t *testing.T) { callbacks: &cfg.Callbacks, }} tt.conns[cn] = struct{}{} - cn.peerImpl = cn + cn.legacyPeerImpl = cn cl.lock() defer cl.unlock() assert.NoError(t, cn.peerSentHave(0)) diff --git a/peer-conn-msg-writer.go b/peer-conn-msg-writer.go index 61324608..2300392c 100644 --- a/peer-conn-msg-writer.go +++ b/peer-conn-msg-writer.go @@ -31,7 +31,7 @@ func (pc *PeerConn) initMessageWriter() { defer pc.locker().RUnlock() return pc.useful() }, - writeBuffer: new(bytes.Buffer), + writeBuffer: new(peerConnMsgWriterBuffer), } } @@ -47,6 +47,13 @@ func (pc *PeerConn) messageWriterRunner() { pc.messageWriter.run(pc.t.cl.config.KeepAliveTimeout) } +type peerConnMsgWriterBuffer struct { + // The number of bytes in the buffer that are part of a piece message. When + // the whole buffer is written, we can count this many bytes. + pieceDataBytes int + bytes.Buffer +} + type peerConnMsgWriter struct { // Must not be called with the local mutex held, as it will call back into the write method. fillWriteBuffer func() @@ -58,7 +65,12 @@ type peerConnMsgWriter struct { mu sync.Mutex writeCond chansync.BroadcastCond // Pointer so we can swap with the "front buffer". - writeBuffer *bytes.Buffer + writeBuffer *peerConnMsgWriterBuffer + + totalWriteDuration time.Duration + totalBytesWritten int64 + totalDataBytesWritten int64 + dataUploadRate float64 } // Routine that writes to the peer. Some of what to write is buffered by @@ -67,7 +79,7 @@ type peerConnMsgWriter struct { func (cn *peerConnMsgWriter) run(keepAliveTimeout time.Duration) { lastWrite := time.Now() keepAliveTimer := time.NewTimer(keepAliveTimeout) - frontBuf := new(bytes.Buffer) + frontBuf := new(peerConnMsgWriterBuffer) for { if cn.closed.IsSet() { return @@ -96,6 +108,8 @@ func (cn *peerConnMsgWriter) run(keepAliveTimeout time.Duration) { panic("expected non-empty front buffer") } var err error + startedWriting := time.Now() + startingBufLen := frontBuf.Len() for frontBuf.Len() != 0 { next := frontBuf.Bytes() var n int @@ -112,6 +126,15 @@ func (cn *peerConnMsgWriter) run(keepAliveTimeout time.Duration) { cn.logger.WithDefaultLevel(log.Debug).Printf("error writing: %v", err) return } + // Track what was sent and how long it took. + writeDuration := time.Since(startedWriting) + cn.mu.Lock() + cn.dataUploadRate = float64(frontBuf.pieceDataBytes) / writeDuration.Seconds() + cn.totalWriteDuration += writeDuration + cn.totalBytesWritten += int64(startingBufLen) + cn.totalDataBytesWritten += int64(frontBuf.pieceDataBytes) + cn.mu.Unlock() + frontBuf.pieceDataBytes = 0 lastWrite = time.Now() keepAliveTimer.Reset(keepAliveTimeout) } @@ -125,7 +148,11 @@ func (cn *peerConnMsgWriter) writeToBuffer(msg pp.Message) (err error) { cn.writeBuffer.Truncate(originalLen) } }() - return msg.WriteTo(cn.writeBuffer) + err = msg.WriteTo(cn.writeBuffer) + if err == nil { + cn.writeBuffer.pieceDataBytes += len(msg.Piece) + } + return } func (cn *peerConnMsgWriter) write(msg pp.Message) bool { diff --git a/peer-conn-msg-writer_test.go b/peer-conn-msg-writer_test.go index 308d18e5..a0db83a6 100644 --- a/peer-conn-msg-writer_test.go +++ b/peer-conn-msg-writer_test.go @@ -1,7 +1,6 @@ package torrent import ( - "bytes" "testing" "github.com/dustin/go-humanize" @@ -22,7 +21,7 @@ var benchmarkPieceLengths = []int{defaultChunkSize, 1 << 20, 4 << 20, 8 << 20} func runBenchmarkWriteToBuffer(b *testing.B, length int64) { writer := &peerConnMsgWriter{ - writeBuffer: &bytes.Buffer{}, + writeBuffer: new(peerConnMsgWriterBuffer), } msg := PieceMsg(length) @@ -53,7 +52,7 @@ func BenchmarkWritePieceMsg(b *testing.B) { func runBenchmarkMarshalBinaryWrite(b *testing.B, length int64) { writer := &peerConnMsgWriter{ - writeBuffer: &bytes.Buffer{}, + writeBuffer: &peerConnMsgWriterBuffer{}, } msg := PieceMsg(length) diff --git a/peer-impl.go b/peer-impl.go index f9f9096b..2a036aea 100644 --- a/peer-impl.go +++ b/peer-impl.go @@ -7,9 +7,11 @@ import ( ) // Contains implementation details that differ between peer types, like Webseeds and regular -// BitTorrent protocol connections. Some methods are underlined so as to avoid collisions with -// legacy PeerConn methods. -type peerImpl interface { +// BitTorrent protocol connections. These methods are embedded in the child types of Peer for legacy +// expectations that they exist on the child type. Some methods are underlined to avoid collisions +// with legacy PeerConn methods. New methods and calls that are fixed up should be migrated over to +// newHotPeerImpl. +type legacyPeerImpl interface { // Trigger the actual request state to get updated handleUpdateRequests() writeInterested(interested bool) bool @@ -35,3 +37,8 @@ type peerImpl interface { peerHasAllPieces() (all, known bool) peerPieces() *roaring.Bitmap } + +// Abstract methods implemented by subclasses of Peer. +type newHotPeerImpl interface { + lastWriteUploadRate() float64 +} diff --git a/peer-stats.go b/peer-stats.go new file mode 100644 index 00000000..5f228d56 --- /dev/null +++ b/peer-stats.go @@ -0,0 +1,10 @@ +package torrent + +type PeerStats struct { + ConnStats + + DownloadRate float64 + LastWriteUploadRate float64 + // How many pieces the peer has. + RemotePieceCount int +} diff --git a/peer.go b/peer.go index 35d7066b..27f094fa 100644 --- a/peer.go +++ b/peer.go @@ -31,7 +31,8 @@ type ( t *Torrent - peerImpl + legacyPeerImpl + peerImpl newHotPeerImpl callbacks *Callbacks outgoing bool @@ -142,6 +143,16 @@ func (p *Peer) Torrent() *Torrent { return p.t } +func (p *Peer) Stats() (ret PeerStats) { + p.locker().RLock() + defer p.locker().RUnlock() + ret.ConnStats = p._stats.Copy() + ret.DownloadRate = p.downloadRate() + ret.LastWriteUploadRate = p.peerImpl.lastWriteUploadRate() + ret.RemotePieceCount = p.remotePieceCount() + return +} + func (p *Peer) initRequestState() { p.requestState.Requests = &peerRequests{} } @@ -210,12 +221,17 @@ func (cn *Peer) bestPeerNumPieces() pieceIndex { return cn.peerMinPieces } -func (cn *Peer) completedString() string { +// How many pieces we think the peer has. +func (cn *Peer) remotePieceCount() pieceIndex { have := pieceIndex(cn.peerPieces().GetCardinality()) if all, _ := cn.peerHasAllPieces(); all { have = cn.bestPeerNumPieces() } - return fmt.Sprintf("%d/%d", have, cn.bestPeerNumPieces()) + return have +} + +func (cn *Peer) completedString() string { + return fmt.Sprintf("%d/%d", cn.remotePieceCount(), cn.bestPeerNumPieces()) } func eventAgeString(t time.Time) string { @@ -256,11 +272,9 @@ func (cn *Peer) downloadRate() float64 { return float64(num) / cn.totalExpectingTime().Seconds() } +// Deprecated: Use Peer.Stats. func (p *Peer) DownloadRate() float64 { - p.locker().RLock() - defer p.locker().RUnlock() - - return p.downloadRate() + return p.Stats().DownloadRate } func (cn *Peer) iterContiguousPieceRequests(f func(piece pieceIndex, count int)) { @@ -336,7 +350,7 @@ func (p *Peer) close() { for _, prs := range p.peerRequests { prs.allocReservation.Drop() } - p.peerImpl.onClose() + p.legacyPeerImpl.onClose() if p.t != nil { p.t.decPeerPieceAvailability(p) } @@ -476,7 +490,7 @@ func (cn *Peer) request(r RequestIndex) (more bool, err error) { for _, f := range cn.callbacks.SentRequest { f(PeerRequestEvent{cn, ppReq}) } - return cn.peerImpl._request(ppReq), nil + return cn.legacyPeerImpl._request(ppReq), nil } func (me *Peer) cancel(r RequestIndex) { @@ -877,7 +891,7 @@ func (cn *Peer) stats() *ConnStats { } func (p *Peer) TryAsPeerConn() (*PeerConn, bool) { - pc, ok := p.peerImpl.(*PeerConn) + pc, ok := p.legacyPeerImpl.(*PeerConn) return pc, ok } diff --git a/peerconn.go b/peerconn.go index ad069a29..2a3fd947 100644 --- a/peerconn.go +++ b/peerconn.go @@ -98,6 +98,12 @@ type PeerConn struct { receivedHashPieces map[[32]byte][][32]byte } +func (cn *PeerConn) lastWriteUploadRate() float64 { + cn.messageWriter.mu.Lock() + defer cn.messageWriter.mu.Unlock() + return cn.messageWriter.dataUploadRate +} + func (cn *PeerConn) pexStatus() string { if !cn.bitExtensionEnabled(pp.ExtensionBitLtep) { return "extended protocol disabled" diff --git a/peerconn_test.go b/peerconn_test.go index 7dc28bd1..e3d1c8a3 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -277,7 +277,7 @@ func TestHaveAllThenBitfield(t *testing.T) { Peer: Peer{t: tt}, } pc.initRequestState() - pc.peerImpl = &pc + pc.legacyPeerImpl = &pc tt.conns[&pc] = struct{}{} c.Assert(pc.onPeerSentHaveAll(), qt.IsNil) c.Check(pc.t.connsWithAllPieces, qt.DeepEquals, map[*Peer]struct{}{&pc.Peer: {}}) diff --git a/piece.go b/piece.go index 66ae2b0c..074f6601 100644 --- a/piece.go +++ b/piece.go @@ -30,9 +30,10 @@ type Piece struct { readerCond chansync.BroadcastCond - numVerifies pieceVerifyCount - numVerifiesCond chansync.BroadcastCond - hashing bool + numVerifies pieceVerifyCount + numVerifiesCond chansync.BroadcastCond + hashing bool + // The piece state may have changed, and is being synchronized with storage. marking bool storageCompletionOk bool @@ -268,7 +269,9 @@ func (p *Piece) effectivePriority() (ret PiecePriority) { } // Tells the Client to refetch the completion status from storage, updating priority etc. if -// necessary. Might be useful if you know the state of the piece data has changed externally. +// necessary. Might be useful if you know the state of the piece data has +// changed externally. TODO: Document why this is public, maybe change name to +// SyncCompletion or something. func (p *Piece) UpdateCompletion() { p.t.cl.lock() defer p.t.cl.unlock() diff --git a/storage/interface.go b/storage/interface.go index 981ed88d..f7ccd42f 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -55,6 +55,7 @@ type PieceImpl interface { Completion() Completion } +// TODO: Yo where the fuck is the documentation. type Completion struct { Complete bool Ok bool diff --git a/torrent.go b/torrent.go index c1a381db..312e4509 100644 --- a/torrent.go +++ b/torrent.go @@ -1152,7 +1152,7 @@ func (t *Torrent) countBytesHashed(n int64) { func (t *Torrent) hashPiece(piece pieceIndex) ( correct bool, -// These are peers that sent us blocks that differ from what we hash here. + // These are peers that sent us blocks that differ from what we hash here. differingPeers map[bannableAddr]struct{}, err error, ) { @@ -1226,7 +1226,7 @@ func sumExactly(dst []byte, sum func(b []byte) []byte) { } func (t *Torrent) hashPieceWithSpecificHash(piece pieceIndex, h hash.Hash) ( -// These are peers that sent us blocks that differ from what we hash here. + // These are peers that sent us blocks that differ from what we hash here. differingPeers map[bannableAddr]struct{}, err error, ) { @@ -1268,8 +1268,8 @@ func (t *Torrent) havePiece(index pieceIndex) bool { } func (t *Torrent) maybeDropMutuallyCompletePeer( -// I'm not sure about taking peer here, not all peer implementations actually drop. Maybe that's -// okay? + // I'm not sure about taking peer here, not all peer implementations actually drop. Maybe that's + // okay? p *PeerConn, ) { if !t.cl.config.DropMutuallyCompletePeers { @@ -2956,6 +2956,8 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) { f(&ws.peer) } ws.peer.logger = t.logger.WithContextValue(&ws).WithNames("webseed") + // TODO: Abstract out a common struct initializer for this... + ws.peer.legacyPeerImpl = &ws ws.peer.peerImpl = &ws if t.haveInfo() { ws.onGotInfo(t.info) diff --git a/torrent_test.go b/torrent_test.go index bd409b68..663716df 100644 --- a/torrent_test.go +++ b/torrent_test.go @@ -238,7 +238,7 @@ func TestRelativeAvailabilityHaveNone(t *testing.T) { g.MakeMapIfNil(&tt.conns) pc := PeerConn{} pc.t = &tt - pc.peerImpl = &pc + pc.legacyPeerImpl = &pc pc.initRequestState() g.InitNew(&pc.callbacks) tt.conns[&pc] = struct{}{} diff --git a/webseed-peer.go b/webseed-peer.go index aeddbaa5..c6da58ff 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -29,7 +29,12 @@ type webseedPeer struct { lastUnhandledErr time.Time } -var _ peerImpl = (*webseedPeer)(nil) +func (me *webseedPeer) lastWriteUploadRate() float64 { + // We never upload to webseeds. + return 0 +} + +var _ legacyPeerImpl = (*webseedPeer)(nil) func (me *webseedPeer) peerImplStatusLines() []string { return []string{ -- 2.48.1